Skip to content

Commit

Permalink
332 comsngularkloadgenserializerprotobufserializer nullpointerexcepti…
Browse files Browse the repository at this point in the history
…on (#421)
  • Loading branch information
jemacineiras authored Nov 15, 2023
1 parent 7685085 commit f0478f3
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 100 deletions.
2 changes: 1 addition & 1 deletion pom-maven-central.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.6.9</version>
<version>5.6.10</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<artifactId>kloadgen</artifactId>

<version>5.6.9</version>
<version>5.6.10</version>

<name>KLoadGen</name>
<description>Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public SchemaRegistryConfigElementBeanInfo() {

super(SchemaRegistryConfigElement.class);

createPropertyGroup("schema_registry_config", new String[]{SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME,
SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_PROPERTIES, SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_URL});
createPropertyGroup("schema_registry_config", new String[] {SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME,
SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_URL, SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_PROPERTIES});

final PropertyDescriptor schemaRegistryName = property(SchemaRegistryConfigElementValue.SCHEMA_REGISTRY_NAME);
schemaRegistryName.setPropertyEditorClass(SchemaRegistryNamePropertyEditor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;

import com.sngular.kloadgen.common.SchemaRegistryEnum;
Expand Down Expand Up @@ -32,20 +33,12 @@ private ExtractorFactory() {
public static ExtractorRegistry getExtractor(final String schemaType) {

if (schemaType != null && EnumUtils.isValidEnum(SchemaTypeEnum.class, schemaType.toUpperCase())) {
final ExtractorRegistry response;
switch (SchemaTypeEnum.valueOf(schemaType.toUpperCase())) {
case JSON:
response = JSON_EXTRACTOR;
break;
case AVRO:
response = AVRO_EXTRACTOR;
break;
case PROTOBUF:
response = PROTOBUFF_EXTRACTOR;
break;
default:
throw new KLoadGenException(String.format("Schema type not supported %s", schemaType));
}
final ExtractorRegistry response = switch (SchemaTypeEnum.valueOf(schemaType.toUpperCase())) {
case JSON -> JSON_EXTRACTOR;
case AVRO -> AVRO_EXTRACTOR;
case PROTOBUF -> PROTOBUFF_EXTRACTOR;
default -> throw new KLoadGenException(String.format("Schema type not supported %s", schemaType));
};
return response;
} else {
throw new KLoadGenException(String.format("Schema type not supported %s", schemaType));
Expand All @@ -63,12 +56,14 @@ public static Pair<String, List<FieldValueMapping>> flatPropertiesList(final Str
final SchemaRegistryEnum schemaRegistryEnum = SchemaRegistryEnum.valueOf(registryName.toUpperCase());

final Object schema;
//TODO change parser
schema = switch (schemaRegistryEnum) {
case APICURIO -> ((ApicurioAbstractParsedSchemaMetadata) abstractParsedSchemaAdapter).getSchema();
case CONFLUENT -> abstractParsedSchemaAdapter.getRawSchema();
};
attributeList.addAll(getExtractor(schemaType).processSchema(schema, schemaRegistryEnum));
if (Objects.nonNull(registryName)) {
//TODO change parser
schema = switch (schemaRegistryEnum) {
case APICURIO -> ((ApicurioAbstractParsedSchemaMetadata) abstractParsedSchemaAdapter).getSchema();
case CONFLUENT -> abstractParsedSchemaAdapter.getRawSchema();
};
attributeList.addAll(getExtractor(schemaType).processSchema(schema, schemaRegistryEnum));
}
return Pair.of(schemaType, attributeList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@

public class AvroExtractor implements ExtractorRegistry<Object> {

private static final Map<SchemaRegistryEnum, Extractor> SCHEMA_REGISTRY_MAP = Map.of(SchemaRegistryEnum.CONFLUENT, new AvroConfluentExtractor(), SchemaRegistryEnum.APICURIO,
new AvroApicurioExtractor());
private static Map<SchemaRegistryEnum, Extractor> schemaRegistryMap = Map.of(SchemaRegistryEnum.CONFLUENT, new AvroConfluentExtractor(), SchemaRegistryEnum.APICURIO,
new AvroApicurioExtractor());

public final List<FieldValueMapping> processSchema(final Object schema, final SchemaRegistryEnum registryEnum) {
return SCHEMA_REGISTRY_MAP.get(registryEnum).processSchema(schema);
return schemaRegistryMap.get(registryEnum).processSchema(schema);
}

public final ParsedSchema processSchema(final String fileContent) {
return new AvroSchema(fileContent);
}

public final List<String> getSchemaNameList(final String schema, final SchemaRegistryEnum registryEnum) {
return SCHEMA_REGISTRY_MAP.get(registryEnum).getSchemaNameList(schema);
return schemaRegistryMap.get(registryEnum).getSchemaNameList(schema);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package com.sngular.kloadgen.sampler;

import java.io.Serial;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -48,6 +49,7 @@ public final class KafkaProducerSampler extends AbstractJavaSamplerClient implem

private static final Set<String> SERIALIZER_SET = Set.of(AvroSerializer.class.getName(), ProtobufSerializer.class.getName());

@Serial
private static final long serialVersionUID = 1L;

private final transient StatelessGeneratorTool statelessGeneratorTool = new StatelessGeneratorTool();
Expand All @@ -71,32 +73,32 @@ public final class KafkaProducerSampler extends AbstractJavaSamplerClient implem
@Override
public void setupTest(final JavaSamplerContext context) {
props = JMeterContextService.getContext().getProperties();

generator = SamplerUtil.configureValueGenerator(props);

if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))
|| "true".equals(context.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
keyMessageFlag = true;
if (!Objects.isNull(JMeterContextService.getContext().getVariables().get(PropsKeysHelper.KEY_SUBJECT_NAME))) {
keyGenerator = SamplerUtil.configureKeyGenerator(props);
try {
generator = SamplerUtil.configureValueGenerator(props);

if ("true".equals(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))
|| "true".equals(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
keyMessageFlag = true;
if (!Objects.isNull(JMeterContextService.getContext().getVariables().get(PropsKeysHelper.KEY_SUBJECT_NAME))) {
keyGenerator = SamplerUtil.configureKeyGenerator(props);
} else {
msgKeyType = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE);
msgKeyValue = PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))
? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
}
} else {
msgKeyType = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE);
msgKeyValue = PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))
? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
}
} else {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT);
}

if (context.getParameter(ProducerKeysHelper.APICURIO_LEGACY_ID_HANDLER).equals(ProducerKeysHelper.FLAG_YES)) {
props.put(SerdeConfig.ID_HANDLER, Legacy4ByteIdHandler.class.getName());
}
if (context.getParameter(ProducerKeysHelper.APICURIO_ENABLE_HEADERS_ID).equals(ProducerKeysHelper.FLAG_NO)) {
props.put(SerdeConfig.ENABLE_HEADERS, "false");
}
if (context.getParameter(ProducerKeysHelper.APICURIO_LEGACY_ID_HANDLER).equals(ProducerKeysHelper.FLAG_YES)) {
props.put(SerdeConfig.ID_HANDLER, Legacy4ByteIdHandler.class.getName());
}
if (context.getParameter(ProducerKeysHelper.APICURIO_ENABLE_HEADERS_ID).equals(ProducerKeysHelper.FLAG_NO)) {
props.put(SerdeConfig.ENABLE_HEADERS, "false");
}

topic = context.getParameter(ProducerKeysHelper.KAFKA_TOPIC_CONFIG);

topic = context.getParameter(ProducerKeysHelper.KAFKA_TOPIC_CONFIG);
try {

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
props.put(ProducerConfig.CLIENT_ID_CONFIG, context.getParameter(ProducerConfig.CLIENT_ID_CONFIG));
Expand Down
90 changes: 40 additions & 50 deletions src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.apicurio.registry.serde.SerdeConfig;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import org.apache.avro.SchemaParseException;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
Expand All @@ -48,6 +49,7 @@
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.jetbrains.annotations.NotNull;

public final class SamplerUtil {

Expand Down Expand Up @@ -206,29 +208,29 @@ public static Arguments getCommonConsumerDefaultParameters() {
return defaultParameters;
}

public static void setupConsumerDeserializerProperties(final JavaSamplerContext context, final Properties props) {
if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY))) {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY));
public static void setupConsumerDeserializerProperties(final Properties props) {
if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY))) {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY));
} else {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
}
if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY))) {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY));
if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY))) {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY));
} else {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
}
}

