Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimal example with Transaction? #20

Open
elpiel opened this issue May 13, 2019 · 9 comments
Open

Minimal example with Transaction? #20

elpiel opened this issue May 13, 2019 · 9 comments

Comments

@elpiel
Copy link
Contributor

elpiel commented May 13, 2019

Hello,
I'm trying to make a transaction, but I am hitting a wall and can't figure it out.

Is it possible for an example with transaction and 2 inserts for example?

@brokenthorn
Copy link
Contributor

Hey elpiel! What you're asking should be very simple, if you know intermediate level Rust. bb8 uses Tokio async (learn just a bit of that), the builder pattern (you should know what this is), and Rust traits (this too), and although you can implement the connection manager trait yourself, the builder is more than enough for general purposes. Also, check out the bb8 docs.

@elpiel
Copy link
Contributor Author

elpiel commented May 14, 2019

Well actually I coulnd't.

  1. run requires output future Ok((_, conn)) or Err((_, conn)), but transaction on other hand is a bit different. It accepts a Future. You can still probably map it, although I do remember trying.
  2. if I move |...| connection also gets moved(in order not to clone every value I need for the query when executing it).
  3. I can't wrap my head around how to create 2 inserts(probably with join) and then again use the connection to create a transaction which should return the connection itself, because you cannot do await! inside run()

PS: I am probably not explaining it very well since I am also learning, but it's not that easy as it seems as well. And I am having a hard time doing it without any guidance or documentation/examples for quite some time now.

@khuey
Copy link
Collaborator

khuey commented May 14, 2019

I haven't actually run this against a real database but I think it should work:

36c52b6#diff-fe61805d997a94ac99aa2ae71f7750d7R30

The tokio-postgres transaction API is not usable from bb8 at the moment, unfortunately, so you need to do it manually like this.

@elpiel
Copy link
Contributor Author

elpiel commented May 14, 2019

@khuey would you guide me through the implementation? I would like to try to implement it, if you have an idea how it will work?

@khuey
Copy link
Collaborator

khuey commented May 14, 2019

Guide you through the implementation of what, exactly?

@elpiel
Copy link
Contributor Author

elpiel commented May 14, 2019

How to implement the Transaction API of tokio-postgres in bb8.

@khuey
Copy link
Collaborator

khuey commented May 14, 2019

So tokio-postgres's transaction API is available[0], it just doesn't work. The TransactionBuilder object that tokio-postgres gives you takes a Future, and we need to move the connection object[1] into the future (so that we can do SQL queries on the connection). But if the TransactionBuilder fails internally, we have no way to attach the connection object to its error (because we moved the connection into the future) so the connection is lost and then you can't make the type signatures match up with what bb8 expects.

What we need is a transaction method that looks something like this:

fn transaction<R, E, Fut, F>(connection: tokio_postgres::Client, f: F) -> impl Future<Item = (tokio_postgres::Client, R), Error = (tokio_postgres::Client, E)>
  where F: FnOnce(tokio_postgres::Client) -> Fut,
        Fut: Future<Item = (tokio_postgres::Client, R), Error = (tokio_postgres::Client, E),
        E: From<tokio_postgres::Error>
{
    connection.simple_query("BEGIN")
        ...
        .and_then(|connection| {
            f(connection)
        })
        .and_then(|(mut connection, result)| {
            connection.simple_query("COMMIT")
                ...
                .then(|r| match r {
                    Ok(_) => Ok((connection, result)),
                    Err(e) => Err((connection, e))
                 })
        })
        .or_else(|(mut connection, e)| {
            connection.simple_query("ROLLBACK")
                ...
                .then(|_| Err((connection, e)))
        })
}

So step 1 is to write that function and get it working.

[0] https://docs.rs/tokio-postgres/0.4.0-rc.2/tokio_postgres/struct.Client.html#method.build_transaction
[1] i.e. a tokio_postgres::Client

@khuey
Copy link
Collaborator

khuey commented Jul 19, 2019

Something like this works.

diff --git a/postgres/src/lib.rs b/postgres/src/lib.rs
index 87c38ce..47bd6a7 100644
--- a/postgres/src/lib.rs
+++ b/postgres/src/lib.rs
@@ -90,16 +90,20 @@ where
 }

 /// Run asynchronous into a Transaction
-pub fn transaction<R, Fut, F>(mut connection: tokio_postgres::Client, f: F) -> impl Future<Item=(R, tokio_postgres::Client), Error=(tokio_postgres::Error, tokio_postgres::Client)>
+pub fn transaction<R, E, Fut, F>(mut connection: tokio_postgres::Client, f: F) -> impl Future<Item=(R, tokio_postgres::Client), Error=(E, tokio_postgres::Client)>
     where
         F: FnOnce(tokio_postgres::Client) -> Fut,
-        Fut: Future<Item=(R, tokio_postgres::Client), Error=(tokio_postgres::Error, tokio_postgres::Client)>,
+        E: From<tokio_postgres::Error>,
+        Fut: Future<Item=(R, tokio_postgres::Client), Error=(E, tokio_postgres::Client)>,
 {
     connection.simple_query("BEGIN")
         .for_each(|_| Ok(()))
         .then(|r| match r {
             Ok(_) => Ok(connection),
-            Err(e) => Err((e, connection)),
+            Err(e) => {
+                let error: E = e.into();
+                Err((error, connection))
+            },
         })
         .and_then(|connection| f(connection))
         .and_then(|(result, mut connection)| {
@@ -107,11 +111,11 @@ pub fn transaction<R, Fut, F>(mut connection: tokio_postgres::Client, f: F) -> i
                 .for_each(|_| Ok(()))
                 .then(|r| match r {
                     Ok(_) => Ok((result, connection)),
-                    Err(e) => Err((e, connection))
+                    Err(e) => Err((e.into(), connection))
                 })
         })
         .or_else(|(e, mut connection)| {
             connection.simple_query("ROLLBACK").for_each(|_| Ok(()))
-                .then(|_| Err((e, connection)))
+                .then(|_| Err((e.into(), connection)))
         })
-}
\ No newline at end of file
+}

@khuey
Copy link
Collaborator

khuey commented Jul 19, 2019

Woops, meant to post that in the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants