Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema Registry Serdes configurable schemaid location #1689

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/confluent_kafka/schema_registry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import struct

from .schema_registry_client import (RegisteredSchema,
Schema,
SchemaRegistryClient,
Expand All @@ -30,7 +32,10 @@
"SchemaReference",
"topic_subject_name_strategy",
"topic_record_subject_name_strategy",
"record_subject_name_strategy"]
"record_subject_name_strategy",
"confluent_payload_framing",
"apicurio_payload_framing",
]


def topic_subject_name_strategy(ctx, record_name):
Expand Down Expand Up @@ -87,3 +92,41 @@ def reference_subject_name_strategy(ctx, schema_ref):

"""
return schema_ref.name

def confluent_payload_framing(ctx):
def reader(payload):
if len(payload) <= 5:
raise SerializationError("Expecting data framing of length 6 bytes or "
"more but total data size is {} bytes. This "
"message was not produced with a Confluent "
"Schema Registry serializer".format(len(data)))
magic, schema_id = struct.unpack('>bI', payload.read(5))
if magic != _MAGIC_BYTE:
raise SerializationError("Unexpected magic byte {}. This message "
"was not produced with a Confluent "
"Schema Registry serializer".format(magic))
return schema_id

def writer(fo, schema_id):
fo.write(struct.pack('>bI', _MAGIC_BYTE, schema_id))

return reader, writer

def apicurio_payload_framing(ctx):
def reader(payload):
if len(payload) <= 9:
raise SerializationError("Expecting data framing of length 10 bytes or "
"more but total data size is {} bytes. This "
"message was not produced with an Apicurio "
"Schema Registry serializer".format(len(data)))
magic, schema_id = struct.unpack('>bq', payload.read(9))
if magic != _MAGIC_BYTE:
raise SerializationError("Unexpected magic byte {}. This message "
"was not produced with an Apicurio "
"Schema Registry serializer".format(magic))
return schema_id

def writer(fo, schema_id):
fo.write(struct.pack('>bq', _MAGIC_BYTE, schema_id))

return reader, writer
106 changes: 82 additions & 24 deletions src/confluent_kafka/schema_registry/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

from io import BytesIO
from json import loads
from struct import pack, unpack

from fastavro import (parse_schema,
schemaless_reader,
schemaless_writer)

from . import (_MAGIC_BYTE,
Schema,
topic_subject_name_strategy)
from . import (Schema,
topic_subject_name_strategy,
confluent_payload_framing)
from confluent_kafka.serialization import (Deserializer,
SerializationError,
Serializer)
Expand Down Expand Up @@ -87,7 +86,7 @@ def _resolve_named_schema(schema, schema_registry_client, named_schemas=None):

class AvroSerializer(Serializer):
"""
Serializer that outputs Avro binary encoded data with Confluent Schema Registry framing.
Serializer that outputs Avro binary encoded data.

Configuration properties:

Expand Down Expand Up @@ -123,6 +122,17 @@ class AvroSerializer(Serializer):
| | | |
| | | Defaults to topic_subject_name_strategy. |
+---------------------------+----------+--------------------------------------------------+
| | | Callable(SerializationContext) -> Tuple( |
| | | Callable(bytes) -> int # Reader |
| | | Callable(BytesIO, int) -> None # Writer |
| | | ) |
| | | |
| ``schemaid.location`` | callable | Define how the schemaid is embedded in the kafka |
| | | Message. Standard locations are defined in the |
| | | confluent_kafka.schema_registry namespace. |
| | | |
| | | Defaults to confluent_payload_framing. |
+---------------------------+----------+--------------------------------------------------+

Schemas are registered against subject names in Confluent Schema Registry that
define a scope in which the schemas can be evolved. By default, the subject name
Expand All @@ -148,6 +158,16 @@ class AvroSerializer(Serializer):

See `Subject name strategy <https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy>`_ for additional details.

Supported schemaid locations:

+--------------------------------------+-------------------------------------+
| Schema ID Location | Output Format |
+======================================+=====================================+
| confluent_payload_framing(default) | magic byte {0} + 4 byte unsigned id |
+--------------------------------------+-------------------------------------+
| apicurio_payload_framing | magic byte {0} + 8 byte signed id |
+--------------------------------------+-------------------------------------+

Note:
Prior to serialization, all values must first be converted to
a dict instance. This may handled manually prior to calling
Expand Down Expand Up @@ -177,12 +197,15 @@ class AvroSerializer(Serializer):
__slots__ = ['_hash', '_auto_register', '_normalize_schemas', '_use_latest_version',
'_known_subjects', '_parsed_schema',
'_registry', '_schema', '_schema_id', '_schema_name',
'_subject_name_func', '_to_dict', '_named_schemas']
'_subject_name_func', '_to_dict', '_named_schemas',
'_schemaid_location_func']

