From fd18b80f8dc95267618876c8acbcc4ed9a8f1091 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sat, 2 Sep 2023 10:15:07 +0300 Subject: [PATCH 1/4] feat(server): spawn task sooner in listenloop --- docs/examples/cors_server.jl | 6 ++-- src/Servers.jl | 69 ++++++++++++++++++++++++------------ src/WebSockets.jl | 2 +- 3 files changed, 51 insertions(+), 26 deletions(-) diff --git a/docs/examples/cors_server.jl b/docs/examples/cors_server.jl index 2275b3619..ad6765cbc 100644 --- a/docs/examples/cors_server.jl +++ b/docs/examples/cors_server.jl @@ -39,7 +39,7 @@ const CORS_RES_HEADERS = ["Access-Control-Allow-Origin" => "*"] #= JSONMiddleware minimizes code by automatically converting the request body to JSON to pass to the other service functions automatically. JSONMiddleware -recieves the body of the response from the other service funtions and sends +receives the body of the response from the other service funtions and sends back a success response code =# function JSONMiddleware(handler) @@ -65,9 +65,9 @@ function JSONMiddleware(handler) end #= CorsMiddleware: handles preflight request with the OPTIONS flag -If a request was recieved with the correct headers, then a response will be +If a request was received with the correct headers, then a response will be sent back with a 200 code, if the correct headers were not specified in the request, -then a CORS error will be recieved on the client side +then a CORS error will be received on the client side Since each request passes throught the CORS Handler, then if the request is not a preflight request, it will simply go to the JSONMiddleware to be passed to the diff --git a/src/Servers.jl b/src/Servers.jl index 6afb16590..89fdb4b64 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -13,6 +13,7 @@ export listen, listen!, Server, forceclose, port using Sockets, Logging, LoggingExtras, MbedTLS, Dates using MbedTLS: SSLContext, SSLConfig +using ConcurrentUtilities: Lockable, lock using ..IOExtras, ..Streams, ..Messages, ..Parsers, ..Connections, ..Exceptions import ..access_threaded, ..SOCKET_TYPE_TLS, ..@logfmt_str @@ -83,10 +84,19 @@ accept(s::Listener{SSLConfig}) = getsslcontext(Sockets.accept(s.server), s.ssl) function getsslcontext(tcp, sslconfig) try + handshake_done = Ref{Bool}(false) ssl = MbedTLS.SSLContext() MbedTLS.setup!(ssl, sslconfig) MbedTLS.associate!(ssl, tcp) - MbedTLS.handshake!(ssl) + handshake_task = @async begin + MbedTLS.handshake!(ssl) + handshake_done[] = true + end + timedwait(5.0) do + handshake_done[] || istaskdone(handshake_task) + end + !istaskdone(handshake_task) && wait(handshake_task) + handshake_done[] || throw(Base.IOError("SSL handshake timed out", Base.ETIMEDOUT)) return ssl catch e @try Base.IOError close(tcp) @@ -363,31 +373,46 @@ Accepts new tcp connections and spawns async tasks to handle them." function listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose) sem = Base.Semaphore(max_connections) + ssl = Lockable(listener.ssl) + connections = Lockable(conns) verbose >= 0 && @infov 1 "Listening on: $(listener.hostname):$(listener.hostport), thread id: $(Threads.threadid())" notify(ready_to_accept) while isopen(listener) try Base.acquire(sem) - io = accept(listener) - if io === nothing - @warnv 1 "unable to accept new connection" - continue - elseif !tcpisvalid(io) - @warnv 1 "!tcpisvalid: $io" - close(io) - continue - end - conn = Connection(io) - conn.state = IDLE - push!(conns, conn) - conn.host, conn.port = listener.hostname, listener.hostport - @async try - handle_connection(f, conn, listener, readtimeout, access_log) - finally - # handle_connection is in charge of closing the underlying io - delete!(conns, conn) - Base.release(sem) - end + io = Sockets.accept(listener.server) + Threads.@spawn begin + local conn = nothing + isssl = !isnothing(listener.ssl) + try + if io === nothing + @warnv 1 "unable to accept new connection" + return + end + if isssl + io = lock(ssl) do ssl + return getsslcontext(io, ssl) + end + end + if !tcpisvalid(io) + close(io) + return + end + conn = Connection(io) + conn.state = IDLE + lock(connections) do conns + push!(conns, conn) + end + conn.host, conn.port = listener.hostname, listener.hostport + handle_connection(f, conn, listener, readtimeout, access_log) + finally + # handle_connection is in charge of closing the underlying io, but it may not get there + !isnothing(conn) && lock(connections) do conns + delete!(conns, conn) + end + Base.release(sem) + end + end # Task.@spawn catch e if e isa Base.IOError && e.code == Base.UV_ECONNABORTED verbose >= 0 && @infov 1 "Server on $(listener.hostname):$(listener.hostport) closing" @@ -442,7 +467,7 @@ function handle_connection(f, c::Connection, listener, readtimeout, access_log) request.response.status = 200 try - # invokelatest becuase the perf is negligible, but this makes live-editing handlers more Revise friendly + # invokelatest because the perf is negligible, but this makes live-editing handlers more Revise friendly @debugv 1 "invoking handler" Base.invokelatest(f, http) # If `startwrite()` was never called, throw an error so we send a 500 and log this diff --git a/src/WebSockets.jl b/src/WebSockets.jl index 56b6933a9..2fbf6a07c 100644 --- a/src/WebSockets.jl +++ b/src/WebSockets.jl @@ -587,7 +587,7 @@ function Base.close(ws::WebSocket, body::CloseFrameBody=CloseFrameBody(1000, "") ws.readclosed = true end end - # we either recieved the responding CLOSE frame and readclosed was set + # we either received the responding CLOSE frame and readclosed was set # or there was an error/timeout reading it; in any case, readclosed should be closed now @assert ws.readclosed # if we're the server, it's our job to close the underlying socket From 48dc4b43e5e922301e9b636bde0d2c83d0e5087b Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sat, 2 Sep 2023 13:37:46 +0300 Subject: [PATCH 2/4] fix(servers): use @async instead of Threads.@spawn --- src/Servers.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Servers.jl b/src/Servers.jl index 89fdb4b64..e0e98618e 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -381,7 +381,7 @@ function listenloop(f, listener, conns, tcpisvalid, try Base.acquire(sem) io = Sockets.accept(listener.server) - Threads.@spawn begin + @async begin local conn = nothing isssl = !isnothing(listener.ssl) try From 46d09bb60eaa6a1f7fd5ad4f3364150a6f306c0b Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Thu, 7 Sep 2023 05:50:16 +0300 Subject: [PATCH 3/4] remove locks --- src/Servers.jl | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/src/Servers.jl b/src/Servers.jl index e0e98618e..84235e405 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -13,7 +13,6 @@ export listen, listen!, Server, forceclose, port using Sockets, Logging, LoggingExtras, MbedTLS, Dates using MbedTLS: SSLContext, SSLConfig -using ConcurrentUtilities: Lockable, lock using ..IOExtras, ..Streams, ..Messages, ..Parsers, ..Connections, ..Exceptions import ..access_threaded, ..SOCKET_TYPE_TLS, ..@logfmt_str @@ -373,8 +372,6 @@ Accepts new tcp connections and spawns async tasks to handle them." function listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose) sem = Base.Semaphore(max_connections) - ssl = Lockable(listener.ssl) - connections = Lockable(conns) verbose >= 0 && @infov 1 "Listening on: $(listener.hostname):$(listener.hostport), thread id: $(Threads.threadid())" notify(ready_to_accept) while isopen(listener) @@ -383,16 +380,13 @@ function listenloop(f, listener, conns, tcpisvalid, io = Sockets.accept(listener.server) @async begin local conn = nothing - isssl = !isnothing(listener.ssl) try if io === nothing @warnv 1 "unable to accept new connection" return end - if isssl - io = lock(ssl) do ssl - return getsslcontext(io, ssl) - end + if !isnothing(listener.ssl) + io = getsslcontext(io, ssl) end if !tcpisvalid(io) close(io) @@ -400,16 +394,12 @@ function listenloop(f, listener, conns, tcpisvalid, end conn = Connection(io) conn.state = IDLE - lock(connections) do conns - push!(conns, conn) - end + push!(conns, conn) conn.host, conn.port = listener.hostname, listener.hostport handle_connection(f, conn, listener, readtimeout, access_log) finally # handle_connection is in charge of closing the underlying io, but it may not get there - !isnothing(conn) && lock(connections) do conns - delete!(conns, conn) - end + !isnothing(conn) && delete!(conns, conn) Base.release(sem) end end # Task.@spawn From 4df902d871a8ba6ba33ef14ce8040a6b0c458c08 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Thu, 7 Sep 2023 06:38:53 +0300 Subject: [PATCH 4/4] fix: invert async logic for timeout, reduce to 2 sec and remove lock --- src/Servers.jl | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/Servers.jl b/src/Servers.jl index 84235e405..49c930b56 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -87,15 +87,18 @@ function getsslcontext(tcp, sslconfig) ssl = MbedTLS.SSLContext() MbedTLS.setup!(ssl, sslconfig) MbedTLS.associate!(ssl, tcp) - handshake_task = @async begin - MbedTLS.handshake!(ssl) - handshake_done[] = true - end - timedwait(5.0) do - handshake_done[] || istaskdone(handshake_task) + thistask = current_task() + # this task is meant to be super small while the handshake remains on the main task + @async begin + timedwait(2.0) do + handshake_done[] || istaskdone(thistask) + end + if !handshake_done[] && !istaskdone(thistask) + Base.throwto(thistask, Base.IOError("SSL handshake timed out", Base.ETIMEDOUT)) + end end - !istaskdone(handshake_task) && wait(handshake_task) - handshake_done[] || throw(Base.IOError("SSL handshake timed out", Base.ETIMEDOUT)) + MbedTLS.handshake!(ssl) + handshake_done[] = true return ssl catch e @try Base.IOError close(tcp) @@ -386,7 +389,7 @@ function listenloop(f, listener, conns, tcpisvalid, return end if !isnothing(listener.ssl) - io = getsslcontext(io, ssl) + io = getsslcontext(io, listener.ssl) end if !tcpisvalid(io) close(io)