Skip to content

Commit

Permalink
Add upsertStreaming and upsertBatch (fix #115) (#117)
Browse files Browse the repository at this point in the history
* Add `upsertStreaming` (fix #115)

* Add `upsertBatch` repo method.

caveats:
- anorm didn't support returning rows for batch queries, so it's monkey-patched in into the `anorm` package
- zio-jdbc cannot express batch updates at all. For this reason it was necessary to add support for not implementing a repo method for a dblib.

* fix upsert for tables where all columns appear in id
  • Loading branch information
oyvindberg committed Jul 13, 2024
1 parent e6cb0de commit 098a1bc
Show file tree
Hide file tree
Showing 814 changed files with 15,574 additions and 128 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* File automatically generated by `typo` for its own test suite.
*
* IF YOU CHANGE THIS FILE YOUR CHANGES WILL BE OVERWRITTEN
*/
package anorm
package testdb
package hardcoded

import java.sql.Connection
import resource.managed

object ExecuteReturningSyntax {
/* add executeReturning to anorm. it needs to be inside the package, because everything is hidden */
implicit class Ops(batchSql: BatchSql) {
def executeReturning[T](parser: ResultSetParser[T])(implicit c: Connection): T =
managed(batchSql.getFilledStatement(c, getGeneratedKeys = true))(using StatementResource, statementClassTag).acquireAndGet { ps =>
ps.executeBatch()
Sql
.asTry(
parser,
managed(ps.getGeneratedKeys)(using ResultSetResource, resultSetClassTag),
onFirstRow = false,
ColumnAliaser.empty
)
.get
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ trait PersonRepo {
def update(row: PersonRow)(implicit c: Connection): Boolean
def updateFieldValues(compositeId: PersonId, fieldValues: List[PersonFieldValue[?]])(implicit c: Connection): Boolean
def upsert(unsaved: PersonRow)(implicit c: Connection): PersonRow
def upsertBatch(unsaved: Iterable[PersonRow])(implicit c: Connection): List[PersonRow]
/* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package hardcoded
package compositepk
package person

import anorm.BatchSql
import anorm.NamedParameter
import anorm.ParameterMetaData
import anorm.ParameterValue
Expand All @@ -17,6 +18,7 @@ import anorm.SimpleSql
import anorm.SqlStringInterpolation
import anorm.ToStatement
import java.sql.Connection
import scala.annotation.nowarn
import testdb.hardcoded.customtypes.Defaulted
import typo.dsl.DeleteBuilder
import typo.dsl.SelectBuilder
Expand Down Expand Up @@ -148,4 +150,40 @@ class PersonRepoImpl extends PersonRepo {
.executeInsert(PersonRow.rowParser(1).single)

}
override def upsertBatch(unsaved: Iterable[PersonRow])(implicit c: Connection): List[PersonRow] = {
def toNamedParameter(row: PersonRow): List[NamedParameter] = List(
NamedParameter("one", ParameterValue(row.one, null, ToStatement.longToStatement)),
NamedParameter("two", ParameterValue(row.two, null, ToStatement.optionToStatement(ToStatement.stringToStatement, ParameterMetaData.StringParameterMetaData))),
NamedParameter("name", ParameterValue(row.name, null, ToStatement.optionToStatement(ToStatement.stringToStatement, ParameterMetaData.StringParameterMetaData)))
)
unsaved.toList match {
case Nil => Nil
case head :: rest =>
new anorm.testdb.hardcoded.ExecuteReturningSyntax.Ops(
BatchSql(
s"""insert into compositepk.person("one", "two", "name")
values ({one}::int8, {two}, {name})
on conflict ("one", "two")
do update set
"name" = EXCLUDED."name"
returning "one", "two", "name"
""",
toNamedParameter(head),
rest.map(toNamedParameter)*
)
).executeReturning(PersonRow.rowParser(1).*)
}
}
/* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
override def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
SQL"create temporary table person_TEMP (like compositepk.person) on commit drop".execute(): @nowarn
streamingInsert(s"""copy person_TEMP("one", "two", "name") from stdin""", batchSize, unsaved)(PersonRow.text, c): @nowarn
SQL"""insert into compositepk.person("one", "two", "name")
select * from person_TEMP
on conflict ("one", "two")
do update set
"name" = EXCLUDED."name"
;
drop table person_TEMP;""".executeUpdate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,17 @@ class PersonRepoMock(toRow: Function1[PersonRowUnsaved, PersonRow],
map.put(unsaved.compositeId, unsaved): @nowarn
unsaved
}
override def upsertBatch(unsaved: Iterable[PersonRow])(implicit c: Connection): List[PersonRow] = {
unsaved.map { row =>
map += (row.compositeId -> row)
row
}.toList
}
/* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
override def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
unsaved.foreach { row =>
map += (row.compositeId -> row)
}
unsaved.size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ trait FootballClubRepo {
def update(row: FootballClubRow)(implicit c: Connection): Boolean
def updateFieldValues(id: FootballClubId, fieldValues: List[FootballClubFieldValue[?]])(implicit c: Connection): Boolean
def upsert(unsaved: FootballClubRow)(implicit c: Connection): FootballClubRow
def upsertBatch(unsaved: Iterable[FootballClubRow])(implicit c: Connection): List[FootballClubRow]
/* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
def upsertStreaming(unsaved: Iterator[FootballClubRow], batchSize: Int = 10000)(implicit c: Connection): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package hardcoded
package myschema
package football_club

import anorm.BatchSql
import anorm.NamedParameter
import anorm.ParameterValue
import anorm.RowParser
Expand All @@ -16,6 +17,7 @@ import anorm.SimpleSql
import anorm.SqlStringInterpolation
import anorm.ToStatement
import java.sql.Connection
import scala.annotation.nowarn
import typo.dsl.DeleteBuilder
import typo.dsl.SelectBuilder
import typo.dsl.SelectBuilderSql
Expand Down Expand Up @@ -130,4 +132,39 @@ class FootballClubRepoImpl extends FootballClubRepo {
.executeInsert(FootballClubRow.rowParser(1).single)

}
override def upsertBatch(unsaved: Iterable[FootballClubRow])(implicit c: Connection): List[FootballClubRow] = {
def toNamedParameter(row: FootballClubRow): List[NamedParameter] = List(
NamedParameter("id", ParameterValue(row.id, null, FootballClubId.toStatement)),
NamedParameter("name", ParameterValue(row.name, null, ToStatement.stringToStatement))
)
unsaved.toList match {
case Nil => Nil
case head :: rest =>
new anorm.testdb.hardcoded.ExecuteReturningSyntax.Ops(
BatchSql(
s"""insert into myschema.football_club("id", "name")
values ({id}::int8, {name})
on conflict ("id")
do update set
"name" = EXCLUDED."name"
returning "id", "name"
""",
toNamedParameter(head),
rest.map(toNamedParameter)*
)
).executeReturning(FootballClubRow.rowParser(1).*)
}
}
/* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
override def upsertStreaming(unsaved: Iterator[FootballClubRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
SQL"create temporary table football_club_TEMP (like myschema.football_club) on commit drop".execute(): @nowarn
streamingInsert(s"""copy football_club_TEMP("id", "name") from stdin""", batchSize, unsaved)(FootballClubRow.text, c): @nowarn
SQL"""insert into myschema.football_club("id", "name")
select * from football_club_TEMP
on conflict ("id")
do update set
"name" = EXCLUDED."name"
;
drop table football_club_TEMP;""".executeUpdate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,17 @@ class FootballClubRepoMock(map: scala.collection.mutable.Map[FootballClubId, Foo
map.put(unsaved.id, unsaved): @nowarn
unsaved
}
override def upsertBatch(unsaved: Iterable[FootballClubRow])(implicit c: Connection): List[FootballClubRow] = {
unsaved.map { row =>
map += (row.id -> row)
row
}.toList
}
/* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
override def upsertStreaming(unsaved: Iterator[FootballClubRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
unsaved.foreach { row =>
map += (row.id -> row)
}
unsaved.size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ trait MaritalStatusRepo {
def selectByIdsTracked(ids: Array[MaritalStatusId])(implicit c: Connection): Map[MaritalStatusId, MaritalStatusRow]
def update: UpdateBuilder[MaritalStatusFields, MaritalStatusRow]
def upsert(unsaved: MaritalStatusRow)(implicit c: Connection): MaritalStatusRow
def upsertBatch(unsaved: Iterable[MaritalStatusRow])(implicit c: Connection): List[MaritalStatusRow]
/* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
def upsertStreaming(unsaved: Iterator[MaritalStatusRow], batchSize: Int = 10000)(implicit c: Connection): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ package hardcoded
package myschema
package marital_status

import anorm.BatchSql
import anorm.NamedParameter
import anorm.ParameterValue
import anorm.RowParser
import anorm.SQL
import anorm.SimpleSql
import anorm.SqlStringInterpolation
import java.sql.Connection
import scala.annotation.nowarn
import typo.dsl.DeleteBuilder
import typo.dsl.SelectBuilder
import typo.dsl.SelectBuilderSql
Expand Down Expand Up @@ -96,11 +98,42 @@ class MaritalStatusRepoImpl extends MaritalStatusRepo {
${ParameterValue(unsaved.id, null, MaritalStatusId.toStatement)}::int8
)
on conflict ("id")
do update set
do nothing
returning "id"
"""
.executeInsert(MaritalStatusRow.rowParser(1).single)

}
override def upsertBatch(unsaved: Iterable[MaritalStatusRow])(implicit c: Connection): List[MaritalStatusRow] = {
def toNamedParameter(row: MaritalStatusRow): List[NamedParameter] = List(
NamedParameter("id", ParameterValue(row.id, null, MaritalStatusId.toStatement))
)
unsaved.toList match {
case Nil => Nil
case head :: rest =>
new anorm.testdb.hardcoded.ExecuteReturningSyntax.Ops(
BatchSql(
s"""insert into myschema.marital_status("id")
values ({id}::int8)
on conflict ("id")
do nothing
returning "id"
""",
toNamedParameter(head),
rest.map(toNamedParameter)*
)
).executeReturning(MaritalStatusRow.rowParser(1).*)
}
}
/* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
override def upsertStreaming(unsaved: Iterator[MaritalStatusRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
SQL"create temporary table marital_status_TEMP (like myschema.marital_status) on commit drop".execute(): @nowarn
streamingInsert(s"""copy marital_status_TEMP("id") from stdin""", batchSize, unsaved)(MaritalStatusRow.text, c): @nowarn
SQL"""insert into myschema.marital_status("id")
select * from marital_status_TEMP
on conflict ("id")
do nothing
;
drop table marital_status_TEMP;""".executeUpdate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,17 @@ class MaritalStatusRepoMock(map: scala.collection.mutable.Map[MaritalStatusId, M
map.put(unsaved.id, unsaved): @nowarn
unsaved
}
override def upsertBatch(unsaved: Iterable[MaritalStatusRow])(implicit c: Connection): List[MaritalStatusRow] = {
unsaved.map { row =>
map += (row.id -> row)
row
}.toList
}
/* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
override def upsertStreaming(unsaved: Iterator[MaritalStatusRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
unsaved.foreach { row =>
map += (row.id -> row)
}
unsaved.size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ trait PersonRepo {
def update(row: PersonRow)(implicit c: Connection): Boolean
def updateFieldValues(id: PersonId, fieldValues: List[PersonFieldValue[?]])(implicit c: Connection): Boolean
def upsert(unsaved: PersonRow)(implicit c: Connection): PersonRow
def upsertBatch(unsaved: Iterable[PersonRow])(implicit c: Connection): List[PersonRow]
/* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package hardcoded
package myschema
package person

import anorm.BatchSql
import anorm.NamedParameter
import anorm.ParameterMetaData
import anorm.ParameterValue
Expand All @@ -17,6 +18,7 @@ import anorm.SimpleSql
import anorm.SqlStringInterpolation
import anorm.ToStatement
import java.sql.Connection
import scala.annotation.nowarn
import testdb.hardcoded.customtypes.Defaulted
import testdb.hardcoded.myschema.football_club.FootballClubId
import testdb.hardcoded.myschema.marital_status.MaritalStatusId
Expand Down Expand Up @@ -231,4 +233,69 @@ class PersonRepoImpl extends PersonRepo {
.executeInsert(PersonRow.rowParser(1).single)

}
override def upsertBatch(unsaved: Iterable[PersonRow])(implicit c: Connection): List[PersonRow] = {
def toNamedParameter(row: PersonRow): List[NamedParameter] = List(
NamedParameter("id", ParameterValue(row.id, null, PersonId.toStatement)),
NamedParameter("favourite_football_club_id", ParameterValue(row.favouriteFootballClubId, null, FootballClubId.toStatement)),
NamedParameter("name", ParameterValue(row.name, null, ToStatement.stringToStatement)),
NamedParameter("nick_name", ParameterValue(row.nickName, null, ToStatement.optionToStatement(ToStatement.stringToStatement, ParameterMetaData.StringParameterMetaData))),
NamedParameter("blog_url", ParameterValue(row.blogUrl, null, ToStatement.optionToStatement(ToStatement.stringToStatement, ParameterMetaData.StringParameterMetaData))),
NamedParameter("email", ParameterValue(row.email, null, ToStatement.stringToStatement)),
NamedParameter("phone", ParameterValue(row.phone, null, ToStatement.stringToStatement)),
NamedParameter("likes_pizza", ParameterValue(row.likesPizza, null, ToStatement.booleanToStatement)),
NamedParameter("marital_status_id", ParameterValue(row.maritalStatusId, null, MaritalStatusId.toStatement)),
NamedParameter("work_email", ParameterValue(row.workEmail, null, ToStatement.optionToStatement(ToStatement.stringToStatement, ParameterMetaData.StringParameterMetaData))),
NamedParameter("sector", ParameterValue(row.sector, null, Sector.toStatement)),
NamedParameter("favorite_number", ParameterValue(row.favoriteNumber, null, Number.toStatement))
)
unsaved.toList match {
case Nil => Nil
case head :: rest =>
new anorm.testdb.hardcoded.ExecuteReturningSyntax.Ops(
BatchSql(
s"""insert into myschema.person("id", "favourite_football_club_id", "name", "nick_name", "blog_url", "email", "phone", "likes_pizza", "marital_status_id", "work_email", "sector", "favorite_number")
values ({id}::int8, {favourite_football_club_id}, {name}, {nick_name}, {blog_url}, {email}, {phone}, {likes_pizza}, {marital_status_id}, {work_email}, {sector}::myschema.sector, {favorite_number}::myschema.number)
on conflict ("id")
do update set
"favourite_football_club_id" = EXCLUDED."favourite_football_club_id",
"name" = EXCLUDED."name",
"nick_name" = EXCLUDED."nick_name",
"blog_url" = EXCLUDED."blog_url",
"email" = EXCLUDED."email",
"phone" = EXCLUDED."phone",
"likes_pizza" = EXCLUDED."likes_pizza",
"marital_status_id" = EXCLUDED."marital_status_id",
"work_email" = EXCLUDED."work_email",
"sector" = EXCLUDED."sector",
"favorite_number" = EXCLUDED."favorite_number"
returning "id", "favourite_football_club_id", "name", "nick_name", "blog_url", "email", "phone", "likes_pizza", "marital_status_id", "work_email", "sector", "favorite_number"
""",
toNamedParameter(head),
rest.map(toNamedParameter)*
)
).executeReturning(PersonRow.rowParser(1).*)
}
}
/* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
override def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
SQL"create temporary table person_TEMP (like myschema.person) on commit drop".execute(): @nowarn
streamingInsert(s"""copy person_TEMP("id", "favourite_football_club_id", "name", "nick_name", "blog_url", "email", "phone", "likes_pizza", "marital_status_id", "work_email", "sector", "favorite_number") from stdin""", batchSize, unsaved)(PersonRow.text, c): @nowarn
SQL"""insert into myschema.person("id", "favourite_football_club_id", "name", "nick_name", "blog_url", "email", "phone", "likes_pizza", "marital_status_id", "work_email", "sector", "favorite_number")
select * from person_TEMP
on conflict ("id")
do update set
"favourite_football_club_id" = EXCLUDED."favourite_football_club_id",
"name" = EXCLUDED."name",
"nick_name" = EXCLUDED."nick_name",
"blog_url" = EXCLUDED."blog_url",
"email" = EXCLUDED."email",
"phone" = EXCLUDED."phone",
"likes_pizza" = EXCLUDED."likes_pizza",
"marital_status_id" = EXCLUDED."marital_status_id",
"work_email" = EXCLUDED."work_email",
"sector" = EXCLUDED."sector",
"favorite_number" = EXCLUDED."favorite_number"
;
drop table person_TEMP;""".executeUpdate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,17 @@ class PersonRepoMock(toRow: Function1[PersonRowUnsaved, PersonRow],
map.put(unsaved.id, unsaved): @nowarn
unsaved
}
override def upsertBatch(unsaved: Iterable[PersonRow])(implicit c: Connection): List[PersonRow] = {
unsaved.map { row =>
map += (row.id -> row)
row
}.toList
}
/* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
override def upsertStreaming(unsaved: Iterator[PersonRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
unsaved.foreach { row =>
map += (row.id -> row)
}
unsaved.size
}
}
Loading

0 comments on commit 098a1bc

Please sign in to comment.