diff --git a/Project.toml b/Project.toml index 591fa017..beeaa2f9 100644 --- a/Project.toml +++ b/Project.toml @@ -24,7 +24,8 @@ TimeZones = "f269a46b-ccf7-5d73-abea-4c690281aa53" [compat] BinaryProvider = "0.5" CEnum = "0.2, 0.3, 0.4" -DataFrames = "0.20, 0.21" +CSV = "0.6.2, 0.7" +DataFrames = "0.21" Decimals = "0.4.1" DocStringExtensions = "0.8.0" Infinity = "0.2" @@ -39,8 +40,9 @@ TimeZones = "0.9.2, 0.10, 0.11, 1" julia = "1" [extras] +CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test", "DataFrames"] +test = ["Test", "DataFrames", "CSV"] diff --git a/docs/src/index.md b/docs/src/index.md index 901c2e3d..aa875d53 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -77,21 +77,32 @@ An alternative to repeated `INSERT` queries is the PostgreSQL `COPY` query. `LibPQ.CopyIn` makes it easier to stream data to the server using a `COPY FROM STDIN` query. ```julia -using LibPQ, DataFrames +using LibPQ, DataFrames, CSV conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") -row_strings = imap(eachrow(df)) do row - if ismissing(row[:yes_nulls]) - "$(row[:no_nulls]),\n" - else - "$(row[:no_nulls]),$(row[:yes_nulls])\n" - end -end +result = execute(conn, """ + CREATE TEMPORARY TABLE libpqjl_test ( + no_nulls varchar(10) PRIMARY KEY, + yes_nulls varchar(10) + ); +""") -copyin = LibPQ.CopyIn("COPY libpqjl_test FROM STDIN (FORMAT CSV);", row_strings) +no_nulls = map(string, 'a':'z') +yes_nulls = Union{String, Missing}[isodd(Int(c)) ? string(c) : missing for c in 'a':'z'] +data = DataFrame(no_nulls=no_nulls, yes_nulls=yes_nulls) + +""" +Function for upload of a Tables.jl compatible data structure (e.g. DataFrames.jl) into the db. +""" +function load_by_copy!(table, conn:: LibPQ.Connection, tablename:: AbstractString) + iter = CSV.RowWriter(table) + column_names = first(iter) + copyin = LibPQ.CopyIn("COPY $tablename ($column_names) FROM STDIN (FORMAT CSV, HEADER);", iter) + execute(conn, copyin) +end -execute(conn, copyin) +load_by_copy!(data, conn, "libpqjl_test") close(conn) ``` diff --git a/test/runtests.jl b/test/runtests.jl index fb21799a..e676fd97 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -12,6 +12,7 @@ using Memento.TestUtils using OffsetArrays using TimeZones using Tables +using CSV Memento.config!("critical") @@ -31,6 +32,27 @@ macro test_nolog_on_windows(ex...) end end +""" + load_by_copy!(table, con:: LibPQ.Connection, tablename:: AbstractString) + +Fast data upload using the PostgreSQL `COPY FROM STDIN` method, which is usually much faster, +especially for large data amounts, than SQL Inserts. + +`table` must be a Tables.jl compatible data structure. + +All columns given in `table` must have corresponding fields in the target DB table, +the order of the columns does not matter. + +Columns in the target DB table, which are not provided by the input `table`, are filled +with `null` (provided they are nullable). +""" +function load_by_copy!(table, conn::LibPQ.Connection, table_name::AbstractString) + iter = CSV.RowWriter(table) + column_names = first(iter) + copyin = LibPQ.CopyIn("COPY $table_name ($column_names) FROM STDIN (FORMAT CSV, HEADER);", iter) + execute(conn, copyin) +end + @testset "LibPQ" begin @testset "ConninfoDisplay" begin @@ -316,6 +338,32 @@ end table_data = DataFrame(result) @test isequal(table_data, data) close(result) + close(conn) + + # testing loading to database using CSV.jl row iterator + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + result = execute(conn, """ + CREATE TEMPORARY TABLE libpqjl_test ( + no_nulls varchar(10) PRIMARY KEY, + yes_nulls varchar(10) + ); + """) + + result = load_by_copy!(data, conn, "libpqjl_test") + @test isopen(result) + @test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK + @test isempty(LibPQ.error_message(result)) + close(result) + + result = execute( + conn, + "SELECT no_nulls, yes_nulls FROM libpqjl_test ORDER BY no_nulls ASC;"; + throw_error=true + ) + table_data = DataFrame(result) + @test isequal(table_data, data) + close(result) + close(conn) end