Skip to content

Commit

Permalink
implement Connection trait
Browse files Browse the repository at this point in the history
  • Loading branch information
rkusa committed Mar 13, 2024
1 parent 5908fb0 commit 244c90f
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ error[E0277]: the trait bound `Vec<User>: Query<Struct<(StructColumn<i64, "id">,
26 | let _: Vec<User> = sql!("SELECT id, role FROM users").await.unwrap();
| -^^^^^
| ||
| |the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>>` is not implemented for `Vec<User>`
| |the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>>` is not implemented for `Vec<User>`, which is required by `Sql<'_, Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>, _>: IntoFuture`
| help: remove the `.await`
|
= help: the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"moderator">, EnumVariant<"user">)>, "role">)>>` is implemented for `Vec<User>`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ error[E0277]: the trait bound `Vec<User>: Query<Struct<(StructColumn<i64, "id">,
20 | let _: Vec<User> = sql!("SELECT id, role FROM users").await.unwrap();
| -^^^^^
| ||
| |the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>>` is not implemented for `Vec<User>`
| |the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>>` is not implemented for `Vec<User>`, which is required by `Sql<'_, Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>, _>: IntoFuture`
| help: remove the `.await`
|
= help: the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"user">,)>, "role">)>>` is implemented for `Vec<User>`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ error[E0277]: the trait bound `Vec<User>: Query<Struct<(StructColumn<i64, "id">,
23 | let _: Vec<User> = sql!("SELECT id, role FROM users").await.unwrap();
| -^^^^^
| ||
| |the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>>` is not implemented for `Vec<User>`
| |the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>>` is not implemented for `Vec<User>`, which is required by `Sql<'_, Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>, _>: IntoFuture`
| help: remove the `.await`
|
= help: the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"moderator">, EnumVariant<"user">)>, "role">)>>` is implemented for `Vec<User>`
Expand Down
7 changes: 4 additions & 3 deletions postgres-macros/tests/fail-stable/enum_extra_variant.stderr
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ error[E0277]: the trait bound `Vec<User>: Query<Struct<(StructColumn<i64, 689821
--> tests/fail-stable/enum_extra_variant.rs:26:59
|
26 | let _: Vec<User> = sql!("SELECT id, role FROM users").await.unwrap();
| -^^^^^ the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is not implemented for `Vec<User>`
| |
| -^^^^^
| ||
| |the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is not implemented for `Vec<User>`
| help: remove the `.await`
|
= help: the trait `Query<Struct<Cols>>` is implemented for `Vec<T>`
= help: the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<16036746858103170191>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is implemented for `Vec<User>`
= note: required for `Sql<'_, Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<Enum<...>, 18137070463969723500>)>, ...>` to implement `IntoFuture`
7 changes: 4 additions & 3 deletions postgres-macros/tests/fail-stable/enum_missing_variant.stderr
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ error[E0277]: the trait bound `Vec<User>: Query<Struct<(StructColumn<i64, 689821
--> tests/fail-stable/enum_missing_variant.rs:20:59
|
20 | let _: Vec<User> = sql!("SELECT id, role FROM users").await.unwrap();
| -^^^^^ the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is not implemented for `Vec<User>`
| |
| -^^^^^
| ||
| |the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is not implemented for `Vec<User>`
| help: remove the `.await`
|
= help: the trait `Query<Struct<Cols>>` is implemented for `Vec<T>`
= help: the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<10465144470622129318>,)>, 18137070463969723500>)>>` is implemented for `Vec<User>`
= note: required for `Sql<'_, Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<Enum<...>, 18137070463969723500>)>, ...>` to implement `IntoFuture`
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ error[E0277]: the trait bound `Vec<User>: Query<Struct<(StructColumn<i64, 689821
--> tests/fail-stable/enum_variant_mismatch.rs:23:59
|
23 | let _: Vec<User> = sql!("SELECT id, role FROM users").await.unwrap();
| -^^^^^ the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is not implemented for `Vec<User>`
| |
| -^^^^^
| ||
| |the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is not implemented for `Vec<User>`
| help: remove the `.await`
|
= help: the trait `Query<Struct<Cols>>` is implemented for `Vec<T>`
= help: the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<16036746858103170191>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is implemented for `Vec<User>`
= note: required for `Sql<'_, Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<Enum<...>, 18137070463969723500>)>, ...>` to implement `IntoFuture`
167 changes: 167 additions & 0 deletions postgres/src/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// TODO: remove once Rust's async lifetime in trait story got improved
#![allow(clippy::manual_async_fn)]

use std::future::Future;

use deadpool_postgres::GenericClient;
use tokio_postgres::types::ToSql;
use tokio_postgres::Row;

use crate::Error;

pub trait Connection: Send + Sync {
fn query_one<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<Row, Error>> + Send + 'a;

fn query_opt<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<Option<Row>, Error>> + Send + 'a;

fn query<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<Vec<Row>, Error>> + Send + 'a;

fn execute<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<(), Error>> + Send + 'a;
}

impl Connection for deadpool_postgres::Client {
fn query_one<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<Row, Error>> + Send + 'a {
async move {
let stmt = self.prepare_cached(query).await?;
Ok(tokio_postgres::Client::query_one(self, &stmt, parameters).await?)
}
}

fn query_opt<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<Option<Row>, Error>> + Send + 'a {
async move {
let stmt = self.prepare_cached(query).await?;
Ok(tokio_postgres::Client::query_opt(self, &stmt, parameters).await?)
}
}

