Skip to content

Commit

Permalink
Merge branch 'master' into 417-issue-cant-load-an-avro-schema-seriali…
Browse files Browse the repository at this point in the history
…zer-from-register
  • Loading branch information
RobertoSngular authored Nov 15, 2023
2 parents c8918af + f0478f3 commit 475c982
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 99 deletions.
2 changes: 1 addition & 1 deletion pom-maven-central.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.6.9</version>
<version>5.6.10</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public SchemaRegistryConfigElementBeanInfo() {

super(SchemaRegistryConfigElement.class);

createPropertyGroup("schema_registry_config", new String[]{SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME,
SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_PROPERTIES, SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_URL});
createPropertyGroup("schema_registry_config", new String[] {SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME,
SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_URL, SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_PROPERTIES});

final PropertyDescriptor schemaRegistryName = property(SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME);
schemaRegistryName.setPropertyEditorClass(SchemaRegistryNamePropertyEditor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;

import com.sngular.kloadgen.common.SchemaRegistryEnum;
Expand Down Expand Up @@ -39,20 +40,12 @@ public static void configExtractorFactory(final AvroExtractor avroExtractor, fin
public static ExtractorRegistry getExtractor(final String schemaType) {

if (schemaType != null && EnumUtils.isValidEnum(SchemaTypeEnum.class, schemaType.toUpperCase())) {
final ExtractorRegistry response;
switch (SchemaTypeEnum.valueOf(schemaType.toUpperCase())) {
case JSON:
response = jsonExtractor;
break;
case AVRO:
response = avroExtractor;
break;
case PROTOBUF:
response = protobuffExtractor;
break;
default:
throw new KLoadGenException(String.format("Schema type not supported %s", schemaType));
}
final ExtractorRegistry response = switch (SchemaTypeEnum.valueOf(schemaType.toUpperCase())) {
case JSON -> jsonExtractor;
case AVRO -> avroExtractor;
case PROTOBUF -> protobuffExtractor;
default -> throw new KLoadGenException(String.format("Schema type not supported %s", schemaType));
};
return response;
} else {
throw new KLoadGenException(String.format("Schema type not supported %s", schemaType));
Expand All @@ -70,12 +63,14 @@ public static Pair<String, List<FieldValueMapping>> flatPropertiesList(final Str
final SchemaRegistryEnum schemaRegistryEnum = SchemaRegistryEnum.valueOf(registryName.toUpperCase());

final Object schema;
//TODO change parser
schema = switch (schemaRegistryEnum) {
case APICURIO -> ((ApicurioAbstractParsedSchemaMetadata) abstractParsedSchemaAdapter).getSchema();
case CONFLUENT -> abstractParsedSchemaAdapter.getRawSchema();
};
attributeList.addAll(getExtractor(schemaType).processSchema(new ParsedSchema(schema), schemaRegistryEnum));
if (Objects.nonNull(registryName)) {
//TODO change parser
schema = switch (schemaRegistryEnum) {
case APICURIO -> ((ApicurioAbstractParsedSchemaMetadata) abstractParsedSchemaAdapter).getSchema();
case CONFLUENT -> abstractParsedSchemaAdapter.getRawSchema();
};
attributeList.addAll(getExtractor(schemaType).processSchema(new ParsedSchema(schema), schemaRegistryEnum));
}
return Pair.of(schemaType, attributeList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@

public class AvroExtractor implements ExtractorRegistry<ParsedSchema> {

private static final Map<SchemaRegistryEnum, Extractor> SCHEMA_REGISTRY_MAP = Map.of(SchemaRegistryEnum.CONFLUENT, new AvroConfluentExtractor(), SchemaRegistryEnum.APICURIO,
new AvroApicurioExtractor());
private static Map<SchemaRegistryEnum, Extractor> schemaRegistryMap = Map.of(SchemaRegistryEnum.CONFLUENT, new AvroConfluentExtractor(), SchemaRegistryEnum.APICURIO,
new AvroApicurioExtractor());

public final List<FieldValueMapping> processSchema(final ParsedSchema schema, final SchemaRegistryEnum registryEnum) {
return SCHEMA_REGISTRY_MAP.get(registryEnum).processSchema(schema.rawSchema());
return schemaRegistryMap.get(registryEnum).processSchema(schema.rawSchema());
}

public final ParsedSchema processSchema(final String fileContent) {
return new ParsedSchema(fileContent, "AVRO");
}

public final List<String> getSchemaNameList(final String schema, final SchemaRegistryEnum registryEnum) {
return SCHEMA_REGISTRY_MAP.get(registryEnum).getSchemaNameList(schema);
return schemaRegistryMap.get(registryEnum).getSchemaNameList(schema);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package com.sngular.kloadgen.sampler;

import java.io.Serial;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -48,6 +49,7 @@ public final class KafkaProducerSampler extends AbstractJavaSamplerClient implem

private static final Set<String> SERIALIZER_SET = Set.of(AvroSerializer.class.getName(), ProtobufSerializer.class.getName());

@Serial
private static final long serialVersionUID = 1L;

private final transient StatelessGeneratorTool statelessGeneratorTool = new StatelessGeneratorTool();
Expand All @@ -71,32 +73,32 @@ public final class KafkaProducerSampler extends AbstractJavaSamplerClient implem
@Override
public void setupTest(final JavaSamplerContext context) {
props = JMeterContextService.getContext().getProperties();

generator = SamplerUtil.configureValueGenerator(props);

if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))
|| "true".equals(context.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
keyMessageFlag = true;
if (!Objects.isNull(JMeterContextService.getContext().getVariables().get(PropsKeysHelper.KEY_SUBJECT_NAME))) {
keyGenerator = SamplerUtil.configureKeyGenerator(props);
try {
generator = SamplerUtil.configureValueGenerator(props);

if ("true".equals(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))
|| "true".equals(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
keyMessageFlag = true;
if (!Objects.isNull(JMeterContextService.getContext().getVariables().get(PropsKeysHelper.KEY_SUBJECT_NAME))) {
keyGenerator = SamplerUtil.configureKeyGenerator(props);
} else {
msgKeyType = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE);
msgKeyValue = PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))
? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
}
} else {
msgKeyType = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE);
msgKeyValue = PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))
? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
}
} else {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
}

if (context.getParameter(ProducerKeysHelper.APICURIO_LEGACY_ID_HANDLER).equals(ProducerKeysHelper.FLAG_YES)) {
props.put(SerdeConfig.ID_HANDLER, Legacy4ByteIdHandler.class.getName());
}
if (context.getParameter(ProducerKeysHelper.APICURIO_ENABLE_HEADERS_ID).equals(ProducerKeysHelper.FLAG_NO)) {
props.put(SerdeConfig.ENABLE_HEADERS, "false");
}
if (context.getParameter(ProducerKeysHelper.APICURIO_LEGACY_ID_HANDLER).equals(ProducerKeysHelper.FLAG_YES)) {
props.put(SerdeConfig.ID_HANDLER, Legacy4ByteIdHandler.class.getName());
}
if (context.getParameter(ProducerKeysHelper.APICURIO_ENABLE_HEADERS_ID).equals(ProducerKeysHelper.FLAG_NO)) {
props.put(SerdeConfig.ENABLE_HEADERS, "false");
}

topic = context.getParameter(ProducerKeysHelper.KAFKA_TOPIC_CONFIG);

topic = context.getParameter(ProducerKeysHelper.KAFKA_TOPIC_CONFIG);
try {

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
props.put(ProducerConfig.CLIENT_ID_CONFIG, context.getParameter(ProducerConfig.CLIENT_ID_CONFIG));
Expand Down
90 changes: 40 additions & 50 deletions src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.apicurio.registry.serde.SerdeConfig;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import org.apache.avro.SchemaParseException;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
Expand All @@ -48,6 +49,7 @@
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.jetbrains.annotations.NotNull;

public final class SamplerUtil {

Expand Down Expand Up @@ -206,29 +208,29 @@ public static Arguments getCommonConsumerDefaultParameters() {
return defaultParameters;
}

public static void setupConsumerDeserializerProperties(final JavaSamplerContext context, final Properties props) {
if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY))) {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY));
public static void setupConsumerDeserializerProperties(final Properties props) {
if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY))) {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY));
} else {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
}
if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY))) {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY));
if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY))) {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY));
} else {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
}
}

