From 68aef338461849a12b647cf67bcdb32c9e130bcc Mon Sep 17 00:00:00 2001 From: Robert Wagner Date: Mon, 11 Dec 2023 12:15:30 -0500 Subject: [PATCH] configurable schemaid framing --- .../schema_registry/__init__.py | 45 +++++++- src/confluent_kafka/schema_registry/avro.py | 106 ++++++++++++++---- .../schema_registry/json_schema.py | 103 +++++++++++++---- .../schema_registry/protobuf.py | 81 +++++++++---- 4 files changed, 266 insertions(+), 69 deletions(-) diff --git a/src/confluent_kafka/schema_registry/__init__.py b/src/confluent_kafka/schema_registry/__init__.py index e9a5a17d4..43c53b81c 100644 --- a/src/confluent_kafka/schema_registry/__init__.py +++ b/src/confluent_kafka/schema_registry/__init__.py @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import struct + from .schema_registry_client import (RegisteredSchema, Schema, SchemaRegistryClient, @@ -30,7 +32,10 @@ "SchemaReference", "topic_subject_name_strategy", "topic_record_subject_name_strategy", - "record_subject_name_strategy"] + "record_subject_name_strategy", + "confluent_payload_framing", + "apicurio_payload_framing", + ] def topic_subject_name_strategy(ctx, record_name): @@ -87,3 +92,41 @@ def reference_subject_name_strategy(ctx, schema_ref): """ return schema_ref.name + +def confluent_payload_framing(ctx): + def reader(payload): + if len(payload) <= 5: + raise SerializationError("Expecting data framing of length 6 bytes or " + "more but total data size is {} bytes. This " + "message was not produced with a Confluent " + "Schema Registry serializer".format(len(data))) + magic, schema_id = struct.unpack('>bI', payload.read(5)) + if magic != _MAGIC_BYTE: + raise SerializationError("Unexpected magic byte {}. This message " + "was not produced with a Confluent " + "Schema Registry serializer".format(magic)) + return schema_id + + def writer(fo, schema_id): + fo.write(struct.pack('>bI', _MAGIC_BYTE, schema_id)) + + return reader, writer + +def apicurio_payload_framing(ctx): + def reader(payload): + if len(payload) <= 9: + raise SerializationError("Expecting data framing of length 10 bytes or " + "more but total data size is {} bytes. This " + "message was not produced with an Apicurio " + "Schema Registry serializer".format(len(data))) + magic, schema_id = struct.unpack('>bq', payload.read(9)) + if magic != _MAGIC_BYTE: + raise SerializationError("Unexpected magic byte {}. This message " + "was not produced with an Apicurio " + "Schema Registry serializer".format(magic)) + return schema_id + + def writer(fo, schema_id): + fo.write(struct.pack('>bq', _MAGIC_BYTE, schema_id)) + + return reader, writer diff --git a/src/confluent_kafka/schema_registry/avro.py b/src/confluent_kafka/schema_registry/avro.py index 9b5209909..9ecd9a2df 100644 --- a/src/confluent_kafka/schema_registry/avro.py +++ b/src/confluent_kafka/schema_registry/avro.py @@ -17,15 +17,14 @@ from io import BytesIO from json import loads -from struct import pack, unpack from fastavro import (parse_schema, schemaless_reader, schemaless_writer) -from . import (_MAGIC_BYTE, - Schema, - topic_subject_name_strategy) +from . import (Schema, + topic_subject_name_strategy, + confluent_payload_framing) from confluent_kafka.serialization import (Deserializer, SerializationError, Serializer) @@ -87,7 +86,7 @@ def _resolve_named_schema(schema, schema_registry_client, named_schemas=None): class AvroSerializer(Serializer): """ - Serializer that outputs Avro binary encoded data with Confluent Schema Registry framing. + Serializer that outputs Avro binary encoded data. Configuration properties: @@ -123,6 +122,17 @@ class AvroSerializer(Serializer): | | | | | | | Defaults to topic_subject_name_strategy. | +---------------------------+----------+--------------------------------------------------+ + | | | Callable(SerializationContext) -> Tuple( | + | | | Callable(bytes) -> int # Reader | + | | | Callable(BytesIO, int) -> None # Writer | + | | | ) | + | | | | + | ``schemaid.location`` | callable | Define how the schemaid is embedded in the kafka | + | | | Message. Standard locations are defined in the | + | | | confluent_kafka.schema_registry namespace. | + | | | | + | | | Defaults to confluent_payload_framing. | + +---------------------------+----------+--------------------------------------------------+ Schemas are registered against subject names in Confluent Schema Registry that define a scope in which the schemas can be evolved. By default, the subject name @@ -148,6 +158,16 @@ class AvroSerializer(Serializer): See `Subject name strategy `_ for additional details. + Supported schemaid locations: + + +--------------------------------------+-------------------------------------+ + | Schema ID Location | Output Format | + +======================================+=====================================+ + | confluent_payload_framing(default) | magic byte {0} + 4 byte unsigned id | + +--------------------------------------+-------------------------------------+ + | apicurio_payload_framing | magic byte {0} + 8 byte signed id | + +--------------------------------------+-------------------------------------+ + Note: Prior to serialization, all values must first be converted to a dict instance. This may handled manually prior to calling @@ -177,12 +197,15 @@ class AvroSerializer(Serializer): __slots__ = ['_hash', '_auto_register', '_normalize_schemas', '_use_latest_version', '_known_subjects', '_parsed_schema', '_registry', '_schema', '_schema_id', '_schema_name', - '_subject_name_func', '_to_dict', '_named_schemas'] + '_subject_name_func', '_to_dict', '_named_schemas', + '_schemaid_location_func'] _default_conf = {'auto.register.schemas': True, 'normalize.schemas': False, 'use.latest.version': False, - 'subject.name.strategy': topic_subject_name_strategy} + 'subject.name.strategy': topic_subject_name_strategy, + 'schemaid.location': confluent_payload_framing, + } def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None): if isinstance(schema_str, str): @@ -224,6 +247,10 @@ def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None): if not callable(self._subject_name_func): raise ValueError("subject.name.strategy must be callable") + self._schemaid_location_func = conf_copy.pop('schemaid.location') + if not callable(self._schemaid_location_func): + raise ValueError("schemaid.location must be callable") + if len(conf_copy) > 0: raise ValueError("Unrecognized properties: {}" .format(", ".join(conf_copy.keys()))) @@ -301,9 +328,10 @@ def __call__(self, obj, ctx): else: value = obj + _, schemaid_writer = self._schemaid_location_func(ctx) + with _ContextStringIO() as fo: - # Write the magic byte and schema ID in network byte order (big endian) - fo.write(pack('>bI', _MAGIC_BYTE, self._schema_id)) + schemaid_writer(fo, self._schema_id) # write the record to the rest of the buffer schemaless_writer(fo, self._parsed_schema, value) @@ -312,8 +340,34 @@ def __call__(self, obj, ctx): class AvroDeserializer(Deserializer): """ - Deserializer for Avro binary encoded data with Confluent Schema Registry - framing. + Deserializer for Avro binary encoded data. + + Configuration properties: + + +---------------------------+----------+--------------------------------------------------+ + | Property Name | Type | Description | + +===========================+==========+==================================================+ + | | | Callable(SerializationContext) -> Tuple( | + | | | Callable(bytes) -> int # Reader | + | | | Callable(BytesIO, int) -> None # Writer | + | | | ) | + | | | | + | ``schemaid.location`` | callable | Define how the schemaid is embedded in the kafka | + | | | Message. Standard locations are defined in the | + | | | confluent_kafka.schema_registry namespace. | + | | | | + | | | Defaults to confluent_payload_framing. | + +---------------------------+----------+--------------------------------------------------+ + + Supported schemaid locations: + + +--------------------------------------+-------------------------------------+ + | Schema ID Location | Output Format | + +======================================+=====================================+ + | confluent_payload_framing(default) | magic byte {0} + 4 byte unsigned id | + +--------------------------------------+-------------------------------------+ + | apicurio_payload_framing | magic byte {0} + 8 byte signed id | + +--------------------------------------+-------------------------------------+ Note: By default, Avro complex types are returned as dicts. This behavior can @@ -347,9 +401,22 @@ class AvroDeserializer(Deserializer): """ __slots__ = ['_reader_schema', '_registry', '_from_dict', '_writer_schemas', '_return_record_name', '_schema', - '_named_schemas'] + '_named_schemas', '_schemaid_location_func'] + + _default_conf = { + 'schemaid.location': confluent_payload_framing, + } + + def __init__(self, schema_registry_client, schema_str=None, from_dict=None, return_record_name=False, conf=None): + conf_copy = self._default_conf.copy() + if conf is not None: + conf_copy.update(conf) + + self._schemaid_location_func = conf_copy.pop('schemaid.location') + if len(conf_copy) > 0: + raise ValueError("Unrecognized properties: {}" + .format(", ".join(conf_copy.keys()))) - def __init__(self, schema_registry_client, schema_str=None, from_dict=None, return_record_name=False): schema = None if schema_str is not None: if isinstance(schema_str, str): @@ -403,19 +470,10 @@ def __call__(self, data, ctx): if data is None: return None - if len(data) <= 5: - raise SerializationError("Expecting data framing of length 6 bytes or " - "more but total data size is {} bytes. This " - "message was not produced with a Confluent " - "Schema Registry serializer".format(len(data))) + schemaid_reader, _ = self._schemaid_location_func(ctx) with _ContextStringIO(data) as payload: - magic, schema_id = unpack('>bI', payload.read(5)) - if magic != _MAGIC_BYTE: - raise SerializationError("Unexpected magic byte {}. This message " - "was not produced with a Confluent " - "Schema Registry serializer".format(magic)) - + schema_id = schemaid_reader(payload) writer_schema = self._writer_schemas.get(schema_id, None) if writer_schema is None: diff --git a/src/confluent_kafka/schema_registry/json_schema.py b/src/confluent_kafka/schema_registry/json_schema.py index 656937c24..9247b0991 100644 --- a/src/confluent_kafka/schema_registry/json_schema.py +++ b/src/confluent_kafka/schema_registry/json_schema.py @@ -18,13 +18,13 @@ from io import BytesIO import json -import struct from jsonschema import validate, ValidationError, RefResolver -from confluent_kafka.schema_registry import (_MAGIC_BYTE, - Schema, - topic_subject_name_strategy) +from confluent_kafka.schema_registry import (Schema, + topic_subject_name_strategy, + confluent_payload_framing, + ) from confluent_kafka.serialization import (SerializationError, Deserializer, Serializer) @@ -104,6 +104,18 @@ class JSONSerializer(Serializer): | | | | | | | Defaults to topic_subject_name_strategy. | +---------------------------+----------+----------------------------------------------------+ + | | | Callable(SerializationContext) -> Tuple( | + | | | Callable(bytes) -> int # Reader | + | | | Callable(BytesIO, int) -> None # Writer | + | | | ) | + | | | | + | ``schemaid.location`` | callable | Define how the schemaid is embedded in the kafka | + | | | Message. Standard locations are defined in the | + | | | confluent_kafka.schema_registry namespace. | + | | | | + | | | Defaults to confluent_payload_framing. | + +---------------------------+----------+----------------------------------------------------+ + Schemas are registered against subject names in Confluent Schema Registry that define a scope in which the schemas can be evolved. By default, the subject name @@ -129,6 +141,16 @@ class JSONSerializer(Serializer): See `Subject name strategy `_ for additional details. + Supported schemaid locations: + + +--------------------------------------+-------------------------------------+ + | Schema ID Location | Output Format | + +======================================+=====================================+ + | confluent_payload_framing(default) | magic byte {0} + 4 byte unsigned id | + +--------------------------------------+-------------------------------------+ + | apicurio_payload_framing | magic byte {0} + 8 byte signed id | + +--------------------------------------+-------------------------------------+ + Notes: The ``title`` annotation, referred to elsewhere as a record name is not strictly required by the JSON Schema specification. It is @@ -157,12 +179,15 @@ class JSONSerializer(Serializer): """ # noqa: E501 __slots__ = ['_hash', '_auto_register', '_normalize_schemas', '_use_latest_version', '_known_subjects', '_parsed_schema', '_registry', '_schema', '_schema_id', - '_schema_name', '_subject_name_func', '_to_dict', '_are_references_provided'] + '_schema_name', '_subject_name_func', '_to_dict', '_are_references_provided', + '_schemaid_location_func'] _default_conf = {'auto.register.schemas': True, 'normalize.schemas': False, 'use.latest.version': False, - 'subject.name.strategy': topic_subject_name_strategy} + 'subject.name.strategy': topic_subject_name_strategy, + 'schemaid.location': confluent_payload_framing, + } def __init__(self, schema_str, schema_registry_client, to_dict=None, conf=None): self._are_references_provided = False @@ -205,6 +230,10 @@ def __init__(self, schema_str, schema_registry_client, to_dict=None, conf=None): self._subject_name_func = conf_copy.pop('subject.name.strategy') if not callable(self._subject_name_func): raise ValueError("subject.name.strategy must be callable") + + self._schemaid_location_func = conf_copy.pop('schemaid.location') + if not callable(self._schemaid_location_func): + raise ValueError("schemaid.location must be callable") if len(conf_copy) > 0: raise ValueError("Unrecognized properties: {}" @@ -280,9 +309,10 @@ def __call__(self, obj, ctx): except ValidationError as ve: raise SerializationError(ve.message) + _, schemaid_writer = self._schemaid_location_func(ctx) + with _ContextStringIO() as fo: - # Write the magic byte and schema ID in network byte order (big endian) - fo.write(struct.pack('>bI', _MAGIC_BYTE, self._schema_id)) + schemaid_writer(fo, self._schema_id) # JSON dump always writes a str never bytes # https://docs.python.org/3/library/json.html fo.write(json.dumps(value).encode('utf8')) @@ -295,6 +325,33 @@ class JSONDeserializer(Deserializer): Deserializer for JSON encoded data with Confluent Schema Registry framing. + Configuration properties: + + +---------------------------+----------+--------------------------------------------------+ + | Property Name | Type | Description | + +===========================+==========+==================================================+ + | | | Callable(SerializationContext) -> Tuple( | + | | | Callable(bytes) -> int # Reader | + | | | Callable(BytesIO, int) -> None # Writer | + | | | ) | + | | | | + | ``schemaid.location`` | callable | Define how the schemaid is embedded in the kafka | + | | | Message. Standard locations are defined in the | + | | | confluent_kafka.schema_registry namespace. | + | | | | + | | | Defaults to confluent_payload_framing. | + +---------------------------+----------+--------------------------------------------------+ + + Supported schemaid locations: + + +--------------------------------------+-------------------------------------+ + | Schema ID Location | Output Format | + +======================================+=====================================+ + | confluent_payload_framing(default) | magic byte {0} + 4 byte unsigned id | + +--------------------------------------+-------------------------------------+ + | apicurio_payload_framing | magic byte {0} + 8 byte signed id | + +--------------------------------------+-------------------------------------+ + Args: schema_str (str, Schema): `JSON schema definition `_ @@ -308,9 +365,22 @@ class JSONDeserializer(Deserializer): schema_registry_client (SchemaRegistryClient, optional): Schema Registry client instance. Needed if ``schema_str`` is a schema referencing other schemas. """ # noqa: E501 - __slots__ = ['_parsed_schema', '_from_dict', '_registry', '_are_references_provided', '_schema'] + __slots__ = ['_parsed_schema', '_from_dict', '_registry', '_are_references_provided', '_schema', '_schemaid_location_func'] + + _default_conf = { + 'schemaid.location': confluent_payload_framing, + } + + def __init__(self, schema_str, from_dict=None, schema_registry_client=None, conf=None): + conf_copy = self._default_conf.copy() + if conf is not None: + conf_copy.update(conf) + + self._schemaid_location_func = conf_copy.pop('schemaid.location') + if len(conf_copy) > 0: + raise ValueError("Unrecognized properties: {}" + .format(", ".join(conf_copy.keys()))) - def __init__(self, schema_str, from_dict=None, schema_registry_client=None): self._are_references_provided = False if isinstance(schema_str, str): schema = Schema(schema_str, schema_type="JSON") @@ -354,19 +424,10 @@ def __call__(self, data, ctx): if data is None: return None - if len(data) <= 5: - raise SerializationError("Expecting data framing of length 6 bytes or " - "more but total data size is {} bytes. This " - "message was not produced with a Confluent " - "Schema Registry serializer".format(len(data))) + schemaid_reader, _ = self._schemaid_location_func(ctx) with _ContextStringIO(data) as payload: - magic, schema_id = struct.unpack('>bI', payload.read(5)) - if magic != _MAGIC_BYTE: - raise SerializationError("Unexpected magic byte {}. This message " - "was not produced with a Confluent " - "Schema Registry serializer".format(magic)) - + schema_id = schemaid_reader(payload) # JSON documents are self-describing; no need to query schema obj_dict = json.loads(payload.read()) diff --git a/src/confluent_kafka/schema_registry/protobuf.py b/src/confluent_kafka/schema_registry/protobuf.py index b1de06799..7e7a00f6d 100644 --- a/src/confluent_kafka/schema_registry/protobuf.py +++ b/src/confluent_kafka/schema_registry/protobuf.py @@ -18,16 +18,15 @@ import io import sys import base64 -import struct import warnings from collections import deque from google.protobuf.message import DecodeError from google.protobuf.message_factory import MessageFactory -from . import (_MAGIC_BYTE, - reference_subject_name_strategy, - topic_subject_name_strategy,) +from . import (reference_subject_name_strategy, + topic_subject_name_strategy, + confluent_payload_framing) from .schema_registry_client import (Schema, SchemaReference) from confluent_kafka.serialization import SerializationError @@ -130,8 +129,7 @@ def _schema_to_str(file_descriptor): class ProtobufSerializer(object): """ - Serializer for Protobuf Message derived classes. Serialization format is Protobuf, - with Confluent Schema Registry framing. + Serializer for Protobuf Message derived classes. Serialization format is Protobuf. Configuration properties: @@ -195,6 +193,17 @@ class ProtobufSerializer(object): | | | Warning: This configuration property will be removed | | | | in a future version of the client. | +-------------------------------------+----------+------------------------------------------------------+ + | | | Callable(SerializationContext) -> Tuple( | + | | | Callable(bytes) -> int # Reader | + | | | Callable(BytesIO, int) -> None # Writer | + | | | ) | + | | | | + | ``schemaid.location`` | callable | Define how the schemaid is embedded in the kafka | + | | | Message. Standard locations are defined in the | + | | | confluent_kafka.schema_registry namespace. | + | | | | + | | | Defaults to confluent_payload_framing. | + +-------------------------------------+----------+------------------------------------------------------+ Schemas are registered against subject names in Confluent Schema Registry that define a scope in which the schemas can be evolved. By default, the subject name @@ -220,6 +229,16 @@ class ProtobufSerializer(object): See `Subject name strategy `_ for additional details. + Supported schemaid locations: + + +--------------------------------------+-------------------------------------+ + | Schema ID Location | Output Format | + +======================================+=====================================+ + | confluent_payload_framing(default) | magic byte {0} + 4 byte unsigned id | + +--------------------------------------+-------------------------------------+ + | apicurio_payload_framing | magic byte {0} + 8 byte signed id | + +--------------------------------------+-------------------------------------+ + Args: msg_type (GeneratedProtocolMessageType): Protobuf Message type. @@ -233,7 +252,8 @@ class ProtobufSerializer(object): """ # noqa: E501 __slots__ = ['_auto_register', '_normalize_schemas', '_use_latest_version', '_skip_known_types', '_registry', '_known_subjects', '_msg_class', '_index_array', '_schema', '_schema_id', - '_ref_reference_subject_func', '_subject_name_func', '_use_deprecated_format'] + '_ref_reference_subject_func', '_subject_name_func', '_use_deprecated_format', + '_schemaid_location_func'] _default_conf = { 'auto.register.schemas': True, @@ -243,6 +263,7 @@ class ProtobufSerializer(object): 'subject.name.strategy': topic_subject_name_strategy, 'reference.subject.name.strategy': reference_subject_name_strategy, 'use.deprecated.format': False, + 'schemaid.location': confluent_payload_framing, } def __init__(self, msg_type, schema_registry_client, conf=None): @@ -410,6 +431,7 @@ def __call__(self, message, ctx): subject = self._subject_name_func(ctx, message.DESCRIPTOR.full_name) + _, schemaid_writer = self._schemaid_location_func(ctx) if subject not in self._known_subjects: if self._use_latest_version: @@ -431,9 +453,7 @@ def __call__(self, message, ctx): self._known_subjects.add(subject) with _ContextStringIO() as fo: - # Write the magic byte and schema ID in network byte order - # (big endian) - fo.write(struct.pack('>bI', _MAGIC_BYTE, self._schema_id)) + schemaid_writer(fo, self._schema_id) # write the index array that specifies the message descriptor # of the serialized data. self._encode_varints(fo, self._index_array, @@ -445,7 +465,7 @@ def __call__(self, message, ctx): class ProtobufDeserializer(object): """ - Deserializer for Protobuf serialized data with Confluent Schema Registry framing. + Deserializer for Protobuf serialized data. Args: message_type (Message derived type): Protobuf Message type. @@ -467,16 +487,37 @@ class ProtobufDeserializer(object): | | | Warning: This configuration property will be removed | | | | in a future version of the client. | +-------------------------------------+----------+------------------------------------------------------+ + | | | Callable(SerializationContext) -> Tuple( | + | | | Callable(bytes) -> int # Reader | + | | | Callable(BytesIO, int) -> None # Writer | + | | | ) | + | | | | + | ``schemaid.location`` | callable | Define how the schemaid is embedded in the kafka | + | | | Message. Standard locations are defined in the | + | | | confluent_kafka.schema_registry namespace. | + | | | | + | | | Defaults to confluent_payload_framing. | + +-------------------------------------+----------+------------------------------------------------------+ + + Supported schemaid locations: + +--------------------------------------+-------------------------------------+ + | Schema ID Location | Output Format | + +======================================+=====================================+ + | confluent_payload_framing(default) | magic byte {0} + 4 byte unsigned id | + +--------------------------------------+-------------------------------------+ + | apicurio_payload_framing | magic byte {0} + 8 byte signed id | + +--------------------------------------+-------------------------------------+ See Also: `Protobuf API reference `_ """ - __slots__ = ['_msg_class', '_index_array', '_use_deprecated_format'] + __slots__ = ['_msg_class', '_index_array', '_use_deprecated_format', '_schemaid_location_func'] _default_conf = { 'use.deprecated.format': False, + 'schemaid.location': confluent_payload_framing, } def __init__(self, message_type, conf=None): @@ -508,6 +549,8 @@ def __init__(self, message_type, conf=None): "consumers to 'use.deprecated.format':False as " "soon as possible") + self._schemaid_location_func = conf_copy.pop('schemaid.location') + descriptor = message_type.DESCRIPTOR self._index_array = _create_index_array(descriptor) self._msg_class = MessageFactory().GetPrototype(descriptor) @@ -614,19 +657,11 @@ def __call__(self, data, ctx): if data is None: return None - # SR wire protocol + msg_index length - if len(data) < 6: - raise SerializationError("Expecting data framing of length 6 bytes or " - "more but total data size is {} bytes. This " - "message was not produced with a Confluent " - "Schema Registry serializer".format(len(data))) + schemaid_reader, _ = self._schemaid_location_func(ctx) + # SR wire protocol + msg_index length with _ContextStringIO(data) as payload: - magic, schema_id = struct.unpack('>bI', payload.read(5)) - if magic != _MAGIC_BYTE: - raise SerializationError("Unknown magic byte. This message was " - "not produced with a Confluent " - "Schema Registry serializer") + schema_id = schemaid_reader(payload) # Protobuf Messages are self-describing; no need to query schema _ = self._read_index_array(payload, zigzag=not self._use_deprecated_format)