Skip to content

Commit

Permalink
Make more tests work through delegate (#153)
Browse files Browse the repository at this point in the history
* Fix `connect_test.dart` with V3

* Run more tests with v3 delegate

* Fix underlying issue for notification test
  • Loading branch information
simolus3 authored Oct 11, 2023
1 parent bc43e22 commit 9f79852
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 105 deletions.
9 changes: 9 additions & 0 deletions lib/postgres_v3_experimental.dart
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,20 @@ final class PgResultColumn {
}

abstract class PgChannels {
/// A stream of all notifications delivered from the server.
///
/// This stream can be used to listen to notifications manually subscribed to.
/// The `[]` operator on [PgChannels] can be used to register subscriptions to
/// notifications only when a stream is being listened to.
Stream<PgNotification> get all;

Stream<String> operator [](String channel);
Future<void> notify(String channel, [String? payload]);
Future<void> cancelAll();
}

typedef PgNotification = ({int processId, String channel, String payload});

final class PgEndpoint {
final String host;
final int port;
Expand Down
247 changes: 150 additions & 97 deletions lib/src/v2_v3_delegate.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,121 @@ import 'package:postgres/src/execution_context.dart';
import 'package:postgres/src/replication.dart';
import 'package:postgres/src/server_messages.dart';

class V3BackedPostgreSQLConnection implements PostgreSQLConnection {
mixin _DelegatingContext implements PostgreSQLExecutionContext {
PgSession? get _session;

@override
void cancelTransaction({String? reason}) {
throw UnimplementedError();
}

@override
Future<int> execute(String fmtString,
{Map<String, dynamic>? substitutionValues, int? timeoutInSeconds}) async {
if (_session case final PgSession session) {
final rs = await session.execute(
PgSql.map(fmtString),
parameters: substitutionValues,
ignoreRows: true,
);
return rs.affectedRows;
} else {
throw PostgreSQLException(
'Attempting to execute query, but connection is not open.');
}
}

@override
Future<List<Map<String, Map<String, dynamic>>>> mappedResultsQuery(
String fmtString,
{Map<String, dynamic>? substitutionValues,
bool? allowReuse,
int? timeoutInSeconds}) async {
final rs = await query(
fmtString,
substitutionValues: substitutionValues,
allowReuse: allowReuse ?? false,
timeoutInSeconds: timeoutInSeconds,
);

// Load table names which are not returned by Postgres reliably. The v2
// implementation used to do this with its own cache, the v3 API doesn't do
// it at all and defers that logic to the user.
final raw = rs._result;
final tableOids = raw.schema.columns
.map((c) => c.tableOid)
.where((id) => id != null && id > 0)
.toList()
..sort();

if (tableOids.isEmpty) {
return rs.map((row) => row.toTableColumnMap()).toList();
}

final oidToName = <int, String>{};
final oidResults = await query(
"SELECT oid::int8, relname FROM pg_class WHERE relkind='r' AND oid = ANY(@ids:_int8::oid[])",
substitutionValues: {'ids': tableOids},
);
for (final row in oidResults) {
oidToName[row[0]] = row[1];
}

final results = <Map<String, Map<String, dynamic>>>[];
for (final row in raw) {
final tables = <String, Map<String, dynamic>>{};

for (final (i, col) in row.schema.columns.indexed) {
final tableName = oidToName[col.tableOid];
final columnName = col.columnName;

if (tableName != null && columnName != null) {
tables.putIfAbsent(tableName, () => {})[columnName] = row[i];
}
}

results.add(tables);
}

return results;
}

@override
Future<_PostgreSQLResult> query(String fmtString,
{Map<String, dynamic>? substitutionValues,
bool? allowReuse,
int? timeoutInSeconds,
bool? useSimpleQueryProtocol}) async {
if (_session case final PgSession session) {
final rs = await session.execute(
PgSql.map(fmtString),
parameters: substitutionValues,
queryMode: (useSimpleQueryProtocol ?? false) ? QueryMode.simple : null,
);
return _PostgreSQLResult(rs, rs.map((e) => _PostgreSQLResultRow(e, e)));
} else {
throw PostgreSQLException(
'Attempting to execute query, but connection is not open.');
}
}

@override
int get queueSize => throw UnimplementedError();
}

class V3BackedPostgreSQLConnection
with _DelegatingContext
implements PostgreSQLConnection {
final PgEndpoint _endpoint;
final PgSessionSettings _sessionSettings;
PgConnection? _connection;
bool _hasConnectedPreviously = false;

V3BackedPostgreSQLConnection(this._endpoint, this._sessionSettings);

@override
PgSession? get _session => _connection;

@override
void addMessage(ClientMessage message) {
throw UnimplementedError();
Expand All @@ -25,11 +133,6 @@ class V3BackedPostgreSQLConnection implements PostgreSQLConnection {
@override
bool get allowClearTextPassword => throw UnimplementedError();

@override
void cancelTransaction({String? reason}) {
throw UnimplementedError();
}

@override
Future close() async {
await _connection?.close();
Expand All @@ -51,23 +154,23 @@ class V3BackedPostgreSQLConnection implements PostgreSQLConnection {
@override
bool get isUnixSocket => throw UnimplementedError();

@override
Future<List<Map<String, Map<String, dynamic>>>> mappedResultsQuery(
String fmtString,
{Map<String, dynamic>? substitutionValues = const {},
bool? allowReuse,
int? timeoutInSeconds}) {
throw UnimplementedError();
}

@override
Stream<ServerMessage> get messages => throw UnimplementedError();

@override
Stream<Notification> get notifications => throw UnimplementedError();
Stream<Notification> get notifications =>
_connection!.channels.all.map((event) {
return Notification(event.processId, event.channel, event.payload);
});

@override
Future open() async {
if (_hasConnectedPreviously) {
throw PostgreSQLException(
'Attempting to reopen a closed connection. Create a instance instead.');
}

_hasConnectedPreviously = true;
_connection = await PgConnection.open(
_endpoint,
sessionSettings: _sessionSettings,
Expand All @@ -83,36 +186,6 @@ class V3BackedPostgreSQLConnection implements PostgreSQLConnection {
@override
int get processID => throw UnimplementedError();

@override
Future<int> execute(
String fmtString, {
Map<String, dynamic>? substitutionValues = const {},
int? timeoutInSeconds,
}) async {
return _PostgreSQLExecutionContext(_connection!).execute(
fmtString,
substitutionValues: substitutionValues,
timeoutInSeconds: timeoutInSeconds,
);
}

@override
Future<PostgreSQLResult> query(
String fmtString, {
Map<String, dynamic>? substitutionValues,
bool? allowReuse,
int? timeoutInSeconds,
bool? useSimpleQueryProtocol,
}) async {
return await _PostgreSQLExecutionContext(_connection!).query(
fmtString,
substitutionValues: substitutionValues,
allowReuse: allowReuse,
timeoutInSeconds: timeoutInSeconds,
useSimpleQueryProtocol: useSimpleQueryProtocol,
);
}

@override
int get queryTimeoutInSeconds => throw UnimplementedError();

Expand All @@ -139,9 +212,14 @@ class V3BackedPostgreSQLConnection implements PostgreSQLConnection {
Future Function(PostgreSQLExecutionContext connection) queryBlock, {
int? commitTimeoutInSeconds,
}) async {
return await _connection!.runTx((session) async {
return await queryBlock(_PostgreSQLExecutionContext(session));
});
if (_connection case final PgConnection conn) {
return await conn.runTx((session) async {
return await queryBlock(_PostgreSQLExecutionContext(session));
});
} else {
throw PostgreSQLException(
'Attempting to execute query, but connection is not open.');
}
}

@override
Expand Down Expand Up @@ -200,63 +278,38 @@ class _PostgreSQLResultRow extends UnmodifiableListView

@override
Map<String, dynamic> toColumnMap() {
throw UnimplementedError();
final map = <String, dynamic>{};
for (final (i, col) in _row.schema.columns.indexed) {
if (col.columnName case final String name) {
map[name] = _row[i];
}
}

return map;
}

@override
Map<String, Map<String, dynamic>> toTableColumnMap() {
throw UnimplementedError();
}
}

class _PostgreSQLExecutionContext implements PostgreSQLExecutionContext {
final PgSession _session;
_PostgreSQLExecutionContext(this._session);
final tables = <String, Map<String, dynamic>>{};

@override
void cancelTransaction({String? reason}) {
throw UnimplementedError();
}
for (final (i, col) in _row.schema.columns.indexed) {
final tableName = col.tableName;
final columnName = col.columnName;

@override
Future<int> execute(
String fmtString, {
Map<String, dynamic>? substitutionValues,
int? timeoutInSeconds,
}) async {
final rs = await _session.execute(
PgSql.map(fmtString),
parameters: substitutionValues,
ignoreRows: true,
);
return rs.affectedRows;
}
if (tableName != null && columnName != null) {
tables.putIfAbsent(tableName, () => {})[columnName] = _row[i];
}
}

@override
Future<List<Map<String, Map<String, dynamic>>>> mappedResultsQuery(
String fmtString,
{Map<String, dynamic>? substitutionValues,
bool? allowReuse,
int? timeoutInSeconds}) {
throw UnimplementedError();
return tables;
}
}

class _PostgreSQLExecutionContext
with _DelegatingContext
implements PostgreSQLExecutionContext {
@override
Future<PostgreSQLResult> query(
String fmtString, {
Map<String, dynamic>? substitutionValues,
bool? allowReuse,
int? timeoutInSeconds,
bool? useSimpleQueryProtocol,
}) async {
final rs = await _session.execute(
PgSql.map(fmtString),
parameters: substitutionValues,
queryMode: (useSimpleQueryProtocol ?? false) ? QueryMode.simple : null,
);
return _PostgreSQLResult(rs, rs.map((e) => _PostgreSQLResultRow(e, e)));
}
final PgSession _session;

@override
int get queueSize => throw UnimplementedError();
_PostgreSQLExecutionContext(this._session);
}
14 changes: 12 additions & 2 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ class _PgResultStreamSubscription
final bool ignoreRows;

final Completer<int> _affectedRows = Completer();
int _affectedRowsSoFar = 0;
final Completer<PgResultSchema> _schema = Completer();
final Completer<void> _done = Completer();
PgResultSchema? _resultSchema;
Expand Down Expand Up @@ -580,7 +581,7 @@ class _PgResultStreamSubscription
// after the query is done, even if we didn't get a row description
// message.
if (!_affectedRows.isCompleted) {
_affectedRows.complete(0);
_affectedRows.complete(_affectedRowsSoFar);
}
if (!_schema.isCompleted) {
_schema.complete(PgResultSchema(const []));
Expand Down Expand Up @@ -646,7 +647,10 @@ class _PgResultStreamSubscription
_controller.add(row);
}
case CommandCompleteMessage():
_affectedRows.complete(message.rowsAffected);
// We can't complete _affectedRows directly after receiving the message
// since, if multiple statements are running in a single SQL string,
// we'll get this more than once.
_affectedRowsSoFar += message.rowsAffected;
case ReadyForQueryMessage():
await _completeQuery();
case CopyBothResponseMessage():
Expand Down Expand Up @@ -707,13 +711,17 @@ class _Channels implements PgChannels {
final PgConnectionImplementation _connection;

final Map<String, List<MultiStreamController<String>>> _activeListeners = {};
final StreamController<PgNotification> _all = StreamController.broadcast();

// We are using the pg_notify function in a prepared select statement to
// efficiently implement [notify].
Completer<PgStatement>? _notifyStatement;

_Channels(this._connection);

@override
Stream<PgNotification> get all => _all.stream;

@override
Stream<String> operator [](String channel) {
return Stream.multi(
Expand Down Expand Up @@ -760,6 +768,8 @@ class _Channels implements PgChannels {
}

void deliverNotification(NotificationResponseMessage msg) {
_all.add(
(processId: msg.processID, channel: msg.channel, payload: msg.payload));
final listeners = _activeListeners[msg.channel] ?? const [];

for (final listener in listeners) {
Expand Down
Loading

0 comments on commit 9f79852

Please sign in to comment.