From 0721ea54b2cfb068c3ecdfca76db92ae325407de Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sun, 3 Sep 2023 15:18:46 +0300 Subject: [PATCH 01/12] feat(nonblocking) implement non blocking mode for connections --- src/asyncresults.jl | 27 ++++++++++++------- src/connections.jl | 63 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 80 insertions(+), 10 deletions(-) diff --git a/src/asyncresults.jl b/src/asyncresults.jl index 47f5cf62..dd2b1789 100644 --- a/src/asyncresults.jl +++ b/src/asyncresults.jl @@ -84,9 +84,8 @@ function _consume(jl_conn::Connection) # this is important? # https://github.com/postgres/postgres/blob/master/src/interfaces/libpq/fe-exec.c#L1266 # if we used non-blocking connections we would need to check for `1` as well - if libpq_c.PQflush(jl_conn.conn) < 0 - error(LOGGER, Errors.PQConnectionError(jl_conn)) - end + # See flush(jl_conn::Connection) in connections.jl + flush(jl_conn) async_result = jl_conn.async_result result_ptrs = Ptr{libpq_c.PGresult}[] @@ -231,7 +230,7 @@ end function _multi_async_execute(jl_conn::Connection, query::AbstractString; kwargs...) async_result = _async_execute(jl_conn; kwargs...) do jl_conn - _async_submit(jl_conn.conn, query) + _async_submit(jl_conn, query) end return async_result @@ -254,7 +253,7 @@ function async_execute( async_result = _async_execute(jl_conn; binary_format=binary_format, kwargs...) do jl_conn GC.@preserve string_params _async_submit( - jl_conn.conn, query, pointer_params; binary_format=binary_format + jl_conn, query, pointer_params; binary_format=binary_format ) end @@ -289,16 +288,22 @@ function _async_execute( return async_result end -function _async_submit(conn_ptr::Ptr{libpq_c.PGconn}, query::AbstractString) - return libpq_c.PQsendQuery(conn_ptr, query) == 1 +function _async_submit(jl_conn::Connection, query::AbstractString) + send_status = libpq_c.PQsendQuery(jl_conn.conn::Ptr{libpq_c.PGconn}, query) + if isnonblocking(jl_conn) == 0 + return send_status == 1 + else + return flush(jl_conn) + end end function _async_submit( - conn_ptr::Ptr{libpq_c.PGconn}, + jl_conn::Connection, query::AbstractString, parameters::Vector{Ptr{UInt8}}; binary_format::Bool=false, ) + conn_ptr::Ptr{libpq_c.PGconn} = jl_conn.conn num_params = length(parameters) send_status = libpq_c.PQsendQueryParams( @@ -312,5 +317,9 @@ function _async_submit( Cint(binary_format), # return result in text or binary format ) - return send_status == 1 + if isnonblocking(jl_conn) == 0 + return send_status == 1 + else + return flush(jl_conn) + end end diff --git a/src/connections.jl b/src/connections.jl index 91372430..689109d7 100644 --- a/src/connections.jl +++ b/src/connections.jl @@ -672,7 +672,7 @@ end """ ConnectionOption(pq_opt::libpq_c.PQconninfoOption) -> ConnectionOption -Construct a `ConnectionOption` from a `libpg_c.PQconninfoOption`. +Construct a `ConnectionOption` from a `libpq_c.PQconninfoOption`. """ function ConnectionOption(pq_opt::libpq_c.PQconninfoOption) return ConnectionOption( @@ -789,3 +789,64 @@ function socket(conn::Ptr{libpq_c.PGconn}) end socket(jl_conn::Connection) = socket(jl_conn.conn) + +""" +Sets the nonblocking connection status of the PG connections. +While async_execute is non-blocking on the receiving side, +the sending side is still nonblockign without this +Returns true on success, false on failure + +https://www.postgresql.org/docs/current/libpq-async.html +""" +function setnonblocking(jl_conn::Connection; nonblock=true) + return libpq_c.PQsetnonblocking(jl_conn.conn, convert(Cint, nonblock)) == 0 +end + +""" +Checks whether the connection is non-blocking. +Returns true if the connection is set to non-blocking, false otherwise + +https://www.postgresql.org/docs/current/libpq-async.html +""" +function isnonblocking(jl_conn) + return libpq_c.PQisnonblocking(jl_conn.conn) == 1 +end + +""" +Do the flush dance described in the libpq docs. Required when the +connections are set to nonblocking and we want do send queries/data +without blocking. + +https://www.postgresql.org/docs/current/libpq-async.html#LIBPQ-PQFlush +""" +function flush(jl_conn) + watcher = FDWatcher(socket(jl_conn), true, true) # can wait for reads and writes + try + while true # Iterators.repeated(true) # would make me more comfotable I think + flushstatus = libpq_c.PQflush(jl_conn.conn) + # 0 indicates success + flushstatus == 0 && return true + # -1 indicates error + flushstatus < 0 && error(LOGGER, Errors.PQConnectionError(jl_conn)) + # Could not send all data without blocking, need to wait FD + flushstatus == 1 && begin + wait(watcher) # Wait for the watcher + # If it becomes write-ready, call PQflush again. + if watcher.mask.writable + continue # Call PGflush again, to send more data + end + if watcher.mask.readable + # if the stream is readable, we have to consume data from the server first. + success = libpq_c.PQconsumeInput(jl_conn.conn) == 1 + !success && error(LOGGER, Errors.PQConnectionError(jl_conn)) + end + end + end + catch + # We don't want to manage anything here + rethrow() + finally + # Just close the watcher + close(watcher) + end +end From 4b6c77c03c3c2f66419a9653b40779e7e1c29df7 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Mon, 4 Sep 2023 10:30:09 +0300 Subject: [PATCH 02/12] fix: always check send_status == 1 in _async_submit --- src/asyncresults.jl | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/asyncresults.jl b/src/asyncresults.jl index dd2b1789..b2a0c5ba 100644 --- a/src/asyncresults.jl +++ b/src/asyncresults.jl @@ -316,10 +316,6 @@ function _async_submit( zeros(Cint, num_params), # all parameters in text format Cint(binary_format), # return result in text or binary format ) - - if isnonblocking(jl_conn) == 0 - return send_status == 1 - else - return flush(jl_conn) - end + # send_status must be 1, if nonblock, we also want to flush + return send_status == 1 && (isnonblocking(jl_conn) == 0 || flush(jl_conn)) end From 385b6810bea07f4f36e3238a7d9bfa46f2fac2c3 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Wed, 6 Sep 2023 23:24:06 +0300 Subject: [PATCH 03/12] test(LibPQ): run the async result test with both types of connections, and option to run all with any type --- src/asyncresults.jl | 3 ++- test/runtests.jl | 51 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/src/asyncresults.jl b/src/asyncresults.jl index b2a0c5ba..5710d7bd 100644 --- a/src/asyncresults.jl +++ b/src/asyncresults.jl @@ -251,7 +251,8 @@ function async_execute( string_params = string_parameters(parameters) pointer_params = parameter_pointers(string_params) - async_result = _async_execute(jl_conn; binary_format=binary_format, kwargs...) do jl_conn + async_result = + _async_execute(jl_conn; binary_format=binary_format, kwargs...) do jl_conn GC.@preserve string_params _async_submit( jl_conn, query, pointer_params; binary_format=binary_format ) diff --git a/test/runtests.jl b/test/runtests.jl index 6a018f64..8c721cb0 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -40,7 +40,8 @@ function count_allocs(f, args...) return Base.gc_alloc_count(stats.gcstats) end -@testset "LibPQ" begin +usenonblocking = get(ENV, "LIBPQJL_CONNECTION_NONBLOCKING", false) +@testset "LibPQ $(usenonblocking ? "(nonblocking connection)" : "")" begin @testset "ConninfoDisplay" begin @test parse(LibPQ.ConninfoDisplay, "") == LibPQ.Normal @@ -82,6 +83,7 @@ end @testset "Example SELECT" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=false) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) @test conn isa LibPQ.Connection @test isopen(conn) @test status(conn) == LibPQ.libpq_c.CONNECTION_OK @@ -190,6 +192,7 @@ end @testset "Example INSERT and DELETE" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute(conn, """ CREATE TEMPORARY TABLE libpqjl_test ( @@ -333,6 +336,7 @@ end @testset "load!" begin @testset "issue #204" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + LibPQ.setnonblocking(conn, nonblock=usenonblocking) close(execute(conn, """ CREATE TEMPORARY TABLE libpqjl_test ( @@ -357,6 +361,7 @@ end @testset "COPY FROM" begin @testset "Example COPY FROM" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute(conn, """ CREATE TEMPORARY TABLE libpqjl_test ( @@ -401,6 +406,7 @@ end @testset "Wrong column order" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute(conn, """ CREATE TEMPORARY TABLE libpqjl_test ( @@ -458,6 +464,7 @@ end local saved_conn was_open = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) do jl_conn + LibPQ.setnonblocking(jl_conn, nonblock=usenonblocking) saved_conn = jl_conn return isopen(jl_conn) end @@ -466,6 +473,7 @@ end @test !isopen(saved_conn) @test_throws LibPQ.Errors.PQConnectionError LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=true) do jl_conn + LibPQ.setnonblocking(conn, nonblock=usenonblocking) saved_conn = jl_conn @test false end @@ -475,6 +483,7 @@ end @testset "Version Numbers" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) # update this test before PostgreSQL 20.0 ;) @test LibPQ.pqv"7" <= LibPQ.server_version(conn) <= LibPQ.pqv"20" @@ -482,6 +491,7 @@ end @testset "Encoding" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) @test LibPQ.encoding(conn) == "UTF8" @@ -513,6 +523,7 @@ end throw_error=true, type_map=Dict(:interval => String), ) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) conn_info = LibPQ.conninfo(conn) options = first(filter(conn_info) do conn_opt @@ -632,6 +643,8 @@ end @testset "Finalizer" begin closed_flags = map(1:50) do _ conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + LibPQ.setnonblocking(conn, nonblock=usenonblocking) + closed = conn.closed finalize(conn) return closed @@ -646,6 +659,8 @@ end closed_flags = map(1:50) do _ conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + LibPQ.setnonblocking(conn, nonblock=usenonblocking) + push!(results, execute(conn, "SELECT 1;")) return conn.closed end @@ -661,6 +676,8 @@ end # with AsyncResults, which hold a reference to Connection closed_flags = asyncmap(1:50) do _ conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + LibPQ.setnonblocking(conn, nonblock=usenonblocking) + wait(async_execute(conn, "SELECT pg_sleep(1);")) return conn.closed end @@ -707,6 +724,8 @@ end @testset "throw_error=false" begin conn = LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=false) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) + @test conn isa LibPQ.Connection @test status(conn) == LibPQ.libpq_c.CONNECTION_BAD @test isopen(conn) @@ -725,6 +744,8 @@ end @test_throws LibPQ.Errors.PQConnectionError LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=true) conn = LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=false) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) + @test conn isa LibPQ.Connection @test status(conn) == LibPQ.libpq_c.CONNECTION_BAD @test isopen(conn) @@ -740,6 +761,7 @@ end @testset "Results" begin @testset "Nulls" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute(conn, "SELECT NULL"; throw_error=true) @test status(result) == LibPQ.libpq_c.PGRES_TUPLES_OK @@ -824,6 +846,7 @@ end @testset "Not Nulls" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute(conn, "SELECT NULL"; not_null=[false], throw_error=true) @test status(result) == LibPQ.libpq_c.PGRES_TUPLES_OK @@ -949,6 +972,7 @@ end @testset "Tables.jl" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute(conn, """ SELECT no_nulls, yes_nulls FROM ( @@ -991,6 +1015,7 @@ end @testset "Duplicate names" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute(conn, "SELECT 1 AS col, 2 AS col;", not_null=true, throw_error=true) columns = Tables.columns(result) @@ -1007,6 +1032,7 @@ end @testset "Uppercase Columns" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute(conn, "SELECT 1 AS \"Column\";") @test num_columns(result) == 1 @@ -1025,6 +1051,7 @@ end @testset "PQResultError" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) try execute(conn, "SELECT log(-1);") @@ -1072,6 +1099,7 @@ end @testset "Type Conversions" begin @testset "Deprecations" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute(conn, "SELECT 'infinity'::timestamp;") @@ -1089,6 +1117,7 @@ end @testset "Automatic" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute(conn, """ SELECT oid, typname, typlen, typbyval, typcategory @@ -1120,6 +1149,7 @@ end @testset "Overrides" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute(conn, "SELECT 4::bigint;") @test first(first(result)) === Int64(4) @@ -1169,6 +1199,7 @@ end @testset for binary_format in (LibPQ.TEXT, LibPQ.BINARY) @testset "Default Types" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) test_data = [ ("3", Cint(3)), @@ -1335,6 +1366,7 @@ end @testset "Specified Types" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) test_data = [ ("3", UInt, UInt(3)), @@ -1428,6 +1460,7 @@ end @testset "Parameters" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) @testset "Arrays" begin tests = ( @@ -1542,6 +1575,7 @@ end @testset "SQLString" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + LibPQ.setnonblocking(conn, nonblock=usenonblocking) execute(conn, sql``` CREATE TEMPORARY TABLE libpq_test_users ( @@ -1577,6 +1611,7 @@ end @testset "Query Errors" begin @testset "Syntax Errors" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute(conn, "SELORCT NUUL;"; throw_error=false) @test status(result) == LibPQ.libpq_c.PGRES_FATAL_ERROR @@ -1595,6 +1630,7 @@ end @testset "Wrong No. Parameters" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute(conn, "SELORCT \$1;", String[]; throw_error=false) @test status(result) == LibPQ.libpq_c.PGRES_FATAL_ERROR @@ -1619,6 +1655,7 @@ end @testset "Interface Errors" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = execute( conn, @@ -1652,6 +1689,7 @@ end # Establish connection and construct temporary table. conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + LibPQ.setnonblocking(conn, nonblock=usenonblocking) # Get the column. result = execute( @@ -1678,6 +1716,7 @@ end @testset "Statements" begin @testset "No Params, Output" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) stmt = prepare(conn, "SELECT oid, typname FROM pg_type") @@ -1699,6 +1738,7 @@ end @testset "Params, Output" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) stmt = prepare(conn, "SELECT oid, typname FROM pg_type WHERE oid = \$1") @@ -1723,11 +1763,12 @@ end end end - @testset "AsyncResults" begin + @testset "AsyncResults (usenonblocking connection=$usenonblocking)" for usenonblocking in [true, false] trywait(ar::LibPQ.AsyncResult) = (try wait(ar) catch end; nothing) @testset "Basic" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) ar = async_execute(conn, "SELECT pg_sleep(2);"; throw_error=false) yield() @@ -1750,6 +1791,7 @@ end @testset "Parameters" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) ar = async_execute( conn, @@ -1782,6 +1824,7 @@ end # Ensures queries wait for previous query completion before starting @testset "Wait in line to complete" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) first_ar = async_execute(conn, "SELECT pg_sleep(4);") yield() @@ -1811,6 +1854,7 @@ end @testset "Cancel" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) # final query needs to be one that actually does something # on Windows, first query also needs to do something @@ -1843,6 +1887,7 @@ end @testset "Canceled by closing connection" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) # final query needs to be one that actually does something # on Windows, first query also needs to do something @@ -1875,6 +1920,7 @@ end @testset "FDWatcher: bad file descriptor (EBADF)" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + LibPQ.setnonblocking(conn, nonblock=usenonblocking) ar = async_execute(conn, "SELECT pg_sleep(3); SELECT * FROM pg_type;") yield() @@ -1898,6 +1944,7 @@ end @testset "DBInterface integration" begin conn = DBInterface.connect(LibPQ.Connection, "dbname=postgres user=$DATABASE_USER") @test conn isa LibPQ.Connection + LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = DBInterface.execute( conn, From 9e2c7f1c3ab77b8ca0503d90d7c023984ded5526 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Tue, 19 Sep 2023 01:03:18 +0300 Subject: [PATCH 04/12] fix(nonblocking): apply suggestions from code review --- src/asyncresults.jl | 19 ++++++++++----- src/connections.jl | 56 ++++++++++++++++++++++++++++++--------------- 2 files changed, 50 insertions(+), 25 deletions(-) diff --git a/src/asyncresults.jl b/src/asyncresults.jl index 5710d7bd..9977b2a2 100644 --- a/src/asyncresults.jl +++ b/src/asyncresults.jl @@ -84,8 +84,10 @@ function _consume(jl_conn::Connection) # this is important? # https://github.com/postgres/postgres/blob/master/src/interfaces/libpq/fe-exec.c#L1266 # if we used non-blocking connections we would need to check for `1` as well - # See flush(jl_conn::Connection) in connections.jl - flush(jl_conn) + # See _flush(jl_conn::Connection) in connections.jl + if !_flush(jl_conn) + error(LOGGER, Errors.PQConnectionError(jl_conn)) + end async_result = jl_conn.async_result result_ptrs = Ptr{libpq_c.PGresult}[] @@ -291,10 +293,10 @@ end function _async_submit(jl_conn::Connection, query::AbstractString) send_status = libpq_c.PQsendQuery(jl_conn.conn::Ptr{libpq_c.PGconn}, query) - if isnonblocking(jl_conn) == 0 + if isnonblocking(jl_conn) return send_status == 1 else - return flush(jl_conn) + return _flush(jl_conn) end end @@ -317,6 +319,11 @@ function _async_submit( zeros(Cint, num_params), # all parameters in text format Cint(binary_format), # return result in text or binary format ) - # send_status must be 1, if nonblock, we also want to flush - return send_status == 1 && (isnonblocking(jl_conn) == 0 || flush(jl_conn)) + # send_status must be 1 + # if nonblock, we also want to _flush + if isnonblocking(jl_conn) + return send_status == 1 && _flush(jl_conn) + else + return send_status == 1 + end end diff --git a/src/connections.jl b/src/connections.jl index 689109d7..093aedd9 100644 --- a/src/connections.jl +++ b/src/connections.jl @@ -267,6 +267,7 @@ function Connection( throw_error::Bool=true, connect_timeout::Real=0, options::Dict{String, String}=CONNECTION_OPTION_DEFAULTS, + nonblocking::Bool=false, kwargs... ) if options === CONNECTION_OPTION_DEFAULTS @@ -300,7 +301,7 @@ function Connection( ) # If password needed and not entered, prompt the user - if libpq_c.PQconnectionNeedsPassword(jl_conn.conn) == 1 + connection = if libpq_c.PQconnectionNeedsPassword(jl_conn.conn) == 1 push!(keywords, "password") user = unsafe_string(libpq_c.PQuser(jl_conn.conn)) # close this connection; will open another one below with the user-provided password @@ -309,7 +310,7 @@ function Connection( pass = Base.getpass(prompt) push!(values, read(pass, String)) Base.shred!(pass) - return handle_new_connection( + handle_new_connection( Connection( _connect_nonblocking(keywords, values, false; timeout=connect_timeout); kwargs... @@ -317,11 +318,20 @@ function Connection( throw_error=throw_error, ) else - return handle_new_connection( + handle_new_connection( jl_conn; throw_error=throw_error, ) end + + if nonblocking + success = libpq_c.PQsetnonblocking(connection.conn, convert(Cint, nonblock)) == 0 + if !success + close(connection) + error(LOGGER, "Could not provide a non-blocking connection") + end + end + return connection end # AbstractLock primitives: @@ -791,9 +801,11 @@ end socket(jl_conn::Connection) = socket(jl_conn.conn) """ + isnonblocking(jl_conn::Connection) + Sets the nonblocking connection status of the PG connections. While async_execute is non-blocking on the receiving side, -the sending side is still nonblockign without this +the sending side is still nonblocking without this Returns true on success, false on failure https://www.postgresql.org/docs/current/libpq-async.html @@ -808,28 +820,37 @@ Returns true if the connection is set to non-blocking, false otherwise https://www.postgresql.org/docs/current/libpq-async.html """ -function isnonblocking(jl_conn) +function isnonblocking(jl_conn::Connection) return libpq_c.PQisnonblocking(jl_conn.conn) == 1 end """ -Do the flush dance described in the libpq docs. Required when the + _flush(jl_conn::Connection) + +Do the _flush dance described in the libpq docs. Required when the connections are set to nonblocking and we want do send queries/data without blocking. -https://www.postgresql.org/docs/current/libpq-async.html#LIBPQ-PQFlush +https://www.postgresql.org/docs/current/libpq-async.html#LIBPQ-PQFLUSH """ -function flush(jl_conn) - watcher = FDWatcher(socket(jl_conn), true, true) # can wait for reads and writes +function _flush(jl_conn::Connection) + local watcher = nothing + if isnonblocking(jl_conn) + watcher = FDWatcher(socket(jl_conn), true, true) # can wait for reads and writes + end try - while true # Iterators.repeated(true) # would make me more comfotable I think + while true flushstatus = libpq_c.PQflush(jl_conn.conn) # 0 indicates success - flushstatus == 0 && return true + if flushstatus == 0 + return true # -1 indicates error - flushstatus < 0 && error(LOGGER, Errors.PQConnectionError(jl_conn)) - # Could not send all data without blocking, need to wait FD - flushstatus == 1 && begin + elseif flushstatus < 0 + return false + # 1 indicates that we could not send all data without blocking, + elseif flushstatus == 1 + # need to wait FD + # Only applicable when the connection is in nonblocking mode wait(watcher) # Wait for the watcher # If it becomes write-ready, call PQflush again. if watcher.mask.writable @@ -838,15 +859,12 @@ function flush(jl_conn) if watcher.mask.readable # if the stream is readable, we have to consume data from the server first. success = libpq_c.PQconsumeInput(jl_conn.conn) == 1 - !success && error(LOGGER, Errors.PQConnectionError(jl_conn)) + !success && return false end end end - catch - # We don't want to manage anything here - rethrow() finally # Just close the watcher - close(watcher) + !isnothing(watcher) && close(watcher) end end From c4f56b5788bc668beb086a5f222c21c5ce9c2a63 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Tue, 19 Sep 2023 01:04:33 +0300 Subject: [PATCH 05/12] oopsie --- src/asyncresults.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/asyncresults.jl b/src/asyncresults.jl index 9977b2a2..a69c433f 100644 --- a/src/asyncresults.jl +++ b/src/asyncresults.jl @@ -294,9 +294,9 @@ end function _async_submit(jl_conn::Connection, query::AbstractString) send_status = libpq_c.PQsendQuery(jl_conn.conn::Ptr{libpq_c.PGconn}, query) if isnonblocking(jl_conn) - return send_status == 1 - else return _flush(jl_conn) + else + return send_status == 1 end end From 6c7ef70a02c615149e888567d597dfa8ab8552e7 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Tue, 19 Sep 2023 01:32:34 +0300 Subject: [PATCH 06/12] fix(tests), attempt at CI --- .github/workflows/CI.yml | 6 +- src/connections.jl | 13 ++-- test/runtests.jl | 126 ++++++++++++++------------------------- 3 files changed, 58 insertions(+), 87 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 49ffb756..61a0ba7d 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -29,6 +29,9 @@ jobs: arch: - x64 - x86 + nonblocking: + - true + - false exclude: # Don't test 32-bit on macOS - os: macOS-latest @@ -67,7 +70,7 @@ jobs: cache-name: cache-artifacts with: path: ~/.julia/artifacts - key: ${{ runner.os }}-${{ matrix.arch }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }} + key: ${{ runner.os }}-${{ matrix.arch }}-test-${{ env.cache-name }}-nonblocking-${{ matrix.nonblocking }}-${{ hashFiles('**/Project.toml') }} restore-keys: | ${{ runner.os }}-${{ matrix.arch }}-test-${{ env.cache-name }}- ${{ runner.os }}-${{ matrix.arch }}-test- @@ -100,6 +103,7 @@ jobs: run: | echo "PGUSER=$USER" >> $GITHUB_ENV echo "LIBPQJL_DATABASE_USER=$USER" >> $GITHUB_ENV + echo "LIBPQJL_CONNECTION_NONBLOCKING=${{ matrix.nonblocking }}" >> $GITHUB_ENV if: ${{ runner.os == 'macOS' }} - name: Start Homebrew PostgreSQL service run: pg_ctl -D /usr/local/var/postgresql@$(psql --version | cut -f3 -d' ' | cut -f1 -d.) start diff --git a/src/connections.jl b/src/connections.jl index 093aedd9..b604a08b 100644 --- a/src/connections.jl +++ b/src/connections.jl @@ -325,10 +325,15 @@ function Connection( end if nonblocking - success = libpq_c.PQsetnonblocking(connection.conn, convert(Cint, nonblock)) == 0 - if !success + success = libpq_c.PQsetnonblocking(connection.conn, convert(Cint, nonblocking)) == 0 + if success + return connection + elseif throw_error close(connection) error(LOGGER, "Could not provide a non-blocking connection") + else + warn(LOGGER, "Could not provide a non-blocking connection") + return connection end end return connection @@ -810,8 +815,8 @@ Returns true on success, false on failure https://www.postgresql.org/docs/current/libpq-async.html """ -function setnonblocking(jl_conn::Connection; nonblock=true) - return libpq_c.PQsetnonblocking(jl_conn.conn, convert(Cint, nonblock)) == 0 +function setnonblocking(jl_conn::Connection; nonblocking=true) + return libpq_c.PQsetnonblocking(jl_conn.conn, convert(Cint, nonblocking)) == 0 end """ diff --git a/test/runtests.jl b/test/runtests.jl index 8c721cb0..08b1ed03 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -40,7 +40,7 @@ function count_allocs(f, args...) return Base.gc_alloc_count(stats.gcstats) end -usenonblocking = get(ENV, "LIBPQJL_CONNECTION_NONBLOCKING", false) +usenonblocking = get(ENV, "LIBPQJL_CONNECTION_NONBLOCKING", "false") === "true" @testset "LibPQ $(usenonblocking ? "(nonblocking connection)" : "")" begin @testset "ConninfoDisplay" begin @@ -82,8 +82,8 @@ end DATABASE_USER = get(ENV, "LIBPQJL_DATABASE_USER", "postgres") @testset "Example SELECT" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=false) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=false, nonblocking=usenonblocking) + @test conn isa LibPQ.Connection @test isopen(conn) @test status(conn) == LibPQ.libpq_c.CONNECTION_OK @@ -191,8 +191,7 @@ end end @testset "Example INSERT and DELETE" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) result = execute(conn, """ CREATE TEMPORARY TABLE libpqjl_test ( @@ -335,8 +334,7 @@ end @testset "load!" begin @testset "issue #204" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) close(execute(conn, """ CREATE TEMPORARY TABLE libpqjl_test ( @@ -360,8 +358,7 @@ end @testset "COPY FROM" begin @testset "Example COPY FROM" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) result = execute(conn, """ CREATE TEMPORARY TABLE libpqjl_test ( @@ -405,8 +402,7 @@ end end @testset "Wrong column order" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) result = execute(conn, """ CREATE TEMPORARY TABLE libpqjl_test ( @@ -463,8 +459,7 @@ end @testset "do" begin local saved_conn - was_open = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) do jl_conn - LibPQ.setnonblocking(jl_conn, nonblock=usenonblocking) + was_open = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) do jl_conn saved_conn = jl_conn return isopen(jl_conn) end @@ -472,8 +467,7 @@ end @test was_open @test !isopen(saved_conn) - @test_throws LibPQ.Errors.PQConnectionError LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=true) do jl_conn - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + @test_throws LibPQ.Errors.PQConnectionError LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) do jl_conn saved_conn = jl_conn @test false end @@ -482,16 +476,14 @@ end end @testset "Version Numbers" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) # update this test before PostgreSQL 20.0 ;) @test LibPQ.pqv"7" <= LibPQ.server_version(conn) <= LibPQ.pqv"20" end @testset "Encoding" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) @test LibPQ.encoding(conn) == "UTF8" @@ -522,8 +514,8 @@ end options=Dict("IntervalStyle" => "postgres_verbose"), throw_error=true, type_map=Dict(:interval => String), + nonblocking=usenonblocking ) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) conn_info = LibPQ.conninfo(conn) options = first(filter(conn_info) do conn_opt @@ -642,8 +634,7 @@ end @testset "Finalizer" begin closed_flags = map(1:50) do _ - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) closed = conn.closed finalize(conn) @@ -658,8 +649,7 @@ end results = LibPQ.Result[] closed_flags = map(1:50) do _ - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) push!(results, execute(conn, "SELECT 1;")) return conn.closed @@ -675,8 +665,7 @@ end # with AsyncResults, which hold a reference to Connection closed_flags = asyncmap(1:50) do _ - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) wait(async_execute(conn, "SELECT pg_sleep(1);")) return conn.closed @@ -723,8 +712,7 @@ end end @testset "throw_error=false" begin - conn = LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=false) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=false, nonblocking=usenonblocking) @test conn isa LibPQ.Connection @test status(conn) == LibPQ.libpq_c.CONNECTION_BAD @@ -743,8 +731,7 @@ end @testset "throw_error=true" begin @test_throws LibPQ.Errors.PQConnectionError LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=true) - conn = LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=false) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=false, nonblocking=usenonblocking) @test conn isa LibPQ.Connection @test status(conn) == LibPQ.libpq_c.CONNECTION_BAD @@ -760,8 +747,7 @@ end @testset "Results" begin @testset "Nulls" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELECT NULL"; throw_error=true) @test status(result) == LibPQ.libpq_c.PGRES_TUPLES_OK @@ -845,8 +831,7 @@ end end @testset "Not Nulls" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELECT NULL"; not_null=[false], throw_error=true) @test status(result) == LibPQ.libpq_c.PGRES_TUPLES_OK @@ -971,8 +956,7 @@ end end @testset "Tables.jl" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, """ SELECT no_nulls, yes_nulls FROM ( @@ -1014,8 +998,7 @@ end end @testset "Duplicate names" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELECT 1 AS col, 2 AS col;", not_null=true, throw_error=true) columns = Tables.columns(result) @@ -1031,8 +1014,7 @@ end end @testset "Uppercase Columns" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELECT 1 AS \"Column\";") @test num_columns(result) == 1 @@ -1050,8 +1032,7 @@ end end @testset "PQResultError" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) try execute(conn, "SELECT log(-1);") @@ -1098,8 +1079,7 @@ end @testset "Type Conversions" begin @testset "Deprecations" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELECT 'infinity'::timestamp;") @@ -1116,8 +1096,7 @@ end end @testset "Automatic" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, """ SELECT oid, typname, typlen, typbyval, typcategory @@ -1148,8 +1127,7 @@ end end @testset "Overrides" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELECT 4::bigint;") @test first(first(result)) === Int64(4) @@ -1198,8 +1176,8 @@ end @testset for binary_format in (LibPQ.TEXT, LibPQ.BINARY) @testset "Default Types" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) + test_data = [ ("3", Cint(3)), @@ -1365,8 +1343,7 @@ end end @testset "Specified Types" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) test_data = [ ("3", UInt, UInt(3)), @@ -1459,8 +1436,7 @@ end end @testset "Parameters" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) @testset "Arrays" begin tests = ( @@ -1574,8 +1550,7 @@ end end @testset "SQLString" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) execute(conn, sql``` CREATE TEMPORARY TABLE libpq_test_users ( @@ -1610,8 +1585,7 @@ end @testset "Query Errors" begin @testset "Syntax Errors" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELORCT NUUL;"; throw_error=false) @test status(result) == LibPQ.libpq_c.PGRES_FATAL_ERROR @@ -1629,8 +1603,7 @@ end end @testset "Wrong No. Parameters" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELORCT \$1;", String[]; throw_error=false) @test status(result) == LibPQ.libpq_c.PGRES_FATAL_ERROR @@ -1654,8 +1627,7 @@ end end @testset "Interface Errors" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute( conn, @@ -1688,8 +1660,7 @@ end ] # Establish connection and construct temporary table. - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) # Get the column. result = execute( @@ -1715,8 +1686,7 @@ end @testset "Statements" begin @testset "No Params, Output" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) stmt = prepare(conn, "SELECT oid, typname FROM pg_type") @@ -1737,8 +1707,7 @@ end end @testset "Params, Output" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) stmt = prepare(conn, "SELECT oid, typname FROM pg_type WHERE oid = \$1") @@ -1763,12 +1732,11 @@ end end end - @testset "AsyncResults (usenonblocking connection=$usenonblocking)" for usenonblocking in [true, false] + @testset "AsyncResults (usenonblocking connection=$usenonblocking)" begin trywait(ar::LibPQ.AsyncResult) = (try wait(ar) catch end; nothing) @testset "Basic" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) ar = async_execute(conn, "SELECT pg_sleep(2);"; throw_error=false) yield() @@ -1790,8 +1758,7 @@ end end @testset "Parameters" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) ar = async_execute( conn, @@ -1823,8 +1790,7 @@ end # Ensures queries wait for previous query completion before starting @testset "Wait in line to complete" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) first_ar = async_execute(conn, "SELECT pg_sleep(4);") yield() @@ -1853,8 +1819,7 @@ end end @testset "Cancel" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) # final query needs to be one that actually does something # on Windows, first query also needs to do something @@ -1886,8 +1851,7 @@ end end @testset "Canceled by closing connection" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) # final query needs to be one that actually does something # on Windows, first query also needs to do something @@ -1919,8 +1883,7 @@ end end @testset "FDWatcher: bad file descriptor (EBADF)" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) - LibPQ.setnonblocking(conn, nonblock=usenonblocking) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) ar = async_execute(conn, "SELECT pg_sleep(3); SELECT * FROM pg_type;") yield() @@ -1944,7 +1907,6 @@ end @testset "DBInterface integration" begin conn = DBInterface.connect(LibPQ.Connection, "dbname=postgres user=$DATABASE_USER") @test conn isa LibPQ.Connection - LibPQ.setnonblocking(conn, nonblock=usenonblocking) result = DBInterface.execute( conn, From 98ae1eb4e8f1fa9e442709ac964154b1366e8a8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=CE=A0=CE=B1=CE=BD=CE=B1=CE=B3=CE=B9=CF=8E=CF=84=CE=B7?= =?UTF-8?q?=CF=82=20=CE=93=CE=B5=CF=89=CF=81=CE=B3=CE=B1=CE=BA=CF=8C=CF=80?= =?UTF-8?q?=CE=BF=CF=85=CE=BB=CE=BF=CF=82?= Date: Tue, 19 Sep 2023 01:35:05 +0300 Subject: [PATCH 07/12] fix(docstrings) --- src/connections.jl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/connections.jl b/src/connections.jl index b604a08b..23ff808e 100644 --- a/src/connections.jl +++ b/src/connections.jl @@ -806,7 +806,7 @@ end socket(jl_conn::Connection) = socket(jl_conn.conn) """ - isnonblocking(jl_conn::Connection) + setnonblocking(jl_conn::Connection; nonblocking=true) Sets the nonblocking connection status of the PG connections. While async_execute is non-blocking on the receiving side, @@ -820,6 +820,7 @@ function setnonblocking(jl_conn::Connection; nonblocking=true) end """ + isnonblocking(jl_conn::Connection) Checks whether the connection is non-blocking. Returns true if the connection is set to non-blocking, false otherwise From cd5ee32afb01cfb965a03c1a4f32f3026ffc7600 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Tue, 19 Sep 2023 01:37:49 +0300 Subject: [PATCH 08/12] tweak ci --- .github/workflows/CI.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 61a0ba7d..805c29f1 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -103,7 +103,6 @@ jobs: run: | echo "PGUSER=$USER" >> $GITHUB_ENV echo "LIBPQJL_DATABASE_USER=$USER" >> $GITHUB_ENV - echo "LIBPQJL_CONNECTION_NONBLOCKING=${{ matrix.nonblocking }}" >> $GITHUB_ENV if: ${{ runner.os == 'macOS' }} - name: Start Homebrew PostgreSQL service run: pg_ctl -D /usr/local/var/postgresql@$(psql --version | cut -f3 -d' ' | cut -f1 -d.) start @@ -120,6 +119,7 @@ jobs: pg_ctl -D $env:PGDATA status if: ${{ runner.os == 'Windows' }} # Run Tests + - run: echo "LIBPQJL_CONNECTION_NONBLOCKING=${{ matrix.nonblocking }}" >> $GITHUB_ENV - run: psql -c '\conninfo' - uses: julia-actions/julia-buildpkg@latest - uses: julia-actions/julia-runtest@latest From 654627df5bf43380d160744434febe4bc2418fe4 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Tue, 19 Sep 2023 01:42:29 +0300 Subject: [PATCH 09/12] tweak ci --- .github/workflows/CI.yml | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 805c29f1..47a38752 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -13,7 +13,7 @@ env: PGPASSWORD: root jobs: test: - name: Julia ${{ matrix.version }} - PostgreSQL ${{ matrix.postgresql-version }} - ${{ matrix.os }} - ${{ matrix.arch }} + name: Julia ${{ matrix.version }} - PostgreSQL ${{ matrix.postgresql-version }} - ${{ matrix.os }} - ${{ matrix.arch }} - nonblocking connection(${{ matrix.nonblock}}) runs-on: ${{ matrix.os }} strategy: fail-fast: false @@ -30,7 +30,6 @@ jobs: - x64 - x86 nonblocking: - - true - false exclude: # Don't test 32-bit on macOS @@ -59,6 +58,31 @@ jobs: version: 1 arch: x64 postgresql-version: '10' + - os: ubuntu-latest + version: 1 + arch: x64 + postgresql-version: latest + nonblocking: true + - os: ubuntu-latest + version: "1.6" + arch: x64 + postgresql-version: latest + nonblocking: true + - os: windows-latest + version: 1 + arch: x64 + postgresql-version: latest + nonblocking: true + - os: macOS-latest + version: 1 + arch: x64 + postgresql-version: latest + nonblocking: true + - os: ubuntu-latest + version: 1 + arch: x64 + postgresql-version: latest + nonblocking: true steps: - uses: actions/checkout@v2 - uses: julia-actions/setup-julia@v1 From 363602d16a58176dbd8301d16c4f81886f07f0e3 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Tue, 19 Sep 2023 01:43:38 +0300 Subject: [PATCH 10/12] tweak ci --- .github/workflows/CI.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 47a38752..4d765d15 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -30,7 +30,7 @@ jobs: - x64 - x86 nonblocking: - - false + - "false" exclude: # Don't test 32-bit on macOS - os: macOS-latest @@ -62,27 +62,27 @@ jobs: version: 1 arch: x64 postgresql-version: latest - nonblocking: true + nonblocking: "true" - os: ubuntu-latest version: "1.6" arch: x64 postgresql-version: latest - nonblocking: true + nonblocking: "true" - os: windows-latest version: 1 arch: x64 postgresql-version: latest - nonblocking: true + nonblocking: "true" - os: macOS-latest version: 1 arch: x64 postgresql-version: latest - nonblocking: true + nonblocking: "true" - os: ubuntu-latest version: 1 arch: x64 postgresql-version: latest - nonblocking: true + nonblocking: "true" steps: - uses: actions/checkout@v2 - uses: julia-actions/setup-julia@v1 From 96b39ac4c285d82d7d3fdb070fe5776e29c5ab6b Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Tue, 19 Sep 2023 01:44:27 +0300 Subject: [PATCH 11/12] tweak ci --- .github/workflows/CI.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 4d765d15..5e6f2bf7 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -13,7 +13,7 @@ env: PGPASSWORD: root jobs: test: - name: Julia ${{ matrix.version }} - PostgreSQL ${{ matrix.postgresql-version }} - ${{ matrix.os }} - ${{ matrix.arch }} - nonblocking connection(${{ matrix.nonblock}}) + name: Julia ${{ matrix.version }} - PostgreSQL ${{ matrix.postgresql-version }} - ${{ matrix.os }} - ${{ matrix.arch }} - nonblocking connection(${{ matrix.nonblocking }}) runs-on: ${{ matrix.os }} strategy: fail-fast: false From 572d0b6f5309565339ba146567b138d42efb6196 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Tue, 19 Sep 2023 01:46:02 +0300 Subject: [PATCH 12/12] tweak ci --- .github/workflows/CI.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 5e6f2bf7..a22c5b43 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -41,23 +41,28 @@ jobs: version: "1.6" arch: x64 postgresql-version: latest + nonblocking: "false" # Add older supported PostgreSQL Versions - os: ubuntu-latest version: 1 arch: x64 postgresql-version: '13' + nonblocking: "false" - os: ubuntu-latest version: 1 arch: x64 postgresql-version: '12' + nonblocking: "false" - os: ubuntu-latest version: 1 arch: x64 postgresql-version: '11' + nonblocking: "false" - os: ubuntu-latest version: 1 arch: x64 postgresql-version: '10' + nonblocking: "false" - os: ubuntu-latest version: 1 arch: x64 @@ -78,11 +83,6 @@ jobs: arch: x64 postgresql-version: latest nonblocking: "true" - - os: ubuntu-latest - version: 1 - arch: x64 - postgresql-version: latest - nonblocking: "true" steps: - uses: actions/checkout@v2 - uses: julia-actions/setup-julia@v1