diff --git a/radar-commons-unsafe/build.gradle b/radar-commons-unsafe/build.gradle index ceacc03d..1d148256 100644 --- a/radar-commons-unsafe/build.gradle +++ b/radar-commons-unsafe/build.gradle @@ -21,7 +21,7 @@ repositories { dependencies { compileOnly group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentVersion compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion - compileOnly group: 'org.apache.kafka', name: 'kafka_2.11', version: kafkaVersion + compileOnly group: 'org.apache.kafka', name: 'kafka_2.12', version: kafkaVersion } diff --git a/radar-commons-unsafe/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java b/radar-commons-unsafe/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java index a7f1222c..6f847a60 100644 --- a/radar-commons-unsafe/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java +++ b/radar-commons-unsafe/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java @@ -16,45 +16,44 @@ package io.confluent.kafka.serializers; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; +import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import kafka.utils.VerifiableProperties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericContainer; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.common.errors.SerializationException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; -import kafka.utils.VerifiableProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe { +public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaSchemaSerDe { private final ConcurrentMap> oldToNewIdMap = new ConcurrentHashMap<>(); - private final ConcurrentMap> oldToNewVersionMap = new ConcurrentHashMap<>(); - - private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaAvroDeserializer.class); private final DecoderFactory decoderFactory = DecoderFactory.get(); protected boolean useSpecificAvroReader = false; - private final Map readerSchemaCache = new ConcurrentHashMap(); + private final Map readerSchemaCache = new ConcurrentHashMap<>(); /** * Sets properties for this deserializer without overriding the schema registry client itself. * Useful for testing, where a mock client is injected. */ protected void configure(KafkaAvroDeserializerConfig config) { - configureClientProperties(config); + configureClientProperties(config, new AvroSchemaProvider()); useSpecificAvroReader = config .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG); } @@ -67,14 +66,6 @@ protected KafkaAvroDeserializerConfig deserializerConfig(VerifiableProperties pr return new KafkaAvroDeserializerConfig(props.props()); } - private ByteBuffer getByteBuffer(byte[] payload) { - ByteBuffer buffer = ByteBuffer.wrap(payload); - if (buffer.get() != MAGIC_BYTE) { - throw new SerializationException("Unknown magic byte!"); - } - return buffer; - } - /** * Deserializes the payload without including schema information for primitive types, maps, and * arrays. Just the resulting deserialized object is returned. @@ -85,7 +76,7 @@ private ByteBuffer getByteBuffer(byte[] payload) { * @return the deserialized object */ protected Object deserialize(byte[] payload) throws SerializationException { - return deserialize(false, null, null, payload, null); + return deserialize(null, null, payload, null); } /** @@ -96,128 +87,29 @@ protected Object deserialize(byte[] payload) throws SerializationException { * @return the deserialized object */ protected Object deserialize(byte[] payload, Schema readerSchema) throws SerializationException { - return deserialize(false, null, null, payload, readerSchema); + return deserialize(null, null, payload, readerSchema); } - // The Object return type is a bit messy, but this is the simplest way to have - // flexible decoding and not duplicate deserialization code multiple times for different variants. - protected Object deserialize(boolean includeSchemaAndVersion, String topic, Boolean isKey, - byte[] payload, Schema readerSchema) throws SerializationException { - // Even if the caller requests schema & version, if the payload is null we cannot include it. - // The caller must handle this case. + protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) + throws SerializationException { if (payload == null) { return null; } - int id = -1; - try { - ByteBuffer buffer = getByteBuffer(payload); - id = buffer.getInt(); - Schema schema; - String subject = null; - if (includeSchemaAndVersion) { - subject = subjectName(topic, isKey, null); - } - SchemaMetadata schemaMetadata = null; - ConcurrentMap subjectIdMap = oldToNewIdMap.computeIfAbsent(subject, sub -> new ConcurrentHashMap<>()); - try { - id = subjectIdMap.getOrDefault(id, id); - schema = schemaForDeserialize(id, null, subject, isKey); - } catch (RestClientException ex) { - if (ex.getErrorCode() == 40403) { - logger.debug("Trying to get id from Latest SchemaMetadata from schemaRegistry for subject {}", subject); - int oldId = id; - schemaMetadata = schemaRegistry.getLatestSchemaMetadata(subject); - id = schemaMetadata.getId(); - schema = new Schema.Parser().parse(schemaMetadata.getSchema()); - // keep a track of a subject's ids map so that subsequent records don't query the wrong schema id - subjectIdMap.put(oldId, id); - logger.debug("success -> schemaMetadata.getId({}) for subject {}", id, subject); - } else { - throw ex; - } - } - - int length = buffer.limit() - 1 - idSize; - final Object result; - if (schema.getType().equals(Schema.Type.BYTES)) { - byte[] bytes = new byte[length]; - buffer.get(bytes, 0, length); - result = bytes; - } else { - int start = buffer.position() + buffer.arrayOffset(); - DatumReader reader = getDatumReader(schema, readerSchema); - Object - object = - reader.read(null, decoderFactory.binaryDecoder(buffer.array(), start, length, null)); - - if (schema.getType().equals(Schema.Type.STRING)) { - object = object.toString(); // Utf8 -> String - } - result = object; - } - - if (includeSchemaAndVersion) { - // Annotate the schema with the version. Note that we only do this if the schema + - // version are requested, i.e. in Kafka Connect converters. This is critical because that - // code *will not* rely on exact schema equality. Regular deserializers *must not* include - // this information because it would return schemas which are not equivalent. - // - // Note, however, that we also do not fill in the connect.version field. This allows the - // Converter to let a version provided by a Kafka Connect source take priority over the - // schema registry's ordering (which is implicit by auto-registration time rather than - // explicit from the Connector). - Integer version; - if (schemaMetadata != null) { - version = schemaMetadata.getVersion(); - } else { - Map schemaVersionMap = oldToNewVersionMap.computeIfAbsent(subject, sub -> new ConcurrentHashMap<>()); - version = schemaVersionMap.get(schema); - - if (version == null) { - try { - version = schemaRegistry.getVersion(subject, schema); - } catch (RestClientException e) { - if (e.getErrorCode() == 40403) { - logger.debug("Trying to get version from Latest SchemaMetadata from schemaRegistry for subject {}", subject); - schemaMetadata = schemaRegistry.getLatestSchemaMetadata(subject); - version = schemaMetadata.getVersion(); - - schemaVersionMap.putIfAbsent(schema, version); - logger.debug("success -> schemaMetadata.getVersion({}) for subject {}", version, subject); - } else { - throw e; - } - } - } - } - - if (schema.getType().equals(Schema.Type.RECORD)) { - return new GenericContainerWithVersion((GenericContainer) result, version); - } else { - return new GenericContainerWithVersion(new NonRecordContainer(schema, result), version); - } - } else { - return result; - } - } catch (IOException | RuntimeException e) { - // avro deserialization may throw AvroRuntimeException, NullPointerException, etc - throw new SerializationException("Error deserializing Avro message for id " + id, e); - } catch (RestClientException e) { - throw new SerializationException("Error retrieving Avro schema for id " + id, e); - } + DeserializationContext context = new DeserializationContext(topic, isKey, payload); + return context.read(context.schemaFromRegistry().rawSchema(), readerSchema); } private Integer schemaVersion(String topic, Boolean isKey, int id, String subject, - Schema schema, + AvroSchema schema, Object result) throws IOException, RestClientException { Integer version; if (isDeprecatedSubjectNameStrategy(isKey)) { subject = getSubjectName(topic, isKey, result, schema); - Schema subjectSchema = schemaRegistry.getBySubjectAndId(subject, id); + AvroSchema subjectSchema = (AvroSchema) schemaRegistry.getSchemaBySubjectAndId(subject, id); version = schemaRegistry.getVersion(subject, subjectSchema); } else { //we already got the subject name @@ -226,21 +118,12 @@ private Integer schemaVersion(String topic, return version; } - private String subjectName(String topic, Boolean isKey, Schema schemaFromRegistry) { + private String subjectName(String topic, Boolean isKey, AvroSchema schemaFromRegistry) { return isDeprecatedSubjectNameStrategy(isKey) ? null : getSubjectName(topic, isKey, null, schemaFromRegistry); } - private Schema schemaForDeserialize(int id, - Schema schemaFromRegistry, - String subject, - Boolean isKey) throws IOException, RestClientException { - return isDeprecatedSubjectNameStrategy(isKey) - ? AvroSchemaUtils.copyOf(schemaFromRegistry) - : schemaRegistry.getBySubjectAndId(subject, id); - } - /** * Deserializes the payload and includes schema information, with version information from the * schema registry embedded in the schema. @@ -249,56 +132,236 @@ private Schema schemaForDeserialize(int id, * @return a GenericContainer with the schema and data, either as a {@link NonRecordContainer}, * {@link org.apache.avro.generic.GenericRecord}, or {@link SpecificRecord} */ - protected GenericContainerWithVersion deserializeWithSchemaAndVersion(String topic, boolean isKey, - byte[] payload) + protected GenericContainerWithVersion deserializeWithSchemaAndVersion( + String topic, boolean isKey, byte[] payload) throws SerializationException { - return (GenericContainerWithVersion) deserialize(true, topic, isKey, payload, null); + // Even if the caller requests schema & version, if the payload is null we cannot include it. + // The caller must handle this case. + if (payload == null) { + return null; + } + + // Annotate the schema with the version. Note that we only do this if the schema + + // version are requested, i.e. in Kafka Connect converters. This is critical because that + // code *will not* rely on exact schema equality. Regular deserializers *must not* include + // this information because it would return schemas which are not equivalent. + // + // Note, however, that we also do not fill in the connect.version field. This allows the + // Converter to let a version provided by a Kafka Connect source take priority over the + // schema registry's ordering (which is implicit by auto-registration time rather than + // explicit from the Connector). + DeserializationContext context = new DeserializationContext(topic, isKey, payload); + AvroSchema schema = context.schemaForDeserialize(); + Object result = context.read(schema.rawSchema(), null); + + try { + Integer version = schemaVersion(topic, isKey, context.getSchemaId(), + context.getSubject(), schema, result); + if (schema.rawSchema().getType().equals(Schema.Type.RECORD)) { + return new GenericContainerWithVersion((GenericContainer) result, version); + } else { + return new GenericContainerWithVersion(new NonRecordContainer(schema.rawSchema(), result), + version); + } + } catch (RestClientException | IOException e) { + throw new SerializationException("Error retrieving Avro " + + getSchemaType(isKey) + + " schema version for id " + + context.getSchemaId(), e); + } } - private DatumReader getDatumReader(Schema writerSchema, Schema readerSchema) { + protected DatumReader getDatumReader(Schema writerSchema, Schema readerSchema) { + // normalize reader schema + readerSchema = getReaderSchema(writerSchema, readerSchema); + boolean writerSchemaIsPrimitive = + AvroSchemaUtils.getPrimitiveSchemas().containsValue(writerSchema); + if (writerSchemaIsPrimitive) { + return new GenericDatumReader<>(writerSchema, readerSchema); + } else if (useSchemaReflection) { + return new ReflectDatumReader<>(writerSchema, readerSchema); + } else if (useSpecificAvroReader) { + return new SpecificDatumReader<>(writerSchema, readerSchema); + } else { + return new GenericDatumReader<>(writerSchema, readerSchema); + } + } + + /** + * Normalizes the reader schema, puts the resolved schema into the cache. + *
  • + *
      if the reader schema is provided, use the provided one
    + *
      if the reader schema is cached for the writer schema full name, use the cached value
    + *
      if the writer schema is primitive, use the writer one
    + *
      if schema reflection is used, generate one from the class referred by writer schema
    + *
      if generated classes are used, query the class referred by writer schema
    + *
      otherwise use the writer schema
    + *
  • + */ + private Schema getReaderSchema(Schema writerSchema, Schema readerSchema) { + if (readerSchema != null) { + return readerSchema; + } + readerSchema = readerSchemaCache.get(writerSchema.getFullName()); + if (readerSchema != null) { + return readerSchema; + } boolean writerSchemaIsPrimitive = AvroSchemaUtils.getPrimitiveSchemas().values().contains(writerSchema); - // do not use SpecificDatumReader if writerSchema is a primitive - if (useSpecificAvroReader && !writerSchemaIsPrimitive) { - if (readerSchema == null) { - readerSchema = getReaderSchema(writerSchema); - } - return new SpecificDatumReader(writerSchema, readerSchema); + if (writerSchemaIsPrimitive) { + readerSchema = writerSchema; + } else if (useSchemaReflection) { + readerSchema = getReflectionReaderSchema(writerSchema); + readerSchemaCache.put(writerSchema.getFullName(), readerSchema); + } else if (useSpecificAvroReader) { + readerSchema = getSpecificReaderSchema(writerSchema); + readerSchemaCache.put(writerSchema.getFullName(), readerSchema); } else { - if (readerSchema == null) { - return new GenericDatumReader(writerSchema); - } - return new GenericDatumReader(writerSchema, readerSchema); + readerSchema = writerSchema; } + return readerSchema; } @SuppressWarnings("unchecked") - private Schema getReaderSchema(Schema writerSchema) { - Schema readerSchema = readerSchemaCache.get(writerSchema.getFullName()); - if (readerSchema == null) { - Class readerClass = SpecificData.get().getClass(writerSchema); - if (readerClass != null) { - try { - readerSchema = readerClass.newInstance().getSchema(); - } catch (InstantiationException e) { - throw new SerializationException(writerSchema.getFullName() - + " specified by the " - + "writers schema could not be instantiated to " - + "find the readers schema."); - } catch (IllegalAccessException e) { - throw new SerializationException(writerSchema.getFullName() - + " specified by the " - + "writers schema is not allowed to be instantiated " - + "to find the readers schema."); + private Schema getSpecificReaderSchema(Schema writerSchema) { + Class readerClass = SpecificData.get().getClass(writerSchema); + if (readerClass == null) { + throw new SerializationException("Could not find class " + + writerSchema.getFullName() + + " specified in writer's schema whilst finding reader's " + + "schema for a SpecificRecord."); + } + try { + return readerClass.getConstructor().newInstance().getSchema(); + } catch (InstantiationException | NoSuchMethodException | InvocationTargetException e) { + throw new SerializationException(writerSchema.getFullName() + + " specified by the " + + "writers schema could not be instantiated to " + + "find the readers schema."); + } catch (IllegalAccessException e) { + throw new SerializationException(writerSchema.getFullName() + + " specified by the " + + "writers schema is not allowed to be instantiated " + + "to find the readers schema."); + } + } + + private Schema getReflectionReaderSchema(Schema writerSchema) { + // shall we use ReflectData.AllowNull.get() instead? + Class readerClass = ReflectData.get().getClass(writerSchema); + if (readerClass == null) { + throw new SerializationException("Could not find class " + + writerSchema.getFullName() + + " specified in writer's schema whilst finding reader's " + + "schema for a reflected class."); + } + return ReflectData.get().getSchema(readerClass); + } + + class DeserializationContext { + private final String topic; + private final Boolean isKey; + private final ByteBuffer buffer; + private final int schemaId; + + DeserializationContext(final String topic, final Boolean isKey, final byte[] payload) { + this.topic = topic; + this.isKey = isKey; + this.buffer = getByteBuffer(payload); + this.schemaId = buffer.getInt(); + } + + AvroSchema schemaFromRegistry() { + String subject = getSubject(); + ConcurrentMap subjectIdMap = oldToNewIdMap.computeIfAbsent(subject, sub -> new ConcurrentHashMap<>()); + int id = subjectIdMap.getOrDefault(schemaId, schemaId); + try { + return (AvroSchema) schemaRegistry.getSchemaBySubjectAndId(subject, id); + } catch (RestClientException e) { + if (e.getErrorCode() == 40403) { + try { + List versions = schemaRegistry.getAllVersions(subject); + int latestId = versions.get(versions.size() - 1); + subjectIdMap.put(schemaId, latestId); + + return (AvroSchema) schemaRegistry.getSchemaBySubjectAndId(subject, latestId); + } catch (RestClientException | IOException ex) { + throw new SerializationException("Error retrieving Avro " + + getSchemaType(isKey) + + " schema for id " + + schemaId, e); + } } - readerSchemaCache.put(writerSchema.getFullName(), readerSchema); + throw new SerializationException("Error retrieving Avro " + + getSchemaType(isKey) + + " schema for id " + + schemaId, e); + } catch (IOException e) { + throw new SerializationException("Error retrieving Avro " + + getSchemaType(isKey) + + " schema for id " + + schemaId, e); + } + } + + AvroSchema schemaForDeserialize() { + return schemaFromRegistry(); + } + + String getSubject() { + return subjectName(topic, isKey, schemaFromRegistry()); + } + + String getTopic() { + return topic; + } + + boolean isKey() { + return isKey; + } + + + int getSchemaId() { + return schemaId; + } + + Object read(Schema writerSchema) { + return read(writerSchema, null); + } + + Object read(Schema writerSchema, Schema readerSchema) { + DatumReader reader = getDatumReader(writerSchema, readerSchema); + int length = buffer.limit() - 1 - idSize; + if (writerSchema.getType().equals(Schema.Type.BYTES)) { + byte[] bytes = new byte[length]; + buffer.get(bytes, 0, length); + return bytes; } else { - throw new SerializationException("Could not find class " - + writerSchema.getFullName() - + " specified in writer's schema whilst finding reader's " - + "schema for a SpecificRecord."); + int start = buffer.position() + buffer.arrayOffset(); + try { + Object result = reader.read(null, decoderFactory.binaryDecoder(buffer.array(), + start, length, null)); + if (writerSchema.getType().equals(Schema.Type.STRING)) { + return result.toString(); + } else { + return result; + } + } catch (IOException | RuntimeException e) { + // avro deserialization may throw AvroRuntimeException, NullPointerException, etc + throw new SerializationException("Error deserializing Avro message for id " + + schemaId, e); + } } } - return readerSchema; + } + + private static String getSchemaType(Boolean isKey) { + if (isKey == null) { + return "unknown"; + } else if (isKey) { + return "key"; + } else { + return "value"; + } } }