From 9e2c7f1c3ab77b8ca0503d90d7c023984ded5526 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Tue, 19 Sep 2023 01:03:18 +0300 Subject: [PATCH] fix(nonblocking): apply suggestions from code review --- src/asyncresults.jl | 19 ++++++++++----- src/connections.jl | 56 ++++++++++++++++++++++++++++++--------------- 2 files changed, 50 insertions(+), 25 deletions(-) diff --git a/src/asyncresults.jl b/src/asyncresults.jl index 5710d7bd..9977b2a2 100644 --- a/src/asyncresults.jl +++ b/src/asyncresults.jl @@ -84,8 +84,10 @@ function _consume(jl_conn::Connection) # this is important? # https://github.com/postgres/postgres/blob/master/src/interfaces/libpq/fe-exec.c#L1266 # if we used non-blocking connections we would need to check for `1` as well - # See flush(jl_conn::Connection) in connections.jl - flush(jl_conn) + # See _flush(jl_conn::Connection) in connections.jl + if !_flush(jl_conn) + error(LOGGER, Errors.PQConnectionError(jl_conn)) + end async_result = jl_conn.async_result result_ptrs = Ptr{libpq_c.PGresult}[] @@ -291,10 +293,10 @@ end function _async_submit(jl_conn::Connection, query::AbstractString) send_status = libpq_c.PQsendQuery(jl_conn.conn::Ptr{libpq_c.PGconn}, query) - if isnonblocking(jl_conn) == 0 + if isnonblocking(jl_conn) return send_status == 1 else - return flush(jl_conn) + return _flush(jl_conn) end end @@ -317,6 +319,11 @@ function _async_submit( zeros(Cint, num_params), # all parameters in text format Cint(binary_format), # return result in text or binary format ) - # send_status must be 1, if nonblock, we also want to flush - return send_status == 1 && (isnonblocking(jl_conn) == 0 || flush(jl_conn)) + # send_status must be 1 + # if nonblock, we also want to _flush + if isnonblocking(jl_conn) + return send_status == 1 && _flush(jl_conn) + else + return send_status == 1 + end end diff --git a/src/connections.jl b/src/connections.jl index 689109d7..093aedd9 100644 --- a/src/connections.jl +++ b/src/connections.jl @@ -267,6 +267,7 @@ function Connection( throw_error::Bool=true, connect_timeout::Real=0, options::Dict{String, String}=CONNECTION_OPTION_DEFAULTS, + nonblocking::Bool=false, kwargs... ) if options === CONNECTION_OPTION_DEFAULTS @@ -300,7 +301,7 @@ function Connection( ) # If password needed and not entered, prompt the user - if libpq_c.PQconnectionNeedsPassword(jl_conn.conn) == 1 + connection = if libpq_c.PQconnectionNeedsPassword(jl_conn.conn) == 1 push!(keywords, "password") user = unsafe_string(libpq_c.PQuser(jl_conn.conn)) # close this connection; will open another one below with the user-provided password @@ -309,7 +310,7 @@ function Connection( pass = Base.getpass(prompt) push!(values, read(pass, String)) Base.shred!(pass) - return handle_new_connection( + handle_new_connection( Connection( _connect_nonblocking(keywords, values, false; timeout=connect_timeout); kwargs... @@ -317,11 +318,20 @@ function Connection( throw_error=throw_error, ) else - return handle_new_connection( + handle_new_connection( jl_conn; throw_error=throw_error, ) end + + if nonblocking + success = libpq_c.PQsetnonblocking(connection.conn, convert(Cint, nonblock)) == 0 + if !success + close(connection) + error(LOGGER, "Could not provide a non-blocking connection") + end + end + return connection end # AbstractLock primitives: @@ -791,9 +801,11 @@ end socket(jl_conn::Connection) = socket(jl_conn.conn) """ + isnonblocking(jl_conn::Connection) + Sets the nonblocking connection status of the PG connections. While async_execute is non-blocking on the receiving side, -the sending side is still nonblockign without this +the sending side is still nonblocking without this Returns true on success, false on failure https://www.postgresql.org/docs/current/libpq-async.html @@ -808,28 +820,37 @@ Returns true if the connection is set to non-blocking, false otherwise https://www.postgresql.org/docs/current/libpq-async.html """ -function isnonblocking(jl_conn) +function isnonblocking(jl_conn::Connection) return libpq_c.PQisnonblocking(jl_conn.conn) == 1 end """ -Do the flush dance described in the libpq docs. Required when the + _flush(jl_conn::Connection) + +Do the _flush dance described in the libpq docs. Required when the connections are set to nonblocking and we want do send queries/data without blocking. -https://www.postgresql.org/docs/current/libpq-async.html#LIBPQ-PQFlush +https://www.postgresql.org/docs/current/libpq-async.html#LIBPQ-PQFLUSH """ -function flush(jl_conn) - watcher = FDWatcher(socket(jl_conn), true, true) # can wait for reads and writes +function _flush(jl_conn::Connection) + local watcher = nothing + if isnonblocking(jl_conn) + watcher = FDWatcher(socket(jl_conn), true, true) # can wait for reads and writes + end try - while true # Iterators.repeated(true) # would make me more comfotable I think + while true flushstatus = libpq_c.PQflush(jl_conn.conn) # 0 indicates success - flushstatus == 0 && return true + if flushstatus == 0 + return true # -1 indicates error - flushstatus < 0 && error(LOGGER, Errors.PQConnectionError(jl_conn)) - # Could not send all data without blocking, need to wait FD - flushstatus == 1 && begin + elseif flushstatus < 0 + return false + # 1 indicates that we could not send all data without blocking, + elseif flushstatus == 1 + # need to wait FD + # Only applicable when the connection is in nonblocking mode wait(watcher) # Wait for the watcher # If it becomes write-ready, call PQflush again. if watcher.mask.writable @@ -838,15 +859,12 @@ function flush(jl_conn) if watcher.mask.readable # if the stream is readable, we have to consume data from the server first. success = libpq_c.PQconsumeInput(jl_conn.conn) == 1 - !success && error(LOGGER, Errors.PQConnectionError(jl_conn)) + !success && return false end end end - catch - # We don't want to manage anything here - rethrow() finally # Just close the watcher - close(watcher) + !isnothing(watcher) && close(watcher) end end