Skip to content

Commit

Permalink
First implementation of statement cancellation through new connection. (
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos authored Sep 13, 2024
1 parent de56b3f commit 80b9078
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 15 deletions.
30 changes: 30 additions & 0 deletions lib/src/messages/client_messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,33 @@ class HotStandbyFeedbackMessage extends ClientMessage
buffer.writeUint32(epochCatalogXminXid);
}
}

class CancelRequestMessage extends ClientMessage {
/// The process ID of the target backend.
final int processId;

/// The secret key for the target backend.
final int secretKey;

CancelRequestMessage({
required this.processId,
required this.secretKey,
});

@override
void applyToBuffer(PgByteDataWriter buffer) {
// Length of message contents in bytes, including self.
buffer.writeInt32(16);
// The cancel request code. The value is chosen to contain 1234 in the most significant 16 bits,
// and 5678 in the least significant 16 bits.
// (To avoid confusion, this code must not be the same as any protocol version number.)
buffer.writeInt32(80877102);
buffer.writeUint32(processId);
buffer.writeUint32(secretKey);
}

@override
String toString() {
return 'CancelRequestMessage: $processId $secretKey';
}
}
56 changes: 41 additions & 15 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -222,20 +222,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
incomingBytesTransformer: incomingBytesTransformer,
);

if (_debugLog) {
channel = channel.transform(StreamChannelTransformer(
StreamTransformer.fromHandlers(
handleData: (msg, sink) {
print('[in] $msg');
sink.add(msg);
},
),
async.StreamSinkTransformer.fromHandlers(handleData: (msg, sink) {
print('[out] $msg');
sink.add(msg);
}),
));
}
channel = _debugChannel(channel);

if (settings.transformer != null) {
channel = channel.transform(settings.transformer!);
Expand All @@ -256,6 +243,25 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
return connection;
}

static StreamChannel<Message> _debugChannel(StreamChannel<Message> channel) {
if (!_debugLog) {
return channel;
}
final hash = channel.hashCode.abs().toRadixString(16);
return channel.transform(StreamChannelTransformer(
StreamTransformer.fromHandlers(
handleData: (msg, sink) {
print('[$hash][in] $msg');
sink.add(msg);
},
),
async.StreamSinkTransformer.fromHandlers(handleData: (msg, sink) {
print('[$hash][out] $msg');
sink.add(msg);
}),
));
}

static Future<(StreamChannel<Message>, bool)> _connect(
Endpoint endpoint,
ResolvedConnectionSettings settings, {
Expand Down Expand Up @@ -380,6 +386,7 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
/// Whether [_channel] is backed by a TLS connection.
final bool _channelIsSecure;
late final StreamSubscription<Message> _serverMessages;
BackendKeyMessage? _backendKeyMessage;
bool _isClosing = false;
bool _socketIsBroken = false;

Expand Down Expand Up @@ -461,7 +468,9 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {

if (message is ParameterStatusMessage) {
info.setParameter(message.name, message.value);
} else if (message is BackendKeyMessage || message is NoticeMessage) {
} else if (message is BackendKeyMessage) {
_backendKeyMessage = message;
} else if (message is NoticeMessage) {
// ignore for now
} else if (message is NotificationResponseMessage) {
_channels.deliverNotification(message);
Expand Down Expand Up @@ -597,6 +606,23 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
socketIsBroken: cause?.willAbortConnection ?? false,
);
}

@internal
Future<void> cancelPendingStatement() async {
var (channel, _) =
await _connect(_endpoint, _settings, codecContext: codecContext);
if (_backendKeyMessage == null) {
throw PgException(
'Unable to cancel pending statement: no backend key available.');
}
channel = _debugChannel(channel);
channel.sink.add(CancelRequestMessage(
processId: _backendKeyMessage!.processId,
secretKey: _backendKeyMessage!.secretKey,
));
// Waiting for the server to close connection.
await channel.stream.listen((_) {}).asFuture();
}
}

class _PreparedStatement extends Statement {
Expand Down
10 changes: 10 additions & 0 deletions test/timeout_test.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:async';

import 'package:postgres/postgres.dart';
import 'package:postgres/src/v3/connection.dart';
import 'package:test/test.dart';

import 'docker.dart';
Expand All @@ -18,6 +19,15 @@ void main() {
await conn.close();
});

test('Cancel current statement through a new connection', () async {
final f = conn.execute('SELECT pg_sleep(2);');
await (conn as PgConnectionImplementation).cancelPendingStatement();
await expectLater(f, throwsA(isA<ServerException>()));
// connection is still usable
final rs = await conn.execute('SELECT 1;');
expect(rs[0][0], 1);
});

test('Timeout fires during transaction rolls ack transaction', () async {
try {
await conn.runTx((ctx) async {
Expand Down

0 comments on commit 80b9078

Please sign in to comment.