Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CopyOut! interface for >5x faster data queries #241

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ TimeZones = "0.9.2, 0.10, 0.11, 1"
julia = "1.6"

[extras]
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test", "DataFrames"]
test = ["Test", "CSV", "DataFrames"]
24 changes: 22 additions & 2 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ LibPQ.load!(
execute(conn, "COMMIT;")
```

### `COPY`
### `COPY FROM`

An alternative to repeated `INSERT` queries is the PostgreSQL `COPY` query.
An alternative to repeated `INSERT` queries is the PostgreSQL `COPY FROM` query.
`LibPQ.CopyIn` makes it easier to stream data to the server using a `COPY FROM STDIN` query.

```julia
Expand All @@ -99,3 +99,23 @@ execute(conn, copyin)

close(conn)
```

### `COPY TO`

An alternative to selection for large datasets in `SELECT` queries is the PostgreSQL `COPY TO` query.
`LibPQ.CopyOut!` makes it easier to stream data out of the server using a `COPY TO STDIN` query.

```julia
using LibPQ, CSV, DataFrames

conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER")

databuf = IOBuffer()
copyout = LibPQ.CopyOut!(databuf, "COPY (SELECT * FROM libpqjl_test) TO STDOUT (FORMAT CSV, HEADER);")

execute(conn, copyout)

df = DataFrame(CSV.File(databuf))

close(conn)
```
2 changes: 2 additions & 0 deletions docs/src/pages/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ LibPQ.load!
```@docs
LibPQ.CopyIn
execute(::LibPQ.Connection, ::LibPQ.CopyIn)
LibPQ.CopyOut!
execute(::LibPQ.Connection, ::LibPQ.CopyOut!)
```

### Asynchronous
Expand Down
74 changes: 74 additions & 0 deletions src/copy.jl
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,77 @@ function execute(
Result(copy_end_result, jl_conn, kwargs...); throw_error=throw_error
)
end

"""
CopyOut!(data, query) -> CopyOut!

Create a `CopyOut!` query instance which can be executed to receive data from PostgreSQL via a
`COPY <table_name> TO STDIN` query.

`query` must be a `COPY TO STDIN` query as described in the [PostgreSQL documentation](https://www.postgresql.org/docs/10/sql-copy.html).
`COPY TO` queries which use a file or `PROGRAM` source can instead use the standard
[`execute`](@ref) query interface.

`data` is an IOBuffer where strings of data received from PostgreSQL are written to.
The data is received as text in CSV format.
"""
struct CopyOut!
data::IOBuffer
query::String
end

"""
execute(jl_conn::Connection, copyout::CopyOut!, args...;
throw_error::Bool=true, kwargs...
) -> Result

Runs [`execute`](@ref execute(::Connection, ::String)) on `copyout`'s query, then fills
`copyout`'s data from the server.

All other arguments are passed through to the `execute` call for the initial query.
"""
function execute(
jl_conn::Connection,
copy::CopyOut!,
parameters=nothing;
throw_error=true,
kwargs...,
)
level = throw_error ? error : warn
if parameters !== nothing
# https://postgrespro.com/list/thread-id/1893680
throw(ArgumentError("COPY can't take any parameter"))
end

copy_end_result = lock(jl_conn) do
result = _execute(jl_conn.conn, copy.query)
result_status = libpq_c.PQresultStatus(result)

if result_status != libpq_c.PGRES_COPY_OUT
level(LOGGER, Errors.JLResultError(
"Expected PGRES_COPY_OUT after COPY query, got $result_status"
))
return result
end

io = copy.data # store csv string
async::Cint = 0 # blocking call
rowRef = Ref{Cstring}()
status_code = Cint(0)
while (status_code = libpq_c.PQgetCopyData(jl_conn.conn, rowRef, async)) > 0
rowPtr = rowRef[]
write(io, unsafe_string(rowPtr))
if rowPtr != C_NULL
libpq_c.PQfreemem(convert(Ptr{Cvoid}, rowPtr))
end
end
seekstart(io) # rewind iobuffer so future user read will begin from start
-2 == status_code && level(LOGGER, Errors.JLResultError("PQgetCopyData error: $(error_message(jl_conn))"))

libpq_c.PQgetResult(jl_conn.conn)
end

return handle_result(
Result(copy_end_result, jl_conn, kwargs...); throw_error=throw_error
)
end
81 changes: 81 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using LibPQ
using Test
using CSV
using Dates
using DataFrames
using DataFrames: eachrow
Expand Down Expand Up @@ -444,6 +445,86 @@ end
end
end

@testset "COPY TO" begin
@testset "Example COPY TO" begin
conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER")

# make data
no_nulls = map(string, 'a':'z')
yes_nulls = Union{String, Missing}[isodd(Int(c)) ? string(c) : missing for c in 'a':'z']
data = DataFrame(no_nulls=no_nulls, yes_nulls=yes_nulls)

row_strings = imap(eachrow(data)) do row
if ismissing(row[:yes_nulls])
"$(row[:no_nulls]),\n"
else
"$(row[:no_nulls]),$(row[:yes_nulls])\n"
end
end

# setup db table
result = execute(conn, """
CREATE TEMPORARY TABLE libpqjl_test (
no_nulls varchar(10) PRIMARY KEY,
yes_nulls varchar(10)
);
""")
@test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK
close(result)

# populate db table
copyin = LibPQ.CopyIn("COPY libpqjl_test FROM STDIN (FORMAT CSV);", row_strings)

result = execute(conn, copyin)
@test isopen(result)
@test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK
@test isempty(LibPQ.error_message(result))
close(result)

# test CopyOut!
databuf = IOBuffer()
copyout = LibPQ.CopyOut!(databuf, "COPY (SELECT * FROM libpqjl_test) TO STDOUT (FORMAT CSV, HEADER, ENCODING 'UTF8');")

result = execute(conn, copyout)
@test isopen(result)
@test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK
@test isempty(LibPQ.error_message(result))
close(result)

@static if VERSION >= v"1.3.0-1"
csvfile = CSV.File(databuf, stringtype=String)
else
csvfile = CSV.File(databuf)
end
df = DataFrame(csvfile)
@test isequal(data, df)

close(conn)
end

@testset "Wrong COPY TO" begin
conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER")

# test CopyOut! with an error
databuf = IOBuffer()
copyout = LibPQ.CopyOut!(databuf, "SELECT libpqjl_test;")

result = execute(conn, copyout; throw_error=false)
@test isopen(result)
@test status(result) == LibPQ.libpq_c.PGRES_FATAL_ERROR

err_msg = LibPQ.error_message(result)
@test occursin("ERROR", err_msg)
close(result)

# parameters are not supported
copyout = LibPQ.CopyOut!(databuf, "COPY (SELECT * FROM libpqjl_test WHERE no_nulls = \$1) TO STDOUT (FORMAT CSV, HEADER);")
@test_throws ArgumentError execute(conn, copyout, ['z'])

close(conn)
end
end

@testset "LibPQ.Connection" begin
@testset "do" begin
local saved_conn
Expand Down