diff --git a/pom-maven-central.xml b/pom-maven-central.xml index d0513d7f..e81bd2ec 100644 --- a/pom-maven-central.xml +++ b/pom-maven-central.xml @@ -7,7 +7,7 @@ kloadgen - 5.6.9 + 5.6.10 KLoadGen Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial diff --git a/pom.xml b/pom.xml index fe0a95c2..48bf6fcd 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ kloadgen - 5.6.9 + 5.6.10 KLoadGen Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial diff --git a/src/main/java/com/sngular/kloadgen/config/schemaregistry/SchemaRegistryConfigElementBeanInfo.java b/src/main/java/com/sngular/kloadgen/config/schemaregistry/SchemaRegistryConfigElementBeanInfo.java index 8cfeae30..2a3b0aff 100644 --- a/src/main/java/com/sngular/kloadgen/config/schemaregistry/SchemaRegistryConfigElementBeanInfo.java +++ b/src/main/java/com/sngular/kloadgen/config/schemaregistry/SchemaRegistryConfigElementBeanInfo.java @@ -22,8 +22,8 @@ public SchemaRegistryConfigElementBeanInfo() { super(SchemaRegistryConfigElement.class); - createPropertyGroup("schema_registry_config", new String[]{SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME, - SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_PROPERTIES, SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_URL}); + createPropertyGroup("schema_registry_config", new String[] {SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME, + SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_URL, SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_PROPERTIES}); final PropertyDescriptor schemaRegistryName = property(SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME); schemaRegistryName.setPropertyEditorClass(SchemaRegistryNamePropertyEditor.class); diff --git a/src/main/java/com/sngular/kloadgen/extractor/extractors/ExtractorFactory.java b/src/main/java/com/sngular/kloadgen/extractor/extractors/ExtractorFactory.java index fdf8700c..445fd070 100644 --- a/src/main/java/com/sngular/kloadgen/extractor/extractors/ExtractorFactory.java +++ b/src/main/java/com/sngular/kloadgen/extractor/extractors/ExtractorFactory.java @@ -2,6 +2,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.Properties; import com.sngular.kloadgen.common.SchemaRegistryEnum; @@ -32,20 +33,12 @@ private ExtractorFactory() { public static ExtractorRegistry getExtractor(final String schemaType) { if (schemaType != null && EnumUtils.isValidEnum(SchemaTypeEnum.class, schemaType.toUpperCase())) { - final ExtractorRegistry response; - switch (SchemaTypeEnum.valueOf(schemaType.toUpperCase())) { - case JSON: - response = JSON_EXTRACTOR; - break; - case AVRO: - response = AVRO_EXTRACTOR; - break; - case PROTOBUF: - response = PROTOBUFF_EXTRACTOR; - break; - default: - throw new KLoadGenException(String.format("Schema type not supported %s", schemaType)); - } + final ExtractorRegistry response = switch (SchemaTypeEnum.valueOf(schemaType.toUpperCase())) { + case JSON -> JSON_EXTRACTOR; + case AVRO -> AVRO_EXTRACTOR; + case PROTOBUF -> PROTOBUFF_EXTRACTOR; + default -> throw new KLoadGenException(String.format("Schema type not supported %s", schemaType)); + }; return response; } else { throw new KLoadGenException(String.format("Schema type not supported %s", schemaType)); @@ -63,12 +56,14 @@ public static Pair> flatPropertiesList(final Str final SchemaRegistryEnum schemaRegistryEnum = SchemaRegistryEnum.valueOf(registryName.toUpperCase()); final Object schema; - //TODO change parser - schema = switch (schemaRegistryEnum) { - case APICURIO -> ((ApicurioAbstractParsedSchemaMetadata) abstractParsedSchemaAdapter).getSchema(); - case CONFLUENT -> abstractParsedSchemaAdapter.getRawSchema(); - }; - attributeList.addAll(getExtractor(schemaType).processSchema(schema, schemaRegistryEnum)); + if (Objects.nonNull(registryName)) { + //TODO change parser + schema = switch (schemaRegistryEnum) { + case APICURIO -> ((ApicurioAbstractParsedSchemaMetadata) abstractParsedSchemaAdapter).getSchema(); + case CONFLUENT -> abstractParsedSchemaAdapter.getRawSchema(); + }; + attributeList.addAll(getExtractor(schemaType).processSchema(schema, schemaRegistryEnum)); + } return Pair.of(schemaType, attributeList); } } \ No newline at end of file diff --git a/src/main/java/com/sngular/kloadgen/extractor/extractors/avro/AvroExtractor.java b/src/main/java/com/sngular/kloadgen/extractor/extractors/avro/AvroExtractor.java index 32594da3..2db79a09 100644 --- a/src/main/java/com/sngular/kloadgen/extractor/extractors/avro/AvroExtractor.java +++ b/src/main/java/com/sngular/kloadgen/extractor/extractors/avro/AvroExtractor.java @@ -12,11 +12,11 @@ public class AvroExtractor implements ExtractorRegistry { - private static final Map SCHEMA_REGISTRY_MAP = Map.of(SchemaRegistryEnum.CONFLUENT, new AvroConfluentExtractor(), SchemaRegistryEnum.APICURIO, - new AvroApicurioExtractor()); + private static Map schemaRegistryMap = Map.of(SchemaRegistryEnum.CONFLUENT, new AvroConfluentExtractor(), SchemaRegistryEnum.APICURIO, + new AvroApicurioExtractor()); public final List processSchema(final Object schema, final SchemaRegistryEnum registryEnum) { - return SCHEMA_REGISTRY_MAP.get(registryEnum).processSchema(schema); + return schemaRegistryMap.get(registryEnum).processSchema(schema); } public final ParsedSchema processSchema(final String fileContent) { @@ -24,7 +24,7 @@ public final ParsedSchema processSchema(final String fileContent) { } public final List getSchemaNameList(final String schema, final SchemaRegistryEnum registryEnum) { - return SCHEMA_REGISTRY_MAP.get(registryEnum).getSchemaNameList(schema); + return schemaRegistryMap.get(registryEnum).getSchemaNameList(schema); } } \ No newline at end of file diff --git a/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java b/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java index edac1f44..4f5daef1 100644 --- a/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java +++ b/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java @@ -6,6 +6,7 @@ package com.sngular.kloadgen.sampler; +import java.io.Serial; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -48,6 +49,7 @@ public final class KafkaProducerSampler extends AbstractJavaSamplerClient implem private static final Set SERIALIZER_SET = Set.of(AvroSerializer.class.getName(), ProtobufSerializer.class.getName()); + @Serial private static final long serialVersionUID = 1L; private final transient StatelessGeneratorTool statelessGeneratorTool = new StatelessGeneratorTool(); @@ -71,32 +73,32 @@ public final class KafkaProducerSampler extends AbstractJavaSamplerClient implem @Override public void setupTest(final JavaSamplerContext context) { props = JMeterContextService.getContext().getProperties(); - - generator = SamplerUtil.configureValueGenerator(props); - - if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY)) - || "true".equals(context.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) { - keyMessageFlag = true; - if (!Objects.isNull(JMeterContextService.getContext().getVariables().get(PropsKeysHelper.KEY_SUBJECT_NAME))) { - keyGenerator = SamplerUtil.configureKeyGenerator(props); + try { + generator = SamplerUtil.configureValueGenerator(props); + + if ("true".equals(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY)) + || "true".equals(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) { + keyMessageFlag = true; + if (!Objects.isNull(JMeterContextService.getContext().getVariables().get(PropsKeysHelper.KEY_SUBJECT_NAME))) { + keyGenerator = SamplerUtil.configureKeyGenerator(props); + } else { + msgKeyType = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE); + msgKeyValue = PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE)) + ? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE)); + } } else { - msgKeyType = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE); - msgKeyValue = PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE)) - ? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE)); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT); } - } else { - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT); - } - if (context.getParameter(ProducerKeysHelper.APICURIO_LEGACY_ID_HANDLER).equals(ProducerKeysHelper.FLAG_YES)) { - props.put(SerdeConfig.ID_HANDLER, Legacy4ByteIdHandler.class.getName()); - } - if (context.getParameter(ProducerKeysHelper.APICURIO_ENABLE_HEADERS_ID).equals(ProducerKeysHelper.FLAG_NO)) { - props.put(SerdeConfig.ENABLE_HEADERS, "false"); - } + if (context.getParameter(ProducerKeysHelper.APICURIO_LEGACY_ID_HANDLER).equals(ProducerKeysHelper.FLAG_YES)) { + props.put(SerdeConfig.ID_HANDLER, Legacy4ByteIdHandler.class.getName()); + } + if (context.getParameter(ProducerKeysHelper.APICURIO_ENABLE_HEADERS_ID).equals(ProducerKeysHelper.FLAG_NO)) { + props.put(SerdeConfig.ENABLE_HEADERS, "false"); + } + + topic = context.getParameter(ProducerKeysHelper.KAFKA_TOPIC_CONFIG); - topic = context.getParameter(ProducerKeysHelper.KAFKA_TOPIC_CONFIG); - try { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); props.put(ProducerConfig.CLIENT_ID_CONFIG, context.getParameter(ProducerConfig.CLIENT_ID_CONFIG)); diff --git a/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java b/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java index 4878decb..65943f85 100644 --- a/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java +++ b/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java @@ -36,6 +36,7 @@ import io.apicurio.registry.serde.SerdeConfig; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import org.apache.avro.SchemaParseException; +import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.jmeter.config.Arguments; import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext; @@ -48,6 +49,7 @@ import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.jetbrains.annotations.NotNull; public final class SamplerUtil { @@ -206,29 +208,29 @@ public static Arguments getCommonConsumerDefaultParameters() { return defaultParameters; } - public static void setupConsumerDeserializerProperties(final JavaSamplerContext context, final Properties props) { - if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY))) { - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY)); + public static void setupConsumerDeserializerProperties(final Properties props) { + if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY))) { + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY)); } else { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); } - if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY))) { - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY)); + if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY))) { + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY)); } else { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); } } - public static void setupConsumerSchemaRegistryProperties(final JavaSamplerContext context, final Properties props) { + public static void setupConsumerSchemaRegistryProperties(final Properties props) { final Map originals = new HashMap<>(); - setupSchemaRegistryAuthenticationProperties(context.getJMeterVariables(), originals); + setupSchemaRegistryAuthenticationProperties(JavaSamplerContext.getJMeterVariables(), originals); props.putAll(originals); - if (Objects.nonNull(context.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY))) { - props.put(ProducerKeysHelper.VALUE_NAME_STRATEGY, context.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY)); + if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY))) { + props.put(ProducerKeysHelper.VALUE_NAME_STRATEGY, JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY)); } - if (Objects.nonNull(context.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY))) { - props.put(ProducerKeysHelper.KEY_NAME_STRATEGY, context.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY)); + if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY))) { + props.put(ProducerKeysHelper.KEY_NAME_STRATEGY, JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY)); } } @@ -255,8 +257,8 @@ public static Properties setupCommonConsumerProperties(final JavaSamplerContext final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - setupConsumerDeserializerProperties(context, props); - setupConsumerSchemaRegistryProperties(context, props); + setupConsumerDeserializerProperties(props); + setupConsumerSchemaRegistryProperties(props); props.put(ConsumerConfig.SEND_BUFFER_CONFIG, context.getParameter(ConsumerConfig.SEND_BUFFER_CONFIG)); props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, context.getParameter(ConsumerConfig.RECEIVE_BUFFER_CONFIG)); @@ -360,7 +362,11 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props) props.put(ProducerKeysHelper.VALUE_NAME_STRATEGY, Objects.nonNull(valueNameStrategy) ? valueNameStrategy : ProducerKeysHelper.TOPIC_NAME_STRATEGY_CONFLUENT); } - generator = getLoadGenerator(); + if (ObjectUtils.isNotEmpty(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA_TYPE))) { + generator = getBaseLoadGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA_TYPE)); + } else { + throw new KLoadGenException("Unsupported Serializer"); + } if (generator.getClass().equals(PlainTextLoadGenerator.class)) { final List list = new ArrayList<>(); @@ -421,20 +427,10 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) { props.put(keyNameStrategy, keyNameStrategyValue); } - if (Objects.nonNull(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE))) { - if (JSON_TYPE_SET.contains(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE).toLowerCase())) { - generator = new JsonSRLoadGenerator(); - } else if (jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE).equalsIgnoreCase("avro")) { - generator = new AvroSRLoadGenerator(); - } else if (jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE).equalsIgnoreCase("Protobuf")) { - generator = new ProtobufLoadGenerator(); - } else if (jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE).equalsIgnoreCase("NoSchema")) { - generator = new PlainTextLoadGenerator(); - } else { - throw new KLoadGenException("Unsupported Serializer"); - } + if (ObjectUtils.isNotEmpty(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE))) { + generator = getBaseLoadGenerator(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE)); } else { - generator = new AvroSRLoadGenerator(); + throw new KLoadGenException("Unsupported Serializer"); } props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -466,6 +462,23 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) { return generator; } + @NotNull + private static BaseLoadGenerator getBaseLoadGenerator(final String schemaType) { + final BaseLoadGenerator generator; + if (JSON_TYPE_SET.contains(schemaType.toLowerCase())) { + generator = new JsonSRLoadGenerator(); + } else if ("avro".equalsIgnoreCase(schemaType)) { + generator = new AvroSRLoadGenerator(); + } else if ("protobuf".equalsIgnoreCase(schemaType)) { + generator = new ProtobufLoadGenerator(); + } else if ("noSchema".equalsIgnoreCase(schemaType)) { + generator = new PlainTextLoadGenerator(); + } else { + throw new KLoadGenException("Unsupported Serializer"); + } + return generator; + } + public static List populateHeaders(final List kafkaHeaders, final ProducerRecord producerRecord) { final List headersSB = new ArrayList<>(); for (final HeaderMapping kafkaHeader : kafkaHeaders) { @@ -476,27 +489,4 @@ public static List populateHeaders(final List kafkaHeader return headersSB; } - private static BaseLoadGenerator getLoadGenerator() { - final BaseLoadGenerator generator; - final String schemaType = JMeterContextService.getContext().getProperties().getProperty(PropsKeysHelper.VALUE_SCHEMA_TYPE); - if (Objects.nonNull(schemaType)) { - if (JSON_TYPE_SET.contains(schemaType.toLowerCase())) { - generator = new JsonSRLoadGenerator(); - } else if ("avro".equalsIgnoreCase(schemaType)) { - generator = new AvroSRLoadGenerator(); - } else if ("Protobuf".equalsIgnoreCase(schemaType)) { - generator = new ProtobufLoadGenerator(); - } else if ("NoSchema".equalsIgnoreCase(schemaType)) { - generator = new PlainTextLoadGenerator(); - } else { - throw new KLoadGenException("Unsupported Serializer"); - } - } else { - generator = new AvroSRLoadGenerator(); - } - - return generator; - } - - } diff --git a/src/main/java/com/sngular/kloadgen/schemaregistry/adapter/impl/GenericSchemaRegistryAdapter.java b/src/main/java/com/sngular/kloadgen/schemaregistry/adapter/impl/GenericSchemaRegistryAdapter.java index 9fdffb18..c04b8c6d 100644 --- a/src/main/java/com/sngular/kloadgen/schemaregistry/adapter/impl/GenericSchemaRegistryAdapter.java +++ b/src/main/java/com/sngular/kloadgen/schemaregistry/adapter/impl/GenericSchemaRegistryAdapter.java @@ -1,6 +1,7 @@ package com.sngular.kloadgen.schemaregistry.adapter.impl; public interface GenericSchemaRegistryAdapter { + T getId(); T getVersion(); diff --git a/src/test/java/com/sngular/kloadgen/sampler/SamplerUtilTest.java b/src/test/java/com/sngular/kloadgen/sampler/SamplerUtilTest.java index 2fe87967..9fc3924e 100644 --- a/src/test/java/com/sngular/kloadgen/sampler/SamplerUtilTest.java +++ b/src/test/java/com/sngular/kloadgen/sampler/SamplerUtilTest.java @@ -72,6 +72,7 @@ public JMeterVariables getVariablesAvro() throws IOException { variables.put(PropsKeysHelper.VALUE_SUBJECT_NAME, "test"); variables.put(PropsKeysHelper.KEY_SUBJECT_NAME, "test"); variables.put(PropsKeysHelper.VALUE_SCHEMA, String.valueOf(parsedSchema)); + variables.put(PropsKeysHelper.VALUE_SCHEMA_TYPE, "avro"); variables.put(PropsKeysHelper.KEY_SCHEMA, String.valueOf(parsedSchema)); variables.putObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES, Arrays.asList( FieldValueMapping.builder().fieldName("subEntity.anotherLevel.subEntityIntArray[2]").fieldType("int-array").valueLength(0).fieldValueList("[1]").required(true) @@ -126,6 +127,7 @@ public JMeterVariables getVariablesProtobuf() throws IOException { variables.put(PropsKeysHelper.KEY_SCHEMA_TYPE, "protobuf"); variables.put(PropsKeysHelper.VALUE_SUBJECT_NAME, "protobufSubject"); variables.put(PropsKeysHelper.VALUE_SCHEMA, String.valueOf(parsedSchema)); + variables.put(PropsKeysHelper.VALUE_SCHEMA_TYPE, "protobuf"); variables.put(PropsKeysHelper.KEY_SCHEMA, String.valueOf(parsedSchema)); variables.put(PropsKeysHelper.KEY_SUBJECT_NAME, "protobufSubject"); variables.putObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES, Arrays.asList(