public static void setupConsumerSchemaRegistryProperties(final JavaSamplerContext context, final Properties props) {
public static void setupConsumerSchemaRegistryProperties(final Properties props) {
final Map<String, String> originals = new HashMap<>();
setupSchemaRegistryAuthenticationProperties(context.getJMeterVariables(), originals);
setupSchemaRegistryAuthenticationProperties(JavaSamplerContext.getJMeterVariables(), originals);
props.putAll(originals);

if (Objects.nonNull(context.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY))) {
props.put(ProducerKeysHelper.VALUE_NAME_STRATEGY, context.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY));
if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY))) {
props.put(ProducerKeysHelper.VALUE_NAME_STRATEGY, JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY));
}
if (Objects.nonNull(context.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY))) {
props.put(ProducerKeysHelper.KEY_NAME_STRATEGY, context.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY));
if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY))) {
props.put(ProducerKeysHelper.KEY_NAME_STRATEGY, JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY));
}
}

Expand All @@ -255,8 +257,8 @@ public static Properties setupCommonConsumerProperties(final JavaSamplerContext
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));

setupConsumerDeserializerProperties(context, props);
setupConsumerSchemaRegistryProperties(context, props);
setupConsumerDeserializerProperties(props);
setupConsumerSchemaRegistryProperties(props);

props.put(ConsumerConfig.SEND_BUFFER_CONFIG, context.getParameter(ConsumerConfig.SEND_BUFFER_CONFIG));
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, context.getParameter(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
Expand Down Expand Up @@ -360,7 +362,11 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
props.put(ProducerKeysHelper.VALUE_NAME_STRATEGY, Objects.nonNull(valueNameStrategy) ? valueNameStrategy : ProducerKeysHelper.TOPIC_NAME_STRATEGY_CONFLUENT);
}

