From 4540f78103b7dec3fb44db08782452cf7bf05292 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=20E=2E=20Garcia=20Maci=C3=B1eiras?= <68995937+jemacineiras@users.noreply.github.com> Date: Fri, 3 May 2024 11:28:22 +0200 Subject: [PATCH] 332 comsngularkloadgenserializerprotobufserializer nullpointerexception (#430) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * #332 Add Files Modified To Solve Exception Added a trim statement to avoid problem when cast the schemas from classpath * #332 Add get dependencies and build proto. * #332 Add Test And Documentation Removed some unused test files * #332 Add Topic Name Strategy * #332 Change image from kafka manager to newest version Apparently the original project was renamed into cmak to avoid break the TOS of kafka having the kafka in the name of unofficial tool * Get the schema name from the schema references of imported classes * #332 Removed generator set up by TopicNameStrategy * #332 refactor * #332 refactor * #332 refactor * #332 updated pom version * #332 fixes PR * #332 fixes PR * update pom * Merge with Master * #340 update dependencies * #332 Adding google proto types * #332 Fixing size serializer. * #332 Include proto files in package * #332 Fix styles and pom version * #332 Fix styles and pom version * #332 Fix styles and test * #332 Fix styles * #332 Try to process multi level protobuff dependencies * #332 Try to process multi level protobuff dependencies * #332 Fix jdk build version * #332 Fix checkstyle issues * #332 Fix Style * #332 Fix Google dependencies * #332 Fix Google dependencies * #332 Fix generator calculation * #332 Fix generator calculation * #332 Merge with master. * #332 Update POM. * #332 Fix Protobuf Apicurio Extractor. * #332 fix new version static error * #332 fix new version static error * #332 fix Generator creation * #332 fix Random Object generation * #332 fix Serializer generation * #332 fix generator creation * #332 add test * Update pom.xml version and developers * Update pom-maven-central.xml --------- Co-authored-by: Adrian Lagartera Co-authored-by: Alfredo González Fernández Co-authored-by: David Hernández González Co-authored-by: Jan Duinkerken Rodríguez --- pom-maven-central.xml | 142 +++++++++------- pom.xml | 98 ++++++----- .../protobuf/AbstractProtoFileExtractor.java | 3 +- .../protobuf/ProtoBufApicurioExtractor.java | 9 +- .../kloadgen/parsedschema/ParsedSchema.java | 2 +- .../randomtool/random/RandomObject.java | 130 +++++---------- .../sampler/KafkaProducerSampler.java | 108 +++++++++++-- .../sngular/kloadgen/sampler/SamplerUtil.java | 152 +++++++++--------- .../schemaregistry/SchemaRegistryFactory.java | 26 +++ .../impl/ApicurioSchemaRegistry.java | 7 +- .../impl/ConfluentSchemaRegistry.java | 26 +-- .../ProtobufApicurioExtractorTest.java | 45 ++++-- .../ProtobufSchemaProcessorTest.java | 10 +- .../proto-files/googleTypesTest.proto | 4 +- 14 files changed, 434 insertions(+), 328 deletions(-) create mode 100644 src/main/java/com/sngular/kloadgen/schemaregistry/SchemaRegistryFactory.java diff --git a/pom-maven-central.xml b/pom-maven-central.xml index 71ca6d5c..8429b63a 100644 --- a/pom-maven-central.xml +++ b/pom-maven-central.xml @@ -7,7 +7,7 @@ kloadgen - 5.6.10 + 5.6.13 KLoadGen Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial @@ -306,12 +306,55 @@ Roberto Riveira Veiga roberto.riveira@sngular.com Sngular + https://www.sngular.com + + Trainee Backend Developer + + + + GraciMndzSNG + Graciela Méndez Olmos + graciela.mendez@sngular.com + Sngular + https://sngular.github.io/ + + Backend Developer + + Europe/Madrid + + + pablorodriguez-sngular + Pablo Rodríguez Pérez + pablo.rodriguezp@sngular.com + Sngular https://sngular.github.io/ Backend Developer Europe/Madrid + + JanDuinRod + Jan Duinkerken Rodríguez + jan.duinkerken@sngular.com + Sngular + https://sngular.github.io + + Fullstack Developer + + Europe/Madrid + + + davidgayoso + David Gayoso Salvado + david.gayoso@sngular.com + Sngular + https://sngular.github.io + + Fullstack Developer + + Europe/Madrid + @@ -335,7 +378,7 @@ 3.24.2 1.2.0 1.5.1 - 1.11.2 + 1.11.3 1.9.4 4.4 3.12.0 @@ -344,20 +387,21 @@ 2.4.0-b180830.0359 17 provided - 5.6.2 - 2.6.0 - 5.8.2 - 7.1.1 - 2.4.3.Final - 3.5.1 + 5.6.3 + 2.9.0 + 5.10.1 + 7.5.1 + 2.5.8.Final + 3.6.1 1.18.22 0.9.5 4.2.0 UTF-8 4.5.0 - 2.0.0-alpha1 + 2.0.11 1.3.1 2.35.1 + 2.15.2 @@ -385,22 +429,21 @@ org.apache.avro avro ${avro.version} - - - com.fasterxml.jackson.core - jackson-core - - + + + org.apache.commons + commons-compress + 1.24.0 com.fasterxml.jackson.core jackson-core - 2.15.2 + ${jackson-core.version} com.fasterxml.jackson.core jackson-annotations - 2.15.2 + ${jackson-core.version} org.projectlombok @@ -413,16 +456,6 @@ commons-lang3 ${commons-lang3.version} - - io.confluent - kafka-schema-registry-client - ${kafka-schema-registry-client.version} - - - io.confluent - kafka-avro-serializer - ${kafka-schema-registry-client.version} - io.confluent kafka-json-serializer @@ -455,6 +488,16 @@ + + org.jetbrains.kotlin + kotlin-stdlib-common + 1.8.20 + + + org.jetbrains.kotlin + kotlin-stdlib + 1.8.20 + io.apicurio apicurio-registry-serdes-avro-serde @@ -478,7 +521,7 @@ org.apache.avro avro-protobuf - 1.11.2 + 1.11.3 com.github.os72 @@ -488,18 +531,18 @@ com.squareup.wire wire-java-generator - 4.0.1 + 4.9.0 runtime com.google.api.grpc proto-google-common-protos - 2.24.0 + 2.30.0 com.google.protobuf protobuf-java - 3.24.3 + 3.25.1 org.slf4j @@ -520,7 +563,7 @@ com.github.curious-odd-man rgxgen - 1.3 + 1.4 org.reflections @@ -536,7 +579,7 @@ com.github.charithe kafka-junit - 4.2.0 + 4.2.1 test @@ -669,7 +712,7 @@ org.apache.maven.plugins maven-checkstyle-plugin - 3.3.0 + 3.2.0 com.puppycrawl.tools @@ -686,6 +729,7 @@ maven-checkstyle-plugin checkstyle.xml + UTF-8 true true false @@ -716,7 +760,7 @@ org.apache.maven.plugins maven-compiler-plugin - 3.9.0 + 3.10.0 true true @@ -725,32 +769,6 @@ ${jdk.version} - - org.apache.maven.plugins - maven-source-plugin - 3.3.0 - - - attach-sources - - jar - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - 3.4.0 - - - attach-javadocs - - jar - - - - org.eluder.coveralls coveralls-maven-plugin @@ -781,7 +799,7 @@ org.apache.maven.plugins maven-surefire-plugin - 3.1.2 + 3.2.2 org.sonatype.plugins diff --git a/pom.xml b/pom.xml index 1bd1946a..7a0ea86c 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ kloadgen - 5.6.12 + 5.6.13 KLoadGen Load Generation Jmeter plugin for Kafka Cluster. Supporting AVRO, JSON Schema and Protobuf schema types. Generate Artificial @@ -20,7 +20,7 @@ Mozilla Public License 2.0 https://github.com/sngular/kloadgen/blob/master/LICENSE repo - + @@ -333,6 +333,28 @@ Europe/Madrid + + JanDuinRod + Jan Duinkerken Rodríguez + jan.duinkerken@sngular.com + Sngular + https://sngular.github.io + + Fullstack Developer + + Europe/Madrid + + + davidgayoso + David Gayoso Salvado + david.gayoso@sngular.com + Sngular + https://sngular.github.io + + Fullstack Developer + + Europe/Madrid + @@ -365,20 +387,21 @@ 2.4.0-b180830.0359 17 provided - 5.6.2 + 5.6.3 2.9.0 - 5.8.2 - 7.1.1 - 2.4.3.Final - 3.5.1 + 5.10.1 + 7.5.1 + 2.5.8.Final + 3.6.1 1.18.22 0.9.5 4.2.0 UTF-8 4.5.0 - 2.0.0-alpha1 + 2.0.11 1.3.1 2.35.1 + 2.15.2 @@ -406,22 +429,21 @@ org.apache.avro avro ${avro.version} - - - com.fasterxml.jackson.core - jackson-core - - + + + org.apache.commons + commons-compress + 1.24.0 com.fasterxml.jackson.core jackson-core - 2.15.2 + ${jackson-core.version} com.fasterxml.jackson.core jackson-annotations - 2.15.2 + ${jackson-core.version} org.projectlombok @@ -434,16 +456,6 @@ commons-lang3 ${commons-lang3.version} - - io.confluent - kafka-schema-registry-client - ${kafka-schema-registry-client.version} - - - io.confluent - kafka-avro-serializer - ${kafka-schema-registry-client.version} - io.confluent kafka-json-serializer @@ -476,6 +488,16 @@ + + org.jetbrains.kotlin + kotlin-stdlib-common + 1.8.20 + + + org.jetbrains.kotlin + kotlin-stdlib + 1.8.20 + io.apicurio apicurio-registry-serdes-avro-serde @@ -499,7 +521,7 @@ org.apache.avro avro-protobuf - 1.11.2 + 1.11.3 com.github.os72 @@ -509,18 +531,18 @@ com.squareup.wire wire-java-generator - 4.0.1 + 4.9.0 runtime com.google.api.grpc proto-google-common-protos - 2.24.0 + 2.30.0 com.google.protobuf protobuf-java - 3.24.3 + 3.25.1 org.slf4j @@ -541,7 +563,7 @@ com.github.curious-odd-man rgxgen - 1.3 + 1.4 org.reflections @@ -557,7 +579,7 @@ com.github.charithe kafka-junit - 4.2.0 + 4.2.1 test @@ -690,7 +712,7 @@ org.apache.maven.plugins maven-checkstyle-plugin - 3.1.2 + 3.2.0 com.puppycrawl.tools @@ -725,7 +747,7 @@ com.github.ekryd.sortpom sortpom-maven-plugin - 3.0.0 + 3.3.0 @@ -738,7 +760,7 @@ org.apache.maven.plugins maven-compiler-plugin - 3.8.1 + 3.10.0 true true @@ -757,7 +779,7 @@ cobertura-maven-plugin 2.7 - + xml @@ -777,7 +799,7 @@ org.apache.maven.plugins maven-surefire-plugin - 3.1.2 + 3.2.2 @@ -828,7 +850,7 @@ org.apache.maven.plugins maven-shade-plugin - 3.2.4 + 3.3.0 diff --git a/src/main/java/com/sngular/kloadgen/extractor/extractors/protobuf/AbstractProtoFileExtractor.java b/src/main/java/com/sngular/kloadgen/extractor/extractors/protobuf/AbstractProtoFileExtractor.java index cde204b7..22f23c09 100644 --- a/src/main/java/com/sngular/kloadgen/extractor/extractors/protobuf/AbstractProtoFileExtractor.java +++ b/src/main/java/com/sngular/kloadgen/extractor/extractors/protobuf/AbstractProtoFileExtractor.java @@ -118,7 +118,8 @@ private static void extractOneOfs(final MessageElement field, final List { +public class ProtoBufApicurioExtractor extends AbstractProtoFileExtractor implements Extractor { - public final List processSchema(final ProtoFileElement schemaReceived) { - return processSchemaDefault(schemaReceived); + public final List processSchema(final ParsedSchema schemaReceived) { + return processSchemaDefault(((ProtobufSchema) schemaReceived.schema()).getProtoFileElement()); } public final List getSchemaNameList(final String schema) { diff --git a/src/main/java/com/sngular/kloadgen/parsedschema/ParsedSchema.java b/src/main/java/com/sngular/kloadgen/parsedschema/ParsedSchema.java index 43aec860..ac5404a3 100644 --- a/src/main/java/com/sngular/kloadgen/parsedschema/ParsedSchema.java +++ b/src/main/java/com/sngular/kloadgen/parsedschema/ParsedSchema.java @@ -45,7 +45,7 @@ public ParsedSchema(final T schema) { this.schema = schema; this.schemaType = switch (this.schema.getClass().getSimpleName()) { case "Schema", "AvroSchema", "UnionSchema", "RecordSchema" -> "AVRO"; - case "ProtoBuf", "ProtoFileElement" -> "PROTOBUF"; + case "ProtoBuf", "ProtobufSchema", "ProtoFileElement" -> "PROTOBUF"; case "JsonSchema", "ObjectSchema" -> "JSON"; default -> throw new KLoadGenException(String.format("Need to specify schemaType for %s", this.schema.getClass().getSimpleName())); }; diff --git a/src/main/java/com/sngular/kloadgen/randomtool/random/RandomObject.java b/src/main/java/com/sngular/kloadgen/randomtool/random/RandomObject.java index 0a959b62..e5797d4e 100644 --- a/src/main/java/com/sngular/kloadgen/randomtool/random/RandomObject.java +++ b/src/main/java/com/sngular/kloadgen/randomtool/random/RandomObject.java @@ -42,111 +42,67 @@ public boolean isTypeValid(final String type) { public Object generateRandom( final String fieldType, final Integer valueLength, final List fieldValueList, final Map constraints) { - Object value; - switch (fieldType.toLowerCase()) { - case ValidTypeConstants.STRING: - value = getStringValueOrRandom(valueLength, fieldValueList, constraints); - break; - case ValidTypeConstants.INT: + final String fixFieldType = StringUtils.defaultString(fieldType, "string"); + return switch (fixFieldType.toLowerCase()) { + case ValidTypeConstants.STRING -> getStringValueOrRandom(valueLength, fieldValueList, constraints); + case ValidTypeConstants.INT -> { try { - value = getIntegerValueOrRandom(valueLength, fieldValueList, constraints).intValueExact(); + yield getIntegerValueOrRandom(valueLength, fieldValueList, constraints).intValueExact(); } catch (final ArithmeticException exception) { - value = Integer.MAX_VALUE; + yield Integer.MAX_VALUE; } - break; - case ValidTypeConstants.LONG: + } + case ValidTypeConstants.LONG -> { try { - value = getIntegerValueOrRandom(valueLength, fieldValueList, constraints).longValueExact(); + yield getIntegerValueOrRandom(valueLength, fieldValueList, constraints).longValueExact(); } catch (final ArithmeticException exception) { - value = Long.MAX_VALUE; + yield Long.MAX_VALUE; } - break; - case ValidTypeConstants.SHORT: + } + case ValidTypeConstants.SHORT -> { try { - value = getIntegerValueOrRandom(valueLength, fieldValueList, constraints).shortValueExact(); + yield getIntegerValueOrRandom(valueLength, fieldValueList, constraints).shortValueExact(); } catch (final ArithmeticException exception) { - value = Short.MAX_VALUE; + yield Short.MAX_VALUE; } - break; - case ValidTypeConstants.DOUBLE: + } + case ValidTypeConstants.DOUBLE -> { try { - value = getDecimalValueOrRandom(valueLength, fieldValueList, constraints).doubleValue(); + yield getDecimalValueOrRandom(valueLength, fieldValueList, constraints).doubleValue(); } catch (final ArithmeticException exception) { - value = Double.MAX_VALUE; + yield Double.MAX_VALUE; } - break; - case ValidTypeConstants.NUMBER: - case ValidTypeConstants.FLOAT: + } + case ValidTypeConstants.NUMBER, ValidTypeConstants.FLOAT -> { try { - value = getDecimalValueOrRandom(valueLength, fieldValueList, constraints).floatValue(); + yield getDecimalValueOrRandom(valueLength, fieldValueList, constraints).floatValue(); } catch (final ArithmeticException exception) { - value = Float.MAX_VALUE; + yield Float.MAX_VALUE; } - break; - case ValidTypeConstants.BYTES: + } + case ValidTypeConstants.BYTES -> { try { - value = getIntegerValueOrRandom(valueLength, Collections.emptyList(), Collections.emptyMap()).byteValueExact(); + yield getIntegerValueOrRandom(valueLength, Collections.emptyList(), Collections.emptyMap()).byteValueExact(); } catch (final ArithmeticException exception) { - value = Byte.MAX_VALUE; + yield Byte.MAX_VALUE; } - break; - case ValidTypeConstants.TIMESTAMP: - case ValidTypeConstants.LONG_TIMESTAMP: - case ValidTypeConstants.STRING_TIMESTAMP: - value = getTimestampValueOrRandom(fieldType, fieldValueList); - break; - case ValidTypeConstants.BOOLEAN: - value = getBooleanValueOrRandom(fieldValueList); - break; - case ValidTypeConstants.ENUM: - value = getEnumValueOrRandom(fieldValueList); - break; - case ValidTypeConstants.INT_DATE: - value = getDateValueOrRandom(fieldValueList); - break; - case ValidTypeConstants.INT_TIME_MILLIS: - value = getTimeMillisValueOrRandom(fieldValueList); - break; - case ValidTypeConstants.LONG_TIME_MICROS: - value = getTimeMicrosValueOrRandom(fieldValueList); - break; - case ValidTypeConstants.LONG_TIMESTAMP_MILLIS: - value = getTimestampMillisValueOrRandom(fieldValueList); - break; - case ValidTypeConstants.LONG_TIMESTAMP_MICROS: - value = getTimestampMicrosValueOrRandom(fieldValueList); - break; - case ValidTypeConstants.LONG_LOCAL_TIMESTAMP_MILLIS: - value = getLocalTimestampMillisValueOrRandom(fieldValueList); - break; - case ValidTypeConstants.LONG_LOCAL_TIMESTAMP_MICROS: - value = getLocalTimestampMicrosValueOrRandom(fieldValueList); - break; - case ValidTypeConstants.UUID: - case ValidTypeConstants.STRING_UUID: - value = getUUIDValueOrRandom(fieldValueList); - break; - case ValidTypeConstants.BYTES_DECIMAL: - case ValidTypeConstants.FIXED_DECIMAL: - value = getDecimalValueOrRandom(fieldValueList, constraints); - break; - case ValidTypeConstants.INT_YEAR: - case ValidTypeConstants.INT_MONTH: - case ValidTypeConstants.INT_DAY: - value = getDateValueOrRandom(fieldType, fieldValueList); - break; - case ValidTypeConstants.INT_HOURS: - case ValidTypeConstants.INT_MINUTES: - case ValidTypeConstants.INT_SECONDS: - case ValidTypeConstants.INT_NANOS: - value = getTimeOfDayValueOrRandom(fieldType, fieldValueList); - break; - default: - value = fieldType; - break; - } - - return value; + } + case ValidTypeConstants.TIMESTAMP, ValidTypeConstants.LONG_TIMESTAMP, ValidTypeConstants.STRING_TIMESTAMP -> getTimestampValueOrRandom(fieldType, fieldValueList); + case ValidTypeConstants.BOOLEAN -> getBooleanValueOrRandom(fieldValueList); + case ValidTypeConstants.ENUM -> getEnumValueOrRandom(fieldValueList); + case ValidTypeConstants.INT_DATE -> getDateValueOrRandom(fieldValueList); + case ValidTypeConstants.INT_TIME_MILLIS -> getTimeMillisValueOrRandom(fieldValueList); + case ValidTypeConstants.LONG_TIME_MICROS -> getTimeMicrosValueOrRandom(fieldValueList); + case ValidTypeConstants.LONG_TIMESTAMP_MILLIS -> getTimestampMillisValueOrRandom(fieldValueList); + case ValidTypeConstants.LONG_TIMESTAMP_MICROS -> getTimestampMicrosValueOrRandom(fieldValueList); + case ValidTypeConstants.LONG_LOCAL_TIMESTAMP_MILLIS -> getLocalTimestampMillisValueOrRandom(fieldValueList); + case ValidTypeConstants.LONG_LOCAL_TIMESTAMP_MICROS -> getLocalTimestampMicrosValueOrRandom(fieldValueList); + case ValidTypeConstants.UUID, ValidTypeConstants.STRING_UUID -> getUUIDValueOrRandom(fieldValueList); + case ValidTypeConstants.BYTES_DECIMAL, ValidTypeConstants.FIXED_DECIMAL -> getDecimalValueOrRandom(fieldValueList, constraints); + case ValidTypeConstants.INT_YEAR, ValidTypeConstants.INT_MONTH, ValidTypeConstants.INT_DAY -> getDateValueOrRandom(fieldType, fieldValueList); + case ValidTypeConstants.INT_HOURS, ValidTypeConstants.INT_MINUTES, ValidTypeConstants.INT_SECONDS, ValidTypeConstants.INT_NANOS -> getTimeOfDayValueOrRandom(fieldType, fieldValueList); + default -> fieldType; + }; } private BigInteger getIntegerValueOrRandom(final Integer valueLength, final List fieldValueList, final Map constraints) { diff --git a/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java b/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java index fdbd905e..7ee3bae3 100644 --- a/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java +++ b/src/main/java/com/sngular/kloadgen/sampler/KafkaProducerSampler.java @@ -11,21 +11,27 @@ import java.lang.reflect.InvocationTargetException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutionException; +import com.sngular.kloadgen.common.SchemaRegistryEnum; import com.sngular.kloadgen.exception.KLoadGenException; import com.sngular.kloadgen.loadgen.BaseLoadGenerator; import com.sngular.kloadgen.model.HeaderMapping; import com.sngular.kloadgen.randomtool.generator.StatelessGeneratorTool; +import com.sngular.kloadgen.schemaregistry.SchemaRegistryFactory; import com.sngular.kloadgen.serializer.EnrichedRecord; import com.sngular.kloadgen.util.ProducerKeysHelper; import com.sngular.kloadgen.util.PropsKeysHelper; +import com.sngular.kloadgen.util.SchemaRegistryKeyHelper; +import io.apicurio.registry.resolver.SchemaResolverConfig; +import io.apicurio.registry.rest.client.RegistryClient; import io.apicurio.registry.serde.Legacy4ByteIdHandler; import io.apicurio.registry.serde.SerdeConfig; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import org.apache.commons.lang3.StringUtils; import org.apache.jmeter.config.Arguments; import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient; @@ -33,12 +39,14 @@ import org.apache.jmeter.samplers.SampleResult; import org.apache.jmeter.threads.JMeterContext; import org.apache.jmeter.threads.JMeterContextService; +import org.apache.jmeter.threads.JMeterVariables; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.serialization.Serializer; +import org.jetbrains.annotations.NotNull; public final class KafkaProducerSampler extends AbstractJavaSamplerClient implements Serializable { @@ -68,22 +76,17 @@ public final class KafkaProducerSampler extends AbstractJavaSamplerClient implem @Override public void setupTest(final JavaSamplerContext context) { props = JMeterContextService.getContext().getProperties(); + final var vars = JMeterContextService.getContext().getVariables(); + generator = SamplerUtil.configureValueGenerator(props); - if (context.getJMeterVariables().get(PropsKeysHelper.VALUE_SCHEMA) == null) { - generator = SamplerUtil.configureKeyGenerator(props); - } else { - generator = SamplerUtil.configureValueGenerator(props); - } - - if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY)) - || "true".equals(context.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) { + if ("true".equals(vars.get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY)) + || "true".equals(vars.get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) { keyMessageFlag = true; - if (!Objects.isNull(JMeterContextService.getContext().getVariables().get(PropsKeysHelper.KEY_SUBJECT_NAME))) { + if (!Objects.isNull(vars.get(PropsKeysHelper.KEY_SUBJECT_NAME))) { keyGenerator = SamplerUtil.configureKeyGenerator(props); } else { - msgKeyType = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE); - msgKeyValue = PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE)) - ? Collections.emptyList() : Collections.singletonList(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE)); + msgKeyType = getMsgKeyType(props, vars); + msgKeyValue = getMsgKeyValue(props, vars); } } else { props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT); @@ -102,11 +105,12 @@ public void setupTest(final JavaSamplerContext context) { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); props.put(ProducerConfig.CLIENT_ID_CONFIG, context.getParameter(ProducerConfig.CLIENT_ID_CONFIG)); props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all"); - props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, calculateKeyProperty(props, vars)); props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT); - producer = new KafkaProducer<>(props, (Serializer) Class.forName((String) props.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance(), - (Serializer) Class.forName((String) props.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).getConstructor().newInstance()); + producer = new KafkaProducer<>(props, + getSerializerInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, props, context), + getSerializerInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, props, context)); } catch (final KafkaException | ClassNotFoundException ex) { getNewLogger().error(ex.getMessage(), ex); } catch (InvocationTargetException | NoSuchMethodException | InstantiationException | IllegalAccessException e) { @@ -114,6 +118,75 @@ public void setupTest(final JavaSamplerContext context) { } } + @NotNull + private Serializer getSerializerInstance(final String keySerializerClassConfig, final Properties props, final JavaSamplerContext context) + throws InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException, ClassNotFoundException { + final String url = props.getProperty(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL); + final Map properties = SamplerUtil.setupSchemaRegistryAuthenticationProperties(context.getJMeterContext().getVariables()); + + Serializer serializer; + if (props.getProperty(keySerializerClassConfig).contains("apicurio")) { + properties.putAll(getStrategyInfo(SchemaRegistryEnum.APICURIO, props)); + serializer = (Serializer) createInstance(Class.forName(props.getProperty(keySerializerClassConfig)), SchemaRegistryEnum.APICURIO, url, properties); + } else if (props.getProperty(keySerializerClassConfig).contains("confluent")) { + properties.putAll(getStrategyInfo(SchemaRegistryEnum.CONFLUENT, props)); + serializer = (Serializer) createInstance(Class.forName(props.getProperty(keySerializerClassConfig)), SchemaRegistryEnum.CONFLUENT, url, properties); + } else { + serializer = (Serializer) Class.forName(props.getProperty(keySerializerClassConfig)).getConstructor().newInstance(); + } + return serializer; + } + + private Map getStrategyInfo(final SchemaRegistryEnum schemaRegistryEnum, final Properties props) { + + return switch (schemaRegistryEnum) { + case APICURIO -> Map.of(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY, props.get(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY), + "reference.subject.name.strategy", props.get(SchemaResolverConfig.ARTIFACT_RESOLVER_STRATEGY)); + case CONFLUENT -> Map.of(ProducerKeysHelper.VALUE_NAME_STRATEGY, props.get(ProducerKeysHelper.VALUE_NAME_STRATEGY), + "reference.subject.name.strategy", props.get(ProducerKeysHelper.VALUE_NAME_STRATEGY)); + }; + } + + private Object createInstance(final Class classToGenerate, final SchemaRegistryEnum schemaRegistryEnum, final String url, final Map properties) + throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { + return switch (schemaRegistryEnum) { + case APICURIO -> classToGenerate.getConstructor(RegistryClient.class) + .newInstance(SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.APICURIO, url, properties)); + case CONFLUENT -> classToGenerate.getConstructor(SchemaRegistryClient.class, Map.class) + .newInstance(SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.CONFLUENT, url, properties), properties); + }; + } + + private String calculateKeyProperty(final Properties props, final JMeterVariables vars) { + String result = vars.get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY); + if (Objects.isNull(result)) { + result = props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerKeysHelper.KEY_SERIALIZER_CLASS_CONFIG_DEFAULT); + } + return result; + } + + private String getMsgKeyType(final Properties props, final JMeterVariables vars) { + String result = props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE, null); + if (Objects.isNull(result)) { + result = vars.get(PropsKeysHelper.KEY_TYPE); + } + return result; + } + + @NotNull + private List getMsgKeyValue(final Properties props, final JMeterVariables vars) { + + final List result = new ArrayList<>(); + + if (PropsKeysHelper.MSG_KEY_VALUE.equalsIgnoreCase(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE)) + || Objects.nonNull(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE))) { + result.add(props.getProperty(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE)); + } else if (Objects.nonNull(vars.get(PropsKeysHelper.KEY_VALUE))) { + result.add(vars.get(PropsKeysHelper.KEY_VALUE)); + } + return result; + } + @Override public void teardownTest(final JavaSamplerContext context) { if (Objects.nonNull(producer)) { @@ -132,6 +205,9 @@ public SampleResult runTest(final JavaSamplerContext javaSamplerContext) { final var sampleResult = new SampleResult(); sampleResult.sampleStart(); final var jMeterContext = JMeterContextService.getContext(); + if (Objects.isNull(generator)) { + throw new KLoadGenException("Error initializing Generator"); + } final var messageVal = generator.nextMessage(); final var kafkaHeaders = safeGetKafkaHeaders(jMeterContext); diff --git a/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java b/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java index 4c25c2d8..1065b603 100644 --- a/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java +++ b/src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java @@ -33,7 +33,6 @@ import com.sngular.kloadgen.util.PropsKeysHelper; import com.sngular.kloadgen.util.SchemaRegistryKeyHelper; import io.apicurio.registry.resolver.SchemaResolverConfig; -import io.apicurio.registry.serde.SerdeConfig; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import org.apache.avro.SchemaParseException; import org.apache.commons.lang3.ObjectUtils; @@ -50,9 +49,13 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class SamplerUtil { + private static final Logger LOG = LoggerFactory.getLogger(SamplerUtil.class); + private static final StatelessGeneratorTool STATELESS_GENERATOR_TOOL = new StatelessGeneratorTool(); private static final Set JSON_TYPE_SET = Set.of("json-schema", "json"); @@ -100,62 +103,64 @@ public static Arguments getCommonDefaultParameters() { return defaultParameters; } - public static Properties setupCommonProperties(final JavaSamplerContext context) { - final Properties props = new Properties(); - - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - if ("true".equals(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))) { - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY)); - } else if ("true".equals(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) { - props.put(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_TYPE)); - props.put(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_VALUE)); - if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA_TYPE))) { - props.put(PropsKeysHelper.KEY_SCHEMA_TYPE, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA_TYPE)); + /* public static Properties setupCommonProperties(final JavaSamplerContext context) { + final Properties props = new Properties(); + + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))) { + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY)); + } else if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) { + props.put(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE, context.getJMeterVariables().get(PropsKeysHelper.KEY_TYPE)); + props.put(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE, context.getJMeterVariables().get(PropsKeysHelper.KEY_VALUE)); + if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA_TYPE))) { + props.put(PropsKeysHelper.KEY_SCHEMA_TYPE, context.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA_TYPE)); + } + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY)); + } else { + props.put(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY, Boolean.FALSE); } - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY)); - } else { - props.put(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY, Boolean.FALSE); - } - - if (Objects.nonNull(context.getParameter(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))) { - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, context.getParameter(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); - } - - props.put(ProducerConfig.ACKS_CONFIG, context.getParameter(ProducerConfig.ACKS_CONFIG)); - props.put(ProducerConfig.SEND_BUFFER_CONFIG, context.getParameter(ProducerConfig.SEND_BUFFER_CONFIG)); - props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, context.getParameter(ProducerConfig.RECEIVE_BUFFER_CONFIG)); - props.put(ProducerConfig.BATCH_SIZE_CONFIG, context.getParameter(ProducerConfig.BATCH_SIZE_CONFIG)); - props.put(ProducerConfig.LINGER_MS_CONFIG, context.getParameter(ProducerConfig.LINGER_MS_CONFIG)); - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, context.getParameter(ProducerConfig.BUFFER_MEMORY_CONFIG)); - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, context.getParameter(ProducerConfig.COMPRESSION_TYPE_CONFIG)); - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, context.getParameter(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); - props.put(ProducerKeysHelper.SASL_MECHANISM, context.getParameter(ProducerKeysHelper.SASL_MECHANISM)); - final String schemaRegistryNameValue = JavaSamplerContext.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME); - final String enableSchemaRegistrationValue = context.getParameter(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG); - if (SchemaRegistryKeyHelper.SCHEMA_REGISTRY_APICURIO.equalsIgnoreCase(schemaRegistryNameValue)) { - props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, enableSchemaRegistrationValue); - props.put(SchemaResolverConfig.REGISTRY_URL, JavaSamplerContext.getJMeterVariables().get(SchemaResolverConfig.REGISTRY_URL)); - props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, JavaSamplerContext.getJMeterVariables().get(SchemaResolverConfig.REGISTRY_URL)); - } else { - props.put(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG, enableSchemaRegistrationValue); - final String schemaRegistryURL = JavaSamplerContext.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL); - if (StringUtils.isNotBlank(schemaRegistryURL)) { - props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, schemaRegistryURL); + if (Objects.nonNull(context.getParameter(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))) { + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, context.getParameter(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); } - } - final Iterator parameters = context.getParameterNamesIterator(); - parameters.forEachRemaining(parameter -> { - if (parameter.startsWith("_")) { - props.put(parameter.substring(1), context.getParameter(parameter)); + props.put(ProducerConfig.ACKS_CONFIG, context.getParameter(ProducerConfig.ACKS_CONFIG)); + props.put(ProducerConfig.SEND_BUFFER_CONFIG, context.getParameter(ProducerConfig.SEND_BUFFER_CONFIG)); + props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, context.getParameter(ProducerConfig.RECEIVE_BUFFER_CONFIG)); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, context.getParameter(ProducerConfig.BATCH_SIZE_CONFIG)); + props.put(ProducerConfig.LINGER_MS_CONFIG, context.getParameter(ProducerConfig.LINGER_MS_CONFIG)); + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, context.getParameter(ProducerConfig.BUFFER_MEMORY_CONFIG)); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, context.getParameter(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, context.getParameter(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); + props.put(ProducerKeysHelper.SASL_MECHANISM, context.getParameter(ProducerKeysHelper.SASL_MECHANISM)); + + final String schemaRegistryNameValue = context.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME); + final String enableSchemaRegistrationValue = context.getParameter(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG); + if (SchemaRegistryKeyHelper.SCHEMA_REGISTRY_APICURIO.equalsIgnoreCase(schemaRegistryNameValue)) { + props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, enableSchemaRegistrationValue); + props.put(SchemaResolverConfig.REGISTRY_URL, context.getJMeterVariables().get(SchemaResolverConfig.REGISTRY_URL)); + props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, context.getJMeterVariables().get(SchemaResolverConfig.REGISTRY_URL)); + } else { + props.put(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG, enableSchemaRegistrationValue); + final String schemaRegistryURL = context.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL); + if (StringUtils.isNotBlank(schemaRegistryURL)) { + props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, schemaRegistryURL); + } } - }); - verifySecurity(context, props); + final Iterator parameters = context.getParameterNamesIterator(); + parameters.forEachRemaining(parameter -> { + if (parameter.startsWith("_")) { + props.put(parameter.substring(1), context.getParameter(parameter)); + } + }); - return props; - } + verifySecurity(context, props); + + return props; + } + + */ private static String propertyOrDefault(final String property, final String defaultToken, final String valueToSent) { return defaultToken.equals(property) ? valueToSent : property; @@ -208,33 +213,33 @@ public static Arguments getCommonConsumerDefaultParameters() { return defaultParameters; } - public static void setupConsumerDeserializerProperties(final Properties props) { - if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY))) { - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY)); + public static void setupConsumerDeserializerProperties(final Properties props, final JavaSamplerContext context) { + if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY))) { + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.KEY_DESERIALIZER_CLASS_PROPERTY)); } else { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); } - if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY))) { - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY)); + if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY))) { + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.VALUE_DESERIALIZER_CLASS_PROPERTY)); } else { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); } } - public static void setupConsumerSchemaRegistryProperties(final Properties props) { - final Map originals = new HashMap<>(); - setupSchemaRegistryAuthenticationProperties(JavaSamplerContext.getJMeterVariables(), originals); + public static void setupConsumerSchemaRegistryProperties(final Properties props, final JavaSamplerContext context) { + final Map originals = setupSchemaRegistryAuthenticationProperties(context.getJMeterVariables()); props.putAll(originals); - if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY))) { - props.put(ProducerKeysHelper.VALUE_NAME_STRATEGY, JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY)); + if (Objects.nonNull(context.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY))) { + props.put(ProducerKeysHelper.VALUE_NAME_STRATEGY, context.getJMeterVariables().get(ProducerKeysHelper.VALUE_NAME_STRATEGY)); } - if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY))) { - props.put(ProducerKeysHelper.KEY_NAME_STRATEGY, JavaSamplerContext.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY)); + if (Objects.nonNull(context.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY))) { + props.put(ProducerKeysHelper.KEY_NAME_STRATEGY, context.getJMeterVariables().get(ProducerKeysHelper.KEY_NAME_STRATEGY)); } } - private static void setupSchemaRegistryAuthenticationProperties(final JMeterVariables context, final Map props) { + static Map setupSchemaRegistryAuthenticationProperties(final JMeterVariables context) { + final Map props = new HashMap<>(); if (Objects.nonNull(context.get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME))) { final SchemaRegistryAdapter schemaRegistryManager = SchemaRegistryManagerFactory.getSchemaRegistry(context.get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME)); @@ -251,14 +256,15 @@ private static void setupSchemaRegistryAuthenticationProperties(final JMeterVari } } } + return props; } public static Properties setupCommonConsumerProperties(final JavaSamplerContext context) { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - setupConsumerDeserializerProperties(props); - setupConsumerSchemaRegistryProperties(props); + setupConsumerDeserializerProperties(props, context); + setupConsumerSchemaRegistryProperties(props, context); props.put(ConsumerConfig.SEND_BUFFER_CONFIG, context.getParameter(ConsumerConfig.SEND_BUFFER_CONFIG)); props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, context.getParameter(ConsumerConfig.RECEIVE_BUFFER_CONFIG)); @@ -270,14 +276,14 @@ public static Properties setupCommonConsumerProperties(final JavaSamplerContext props.put(CommonClientConfigs.CLIENT_ID_CONFIG, context.getParameter(CommonClientConfigs.CLIENT_ID_CONFIG)); - if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_SCHEMA))) { - props.put(PropsKeysHelper.VALUE_SCHEMA, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.VALUE_SCHEMA)); + if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.VALUE_SCHEMA))) { + props.put(PropsKeysHelper.VALUE_SCHEMA, context.getJMeterVariables().get(PropsKeysHelper.VALUE_SCHEMA)); } - if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA))) { - props.put(PropsKeysHelper.KEY_SCHEMA, JavaSamplerContext.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA)); + if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA))) { + props.put(PropsKeysHelper.KEY_SCHEMA, context.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA)); } - if (Objects.nonNull(JavaSamplerContext.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL))) { - props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, JavaSamplerContext.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL)); + if (Objects.nonNull(context.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL))) { + props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, context.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL)); } props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, context.getParameter(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); @@ -378,8 +384,7 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props) Objects.requireNonNullElse(jMeterVariables.get(PropsKeysHelper.VALUE_SERIALIZER_CLASS_PROPERTY), ProducerKeysHelper.VALUE_SERIALIZER_CLASS_CONFIG_DEFAULT)); if (Objects.nonNull(jMeterVariables.get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME))) { - final Map originals = new HashMap<>(); - setupSchemaRegistryAuthenticationProperties(jMeterVariables, originals); + final Map originals = setupSchemaRegistryAuthenticationProperties(jMeterVariables); props.putAll(originals); @@ -390,6 +395,7 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props) if (Objects.nonNull(props.get(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG))) { generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), (List) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES)); } else { + LOG.error(exc.getMessage(), exc); throw exc; } } diff --git a/src/main/java/com/sngular/kloadgen/schemaregistry/SchemaRegistryFactory.java b/src/main/java/com/sngular/kloadgen/schemaregistry/SchemaRegistryFactory.java new file mode 100644 index 00000000..bdc55802 --- /dev/null +++ b/src/main/java/com/sngular/kloadgen/schemaregistry/SchemaRegistryFactory.java @@ -0,0 +1,26 @@ +package com.sngular.kloadgen.schemaregistry; + +import java.util.List; +import java.util.Map; + +import com.sngular.kloadgen.common.SchemaRegistryEnum; +import com.sngular.kloadgen.util.JMeterHelper; +import io.apicurio.registry.rest.client.RegistryClientFactory; +import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; + +public final class SchemaRegistryFactory { + + private SchemaRegistryFactory() { + } + + public static Object getSchemaRegistryClient(final SchemaRegistryEnum typeEnum, final String url, final Map properties) { + return switch (typeEnum) { + case APICURIO -> RegistryClientFactory.create(url); + case CONFLUENT -> new CachedSchemaRegistryClient(List.of(JMeterHelper.checkPropertyOrVariable(url)), 1000, + List.of(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()), properties); + }; + } +} diff --git a/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ApicurioSchemaRegistry.java b/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ApicurioSchemaRegistry.java index ad990e80..26250864 100644 --- a/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ApicurioSchemaRegistry.java +++ b/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ApicurioSchemaRegistry.java @@ -10,9 +10,11 @@ import java.util.Objects; import com.google.protobuf.Message; +import com.sngular.kloadgen.common.SchemaRegistryEnum; import com.sngular.kloadgen.common.SchemaTypeEnum; import com.sngular.kloadgen.exception.KLoadGenException; import com.sngular.kloadgen.schemaregistry.SchemaRegistryAdapter; +import com.sngular.kloadgen.schemaregistry.SchemaRegistryFactory; import com.sngular.kloadgen.schemaregistry.adapter.impl.ApicurioAbstractParsedSchemaMetadata; import com.sngular.kloadgen.schemaregistry.adapter.impl.ApicurioSchemaMetadata; import com.sngular.kloadgen.schemaregistry.adapter.impl.BaseParsedSchema; @@ -20,7 +22,6 @@ import com.sngular.kloadgen.schemaregistry.adapter.impl.SchemaMetadataAdapter; import io.apicurio.registry.resolver.SchemaParser; import io.apicurio.registry.rest.client.RegistryClient; -import io.apicurio.registry.rest.client.RegistryClientFactory; import io.apicurio.registry.rest.client.exception.RestClientException; import io.apicurio.registry.rest.v2.beans.ArtifactMetaData; import io.apicurio.registry.rest.v2.beans.SearchedArtifact; @@ -43,13 +44,13 @@ public String getSchemaRegistryUrlKey() { @Override public void setSchemaRegistryClient(final String url, final Map properties) { - this.schemaRegistryClient = RegistryClientFactory.create(url); + this.schemaRegistryClient = (RegistryClient) SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.APICURIO, url, properties); } @Override public void setSchemaRegistryClient(final Map properties) { final String url = Objects.toString(properties.get(this.getSchemaRegistryUrlKey()), ""); - this.schemaRegistryClient = RegistryClientFactory.create(url); + this.schemaRegistryClient = (RegistryClient) SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.APICURIO, url, properties); } @Override diff --git a/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ConfluentSchemaRegistry.java b/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ConfluentSchemaRegistry.java index fd36e771..b2b047c6 100644 --- a/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ConfluentSchemaRegistry.java +++ b/src/main/java/com/sngular/kloadgen/schemaregistry/impl/ConfluentSchemaRegistry.java @@ -3,27 +3,23 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.List; import java.util.Map; +import com.sngular.kloadgen.common.SchemaRegistryEnum; import com.sngular.kloadgen.exception.KLoadGenException; import com.sngular.kloadgen.parsedschema.ParsedSchema; import com.sngular.kloadgen.schemaregistry.SchemaRegistryAdapter; +import com.sngular.kloadgen.schemaregistry.SchemaRegistryFactory; import com.sngular.kloadgen.schemaregistry.adapter.impl.AbstractParsedSchemaAdapter; import com.sngular.kloadgen.schemaregistry.adapter.impl.BaseParsedSchema; import com.sngular.kloadgen.schemaregistry.adapter.impl.BaseSchemaMetadata; import com.sngular.kloadgen.schemaregistry.adapter.impl.ConfluentAbstractParsedSchemaMetadata; import com.sngular.kloadgen.schemaregistry.adapter.impl.ConfluentSchemaMetadata; import com.sngular.kloadgen.schemaregistry.adapter.impl.SchemaMetadataAdapter; -import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; -import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider; -import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import lombok.extern.slf4j.Slf4j; -import org.apache.jmeter.threads.JMeterContextService; @Slf4j public final class ConfluentSchemaRegistry implements SchemaRegistryAdapter { @@ -37,15 +33,13 @@ public String getSchemaRegistryUrlKey() { @Override public void setSchemaRegistryClient(final String url, final Map properties) { - this.schemaRegistryClient = new CachedSchemaRegistryClient(List.of(checkPropertyOrVariable(url)), 1000, - List.of(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()), properties); + this.schemaRegistryClient = (SchemaRegistryClient) SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.CONFLUENT, url, properties); } @Override public void setSchemaRegistryClient(final Map properties) { final String url = properties.get(this.getSchemaRegistryUrlKey()).toString(); - this.schemaRegistryClient = new CachedSchemaRegistryClient(List.of(checkPropertyOrVariable(url)), 1000, - List.of(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()), properties); + this.schemaRegistryClient = (SchemaRegistryClient) SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.CONFLUENT, url, properties); } @@ -89,16 +83,4 @@ public BaseParsedSchema getSchemaBySubjec throw new KLoadGenException(e.getMessage()); } } - - private String checkPropertyOrVariable(final String textToCheck) { - final String result; - if (textToCheck.matches("\\$\\{__P\\(.*\\)}")) { - result = JMeterContextService.getContext().getProperties().getProperty(textToCheck.substring(6, textToCheck.length() - 2)); - } else if (textToCheck.matches("\\$\\{\\w*}")) { - result = JMeterContextService.getContext().getVariables().get(textToCheck.substring(2, textToCheck.length() - 1)); - } else { - result = textToCheck; - } - return result; - } } diff --git a/src/test/java/com/sngular/kloadgen/extractor/extractors/protobuf/ProtobufApicurioExtractorTest.java b/src/test/java/com/sngular/kloadgen/extractor/extractors/protobuf/ProtobufApicurioExtractorTest.java index 0d63c161..cfb29501 100644 --- a/src/test/java/com/sngular/kloadgen/extractor/extractors/protobuf/ProtobufApicurioExtractorTest.java +++ b/src/test/java/com/sngular/kloadgen/extractor/extractors/protobuf/ProtobufApicurioExtractorTest.java @@ -6,10 +6,13 @@ import com.sngular.kloadgen.extractor.extractors.Extractor; import com.sngular.kloadgen.model.FieldValueMapping; +import com.sngular.kloadgen.parsedschema.ParsedSchema; import com.sngular.kloadgen.testutil.FileHelper; import com.squareup.wire.schema.Location; import com.squareup.wire.schema.internal.parser.ProtoFileElement; import com.squareup.wire.schema.internal.parser.ProtoParser; +import io.apicurio.registry.utils.protobuf.schema.FileDescriptorUtils; +import io.apicurio.registry.utils.protobuf.schema.ProtobufSchema; import org.apache.jmeter.threads.JMeterContext; import org.apache.jmeter.threads.JMeterContextService; import org.apache.jmeter.threads.JMeterVariables; @@ -23,7 +26,7 @@ class ProtobufApicurioExtractorTest { private final FileHelper fileHelper = new FileHelper(); - private final Extractor protoBufApicurioExtractor = new ProtoBufApicurioExtractor(); + private final Extractor protoBufApicurioExtractor = new ProtoBufApicurioExtractor(); private final Location location = Location.get("", ""); @@ -42,8 +45,9 @@ public void setUp() { @DisplayName("Test Extractor with simple proto file") void testFlatProperties() throws Exception { final String testFile = fileHelper.getContent("/proto-files/easyTest.proto"); - final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile); - final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema); + final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile); + final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement); + final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema)); Assertions.assertThat(fieldValueMappingList) .hasSize(3) @@ -59,8 +63,9 @@ void testFlatProperties() throws Exception { @DisplayName("Test Extractor with data structure map and array") void testEmbeddedTypes() throws Exception { final String testFile = fileHelper.getContent("/proto-files/embeddedTypeTest.proto"); - final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile); - final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema); + final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile); + final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement); + final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema)); Assertions.assertThat(fieldValueMappingList) .hasSize(2) .containsExactlyInAnyOrder( @@ -74,8 +79,9 @@ void testEmbeddedTypes() throws Exception { @DisplayName("Test Extractor with data structure enums and collections") void testEnumType() throws Exception { final String testFile = fileHelper.getContent("/proto-files/enumTest.proto"); - final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile); - final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema); + final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile); + final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement); + final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema)); Assertions.assertThat(fieldValueMappingList) .hasSize(3) .containsExactlyInAnyOrder( @@ -92,8 +98,9 @@ void testEnumType() throws Exception { @DisplayName("Test Extractor with data structure Any of") void testOneOfsType() throws Exception { final String testFile = fileHelper.getContent("/proto-files/oneOfTest.proto"); - final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile); - final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema); + final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile); + final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement); + final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema)); Assertions.assertThat(fieldValueMappingList) .hasSize(4) .contains( @@ -112,8 +119,9 @@ void testOneOfsType() throws Exception { @DisplayName("Test Extractor with complex structure") void testComplexProto() throws Exception { final String testFile = fileHelper.getContent("/proto-files/complexTest.proto"); - final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile); - final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema); + final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile); + final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement); + final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema)); Assertions.assertThat(fieldValueMappingList) .hasSize(13) .containsExactlyInAnyOrder( @@ -137,8 +145,9 @@ void testComplexProto() throws Exception { @DisplayName("Test Extractor with real proto") void testProvided() throws Exception { final String testFile = fileHelper.getContent("/proto-files/providedTest.proto"); - final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile); - final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema); + final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile); + final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement); + final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema)); Assertions.assertThat(fieldValueMappingList) .hasSize(32) .containsExactlyInAnyOrder( @@ -181,8 +190,9 @@ void testProvided() throws Exception { @DisplayName("Test Extractor with data structure maps") void testMap() throws Exception { final String testFile = fileHelper.getContent("/proto-files/mapTest.proto"); - final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile); - final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema); + final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile); + final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement); + final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema)); Assertions.assertThat(fieldValueMappingList) .hasSize(7) .containsExactlyInAnyOrder( @@ -200,8 +210,9 @@ void testMap() throws Exception { @DisplayName("Test Extractor with multi types") void completeTest() throws Exception { final String testFile = fileHelper.getContent("/proto-files/completeProto.proto"); - final ProtoFileElement schema = ProtoParser.Companion.parse(location, testFile); - final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(schema); + final ProtoFileElement protoFileElement = ProtoParser.Companion.parse(location, testFile); + final ProtobufSchema schema = new ProtobufSchema(FileDescriptorUtils.protoFileToFileDescriptor(protoFileElement), protoFileElement); + final List fieldValueMappingList = protoBufApicurioExtractor.processSchema(new ParsedSchema(schema)); Assertions.assertThat(fieldValueMappingList) .hasSize(11) .containsExactlyInAnyOrder( diff --git a/src/test/java/com/sngular/kloadgen/processor/ProtobufSchemaProcessorTest.java b/src/test/java/com/sngular/kloadgen/processor/ProtobufSchemaProcessorTest.java index c5bcead9..1ac0c82f 100644 --- a/src/test/java/com/sngular/kloadgen/processor/ProtobufSchemaProcessorTest.java +++ b/src/test/java/com/sngular/kloadgen/processor/ProtobufSchemaProcessorTest.java @@ -94,7 +94,9 @@ void testProtobufGoogleTypes() throws IOException { final List fieldValueMappingList = List.of( FieldValueMapping.builder().fieldName("id").fieldType("Int32Value").required(true).isAncestorRequired(true).build(), FieldValueMapping.builder().fieldName("occurrence_id").fieldType("StringValue").fieldValueList("Isabel").required(true).isAncestorRequired(true).build(), - FieldValueMapping.builder().fieldName("load_number").fieldType("Int32Value").required(true).isAncestorRequired(true).build()); + FieldValueMapping.builder().fieldName("load_number").fieldType("Int32Value").required(true).isAncestorRequired(true).build(), + FieldValueMapping.builder().fieldName("date").fieldType("DateValue").required(true).isAncestorRequired(true).build(), + FieldValueMapping.builder().fieldName("timeofday").fieldType("TimeOfDateValue").required(true).isAncestorRequired(true).build()); final SchemaProcessor protobufSchemaProcessor = new SchemaProcessor(); protobufSchemaProcessor.processSchema(SchemaTypeEnum.PROTOBUF, new ParsedSchema(testFile, SchemaTypeEnum.PROTOBUF.name()), confluentBaseSchemaMetadata, fieldValueMappingList); @@ -112,9 +114,11 @@ void testProtobufGoogleTypes() throws IOException { Assertions.assertThat(message).isNotNull().isInstanceOf(EnrichedRecord.class); Assertions.assertThat(message.getGenericRecord()).isNotNull(); Assertions.assertThat(secondValue).isEqualTo("Isabel"); - Assertions.assertThat(assertKeys).hasSize(3).containsExactlyInAnyOrder("abc.Incident.id", + Assertions.assertThat(assertKeys).hasSize(5).containsExactlyInAnyOrder("abc.Incident.id", "abc.Incident.occurrence_id", - "abc.Incident.load_number"); + "abc.Incident.load_number", + "abc.Incident.date", + "abc.Incident.timeofday"); } @Test diff --git a/src/test/resources/proto-files/googleTypesTest.proto b/src/test/resources/proto-files/googleTypesTest.proto index d173da93..54824855 100644 --- a/src/test/resources/proto-files/googleTypesTest.proto +++ b/src/test/resources/proto-files/googleTypesTest.proto @@ -14,5 +14,7 @@ message Incident { .google.protobuf.Int32Value id = 1; .google.protobuf.StringValue occurrence_id = 2; .google.protobuf.StringValue load_number = 3; - + .google.type.Date date = 4; + .google.type.TimeOfDay timeofday = 5; + } \ No newline at end of file