Skip to content

Commit

Permalink
Migrate transaction tests to v3 (#148)
Browse files Browse the repository at this point in the history
* Start migrating transaction tests

* Fix first round of transaction tests

* Fix remaining transaction tests

* Adapt tests to expect default map
  • Loading branch information
simolus3 authored Oct 10, 2023
1 parent 54eaa44 commit 95663fd
Show file tree
Hide file tree
Showing 10 changed files with 412 additions and 321 deletions.
26 changes: 15 additions & 11 deletions lib/postgres_v3_experimental.dart
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,27 @@ class PgSql {
abstract class PgSession {
/// Prepares a reusable statement from a [query].
///
/// Query can either be a string or a [PgSql] instance. The [query] value used
/// alters the behavior of [PgStatement.bind]: When a string is used, the
/// query is sent to Postgres without modification and you may only used
/// indexed parameters (e.g. `SELECT * FROM users WHERE id = $1`). When using
/// [PgSql.map], you can use named parameters as well (e.g. `WHERE id = @id`).
/// [query] must either be a [String] or a [PgSql] object with types for
/// parameters already set. If the types for parameters are already known from
/// the query, a direct list of values can be passed for [parameters].
/// Otherwise, the type of parameter types must be made explicit. This can be
/// done by passing [PgTypedParameter] objects in a list, or (if a string or
/// [PgSql.map] value is passed for [query]), via the names of declared
/// statements.
///
/// When the returned future completes, the statement must eventually be freed
/// using [PgStatement.close] to avoid resource leaks.
/// using [PgStatement.dispose] to avoid resource leaks.
Future<PgStatement> prepare(Object /* String | PgSql */ query);

/// Executes the [query] with the given [parameters].
///
/// [query] must either be a [String] or a [PgSql] query with types for
/// parameters. When a [PgSql] query object with known types is used,
/// [parameters] can be a list of direct values. Otherwise, it must be a list
/// of [PgTypedParameter]s. With [PgSql.map], values can also be provided as a
/// map from the substituted parameter keys to objects or [PgTypedParameter]s.
/// [query] must either be a [String] or a [PgSql] object with types for
/// parameters already set. If the types for parameters are already known from
/// the query, a direct list of values can be passed for [parameters].
/// Otherwise, the type of parameter types must be made explicit. This can be
/// done by passing [PgTypedParameter] objects in a list, or (if a string or
/// [PgSql.map] value is passed for [query]), via the names of declared
/// statements.
///
/// When [ignoreRows] is set to true, the implementation may internally
/// optimize the execution to ignore rows returned by the query. Whether this
Expand Down
10 changes: 10 additions & 0 deletions lib/src/client_messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ class QueryMessage extends ClientMessage {
buffer.writeUint8(ClientMessage.QueryIdentifier);
buffer.writeLengthEncodedString(_queryString);
}

@override
String toString() {
return 'Query: $_queryString';
}
}

class ParseMessage extends ClientMessage {
Expand Down Expand Up @@ -152,6 +157,11 @@ class ParseMessage extends ClientMessage {
buffer.writeInt32(type?.oid ?? 0);
}
}

@override
String toString() {
return 'Parse $_statement';
}
}

class DescribeMessage extends ClientMessage {
Expand Down
106 changes: 88 additions & 18 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,32 @@ abstract class _PgSessionBase implements PgSession {
/// connection in the meantime.
final Pool _operationLock = Pool(1);

bool _sessionClosed = false;

PgConnectionImplementation get _connection;
Encoding get encoding => _connection._settings.encoding;

/// Runs [callback], guarded by [_operationLock] and cleans up the pending
/// resource afterwards.
Future<T> _withResource<T>(FutureOr<T> Function() callback) {
if (_connection._isClosing) {
void _checkActive() {
if (_sessionClosed) {
throw PostgreSQLException(
'Session or transaction has already finished, did you forget to await a statement?');
} else if (_connection._isClosing) {
throw PostgreSQLException('Connection is closing down');
}
}

return _operationLock.withResource(() async {
assert(_connection._pending == null);
/// Runs [callback], guarded by [_operationLock] and cleans up the pending
/// resource afterwards.
Future<T> _withResource<T>(FutureOr<T> Function() callback) {
_checkActive();
return _operationLock.withResource(() {
_checkActive();
assert(_connection._pending == null,
'Previous operation ${_connection._pending} did not clean up.');

try {
return await callback();
} finally {
return Future(callback).whenComplete(() {
_connection._pending = null;
}
});
});
}

Expand Down Expand Up @@ -303,6 +311,9 @@ class PgConnectionImplementation extends _PgSessionBase
final _ResolvedSettings _settings;

_PendingOperation? _pending;
// Errors happening while a transaction is active will roll back the
// transaction and should be reporte to the user.
_TransactionSession? _activeTransaction;

final Map<String, String> _parameters = {};

Expand Down Expand Up @@ -355,6 +366,8 @@ class PgConnectionImplementation extends _PgSessionBase
if (exception.willAbortConnection || _pending == null) {
_closeAfterError(exception);
} else {
_connection._activeTransaction?._transactionException = exception;

_pending!.handleError(exception);
}
} else if (_pending != null) {
Expand Down Expand Up @@ -384,16 +397,27 @@ class PgConnectionImplementation extends _PgSessionBase
return _operationLock.withResource(() async {
// The transaction has its own _operationLock, which means that it (and
// only it) can be used to run statements while it's active.
final transaction = _TransactionSession(this);
await transaction.execute('BEGIN;');
final transaction =
_connection._activeTransaction = _TransactionSession(this);
await transaction.execute(PgSql('BEGIN;'), queryMode: QueryMode.simple);

try {
final result = await fn(transaction);
await transaction.execute('COMMIT;');
await transaction._sendAndMarkClosed('COMMIT;');

// If we have received an error while the transaction was active, it
// will always be rolled back.
if (transaction._transactionException
case final PostgreSQLException e) {
throw e;
}

return result;
} catch (e) {
await transaction.execute('ROLLBACK;');
if (!transaction._sessionClosed) {
await transaction._sendAndMarkClosed('ROLLBACK;');
}

rethrow;
}
});
Expand Down Expand Up @@ -490,7 +514,7 @@ class _PgResultStreamSubscription
_BoundStatement statement, this._controller, this._source)
: session = statement.statement._session,
ignoreRows = false {
session._withResource(() async {
_scheduleStatement(() async {
connection._pending = this;

connection._channel.sink.add(AggregatedClientMessage([
Expand All @@ -511,16 +535,36 @@ class _PgResultStreamSubscription
});
}

_PgResultStreamSubscription.simpleQueryProtocol(String sql, this.session,
this._controller, this._source, this.ignoreRows) {
session._withResource(() async {
_PgResultStreamSubscription.simpleQueryProtocol(
String sql,
this.session,
this._controller,
this._source,
this.ignoreRows, {
void Function()? cleanup,
}) {
_scheduleStatement(() async {
connection._pending = this;

connection._channel.sink.add(QueryMessage(sql));
await _done.future;
cleanup?.call();
});
}

void _scheduleStatement(Future<void> Function() sendAndWait) async {
try {
await session._withResource(sendAndWait);
} catch (e, s) {
// _withResource can fail if the connection or the session is already
// closed. This error should be reported to the user!
if (!_done.isCompleted) {
_controller.addError(e, s);
await _completeQuery();
}
}
}

@override
Future<int> get affectedRows => _affectedRows.future;

Expand Down Expand Up @@ -749,8 +793,34 @@ class _TransactionSession extends _PgSessionBase {
@override
final PgConnectionImplementation _connection;

PostgreSQLException? _transactionException;

_TransactionSession(this._connection);

/// Sends the [command] and, before releasing the internal connection lock,
/// marks the session as closed.
///
/// This prevents other pending operations on the transaction that haven't
/// been awaited from running.
Future<void> _sendAndMarkClosed(String command) async {
final controller = StreamController<PgResultRow>();
final items = <PgResultRow>[];

final querySubscription = _PgResultStreamSubscription.simpleQueryProtocol(
command,
this,
controller,
controller.stream.listen(items.add),
true,
cleanup: () {
_sessionClosed = true;
_connection._activeTransaction = null;
},
);
await querySubscription.asFuture();
await querySubscription.cancel();
}

@override
Future<void> close() async {
throw UnsupportedError(
Expand Down
4 changes: 1 addition & 3 deletions lib/src/v3/query_description.dart
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ class InternalQueryDescription implements PgSql {

factory InternalQueryDescription.wrap(Object query) {
if (query is String) {
// todo: Determine whether we want to use a direct SQL command by default.
// Maybe this should be replaced with .map once implemented.
return InternalQueryDescription.direct(query);
return InternalQueryDescription.map(query);
} else if (query is InternalQueryDescription) {
return query;
} else {
Expand Down
56 changes: 54 additions & 2 deletions test/docker.dart
Original file line number Diff line number Diff line change
@@ -1,32 +1,83 @@
import 'dart:async';
import 'dart:io';

import 'package:async/async.dart';
import 'package:docker_process/containers/postgres.dart';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:path/path.dart' as p;
import 'package:postgres/messages.dart';
import 'package:postgres/postgres_v3_experimental.dart';
import 'package:postgres/src/connection.dart';
import 'package:postgres/src/replication.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';

// We log all packets sent to and received from the postgres server. This can be
// used to debug failing tests. To view logs, something like this can be put
// at the beginning of `main()`:
//
// Logger.root.level = Level.ALL;
// Logger.root.onRecord.listen((r) => print('${r.loggerName}: ${r.message}'));
StreamChannelTransformer<BaseMessage, BaseMessage> loggingTransformer(
String prefix) {
final inLogger = Logger('postgres.connection.$prefix.in');
final outLogger = Logger('postgres.connection.$prefix.out');

return StreamChannelTransformer(
StreamTransformer.fromHandlers(
handleData: (data, sink) {
inLogger.fine(data);
sink.add(data);
},
),
StreamSinkTransformer.fromHandlers(
handleData: (data, sink) {
outLogger.fine(data);
sink.add(data);
},
),
);
}

class PostgresServer {
final _port = Completer<int>();
final _containerName = Completer<String>();

Future<int> get port => _port.future;

Future<PgEndpoint> get endpoint async => PgEndpoint(
Future<PgEndpoint> endpoint({
bool requireSsl = false,
}) async =>
PgEndpoint(
host: 'localhost',
database: 'postgres',
username: 'postgres',
password: 'postgres',
port: await port,
requireSsl: requireSsl,
);

Future<PgConnection> newConnection({
bool useSSL = false,
ReplicationMode replicationMode = ReplicationMode.none,
}) async {
return PgConnection.open(
await endpoint(requireSsl: useSSL),
sessionSettings: PgSessionSettings(
replicationMode: replicationMode,
// We use self-signed certificates in tests
onBadSslCertificate: (_) => true,
transformer: loggingTransformer('conn'),
),
);
}

Future<PostgreSQLConnection> newPostgreSQLConnection({
bool useSSL = false,
ReplicationMode replicationMode = ReplicationMode.none,
}) async {
final e = await endpoint;
final e = await endpoint();
return PostgreSQLConnection(
e.host,
e.port,
Expand All @@ -39,6 +90,7 @@ class PostgresServer {
}
}

@isTestGroup
void withPostgresServer(
String name,
void Function(PostgresServer server) fn, {
Expand Down
4 changes: 2 additions & 2 deletions test/pool_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ void main() {

setUp(() async {
pool = PgPool(
[await server.endpoint],
[await server.endpoint()],
sessionSettings: _sessionSettings,
);

Expand Down Expand Up @@ -84,7 +84,7 @@ void main() {
withPostgresServer('limit pool connections', (server) {
test('can limit concurrent connections', () async {
final pool = PgPool(
[await server.endpoint],
[await server.endpoint()],
sessionSettings: _sessionSettings,
poolSettings: const PgPoolSettings(maxConnectionCount: 2),
);
Expand Down
Loading

0 comments on commit 95663fd

Please sign in to comment.