generator = getLoadGenerator();
if (ObjectUtils.isNotEmpty(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA_TYPE))) {
generator = getBaseLoadGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA_TYPE));
} else {
throw new KLoadGenException("Unsupported Serializer");
}

if (generator.getClass().equals(PlainTextLoadGenerator.class)) {
final List<FieldValueMapping> list = new ArrayList<>();
Expand Down Expand Up @@ -421,20 +427,10 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) {
props.put(keyNameStrategy, keyNameStrategyValue);
}

if (Objects.nonNull(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE))) {
if (JSON_TYPE_SET.contains(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE).toLowerCase())) {
generator = new JsonSRLoadGenerator();
} else if (jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE).equalsIgnoreCase("avro")) {
generator = new AvroSRLoadGenerator();
} else if (jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE).equalsIgnoreCase("Protobuf")) {
generator = new ProtobufLoadGenerator();
} else if (jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE).equalsIgnoreCase("NoSchema")) {
generator = new PlainTextLoadGenerator();
} else {
throw new KLoadGenException("Unsupported Serializer");
}
if (ObjectUtils.isNotEmpty(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE))) {
generator = getBaseLoadGenerator(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE));
} else {
generator = new AvroSRLoadGenerator();
throw new KLoadGenException("Unsupported Serializer");
}

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -466,6 +462,23 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) {
return generator;
}

@NotNull
private static BaseLoadGenerator getBaseLoadGenerator(final String schemaType) {
final BaseLoadGenerator generator;
if (JSON_TYPE_SET.contains(schemaType.toLowerCase())) {
generator = new JsonSRLoadGenerator();
} else if ("avro".equalsIgnoreCase(schemaType)) {
generator = new AvroSRLoadGenerator();
} else if ("protobuf".equalsIgnoreCase(schemaType)) {
generator = new ProtobufLoadGenerator();
} else if ("noSchema".equalsIgnoreCase(schemaType)) {
generator = new PlainTextLoadGenerator();
} else {
throw new KLoadGenException("Unsupported Serializer");
}
return generator;
}

public static List<String> populateHeaders(final List<HeaderMapping> kafkaHeaders, final ProducerRecord<Object, Object> producerRecord) {
final List<String> headersSB = new ArrayList<>();
for (final HeaderMapping kafkaHeader : kafkaHeaders) {
Expand All @@ -476,27 +489,4 @@ public static List<String> populateHeaders(final List<HeaderMapping> kafkaHeader
return headersSB;
}

private static BaseLoadGenerator getLoadGenerator() {
final BaseLoadGenerator generator;
final String schemaType = JMeterContextService.getContext().getProperties().getProperty(PropsKeysHelper.VALUE_SCHEMA_TYPE);
if (Objects.nonNull(schemaType)) {
if (JSON_TYPE_SET.contains(schemaType.toLowerCase())) {
generator = new JsonSRLoadGenerator();
} else if ("avro".equalsIgnoreCase(schemaType)) {
generator = new AvroSRLoadGenerator();
} else if ("Protobuf".equalsIgnoreCase(schemaType)) {
generator = new ProtobufLoadGenerator();
} else if ("NoSchema".equalsIgnoreCase(schemaType)) {
generator = new PlainTextLoadGenerator();
} else {
throw new KLoadGenException("Unsupported Serializer");
}
} else {
generator = new AvroSRLoadGenerator();
}

return generator;
}


}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.sngular.kloadgen.schemaregistry.adapter.impl;

public interface GenericSchemaRegistryAdapter<T, U> {

T getId();

T getVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public JMeterVariables getVariablesAvro() throws IOException {
variables.put(PropsKeysHelper.VALUE_SUBJECT_NAME, "test");
variables.put(PropsKeysHelper.KEY_SUBJECT_NAME, "test");
variables.put(PropsKeysHelper.VALUE_SCHEMA, String.valueOf(parsedSchema.rawSchema()));
variables.put(PropsKeysHelper.VALUE_SCHEMA_TYPE, "avro");
variables.put(PropsKeysHelper.KEY_SCHEMA, String.valueOf(parsedSchema.rawSchema()));
variables.putObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES, Arrays.asList(
FieldValueMapping.builder().fieldName("subEntity.anotherLevel.subEntityIntArray[2]").fieldType("int-array").valueLength(0).fieldValueList("[1]").required(true)
Expand Down Expand Up @@ -125,6 +126,7 @@ public JMeterVariables getVariablesProtobuf() throws IOException {
variables.put(PropsKeysHelper.KEY_SCHEMA_TYPE, "protobuf");
variables.put(PropsKeysHelper.VALUE_SUBJECT_NAME, "protobufSubject");
variables.put(PropsKeysHelper.VALUE_SCHEMA, String.valueOf(parsedSchema.schema()));
variables.put(PropsKeysHelper.VALUE_SCHEMA_TYPE, "protobuf");
variables.put(PropsKeysHelper.KEY_SCHEMA, String.valueOf(parsedSchema.schema()));
variables.put(PropsKeysHelper.KEY_SUBJECT_NAME, "protobufSubject");
variables.putObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES, Arrays.asList(
Expand Down

0 comments on commit 475c982

Please sign in to comment.