Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] configurable encoding + optimizing json access #132

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/postgres.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
library postgres;

export 'src/client_encoding.dart';
export 'src/connection.dart';
export 'src/execution_context.dart';
export 'src/replication.dart' show ReplicationMode;
Expand Down
22 changes: 10 additions & 12 deletions lib/postgres_v3_experimental.dart
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ abstract class PgStatement {
Future<PgResult> run([
Object? /* List<Object?|PgTypedParameter> | Map<String, Object?|PgTypedParameter> */
parameters,
]
) async {
]) async {
final items = <PgResultRow>[];
final subscription = bind(parameters).listen(items.add);
await subscription.asFuture();
Expand Down Expand Up @@ -362,25 +361,24 @@ final class PgSessionSettings {

/// The replication mode for connecting in streaming replication mode.
///
/// The default value is [ReplicationMode.none]. But when the value is set to
/// [ReplicationMode.physical] or [ReplicationMode.logical], the connection
/// The default value is [ReplicationMode.none]. But when the value is set to
/// [ReplicationMode.physical] or [ReplicationMode.logical], the connection
/// will be established in replication mode.
///
/// Please note, while in replication mode, only the Simple Query Protcol can
/// be used to execute queries.
/// be used to execute queries.
///
/// For more info, see [Streaming Replication Protocol]
///
/// [Streaming Replication Protocol]: https://www.postgresql.org/docs/current/protocol-replication.html
final ReplicationMode replicationMode;

PgSessionSettings({
this.connectTimeout,
this.timeZone,
this.onBadSslCertificate,
this.transformer,
this.replicationMode = ReplicationMode.none
});
PgSessionSettings(
{this.connectTimeout,
this.timeZone,
this.onBadSslCertificate,
this.transformer,
this.replicationMode = ReplicationMode.none});
}

final class PgPoolSettings {
Expand Down
42 changes: 26 additions & 16 deletions lib/src/binary_codec.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import 'dart:convert';
import 'dart:convert' hide json, utf8;
import 'dart:typed_data';

import 'package:buffer/buffer.dart';
import 'package:postgres/src/v3/types.dart';

import 'client_encoding.dart';
import 'types.dart';

final _bool0 = Uint8List(1)..[0] = 0;
Expand Down Expand Up @@ -34,8 +35,12 @@ final _trailingZerosRegExp = RegExp(r'0+$');
class PostgresBinaryEncoder<T extends Object>
extends Converter<T?, Uint8List?> {
final PgDataType<T> _dataType;
final ClientEncoding _clientEncoding;

const PostgresBinaryEncoder(this._dataType);
PostgresBinaryEncoder(
this._dataType, {
ClientEncoding? clientEncoding,
}) : _clientEncoding = clientEncoding ?? ClientEncoding.utf8;

@override
Uint8List? convert(Object? input) {
Expand Down Expand Up @@ -93,7 +98,7 @@ class PostgresBinaryEncoder<T extends Object>
case PgDataType.varChar:
{
if (input is String) {
return castBytes(utf8.encode(input));
return _clientEncoding.encodeString(input);
}
throw FormatException(
'Invalid type for parameter value. Expected: String Got: ${input.runtimeType}');
Expand Down Expand Up @@ -181,15 +186,15 @@ class PostgresBinaryEncoder<T extends Object>

case PgDataType.jsonb:
{
final jsonBytes = utf8.encode(json.encode(input));
final jsonBytes = _clientEncoding.encodeJson(input);
final writer = ByteDataWriter(bufferLength: jsonBytes.length + 1);
writer.writeUint8(1);
writer.write(jsonBytes);
return writer.toBytes();
}

case PgDataType.json:
return castBytes(utf8.encode(json.encode(input)));
return _clientEncoding.encodeJson(input);

case PgDataType.byteArray:
{
Expand Down Expand Up @@ -292,7 +297,7 @@ class PostgresBinaryEncoder<T extends Object>
case PgDataType.varCharArray:
{
if (input is List<String>) {
final bytesArray = input.map((v) => utf8.encode(v));
final bytesArray = input.map(_clientEncoding.encodeString);
return writeListBytes<List<int>>(bytesArray, 1043,
(item) => item.length, (writer, item) => writer.write(item));
}
Expand All @@ -303,7 +308,7 @@ class PostgresBinaryEncoder<T extends Object>
case PgDataType.textArray:
{
if (input is List<String>) {
final bytesArray = input.map((v) => utf8.encode(v));
final bytesArray = input.map(_clientEncoding.encodeString);
return writeListBytes<List<int>>(bytesArray, 25,
(item) => item.length, (writer, item) => writer.write(item));
}
Expand All @@ -324,7 +329,7 @@ class PostgresBinaryEncoder<T extends Object>
case PgDataType.jsonbArray:
{
if (input is List<Object>) {
final objectsArray = input.map((v) => utf8.encode(json.encode(v)));
final objectsArray = input.map(_clientEncoding.encodeJson);
return writeListBytes<List<int>>(
objectsArray, 3802, (item) => item.length + 1, (writer, item) {
writer.writeUint8(1);
Expand Down Expand Up @@ -439,9 +444,13 @@ class PostgresBinaryEncoder<T extends Object>
}

class PostgresBinaryDecoder<T> extends Converter<Uint8List?, T?> {
PostgresBinaryDecoder(this.type);

final PgDataType type;
final ClientEncoding _clientEncoding;

PostgresBinaryDecoder(
this.type, {
ClientEncoding? clientEncoding,
}) : _clientEncoding = clientEncoding ?? ClientEncoding.utf8;

@override
T? convert(Uint8List? input) {
Expand All @@ -456,7 +465,7 @@ class PostgresBinaryDecoder<T> extends Converter<Uint8List?, T?> {
case PostgreSQLDataType.name:
case PostgreSQLDataType.text:
case PostgreSQLDataType.varChar:
return utf8.decode(input) as T;
return _clientEncoding.decodeString(input) as T;
case PostgreSQLDataType.boolean:
return (buffer.getInt8(0) != 0) as T;
case PostgreSQLDataType.smallInteger:
Expand Down Expand Up @@ -493,11 +502,11 @@ class PostgresBinaryDecoder<T> extends Converter<Uint8List?, T?> {
// Removes version which is first character and currently always '1'
final bytes = input.buffer
.asUint8List(input.offsetInBytes + 1, input.lengthInBytes - 1);
return json.decode(utf8.decode(bytes)) as T;
return _clientEncoding.decodeJson(bytes) as T;
}

case PostgreSQLDataType.json:
return json.decode(utf8.decode(input)) as T;
return _clientEncoding.decodeJson(input) as T;

case PostgreSQLDataType.byteArray:
return input as T;
Expand Down Expand Up @@ -545,7 +554,8 @@ class PostgresBinaryDecoder<T> extends Converter<Uint8List?, T?> {
case PostgreSQLDataType.varCharArray:
case PostgreSQLDataType.textArray:
return readListBytes<String>(input, (reader, length) {
return utf8.decode(length > 0 ? reader.read(length) : []);
return _clientEncoding
.decodeString(length > 0 ? reader.read(length) : []);
}) as T;

case PostgreSQLDataType.doubleArray:
Expand All @@ -556,7 +566,7 @@ class PostgresBinaryDecoder<T> extends Converter<Uint8List?, T?> {
return readListBytes<dynamic>(input, (reader, length) {
reader.read(1);
final bytes = reader.read(length - 1);
return json.decode(utf8.decode(bytes));
return _clientEncoding.decodeJson(bytes);
}) as T;

case PostgreSQLDataType.unknownType:
Expand All @@ -566,7 +576,7 @@ class PostgresBinaryDecoder<T> extends Converter<Uint8List?, T?> {
// we just return the bytes and let the caller figure out what to
// do with it.
try {
return utf8.decode(input) as T;
return _clientEncoding.decodeString(input) as T;
} catch (_) {
return input as T;
}
Expand Down
23 changes: 23 additions & 0 deletions lib/src/client_encoding.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import 'dart:convert' as c;
import 'dart:typed_data';

import 'package:buffer/buffer.dart';

/// Describes the client encoding used by a connection.
class ClientEncoding {
/// The Dart [Encoding] object.
final c.Encoding encoding;

ClientEncoding(this.encoding);

static final utf8 = ClientEncoding(c.utf8);

/// The optimized (fused encoding to/from JSON and bytes)
late final _jsonEncoding = c.json.fuse(encoding);

Uint8List encodeString(String input) => castBytes(encoding.encode(input));
Uint8List encodeJson(Object? input) => castBytes(_jsonEncoding.encode(input));

String decodeString(List<int> encoded) => encoding.decode(encoded);
Object? decodeJson(List<int> encoded) => _jsonEncoding.decode(encoded);
}
8 changes: 5 additions & 3 deletions lib/src/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'package:buffer/buffer.dart';
import 'package:meta/meta.dart';
import 'auth/auth.dart';

import 'client_encoding.dart';
import 'client_messages.dart';
import 'execution_context.dart';
import 'message_window.dart';
Expand All @@ -19,9 +20,7 @@ import 'replication.dart';
import 'server_messages.dart';

part 'connection_fsm.dart';

part 'transaction_proxy.dart';

part 'exceptions.dart';

/// Instances of this class connect to and communicate with a PostgreSQL database.
Expand Down Expand Up @@ -117,6 +116,9 @@ class PostgreSQLConnection extends Object
/// [Streaming Replication Protocol]: https://www.postgresql.org/docs/current/protocol-replication.html
final ReplicationMode replicationMode;

@internal
final clientEncoding = ClientEncoding.utf8;

/// Stream of notification from the database.
///
/// Listen to this [Stream] to receive events from PostgreSQL NOTIFY commands.
Expand Down Expand Up @@ -317,7 +319,7 @@ class PostgreSQLConnection extends Object
// and the state node managing delivering data to the query no longer exists. Therefore,
// as soon as a close occurs, we detach the data stream from anything that actually does
// anything with that data.
_framer.addBytes(castBytes(bytes));
_framer.addBytes(castBytes(bytes), clientEncoding);
while (_framer.hasMessage) {
final msg = _framer.popMessage();
try {
Expand Down
Loading