public static void setupConsumerSchemaRegistryProperties(final JavaSamplerContext context, final Properties props) {
public static void setupConsumerSchemaRegistryProperties(final Properties props) {
final Map<String, String> originals = new HashMap<>();
setupSchemaRegistryAuthenticationProperties(context.getJMeterVariables(), originals);
setupSchemaRegistryAuthenticationProperties(JavaSamplerContext.getJMeterVariables(), originals);
props.putAll(originals);

if (Objects.nonNull(context.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY))) {
props.put(ProducerKeysHelper.VALUE_NAME_STRATEGY, context.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY));
if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY))) {
props.put(ProducerKeysHelper.VALUE_NAME_STRATEGY, JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY));
}
if (Objects.nonNull(context.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY))) {
props.put(ProducerKeysHelper.KEY_NAME_STRATEGY, context.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY));
if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY))) {
props.put(ProducerKeysHelper.KEY_NAME_STRATEGY, JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY));
}
}

Expand All @@ -255,8 +257,8 @@ public static Properties setupCommonConsumerProperties(final JavaSamplerContext
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));

setupConsumerDeserializerProperties(context, props);
setupConsumerSchemaRegistryProperties(context, props);
setupConsumerDeserializerProperties(props);
setupConsumerSchemaRegistryProperties(props);

