diff --git a/README.md b/README.md index 303fd71..43a92c7 100644 --- a/README.md +++ b/README.md @@ -15,10 +15,8 @@ npm i @mokuteki/propagated-transactions 1. Create an implementation of `ITransactionRunner` interface (provided by the package) for your specific database, driver, ORM, whatever 2. Create an instance of `PropagatedTransaction` and pass implementation from step one into constructor -3. Instantiate and store database connection by starting the transaction with `PropagatedTransaction.start()` -4. Create a callback that executes business logic, use `PropagatedTransaction.commit() / PropagatedTransaction.rollback()` inside of it -5. Run `PropagatedTransaction.run(connection, callback)`, where `connection` is stored connection from step three, `callback` is a callback from step four -6. Obtain connection inside of inner method/abstraction layer and use it to run your query +3. Create a callback that executes business logic, pass it to `PropagatedTransaction.run()`. If the execution of the provided callback fails - the library rollbacks the transaction and rethrows the error. In case of a successful execution we implicitly commit the transaction and return the value from the callback +4. Obtain connection inside of inner method/abstraction layer and use it to run your query ### Examples @@ -61,24 +59,14 @@ module.exports.ptx = new PropagatedTransaction(KnexTransactionRunner); ```js async create(payload1, payload2) { // Step 3 - const connection = await ptx.start(); - - // Step 4 const callback = async () => { - try { - const user = await userService.create(payload1); - const wallet = await walletService.create(payload2); - - await ptx.commit(); + const user = await userService.create(payload1); + const wallet = await walletService.create(payload2); - return user; - } catch (err) { - await ptx.rollback(); - } + return user; }; - // Step 5 - const user = await ptx.run(connection, callback); + const user = await ptx.run(callback); return user; } @@ -88,7 +76,7 @@ async create(payload1, payload2) { class UserService { async create(payload) { /** - * Step 6 + * Step 4 * If you run this method in PropagatedTransaction context it will be executed in transaction * Otherwise it will be executed as usual query */ @@ -101,7 +89,7 @@ class UserService { ```js class WalletService { async create(payload) { - // Step 6 + // Step 4 const connection = ptx.connection || knex; return connection('wallet').insert(payload); } @@ -161,24 +149,14 @@ export class UserService { payload2: ICreateWallet ): Promise { // Step 3 - const connection = await this.ptx.start(); - - // Step 4 const callback = async () => { - try { - const user = await this.userRepository.create(payload1); - const wallet = await this.walletRepository.create(payload2); + const user = await this.userRepository.create(payload1); + const wallet = await this.walletRepository.create(payload2); - await this.ptx.commit(); - - return user; - } catch (err) { - await this.ptx.rollback(); - } + return user; }; - // Step 5 - const user = await this.ptx.run>(connection, callback); + const user = await this.ptx.run>(callback); return user; } @@ -193,7 +171,7 @@ export class UserRepository implements IUserRepository { ) {} /** - * Step 6 + * Step 4 * If you run this method in PropagatedTransaction context it will be executed in transaction * Otherwise it will be executed as usual query */ @@ -215,7 +193,7 @@ export class WalletRepository implements IWalletRepository { ) {} /** - * Step 6 + * Step 4 * If you run this method in PropagatedTransaction context it will be executed in transaction * Otherwise it will be executed as usual query */ @@ -236,6 +214,8 @@ Package gives you an ability to work with essential isolation levels: * `REPEATABLE READ` * `SERIALIZABLE` +Just import `IsolationLevels` and pass the desired level as a second argument of `PropagatedTransaction.run()` method + By default we use `READ COMMITTED` isolations level ```js @@ -260,28 +240,54 @@ const KnexTransactionRunner = { ``` ```js -async create(payload1, payload2) { - const connection = await ptx.start(IsolationLevels.ReadCommitted); +const { IsolationLevels } = require('@mokuteki/propagated-transactions') + +// some code +async create(payload1, payload2) { const callback = async () => { - try { - const user = await userService.create(payload1); - const wallet = await walletService.create(payload2); + const user = await userService.create(payload1); + const wallet = await walletService.create(payload2); - await ptx.commit(); + return user; + }; - return user; - } catch (err) { - await ptx.rollback(); - } + const user = await ptx.run(callback, IsolationLevels.Serializable); + + return user; +} +``` + +#### Nested execution +Since version 1.2.0 the library supports execution of nested transactions like in the example below. That means that if we call `updateBalance` from `create`, the `updateBalance` won't start a separate transaction, and will be treated as a part of `create's` transaction. However, if we call `updateBalance` directly, it will start its own transaction + + +```js +async create(payload1, payload2) { + const callback = async () => { + const user = await userService.create(payload1); + const wallet = await walletService.create(payload2); + + return user; }; - const user = await ptx.run(connection, callback); + const user = await ptx.run(callback); return user; } ``` +```js +async updateBalance(payload2) { + const callback = async () => { + await walletService.updateBalance(payload2); + }; + + return ptx.run(callback); +} +``` + + ## Motivation Imagine we need to run `UserService.create()` and `WalletService.create()` in transaction diff --git a/db/docker-compose.yml b/db/docker-compose.yml index 23cb20b..707f389 100644 --- a/db/docker-compose.yml +++ b/db/docker-compose.yml @@ -1,6 +1,6 @@ version: '3.8' services: - postgres: + propagated_transactions_postgres: image: postgres:14 restart: always environment: diff --git a/lib/propagated-transaction.js b/lib/propagated-transaction.js index 4427af4..b145a8f 100644 --- a/lib/propagated-transaction.js +++ b/lib/propagated-transaction.js @@ -30,11 +30,11 @@ class PropagatedTransaction { PropagatedTransaction.#instance = this; } - async start(isolationLevel = IsolationLevels.ReadCommitted) { + async #start(isolationLevel = IsolationLevels.ReadCommitted) { return this.connection || this.runner.start(isolationLevel); } - async commit() { + async #commit() { if (!this.connection) { throw TransactionError.NotInContext(); } @@ -42,7 +42,7 @@ class PropagatedTransaction { return this.runner.commit(this.connection); } - async rollback() { + async #rollback() { if (!this.connection) { throw TransactionError.NotInContext(); } @@ -50,8 +50,28 @@ class PropagatedTransaction { return this.runner.rollback(this.connection); } - async run(connection, callback) { - return this.als.run(connection, callback); + async run(callback, isolationLevel = undefined) { + if (this.connection) { + return callback(); + } + + const connection = await this.#start(isolationLevel); + + const wrapped = async () => { + try { + const result = await callback(); + + await this.#commit(); + + return result; + } catch (err) { + await this.#rollback(); + + throw err; + } + }; + + return this.als.run(connection, wrapped); } } diff --git a/package-lock.json b/package-lock.json index 9f5d77e..bd5940d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@mokuteki/propagated-transactions", - "version": "1.1.4", + "version": "1.2.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@mokuteki/propagated-transactions", - "version": "1.1.4", + "version": "1.2.0", "license": "MIT", "devDependencies": { "@types/node": "^18.14.6", diff --git a/package.json b/package.json index e02b883..50ad2fc 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@mokuteki/propagated-transactions", - "version": "1.1.4", + "version": "1.2.0", "description": "Convenient wrapper to propagate and manage database transactions using AsyncLocalStorage", "main": "lib/propagated-transaction.js", "scripts": { diff --git a/test/fixtures.js b/test/fixtures.js index 138a558..9deeda6 100644 --- a/test/fixtures.js +++ b/test/fixtures.js @@ -32,9 +32,9 @@ const KnexTransactionRunner = { const data = { user: { id: 1, + balance: 0, name: 'Mykola', surname: 'Lysenko', - balance: 0, }, }; diff --git a/test/suits/propagated-transaction.test.js b/test/suits/propagated-transaction.test.js index bee2907..1acb7b1 100644 --- a/test/suits/propagated-transaction.test.js +++ b/test/suits/propagated-transaction.test.js @@ -1,12 +1,12 @@ const assert = require('node:assert'); -const { describe, it, before, afterEach, after } = require('node:test'); +const { describe, it, before, after, beforeEach } = require('node:test'); const { TransactionError } = require('#root/lib/transaction-error.js'); const { knex, data, KnexTransactionRunner } = require('#test/fixtures.js'); const { + IsolationLevels, PropagatedTransaction, } = require('#root/lib/propagated-transaction.js'); -const { IsolationLevels } = require('../../lib/propagated-transaction'); describe('PropagatedTransaction', async () => { before(async () => { @@ -19,30 +19,22 @@ describe('PropagatedTransaction', async () => { }); }); - afterEach(async () => { + beforeEach(async () => { await knex('user').del(); }); after(() => { - process.exit(0); + process.exit(); }); it('Successfully create user inside of knex transaction', async () => { const ptx = new PropagatedTransaction(KnexTransactionRunner); - const connection = await ptx.start(); - const callback = async () => { - try { - const user = await ptx.connection('user').insert(data.user); - - await ptx.commit(); - } catch (err) { - await ptx.rollback(); - } + const user = await ptx.connection('user').insert(data.user); }; - await ptx.run(connection, callback); + await ptx.run(callback); const selected = await knex .select('*') @@ -50,7 +42,7 @@ describe('PropagatedTransaction', async () => { .where('id', data.user.id) .first(); - assert.deepEqual(selected, data.user); + assert.deepStrictEqual(selected, data.user); }); it('Successfully execute callback and verify that specifying isolation level works as expected', async () => { @@ -58,50 +50,52 @@ describe('PropagatedTransaction', async () => { const ptx = new PropagatedTransaction(KnexTransactionRunner); - const connection = await ptx.start(IsolationLevels.ReadCommitted); - const callback = async () => { - try { - /** - * For some reason isolation levels don't work as expected if we don't execute at least one random query before performing any query on the outer connection - * Even though DEBUG=knex:query shows the proper order of SQL queries it still returns wrong results from time to time. - */ - await ptx.connection('user').count(); - - // Operation that is being executed using outer connection - await knex('user').where({ id: data.user.id }).update({ balance: 100 }); - - const user = await ptx - .connection('user') - .where({ id: data.user.id }) - .select('*') - .first(); - - await ptx.commit(); - - return user; - } catch (err) { - await ptx.rollback(); - } + /** + * For some reason isolation levels don't work as expected if we don't execute at least one random query before performing any query on the outer connection + * Even though DEBUG=knex:query shows the proper order of SQL queries it still returns wrong results from time to time. + */ + await ptx.connection('user').count(); + + // Operation that is being executed using outer connection + await knex('user').where({ id: data.user.id }).update({ balance: 100 }); + + const user = await ptx + .connection('user') + .where({ id: data.user.id }) + .select('*') + .first(); + + return user; }; - const user = await ptx.run(connection, callback); + const user = await ptx.run(callback, IsolationLevels.RepeatableRead); assert.strictEqual(user.balance, data.user.balance); }); - it('Rollback after creating the user inside of knex transaction', async () => { + it('Successfully reuse connection inside of a nested transaction', async () => { const ptx = new PropagatedTransaction(KnexTransactionRunner); - const connection = await ptx.start(); + const nestedMethod = async (id) => { + const nestedCallback = async () => { + return ptx.connection('user').where({ id }).select('*').first(); + }; - const callback = async () => { - await ptx.connection('user').insert(data.user); + return ptx.run(nestedCallback); + }; + + const method = async () => { + const callback = async () => { + await ptx.connection('user').insert(data.user); + + return nestedMethod(data.user.id); + }; - await ptx.rollback(); + return ptx.run(callback); }; - await ptx.run(connection, callback); + const result = await method(); const selected = await knex .select('*') @@ -109,26 +103,32 @@ describe('PropagatedTransaction', async () => { .where('id', data.user.id) .first(); - assert.deepEqual(selected, undefined); + assert.deepStrictEqual(selected, result); }); - it('Throw TransactionError.NotInContext when calling .commit() outside of the context', async () => { + it('Rollback after creating the user inside of knex transaction', async () => { const ptx = new PropagatedTransaction(KnexTransactionRunner); - const handler = async () => { - await ptx.commit(); - }; + const error = new Error('Internal error'); - assert.rejects(handler, TransactionError.NotInContext()); - }); + const callback = async () => { + await ptx.connection('user').insert(data.user); - it('Throw TransactionError.NotInContext when calling .rollback() outside of the context', async () => { - const ptx = new PropagatedTransaction(KnexTransactionRunner); + throw error; + }; const handler = async () => { - await ptx.rollback(); + await ptx.run(callback); }; - assert.rejects(handler, TransactionError.NotInContext()); + assert.rejects(handler, error); + + const selected = await knex + .select('*') + .from('user') + .where('id', data.user.id) + .first(); + + assert.deepStrictEqual(selected, undefined); }); });