Skip to content

Commit

Permalink
Parsing server messages with provided encoding. (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos authored Sep 29, 2023
1 parent c19af6c commit 13c77cd
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 205 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 2.6.3

- Refactor: replaced `UTF8BackedString` with generic encoding (not complete).
- Breaking change in `package:postgres/messages.dart`: default constructors made internal, parsing is done with more efficient reader.

## 2.6.2

Expand Down
18 changes: 18 additions & 0 deletions lib/src/buffer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,21 @@ class PgByteDataWriter extends ByteDataWriter {
writeInt8(0);
}
}

const _emptyString = '';

class PgByteDataReader extends ByteDataReader {
final Encoding encoding;

PgByteDataReader({
this.encoding = utf8,
});

String readNullTerminatedString() {
final bytes = readUntilTerminatingByte(0);
if (bytes.isEmpty) {
return _emptyString;
}
return encoding.decode(bytes);
}
}
128 changes: 44 additions & 84 deletions lib/src/logical_replication_messages.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import 'dart:convert';
import 'dart:typed_data';

import 'package:buffer/buffer.dart';
import 'package:postgres/src/buffer.dart';

import 'binary_codec.dart';
import 'server_messages.dart';
Expand Down Expand Up @@ -48,56 +48,53 @@ class XLogDataLogicalMessage implements XLogDataMessage {
/// Tries to check if the [bytesList] is a [LogicalReplicationMessage]. If so,
/// [LogicalReplicationMessage] is returned, otherwise `null` is returned.
LogicalReplicationMessage? tryParseLogicalReplicationMessage(
Uint8List bytesList) {
PgByteDataReader reader, int length) {
// the first byte is the msg type
final firstByte = bytesList.first;
final firstByte = reader.readUint8();
final msgType = LogicalReplicationMessageTypes.fromByte(firstByte);
// remaining bytes are the data
final bytes = bytesList.sublist(1);
switch (msgType) {
case LogicalReplicationMessageTypes.Begin:
return BeginMessage(bytes);
return BeginMessage._parse(reader);

case LogicalReplicationMessageTypes.Commit:
return CommitMessage(bytes);
return CommitMessage._parse(reader);

case LogicalReplicationMessageTypes.Origin:
return OriginMessage(bytes);
return OriginMessage._parse(reader);

case LogicalReplicationMessageTypes.Relation:
return RelationMessage(bytes);
return RelationMessage._parse(reader);

case LogicalReplicationMessageTypes.Type:
return TypeMessage(bytes);
return TypeMessage._parse(reader);

case LogicalReplicationMessageTypes.Insert:
return InsertMessage(bytes);
return InsertMessage._parse(reader);

case LogicalReplicationMessageTypes.Update:
return UpdateMessage(bytes);
return UpdateMessage._parse(reader);

case LogicalReplicationMessageTypes.Delete:
return DeleteMessage(bytes);
return DeleteMessage._parse(reader);

case LogicalReplicationMessageTypes.Truncate:
return TruncateMessage(bytes);
return TruncateMessage._parse(reader);

case LogicalReplicationMessageTypes.Unsupported:
// note this needs the full set of bytes unlike other cases
return _tryParseJsonMessage(bytesList);
}
}

LogicalReplicationMessage? _tryParseJsonMessage(Uint8List bytes) {
// wal2json messages starts with `{` as the first byte
if (bytes.first == '{'.codeUnits.first) {
try {
return JsonMessage(utf8.decode(bytes));
} catch (e) {
// wal2json messages starts with `{` as the first byte
if (firstByte == '{'.codeUnits.single) {
// note this needs the full set of bytes unlike other cases
final bb = BytesBuffer();
bb.addByte(firstByte);
bb.add(reader.read(length - 1));
try {
return JsonMessage(reader.encoding.decode(bb.toBytes()));
} catch (_) {
// ignore
}
}
return null;
}
}
return null;
}

enum LogicalReplicationMessageTypes {
Expand Down Expand Up @@ -165,8 +162,7 @@ class BeginMessage implements LogicalReplicationMessage {
/// The transaction id
late final int xid;

BeginMessage(Uint8List bytes) {
final reader = ByteDataReader()..add(bytes);
BeginMessage._parse(PgByteDataReader reader) {
// reading order matters
finalLSN = reader.readLSN();
commitTime = reader.readTime();
Expand Down Expand Up @@ -195,8 +191,7 @@ class CommitMessage implements LogicalReplicationMessage {
/// The commit timestamp of the transaction.
late final DateTime commitTime;

CommitMessage(Uint8List bytes) {
final reader = ByteDataReader()..add(bytes);
CommitMessage._parse(PgByteDataReader reader) {
// reading order matters
flags = reader.readUint8();
commitLSN = reader.readLSN();
Expand All @@ -219,11 +214,10 @@ class OriginMessage implements LogicalReplicationMessage {

late final String name;

OriginMessage(Uint8List bytes) {
final reader = ByteDataReader()..add(bytes);
OriginMessage._parse(PgByteDataReader reader) {
// reading order matters
commitLSN = reader.readLSN();
name = reader.decodeString();
name = reader.readNullTerminatedString();
}

@override
Expand Down Expand Up @@ -266,19 +260,18 @@ class RelationMessage implements LogicalReplicationMessage {
late final int columnNum;
late final columns = <RelationMessageColumn>[];

RelationMessage(Uint8List bytes) {
final reader = ByteDataReader()..add(bytes);
RelationMessage._parse(PgByteDataReader reader) {
// reading order matters
relationID = reader.readUint32();
nameSpace = reader.decodeString();
relationName = reader.decodeString();
nameSpace = reader.readNullTerminatedString();
relationName = reader.readNullTerminatedString();
replicaIdentity = reader.readUint8();
columnNum = reader.readUint16();

for (var i = 0; i < columnNum; i++) {
// reading order matters
final flags = reader.readUint8();
final name = reader.decodeString();
final name = reader.readNullTerminatedString();
final dataType = reader.readUint32();
final typeModifier = reader.readUint32();
columns.add(
Expand Down Expand Up @@ -310,12 +303,11 @@ class TypeMessage implements LogicalReplicationMessage {

late final String name;

TypeMessage(Uint8List bytes) {
final reader = ByteDataReader()..add(bytes);
TypeMessage._parse(PgByteDataReader reader) {
// reading order matters
dataType = reader.readUint32();
nameSpace = reader.decodeString();
name = reader.decodeString();
nameSpace = reader.readNullTerminatedString();
name = reader.readNullTerminatedString();
}

@override
Expand Down Expand Up @@ -380,31 +372,31 @@ class TupleData {
/// TupleData does not consume the entire bytes
///
/// It'll read until the types are generated.
TupleData(ByteDataReader reader) {
TupleData(PgByteDataReader reader) {
columnNum = reader.readUint16();
for (var i = 0; i < columnNum; i++) {
// reading order matters
final dataType = reader.readUint8();
final tupleDataType = TupleDataType.fromByte(dataType);
late final int length;
late final Uint8List data;
late final String data;
switch (tupleDataType) {
case TupleDataType.textType:
case TupleDataType.binaryType:
length = reader.readUint32();
data = reader.read(length);
data = reader.encoding.decode(reader.read(length));
break;
case TupleDataType.nullType:
case TupleDataType.toastType:
length = 0;
data = Uint8List(0);
data = '';
break;
}
columns.add(
TupleDataColumn(
dataType: dataType,
length: length,
data: utf8.decode(data),
data: data,
),
);
}
Expand All @@ -422,8 +414,7 @@ class InsertMessage implements LogicalReplicationMessage {
late final int relationID;
late final TupleData tuple;

InsertMessage(Uint8List bytes) {
final reader = ByteDataReader()..add(bytes);
InsertMessage._parse(PgByteDataReader reader) {
relationID = reader.readUint32();
final tupleType = reader.readUint8();
if (tupleType != 'N'.codeUnitAt(0)) {
Expand Down Expand Up @@ -483,8 +474,7 @@ class UpdateMessage implements LogicalReplicationMessage {
/// Byte1('N'): Identifies the following TupleData message as a new tuple.
late final TupleData? newTuple;

UpdateMessage(Uint8List bytes) {
final reader = ByteDataReader()..add(bytes);
UpdateMessage._parse(PgByteDataReader reader) {
// reading order matters
relationID = reader.readUint32();
var tupleType = UpdateMessageTuple.fromByte(reader.readUint8());
Expand Down Expand Up @@ -556,8 +546,7 @@ class DeleteMessage implements LogicalReplicationMessage {
/// Byte1('N'): Identifies the following TupleData message as a new tuple.
late final TupleData oldTuple;

DeleteMessage(Uint8List bytes) {
final reader = ByteDataReader()..add(bytes);
DeleteMessage._parse(PgByteDataReader reader) {
relationID = reader.readUint32();
oldTupleType = DeleteMessageTuple.fromByte(reader.readUint8());

Expand Down Expand Up @@ -601,8 +590,7 @@ class TruncateMessage implements LogicalReplicationMessage {

final relationIds = <int>[];

TruncateMessage(Uint8List bytes) {
final reader = ByteDataReader()..add(bytes);
TruncateMessage._parse(PgByteDataReader reader) {
relationNum = reader.readUint32();
option = TruncateOptions.fromValue(reader.readUint8());
for (var i = 0; i < relationNum; i++) {
Expand All @@ -625,32 +613,4 @@ extension on ByteDataReader {
DateTime readTime() {
return dateTimeFromMicrosecondsSinceY2k(readUint64());
}

/// Decodes a string from reader current offset
///
/// String type definition: https://www.postgresql.org/docs/current/protocol-message-types.html
/// String(s)
/// A null-terminated string (C-style string). There is no specific length limitation on strings.
/// If s is specified it is the exact value that will appear, otherwise the value is variable.
/// Eg. String, String("user").
///
/// If there is no null byte, return empty string.
String decodeString() {
var foundNullByte = false;
final string = <int>[];
while (remainingLength > 0) {
final byte = readUint8();
if (byte == 0) {
foundNullByte = true;
break;
}
string.add(byte);
}

if (!foundNullByte) {
return '';
}

return utf8.decode(string);
}
}
Loading

0 comments on commit 13c77cd

Please sign in to comment.