props.put(ConsumerConfig.SEND_BUFFER_CONFIG, context.getParameter(ConsumerConfig.SEND_BUFFER_CONFIG));
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, context.getParameter(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
Expand Down Expand Up @@ -360,7 +362,11 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
props.put(ProducerKeysHelper.VALUE_NAME_STRATEGY, Objects.nonNull(valueNameStrategy) ? valueNameStrategy : ProducerKeysHelper.TOPIC_NAME_STRATEGY_CONFLUENT);
}

generator = getLoadGenerator();
if (ObjectUtils.isNotEmpty(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA_TYPE))) {
generator = getBaseLoadGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA_TYPE));
} else {
throw new KLoadGenException("Unsupported Serializer");
}

if (generator.getClass().equals(PlainTextLoadGenerator.class)) {
final List<FieldValueMapping> list = new ArrayList<>();
Expand Down Expand Up @@ -421,20 +427,10 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) {
props.put(keyNameStrategy, keyNameStrategyValue);
}

if (Objects.nonNull(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE))) {
if (JSON_TYPE_SET.contains(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE).toLowerCase())) {
generator = new JsonSRLoadGenerator();
} else if (jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE).equalsIgnoreCase("avro")) {
generator = new AvroSRLoadGenerator();
} else if (jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE).equalsIgnoreCase("Protobuf")) {
generator = new ProtobufLoadGenerator();
} else if (jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE).equalsIgnoreCase("NoSchema")) {
generator = new PlainTextLoadGenerator();
} else {
throw new KLoadGenException("Unsupported Serializer");
}
if (ObjectUtils.isNotEmpty(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE))) {
generator = getBaseLoadGenerator(jMeterVariables.get(PropsKeysHelper.KEY_SCHEMA_TYPE));
} else {
generator = new AvroSRLoadGenerator();
throw new KLoadGenException("Unsupported Serializer");
}

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down Expand Up @@ -466,6 +462,23 @@ public static BaseLoadGenerator configureKeyGenerator(final Properties props) {
return generator;
}

