Convenient wrapper to propagate database connection and give you opportunity to manage the state of your transactions, uses AsyncLocalStorage
under the hood, Typescript friendly, 0 production dependencies
- Package prevents you from polluting method arguments with connection object
userService.create(payload, connection)
- Package prevents you from binding your transaction managing logic to any database, driver, ORM, whatever. In case you need to change one of the mentioned above, you will just need to provide a different implementation of
ITransactionRunner
npm i @mokuteki/propagated-transactions
- Create an implementation of
ITransactionRunner
interface (provided by the package) for your specific database, driver, ORM, whatever - Create an instance of
PropagatedTransaction
and pass implementation from step one into constructor - Create a callback that executes business logic, pass it to
PropagatedTransaction.run()
. If the execution of the provided callback fails - the library rollbacks the transaction and rethrows the error. In case of a successful execution we implicitly commit the transaction and return the value from the callback - Obtain connection inside of inner method/abstraction layer and use it to run your query
const { PropagatedTransaction } = require('@mokuteki/propagated-transactions');
const knex = require('knex')({
client: 'pg',
connection: {
version: '8.10',
host: '127.0.0.1',
port: 5432,
user: 'mokuteki',
password: 'pass123',
database: 'propagated-test',
},
});
// Step 1
const KnexTransactionRunner = {
start: async () => {
const trx = await knex.transaction();
return trx;
},
commit: async (trx) => {
return trx.commit();
},
rollback: async (trx) => {
return trx.rollback();
},
};
// Step 2
module.exports.ptx = new PropagatedTransaction(KnexTransactionRunner);
async create(payload1, payload2) {
// Step 3
const callback = async () => {
const user = await userService.create(payload1);
const wallet = await walletService.create(payload2);
return user;
};
const user = await ptx.run(callback);
return user;
}
class UserService {
async create(payload) {
/**
* Step 4
* If you run this method in PropagatedTransaction context it will be executed in transaction
* Otherwise it will be executed as usual query
*/
const connection = ptx.connection || knex;
return connection('user').insert(payload);
}
}
class WalletService {
async create(payload) {
// Step 4
const connection = ptx.connection || knex;
return connection('wallet').insert(payload);
}
}
import { DataSource, QueryRunner } from 'typeorm';
import { PropagatedTransaction, ITransactionRunner } from '@mokuteki/propagated-transactions';
// Step 1
class TypeormTransactionRunner implements ITransactionRunner<QueryRunner> {
constructor(private readonly dataSource: DataSource) {}
/**
* Book and return database connection, run `.start()` method if exists
*/
public async start(): Promise<QueryRunner> {
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
return queryRunner;
}
public async commit(queryRunner: QueryRunner): Promise<void> {
await queryRunner.commitTransaction();
return queryRunner.release();
}
public async rollback(queryRunner: QueryRunner): Promise<void> {
await queryRunner.rollbackTransaction();
return queryRunner.release();
}
}
// Step 2
export const ptx = new PropagatedTransaction(TypeormTransactionRunner);
export class UserService {
constructor(
private readonly ptx: PropagatedTransaction<unknown>,
private readonly userRepository: IUserRepository,
private readonly walletRepository: IWalletRepository,
) {}
public async create(
payload1: ICreateUser,
payload2: ICreateWallet
): Promise<UserEntity> {
// Step 3
const callback = async () => {
const user = await this.userRepository.create(payload1);
const wallet = await this.walletRepository.create(payload2);
return user;
};
const user = await this.ptx.run<Promise<UserEntity>>(callback);
return user;
}
}
export class UserRepository implements IUserRepository {
constructor(
private readonly manager: EntityManager,
private readonly ptx: PropagatedTransaction<QueryRunner>,
) {}
/**
* Step 4
* If you run this method in PropagatedTransaction context it will be executed in transaction
* Otherwise it will be executed as usual query
*/
public async create(data: ICreateUser): Promise<UserEntity> {
const manager = this.ptx.connection?.manager || this.manager;
const user = manager.getRepository(TypeormUserEntity).create(data);
return manager.save(user)
}
}
export class WalletRepository implements IWalletRepository {
constructor(
private readonly manager: EntityManager,
private readonly ptx: PropagatedTransaction<QueryRunner>,
) {}
/**
* Step 4
* If you run this method in PropagatedTransaction context it will be executed in transaction
* Otherwise it will be executed as usual query
*/
public async create(data: ICreateWallet): Promise<WalletEntity> {
const manager = this.ptx.connection?.manager || this.manager;
const wallet = manager.getRepository(TypeormWalletEntity).create(data);
return manager.save(wallet)
}
}
Package gives you an ability to work with essential isolation levels:
READ COMMITTED
READ UNCOMMITTED
REPEATABLE READ
SERIALIZABLE
Just import IsolationLevels
and pass the desired level as a second argument of PropagatedTransaction.run()
method
By default we use READ COMMITTED
isolations level
const KnexTransactionRunner = {
start: async (isolationLevel) => {
// adapt internal @mokuteki/propagated-transactions isolation level to the one your transaction runner uses
const adaptedIsolationLevel = adapt(isolationLevel);
const trx = await knex.transaction({
isolationLevel: adaptedIsolationLevel,
});
return trx;
},
commit: async (trx) => {
return trx.commit();
},
rollback: async (trx) => {
return trx.rollback();
},
};
const { IsolationLevels } = require('@mokuteki/propagated-transactions')
// some code
async create(payload1, payload2) {
const callback = async () => {
const user = await userService.create(payload1);
const wallet = await walletService.create(payload2);
return user;
};
const user = await ptx.run(callback, IsolationLevels.Serializable);
return user;
}
Since version 1.2.0 the library supports execution of nested transactions like in the example below. That means that if we call updateBalance
from create
, the updateBalance
won't start a separate transaction, and will be treated as a part of create's
transaction. However, if we call updateBalance
directly, it will start its own transaction
async create(payload1, payload2) {
const callback = async () => {
const user = await userService.create(payload1);
const wallet = await walletService.create(payload2);
return user;
};
const user = await ptx.run(callback);
return user;
}
async updateBalance(payload2) {
const callback = async () => {
await walletService.updateBalance(payload2);
};
return ptx.run(callback);
}
Imagine we need to run UserService.create()
and WalletService.create()
in transaction
class UserService {
async create(payload) {
return knex('user').insert(payload);
}
}
class WalletService {
async create(payload) {
return knex('wallet').insert(payload);
}
}
Due to Node.js asynchronous nature in order to guarantee operations' atomicity we have to book a database connection and execute all of the transaction's operations using this specific connection. So we have to do something like this
class UserService {
async create(payload, trx) {
return trx('user').insert(payload);
}
}
class WalletService {
async create(payload, trx) {
return trx('wallet').insert(payload);
}
}
async create(payload1, payload2) {
const trx = await knex.transaction();
try {
const user = await userService.create(payload1, trx);
const wallet = await walletService.create(payload2, trx);
await trx.commit();
} catch (err) {
await trx.rollback();
}
}
The idea of this package is to propagate the connection and give you opportunity to manage the state of your transaction without binding your business logic to any database, ORM, driver, whatever
Before doing any kind of contribution run tests, here is how
cd db && sudo docker-compose up -d
npm run test
Licensed under MIT