_default_conf = {'auto.register.schemas': True,
'normalize.schemas': False,
'use.latest.version': False,
'subject.name.strategy': topic_subject_name_strategy}
'subject.name.strategy': topic_subject_name_strategy,
'schemaid.location': confluent_payload_framing,
}

def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):
if isinstance(schema_str, str):
Expand Down Expand Up @@ -224,6 +247,10 @@ def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):
if not callable(self._subject_name_func):
raise ValueError("subject.name.strategy must be callable")

self._schemaid_location_func = conf_copy.pop('schemaid.location')
if not callable(self._schemaid_location_func):
raise ValueError("schemaid.location must be callable")

if len(conf_copy) > 0:
raise ValueError("Unrecognized properties: {}"
.format(", ".join(conf_copy.keys())))
Expand Down Expand Up @@ -301,9 +328,10 @@ def __call__(self, obj, ctx):
else:
value = obj

_, schemaid_writer = self._schemaid_location_func(ctx)

with _ContextStringIO() as fo:
# Write the magic byte and schema ID in network byte order (big endian)
fo.write(pack('>bI', _MAGIC_BYTE, self._schema_id))
schemaid_writer(fo, self._schema_id)
# write the record to the rest of the buffer
schemaless_writer(fo, self._parsed_schema, value)

Expand All @@ -312,8 +340,34 @@ def __call__(self, obj, ctx):

class AvroDeserializer(Deserializer):
"""
Deserializer for Avro binary encoded data with Confluent Schema Registry
framing.
Deserializer for Avro binary encoded data.

Configuration properties:

+---------------------------+----------+--------------------------------------------------+
| Property Name | Type | Description |
+===========================+==========+==================================================+
| | | Callable(SerializationContext) -> Tuple( |
| | | Callable(bytes) -> int # Reader |
| | | Callable(BytesIO, int) -> None # Writer |
| | | ) |
| | | |
| ``schemaid.location`` | callable | Define how the schemaid is embedded in the kafka |
| | | Message. Standard locations are defined in the |
| | | confluent_kafka.schema_registry namespace. |
| | | |
| | | Defaults to confluent_payload_framing. |
+---------------------------+----------+--------------------------------------------------+

Supported schemaid locations:

+--------------------------------------+-------------------------------------+
| Schema ID Location | Output Format |
+======================================+=====================================+
| confluent_payload_framing(default) | magic byte {0} + 4 byte unsigned id |
+--------------------------------------+-------------------------------------+
| apicurio_payload_framing | magic byte {0} + 8 byte signed id |
+--------------------------------------+-------------------------------------+

Note:
By default, Avro complex types are returned as dicts. This behavior can
Expand Down Expand Up @@ -347,9 +401,22 @@ class AvroDeserializer(Deserializer):
"""

__slots__ = ['_reader_schema', '_registry', '_from_dict', '_writer_schemas', '_return_record_name', '_schema',
'_named_schemas']
'_named_schemas', '_schemaid_location_func']

_default_conf = {
'schemaid.location': confluent_payload_framing,
}

def __init__(self, schema_registry_client, schema_str=None, from_dict=None, return_record_name=False, conf=None):
conf_copy = self._default_conf.copy()
if conf is not None:
conf_copy.update(conf)

self._schemaid_location_func = conf_copy.pop('schemaid.location')
if len(conf_copy) > 0:
raise ValueError("Unrecognized properties: {}"
.format(", ".join(conf_copy.keys())))

def __init__(self, schema_registry_client, schema_str=None, from_dict=None, return_record_name=False):
schema = None
if schema_str is not None:
if isinstance(schema_str, str):
Expand Down Expand Up @@ -403,19 +470,10 @@ def __call__(self, data, ctx):
if data is None:
return None

if len(data) <= 5:
raise SerializationError("Expecting data framing of length 6 bytes or "
"more but total data size is {} bytes. This "
"message was not produced with a Confluent "
"Schema Registry serializer".format(len(data)))
schemaid_reader, _ = self._schemaid_location_func(ctx)

with _ContextStringIO(data) as payload:
magic, schema_id = unpack('>bI', payload.read(5))
if magic != _MAGIC_BYTE:
raise SerializationError("Unexpected magic byte {}. This message "
"was not produced with a Confluent "
"Schema Registry serializer".format(magic))

schema_id = schemaid_reader(payload)
writer_schema = self._writer_schemas.get(schema_id, None)

if writer_schema is None:
Expand Down
Loading