@NotNull
private static BaseLoadGenerator getBaseLoadGenerator(final String schemaType) {
final BaseLoadGenerator generator;
if (JSON_TYPE_SET.contains(schemaType.toLowerCase())) {
generator = new JsonSRLoadGenerator();
} else if ("avro".equalsIgnoreCase(schemaType)) {
generator = new AvroSRLoadGenerator();
} else if ("protobuf".equalsIgnoreCase(schemaType)) {
generator = new ProtobufLoadGenerator();
} else if ("noSchema".equalsIgnoreCase(schemaType)) {
generator = new PlainTextLoadGenerator();
} else {
throw new KLoadGenException("Unsupported Serializer");
}
return generator;
}

public static List<String> populateHeaders(final List<HeaderMapping> kafkaHeaders, final ProducerRecord<Object, Object> producerRecord) {
final List<String> headersSB = new ArrayList<>();
for (final HeaderMapping kafkaHeader : kafkaHeaders) {
Expand All @@ -476,27 +489,4 @@ public static List<String> populateHeaders(final List<HeaderMapping> kafkaHeader
return headersSB;
}

private static BaseLoadGenerator getLoadGenerator() {
final BaseLoadGenerator generator;
final String schemaType = JMeterContextService.getContext().getProperties().getProperty(PropsKeysHelper.VALUE_SCHEMA_TYPE);
if (Objects.nonNull(schemaType)) {
if (JSON_TYPE_SET.contains(schemaType.toLowerCase())) {
generator = new JsonSRLoadGenerator();
} else if ("avro".equalsIgnoreCase(schemaType)) {
generator = new AvroSRLoadGenerator();
} else if ("Protobuf".equalsIgnoreCase(schemaType)) {
generator = new ProtobufLoadGenerator();
} else if ("NoSchema".equalsIgnoreCase(schemaType)) {
generator = new PlainTextLoadGenerator();
} else {
throw new KLoadGenException("Unsupported Serializer");
}
} else {
generator = new AvroSRLoadGenerator();
}

return generator;
}


}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.sngular.kloadgen.schemaregistry.adapter.impl;

public interface GenericSchemaRegistryAdapter<T, U> {

T getId();

T getVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public JMeterVariables getVariablesAvro() throws IOException {
variables.put(PropsKeysHelper.VALUE_SUBJECT_NAME, "test");
variables.put(PropsKeysHelper.KEY_SUBJECT_NAME, "test");
variables.put(PropsKeysHelper.VALUE_SCHEMA, String.valueOf(parsedSchema));
variables.put(PropsKeysHelper.VALUE_SCHEMA_TYPE, "avro");
variables.put(PropsKeysHelper.KEY_SCHEMA, String.valueOf(parsedSchema));
variables.putObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES, Arrays.asList(
FieldValueMapping.builder().fieldName("subEntity.anotherLevel.subEntityIntArray[2]").fieldType("int-array").valueLength(0).fieldValueList("[1]").required(true)
Expand Down Expand Up @@ -126,6 +127,7 @@ public JMeterVariables getVariablesProtobuf() throws IOException {
variables.put(PropsKeysHelper.KEY_SCHEMA_TYPE, "protobuf");
variables.put(PropsKeysHelper.VALUE_SUBJECT_NAME, "protobufSubject");
variables.put(PropsKeysHelper.VALUE_SCHEMA, String.valueOf(parsedSchema));
variables.put(PropsKeysHelper.VALUE_SCHEMA_TYPE, "protobuf");
variables.put(PropsKeysHelper.KEY_SCHEMA, String.valueOf(parsedSchema));
variables.put(PropsKeysHelper.KEY_SUBJECT_NAME, "protobufSubject");
variables.putObject(PropsKeysHelper.KEY_SCHEMA_PROPERTIES, Arrays.asList(
Expand Down

0 comments on commit f0478f3

Please sign in to comment.