fn query<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<Vec<Row>, Error>> + Send + 'a {
async move {
let stmt = self.prepare_cached(query).await?;
Ok(tokio_postgres::Client::query(self, &stmt, parameters).await?)
}
}

fn execute<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<(), Error>> + Send + 'a {
async move {
let stmt = self.prepare_cached(query).await?;
tokio_postgres::Client::execute(self, &stmt, parameters).await?;
Ok(())
}
}
}

impl<'t> Connection for deadpool_postgres::Transaction<'t> {
fn query_one<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<Row, Error>> + Send + 'a {
async move {
let stmt = self.prepare_cached(query).await?;
Ok(tokio_postgres::Transaction::query_one(self, &stmt, parameters).await?)
}
}

fn query_opt<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<Option<Row>, Error>> + Send + 'a {
async move {
let stmt = self.prepare_cached(query).await?;
Ok(tokio_postgres::Transaction::query_opt(self, &stmt, parameters).await?)
}
}

fn query<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<Vec<Row>, Error>> + Send + 'a {
async move {
let stmt = self.prepare_cached(query).await?;
Ok(tokio_postgres::Transaction::query(self, &stmt, parameters).await?)
}
}

fn execute<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<(), Error>> + Send + 'a {
async move {
let stmt = self.prepare_cached(query).await?;
tokio_postgres::Transaction::execute(self, &stmt, parameters).await?;
Ok(())
}
}
}

impl<'b, C> Connection for &'b C
where
C: Connection,
{
fn query_one<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<Row, Error>> + Send + 'a {
(*self).query_one(query, parameters)
}

fn query_opt<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<Option<Row>, Error>> + Send + 'a {
(*self).query_opt(query, parameters)
}

fn query<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<Vec<Row>, Error>> + Send + 'a {
(*self).query(query, parameters)
}

fn execute<'a>(
&'a self,
query: &'a str,
parameters: &'a [&'a (dyn ToSql + Sync)],
) -> impl Future<Output = Result<(), Error>> + Send + 'a {
(*self).execute(query, parameters)
}
}
67 changes: 61 additions & 6 deletions postgres/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,33 @@ where
type IntoFuture = SqlFuture<'a, T>;

fn into_future(self) -> Self::IntoFuture {
SqlFuture::new(self)
}
}

pub struct SqlFuture<'a, T> {
future: Pin<Box<dyn Future<Output = Result<T, Error>> + Send + 'a>>,
marker: PhantomData<&'a ()>,
}

impl<'a, T> SqlFuture<'a, T> {
pub fn new<Cols>(sql: Sql<'a, Cols, T>) -> Self
where
T: Query<Cols> + Send + Sync + 'a,
Cols: Send + Sync + 'a,
{
let span =
tracing::debug_span!("sql query", query = self.query, parameters = ?self.parameters);
tracing::debug_span!("sql query", query = sql.query, parameters = ?sql.parameters);
let start = Instant::now();

SqlFuture {
future: Box::pin(
// Note: changes here must be applied to `with_connection` below too!
async move {
let mut i = 1;
loop {
match T::query(&self).await {
let conn = super::connect().await?;
match T::query(&sql, &conn).await {
Ok(r) => {
let elapsed = start.elapsed();
tracing::trace!(?elapsed, "sql query finished");
Expand All @@ -55,11 +72,49 @@ where
marker: PhantomData,
}
}
}

pub struct SqlFuture<'a, T> {
future: Pin<Box<dyn Future<Output = Result<T, Error>> + Send + 'a>>,
marker: PhantomData<&'a ()>,
pub fn with_connection<Cols>(sql: Sql<'a, Cols, T>, conn: impl super::Connection + 'a) -> Self
where
T: Query<Cols> + Send + Sync + 'a,
Cols: Send + Sync + 'a,
{
let span =
tracing::debug_span!("sql query", query = sql.query, parameters = ?sql.parameters);
let start = Instant::now();

SqlFuture {
future: Box::pin(
// Note: changes here must be applied to `bew` above too!
async move {
let mut i = 1;
loop {
match T::query(&sql, &conn).await {
Ok(r) => {
let elapsed = start.elapsed();
tracing::trace!(?elapsed, "sql query finished");
return Ok(r);
}
Err(Error {
kind: ErrorKind::Postgres(err),
..
}) if err.is_closed() && i <= 5 => {
// retry pool size + 1 times if connection is closed (might have
// received a closed one from the connection pool)
i += 1;
tracing::trace!("retry due to connection closed error");
continue;
}
Err(err) => {
return Err(err);
}
}
}
}
.instrument(span),
),
marker: PhantomData,
}
}
}

impl<'a, T> Future for SqlFuture<'a, T> {
Expand Down
Loading

0 comments on commit 244c90f

Please sign in to comment.