Skip to content

Commit

Permalink
Timeout triggers current statement cancellation (using a new connecti…
Browse files Browse the repository at this point in the history
…on). (#380)
  • Loading branch information
isoos authored Sep 13, 2024
1 parent 0760253 commit c212668
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- `DatabaseInfo` tracks information about relations and oids (currently limited to `RelationMessage` caching).
- **Behaviour / soft-breaking changes**:
- Preparing/executing a stamement on the main connection while in a `runTx` callback will throw an exception.
- Setting `timeout` will try to actively cancel the current statement using a new connection.
- Deprecated `TupleDataColumn.data`, use `.value` instead (for binary protocol messages).
- Deprecated some logical replication message parsing method.
- Removed `@internal`-annotated methods from the public API of `ServerException` and `Severity`.
Expand Down
10 changes: 9 additions & 1 deletion lib/src/exceptions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,21 @@ ServerException buildExceptionFromErrorFields(List<ErrorField> errorFields) {
);
}

PgException transformServerException(ServerException ex) {
PgException transformServerException(
ServerException ex, {
bool timeoutTriggered = false,
}) {
if (ex.code == '57014' &&
ex.message == 'canceling statement due to statement timeout') {
return _PgTimeoutException(
['${ex.code}:', ex.message, ex.trace].whereType<String>().join(' '),
);
}
if (ex.code == '57014' && timeoutTriggered) {
return _PgTimeoutException(
['${ex.code}:', ex.message, ex.trace].whereType<String>().join(' '),
);
}
return ex;
}

Expand Down
48 changes: 36 additions & 12 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,9 @@ abstract class _PgSessionBase implements Session {
ignoreRows,
);
try {
await querySubscription.asFuture().optionalTimeout(timeout);
return Result(
rows: items,
affectedRows: await querySubscription.affectedRows,
schema: await querySubscription.schema,
return await querySubscription._waitForResult(
items: items,
timeout: timeout,
);
} finally {
await querySubscription.cancel();
Expand Down Expand Up @@ -260,6 +258,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
),
async.StreamSinkTransformer.fromHandlers(handleData: (msg, sink) {
print('[$hash][out] $msg');
print('[out] $msg');
sink.add(msg);
}),
));
Expand Down Expand Up @@ -655,16 +654,13 @@ class _PreparedStatement extends Statement {
final items = <ResultRow>[];
final subscription = bind(parameters).listen(items.add);
try {
await subscription.asFuture().optionalTimeout(timeout);
return await (subscription as _PgResultStreamSubscription)._waitForResult(
items: items,
timeout: timeout,
);
} finally {
await subscription.cancel();
}

return Result(
rows: items,
affectedRows: await subscription.affectedRows,
schema: await subscription.schema,
);
}

@override
Expand Down Expand Up @@ -892,6 +888,34 @@ class _PgResultStreamSubscription
}
}

Future<Result> _waitForResult({
required List<ResultRow> items,
required Duration? timeout,
}) async {
bool timeoutTriggered = false;
final cancelTimer = timeout == null
? null
: Timer(timeout, () async {
timeoutTriggered = true;
await connection.cancelPendingStatement();
});
try {
await asFuture();
return Result(
rows: items,
affectedRows: await affectedRows,
schema: await schema,
);
} on ServerException catch (e) {
if (timeoutTriggered) {
throw transformServerException(e, timeoutTriggered: timeoutTriggered);
}
rethrow;
} finally {
cancelTimer?.cancel();
}
}

// Forwarding subscription interface to regular stream subscription from
// controller

Expand Down
44 changes: 29 additions & 15 deletions test/timeout_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,33 @@ void main() {

// Note: to fix this, we may consider cancelling the currently running statements:
// https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-CANCELING-REQUESTS
// withPostgresServer('timeout race conditions', (server) {
// test('two transactions for update', () async {
// final c1 = await server.newConnection();
// final c2 = await server.newConnection();
// await c1.execute('CREATE TABLE t (id INT PRIMARY KEY);');
// await c1.execute('INSERT INTO t (id) values (1);');
// await c1.execute('BEGIN');
// await c1.execute('SELECT * FROM t WHERE id=1 FOR UPDATE');
// await c2.execute('BEGIN');
// await c2.execute('SELECT * FROM t WHERE id=1 FOR UPDATE',
// timeout: Duration(seconds: 1));
// await c1.execute('ROLLBACK');
// await c2.execute('ROLLBACK');
// });
// });
withPostgresServer('timeout race conditions', (server) {
setUp(() async {
final c1 = await server.newConnection();
await c1.execute('CREATE TABLE t (id INT PRIMARY KEY);');
await c1.execute('INSERT INTO t (id) values (1);');
});

test('two transactions for update', () async {
for (final qm in QueryMode.values) {
final c1 = await server.newConnection();
final c2 = await server.newConnection(queryMode: qm);
await c1.execute('BEGIN');
await c1.execute('SELECT * FROM t WHERE id=1 FOR UPDATE');
await c2.execute('BEGIN');
try {
await c2.execute('SELECT * FROM t WHERE id=1 FOR UPDATE',
timeout: Duration(seconds: 1));
fail('unreachable');
} on TimeoutException catch (_) {
// ignore
}
await c1.execute('ROLLBACK');
await c2.execute('ROLLBACK');

await c1.execute('SELECT 1');
await c2.execute('SELECT 1');
}
});
});
}

0 comments on commit c212668

Please sign in to comment.