Skip to content

Commit

Permalink
#332 fix generator creation
Browse files Browse the repository at this point in the history
  • Loading branch information
jemacineiras committed Jan 29, 2024
1 parent addc4df commit a94d288
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ private Serializer getSerializerInstance(final String keySerializerClassConfig,
};
}

private Object createInstance(final Class<?> aClass, final SchemaRegistryEnum schemaRegistryEnum, final String url, final Map<String, ?> properties)
private Object createInstance(final Class<?> classToGenerate, final SchemaRegistryEnum schemaRegistryEnum, final String url, final Map<String, ?> properties)
throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
return switch (schemaRegistryEnum) {
case APICURIO -> aClass.getConstructor(RegistryClient.class)
case APICURIO -> classToGenerate.getConstructor(RegistryClient.class)
.newInstance(SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.APICURIO, url, properties));
case CONFLUENT -> aClass.getConstructor(SchemaRegistryClient.class, Map.class)
case CONFLUENT -> classToGenerate.getConstructor(SchemaRegistryClient.class, Map.class)
.newInstance(SchemaRegistryFactory.getSchemaRegistryClient(SchemaRegistryEnum.CONFLUENT, url, properties), properties);
};
}
Expand Down Expand Up @@ -205,6 +205,9 @@ public SampleResult runTest(final JavaSamplerContext javaSamplerContext) {
final var sampleResult = new SampleResult();
sampleResult.sampleStart();
final var jMeterContext = JMeterContextService.getContext();
if (Objects.isNull(generator)) {
throw new KLoadGenException("Error initializing Generator");
}
final var messageVal = generator.nextMessage();
final var kafkaHeaders = safeGetKafkaHeaders(jMeterContext);

Expand Down
106 changes: 56 additions & 50 deletions src/main/java/com/sngular/kloadgen/sampler/SamplerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.sngular.kloadgen.util.PropsKeysHelper;
import com.sngular.kloadgen.util.SchemaRegistryKeyHelper;
import io.apicurio.registry.resolver.SchemaResolverConfig;
import io.apicurio.registry.serde.SerdeConfig;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import org.apache.avro.SchemaParseException;
import org.apache.commons.lang3.ObjectUtils;
Expand All @@ -50,9 +49,13 @@
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SamplerUtil {

private static final Logger LOG = LoggerFactory.getLogger(SamplerUtil.class);

private static final StatelessGeneratorTool STATELESS_GENERATOR_TOOL = new StatelessGeneratorTool();

private static final Set<String> JSON_TYPE_SET = Set.of("json-schema", "json");
Expand Down Expand Up @@ -100,62 +103,64 @@ public static Arguments getCommonDefaultParameters() {
return defaultParameters;
}

public static Properties setupCommonProperties(final JavaSamplerContext context) {
final Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))) {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY));
} else if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
props.put(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE, context.getJMeterVariables().get(PropsKeysHelper.KEY_TYPE));
props.put(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE, context.getJMeterVariables().get(PropsKeysHelper.KEY_VALUE));
if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA_TYPE))) {
props.put(PropsKeysHelper.KEY_SCHEMA_TYPE, context.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA_TYPE));
/* public static Properties setupCommonProperties(final JavaSamplerContext context) {
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY))) {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY));
} else if ("true".equals(context.getJMeterVariables().get(PropsKeysHelper.SIMPLE_KEYED_MESSAGE_KEY))) {
props.put(PropsKeysHelper.MESSAGE_KEY_KEY_TYPE, context.getJMeterVariables().get(PropsKeysHelper.KEY_TYPE));
props.put(PropsKeysHelper.MESSAGE_KEY_KEY_VALUE, context.getJMeterVariables().get(PropsKeysHelper.KEY_VALUE));
if (Objects.nonNull(context.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA_TYPE))) {
props.put(PropsKeysHelper.KEY_SCHEMA_TYPE, context.getJMeterVariables().get(PropsKeysHelper.KEY_SCHEMA_TYPE));
}
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY));
} else {
props.put(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY, Boolean.FALSE);
}
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, context.getJMeterVariables().get(PropsKeysHelper.KEY_SERIALIZER_CLASS_PROPERTY));
} else {
props.put(PropsKeysHelper.SCHEMA_KEYED_MESSAGE_KEY, Boolean.FALSE);
}
if (Objects.nonNull(context.getParameter(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))) {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, context.getParameter(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
}

props.put(ProducerConfig.ACKS_CONFIG, context.getParameter(ProducerConfig.ACKS_CONFIG));
props.put(ProducerConfig.SEND_BUFFER_CONFIG, context.getParameter(ProducerConfig.SEND_BUFFER_CONFIG));
props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, context.getParameter(ProducerConfig.RECEIVE_BUFFER_CONFIG));
props.put(ProducerConfig.BATCH_SIZE_CONFIG, context.getParameter(ProducerConfig.BATCH_SIZE_CONFIG));
props.put(ProducerConfig.LINGER_MS_CONFIG, context.getParameter(ProducerConfig.LINGER_MS_CONFIG));
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, context.getParameter(ProducerConfig.BUFFER_MEMORY_CONFIG));
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, context.getParameter(ProducerConfig.COMPRESSION_TYPE_CONFIG));
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, context.getParameter(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
props.put(ProducerKeysHelper.SASL_MECHANISM, context.getParameter(ProducerKeysHelper.SASL_MECHANISM));

final String schemaRegistryNameValue = context.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME);
final String enableSchemaRegistrationValue = context.getParameter(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG);
if (SchemaRegistryKeyHelper.SCHEMA_REGISTRY_APICURIO.equalsIgnoreCase(schemaRegistryNameValue)) {
props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, enableSchemaRegistrationValue);
props.put(SchemaResolverConfig.REGISTRY_URL, context.getJMeterVariables().get(SchemaResolverConfig.REGISTRY_URL));
props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, context.getJMeterVariables().get(SchemaResolverConfig.REGISTRY_URL));
} else {
props.put(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG, enableSchemaRegistrationValue);
final String schemaRegistryURL = context.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL);
if (StringUtils.isNotBlank(schemaRegistryURL)) {
props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, schemaRegistryURL);
if (Objects.nonNull(context.getParameter(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))) {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, context.getParameter(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
}
}
final Iterator<String> parameters = context.getParameterNamesIterator();
parameters.forEachRemaining(parameter -> {
if (parameter.startsWith("_")) {
props.put(parameter.substring(1), context.getParameter(parameter));
props.put(ProducerConfig.ACKS_CONFIG, context.getParameter(ProducerConfig.ACKS_CONFIG));
props.put(ProducerConfig.SEND_BUFFER_CONFIG, context.getParameter(ProducerConfig.SEND_BUFFER_CONFIG));
props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, context.getParameter(ProducerConfig.RECEIVE_BUFFER_CONFIG));
props.put(ProducerConfig.BATCH_SIZE_CONFIG, context.getParameter(ProducerConfig.BATCH_SIZE_CONFIG));
props.put(ProducerConfig.LINGER_MS_CONFIG, context.getParameter(ProducerConfig.LINGER_MS_CONFIG));
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, context.getParameter(ProducerConfig.BUFFER_MEMORY_CONFIG));
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, context.getParameter(ProducerConfig.COMPRESSION_TYPE_CONFIG));
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, context.getParameter(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
props.put(ProducerKeysHelper.SASL_MECHANISM, context.getParameter(ProducerKeysHelper.SASL_MECHANISM));
final String schemaRegistryNameValue = context.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_NAME);
final String enableSchemaRegistrationValue = context.getParameter(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG);
if (SchemaRegistryKeyHelper.SCHEMA_REGISTRY_APICURIO.equalsIgnoreCase(schemaRegistryNameValue)) {
props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, enableSchemaRegistrationValue);
props.put(SchemaResolverConfig.REGISTRY_URL, context.getJMeterVariables().get(SchemaResolverConfig.REGISTRY_URL));
props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, context.getJMeterVariables().get(SchemaResolverConfig.REGISTRY_URL));
} else {
props.put(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG, enableSchemaRegistrationValue);
final String schemaRegistryURL = context.getJMeterVariables().get(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL);
if (StringUtils.isNotBlank(schemaRegistryURL)) {
props.put(SchemaRegistryKeyHelper.SCHEMA_REGISTRY_URL, schemaRegistryURL);
}
}
});
verifySecurity(context, props);
final Iterator<String> parameters = context.getParameterNamesIterator();
parameters.forEachRemaining(parameter -> {
if (parameter.startsWith("_")) {
props.put(parameter.substring(1), context.getParameter(parameter));
}
});
return props;
}
verifySecurity(context, props);
return props;
}
*/

private static String propertyOrDefault(final String property, final String defaultToken, final String valueToSent) {
return defaultToken.equals(property) ? valueToSent : property;
Expand Down Expand Up @@ -390,6 +395,7 @@ public static BaseLoadGenerator configureValueGenerator(final Properties props)
if (Objects.nonNull(props.get(SchemaRegistryKeyHelper.ENABLE_AUTO_SCHEMA_REGISTRATION_CONFIG))) {
generator.setUpGenerator(jMeterVariables.get(PropsKeysHelper.VALUE_SCHEMA), (List<FieldValueMapping>) jMeterVariables.getObject(PropsKeysHelper.VALUE_SCHEMA_PROPERTIES));
} else {
LOG.error(exc.getMessage(), exc);
throw exc;
}
}
Expand Down

0 comments on commit a94d288

Please sign in to comment.