Skip to content

Commit

Permalink
add method to manual specify connection to use
Browse files Browse the repository at this point in the history
  • Loading branch information
rkusa committed Aug 8, 2023
1 parent 74f5581 commit f06ffe1
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
4 changes: 4 additions & 0 deletions postgres-macros/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ pub fn sql(item: TokenStream) -> TokenStream {
query: #result,
parameters: &[#(&(#typed_parameters),)*],
transaction: None,
connection: None,
marker: ::std::marker::PhantomData,
}
}
Expand All @@ -290,6 +291,7 @@ pub fn sql(item: TokenStream) -> TokenStream {
query: #result,
parameters: &[#(&(#typed_parameters),)*],
transaction: None,
connection: None,
marker: ::std::marker::PhantomData,
}
}
Expand All @@ -302,6 +304,7 @@ pub fn sql(item: TokenStream) -> TokenStream {
query: #result,
parameters: &[#(&(#typed_parameters),)*],
transaction: None,
connection: None,
marker: ::std::marker::PhantomData,
}
}
Expand Down Expand Up @@ -355,6 +358,7 @@ pub fn sql(item: TokenStream) -> TokenStream {
query: #result,
parameters: &[#(&(#typed_parameters),)*],
transaction: None,
connection: None,
marker: ::std::marker::PhantomData,
}
}
Expand Down
20 changes: 19 additions & 1 deletion postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::str::FromStr;
use std::sync::Arc;

pub use deadpool_postgres::Transaction;
use deadpool_postgres::{Manager, ManagerConfig, Object, Pool, RecyclingMethod};
use deadpool_postgres::{ClientWrapper, Manager, ManagerConfig, Object, Pool, RecyclingMethod};
pub use error::Error;
pub use future::SqlFuture;
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -80,10 +80,16 @@ pub struct Sql<'a, Cols, T> {
pub query: &'static str,
pub parameters: &'a [&'a (dyn ToSql + Sync)],
pub transaction: Option<&'a Transaction<'a>>,
pub connection: Option<&'a ClientWrapper>,
pub marker: PhantomData<(Cols, T)>,
}

impl<'a, Cols, T> Sql<'a, Cols, T> {
pub fn with(mut self, tx: &'a ClientWrapper) -> Self {
self.connection = Some(tx);
self
}

pub fn with_transaction(mut self, tx: &'a Transaction<'a>) -> Self {
self.transaction = Some(tx);
self
Expand All @@ -93,6 +99,9 @@ impl<'a, Cols, T> Sql<'a, Cols, T> {
if let Some(tx) = self.transaction {
let stmt = tx.prepare_cached(self.query).await?;
Ok(tx.query_one(&stmt, self.parameters).await?)
} else if let Some(conn) = self.connection {
let stmt = conn.prepare_cached(self.query).await?;
Ok(conn.query_one(&stmt, self.parameters).await?)
} else {
let conn = connect().await?;
let stmt = conn.prepare_cached(self.query).await?;
Expand All @@ -104,6 +113,9 @@ impl<'a, Cols, T> Sql<'a, Cols, T> {
if let Some(tx) = self.transaction {
let stmt = tx.prepare_cached(self.query).await?;
Ok(tx.query_opt(&stmt, self.parameters).await?)
} else if let Some(conn) = self.connection {
let stmt = conn.prepare_cached(self.query).await?;
Ok(conn.query_opt(&stmt, self.parameters).await?)
} else {
let conn = connect().await?;
let stmt = conn.prepare_cached(self.query).await?;
Expand All @@ -115,6 +127,9 @@ impl<'a, Cols, T> Sql<'a, Cols, T> {
if let Some(tx) = self.transaction {
let stmt = tx.prepare_cached(self.query).await?;
Ok(tx.query(&stmt, self.parameters).await?)
} else if let Some(conn) = self.connection {
let stmt = conn.prepare_cached(self.query).await?;
Ok(conn.query(&stmt, self.parameters).await?)
} else {
let conn = connect().await?;
let stmt = conn.prepare_cached(self.query).await?;
Expand All @@ -126,6 +141,9 @@ impl<'a, Cols, T> Sql<'a, Cols, T> {
if let Some(tx) = self.transaction {
let stmt = tx.prepare_cached(self.query).await?;
tx.execute(&stmt, self.parameters).await?;
} else if let Some(conn) = self.connection {
let stmt = conn.prepare_cached(self.query).await?;
conn.execute(&stmt, self.parameters).await?;
} else {
let conn = connect().await?;
let stmt = conn.prepare_cached(self.query).await?;
Expand Down

0 comments on commit f06ffe1

Please sign in to comment.