Skip to content

Commit

Permalink
Optional custom scheduler for KafkaMetrics (#4977)
Browse files Browse the repository at this point in the history
With many instances of KafkaMetrics (corresponding to many instances of Kafka clients corresponding to many consumers/producers/streams), many single-thread pools would be created to check and bind metrics from the Kafka client periodically. This allows users to provide their own scheduler which can be shared among instances, which will reduce the overhead of many single-thread schedulers.

Resolves gh-4976

Co-authored-by: Tommy Ludwig <[email protected]>
  • Loading branch information
vasiliy-sarzhynskyi and shakuzen authored Oct 15, 2024
1 parent 669db8c commit b9d4107
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;

import java.util.concurrent.ScheduledExecutorService;

/**
* Kafka Client metrics binder. This should be closed on application shutdown to clean up
* resources.
Expand All @@ -43,6 +45,21 @@
@NonNullFields
public class KafkaClientMetrics extends KafkaMetrics {

/**
* Kafka {@link Producer} metrics binder. The lifecycle of the custom scheduler passed
* is the responsibility of the caller. It will not be shut down when this instance is
* {@link #close() closed}. A scheduler can be shared among multiple instances of
* {@link KafkaClientMetrics} to reduce resource usage by reducing the number of
* threads if there will be many instances.
* @param kafkaProducer producer instance to be instrumented
* @param tags additional tags
* @param scheduler custom scheduler to check and bind metrics
* @since 1.14.0
*/
public KafkaClientMetrics(Producer<?, ?> kafkaProducer, Iterable<Tag> tags, ScheduledExecutorService scheduler) {
super(kafkaProducer::metrics, tags, scheduler);
}

/**
* Kafka {@link Producer} metrics binder
* @param kafkaProducer producer instance to be instrumented
Expand All @@ -60,6 +77,21 @@ public KafkaClientMetrics(Producer<?, ?> kafkaProducer) {
super(kafkaProducer::metrics);
}

/**
* Kafka {@link Consumer} metrics binder. The lifecycle of the custom scheduler passed
* is the responsibility of the caller. It will not be shut down when this instance is
* {@link #close() closed}. A scheduler can be shared among multiple instances of
* {@link KafkaClientMetrics} to reduce resource usage by reducing the number of
* threads if there will be many instances.
* @param kafkaConsumer consumer instance to be instrumented
* @param tags additional tags
* @param scheduler custom scheduler to check and bind metrics
* @since 1.14.0
*/
public KafkaClientMetrics(Consumer<?, ?> kafkaConsumer, Iterable<Tag> tags, ScheduledExecutorService scheduler) {
super(kafkaConsumer::metrics, tags, scheduler);
}

/**
* Kafka {@link Consumer} metrics binder
* @param kafkaConsumer consumer instance to be instrumented
Expand All @@ -77,6 +109,21 @@ public KafkaClientMetrics(Consumer<?, ?> kafkaConsumer) {
super(kafkaConsumer::metrics);
}

/**
* Kafka {@link AdminClient} metrics binder. The lifecycle of the custom scheduler
* passed is the responsibility of the caller. It will not be shut down when this
* instance is {@link #close() closed}. A scheduler can be shared among multiple
* instances of {@link KafkaClientMetrics} to reduce resource usage by reducing the
* number of threads if there will be many instances.
* @param adminClient instance to be instrumented
* @param tags additional tags
* @param scheduler custom scheduler to check and bind metrics
* @since 1.14.0
*/
public KafkaClientMetrics(AdminClient adminClient, Iterable<Tag> tags, ScheduledExecutorService scheduler) {
super(adminClient::metrics, tags, scheduler);
}

/**
* Kafka {@link AdminClient} metrics binder
* @param adminClient instance to be instrumented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {
static final String KAFKA_VERSION_TAG_NAME = "kafka.version";
static final String DEFAULT_VALUE = "unknown";

private static final String DEFAULT_SCHEDULER_THREAD_NAME_PREFIX = "micrometer-kafka-metrics";

private static final Set<Class<?>> counterMeasurableClasses = new HashSet<>();

static {
Expand All @@ -96,8 +98,9 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {

private final Duration refreshInterval;

private final ScheduledExecutorService scheduler = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("micrometer-kafka-metrics"));
private final ScheduledExecutorService scheduler;

private final boolean schedulerExternallyManaged;

@Nullable
private Iterable<Tag> commonTags;
Expand All @@ -122,11 +125,23 @@ class KafkaMetrics implements MeterBinder, AutoCloseable {
this(metricsSupplier, extraTags, DEFAULT_REFRESH_INTERVAL);
}

KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags,
ScheduledExecutorService scheduler) {
this(metricsSupplier, extraTags, DEFAULT_REFRESH_INTERVAL, scheduler, true);
}

KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags,
Duration refreshInterval) {
this(metricsSupplier, extraTags, refreshInterval, createDefaultScheduler(), false);
}

KafkaMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> extraTags,
Duration refreshInterval, ScheduledExecutorService scheduler, boolean schedulerExternallyManaged) {
this.metricsSupplier = metricsSupplier;
this.extraTags = extraTags;
this.refreshInterval = refreshInterval;
this.scheduler = scheduler;
this.schedulerExternallyManaged = schedulerExternallyManaged;
}

@Override
Expand Down Expand Up @@ -295,6 +310,10 @@ private static Class<? extends Measurable> getMeasurableClass(Metric metric) {
}
}

private static ScheduledExecutorService createDefaultScheduler() {
return Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(DEFAULT_SCHEDULER_THREAD_NAME_PREFIX));
}

private Gauge registerGauge(MeterRegistry registry, MetricName metricName, String meterName, Iterable<Tag> tags) {
return Gauge.builder(meterName, this.metrics, toMetricValue(metricName))
.tags(tags)
Expand Down Expand Up @@ -344,7 +363,9 @@ private Meter.Id meterIdForComparison(MetricName metricName) {

@Override
public void close() {
this.scheduler.shutdownNow();
if (!schedulerExternallyManaged) {
this.scheduler.shutdownNow();
}

for (Meter.Id id : registeredMeterIds) {
registry.remove(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.kafka.common.Metric;
import org.apache.kafka.streams.KafkaStreams;

import java.util.concurrent.ScheduledExecutorService;

/**
* Kafka Streams metrics binder. This should be closed on application shutdown to clean up
* resources.
Expand Down Expand Up @@ -58,4 +60,19 @@ public KafkaStreamsMetrics(KafkaStreams kafkaStreams) {
super(kafkaStreams::metrics);
}

/**
* {@link KafkaStreams} metrics binder. The lifecycle of the custom scheduler passed
* is the responsibility of the caller. It will not be shut down when this instance is
* {@link #close() closed}. A scheduler can be shared among multiple instances of
* {@link KafkaStreamsMetrics} to reduce resource usage by reducing the number of
* threads if there will be many instances.
* @param kafkaStreams instance to be instrumented
* @param tags additional tags
* @param scheduler customer scheduler to run the task that checks and binds metrics
* @since 1.14.0
*/
public KafkaStreamsMetrics(KafkaStreams kafkaStreams, Iterable<Tag> tags, ScheduledExecutorService scheduler) {
super(kafkaStreams::metrics, tags, scheduler);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX;
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
Expand All @@ -32,7 +34,7 @@ class KafkaClientMetricsAdminTest {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";

private Tags tags = Tags.of("app", "myapp", "version", "1");
private final Tags tags = Tags.of("app", "myapp", "version", "1");

KafkaClientMetrics metrics;

Expand Down Expand Up @@ -69,6 +71,27 @@ void shouldCreateMetersWithTags() {
}
}

@Test
void shouldCreateMetersWithTagsAndCustomScheduler() {
try (AdminClient adminClient = createAdmin()) {
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1);
metrics = new KafkaClientMetrics(adminClient, tags, customScheduler);
MeterRegistry registry = new SimpleMeterRegistry();

metrics.bindTo(registry);

assertThat(registry.getMeters()).hasSizeGreaterThan(0)
.extracting(meter -> meter.getId().getTag("app"))
.allMatch(s -> s.equals("myapp"));

metrics.close();
assertThat(customScheduler.isShutdown()).isFalse();

customScheduler.shutdownNow();
assertThat(customScheduler.isShutdown()).isTrue();
}
}

private AdminClient createAdmin() {
Properties adminConfig = new Properties();
adminConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX;
import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
Expand All @@ -34,7 +36,7 @@ class KafkaClientMetricsConsumerTest {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";

private Tags tags = Tags.of("app", "myapp", "version", "1");
private final Tags tags = Tags.of("app", "myapp", "version", "1");

KafkaClientMetrics metrics;

Expand Down Expand Up @@ -71,6 +73,27 @@ void shouldCreateMetersWithTags() {
}
}

@Test
void shouldCreateMetersWithTagsAndCustomScheduler() {
try (Consumer<String, String> consumer = createConsumer()) {
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1);
metrics = new KafkaClientMetrics(consumer, tags, customScheduler);
MeterRegistry registry = new SimpleMeterRegistry();

metrics.bindTo(registry);

assertThat(registry.getMeters()).hasSizeGreaterThan(0)
.extracting(meter -> meter.getId().getTag("app"))
.allMatch(s -> s.equals("myapp"));

metrics.close();
assertThat(customScheduler.isShutdown()).isFalse();

customScheduler.shutdownNow();
assertThat(customScheduler.isShutdown()).isTrue();
}
}

private Consumer<String, String> createConsumer() {
Properties consumerConfig = new Properties();
consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX;
import static org.apache.kafka.clients.producer.ProducerConfig.*;
Expand All @@ -34,7 +36,7 @@ class KafkaClientMetricsProducerTest {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";

private Tags tags = Tags.of("app", "myapp", "version", "1");
private final Tags tags = Tags.of("app", "myapp", "version", "1");

KafkaClientMetrics metrics;

Expand Down Expand Up @@ -71,6 +73,27 @@ void shouldCreateMetersWithTags() {
}
}

@Test
void shouldCreateMetersWithTagsAndCustomScheduler() {
try (Producer<String, String> producer = createProducer()) {
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1);
metrics = new KafkaClientMetrics(producer, tags, customScheduler);
MeterRegistry registry = new SimpleMeterRegistry();

metrics.bindTo(registry);

assertThat(registry.getMeters()).hasSizeGreaterThan(0)
.extracting(meter -> meter.getId().getTag("app"))
.allMatch(s -> s.equals("myapp"));

metrics.close();
assertThat(customScheduler.isShutdown()).isFalse();

customScheduler.shutdownNow();
assertThat(customScheduler.isShutdown()).isTrue();
}
}

private Producer<String, String> createProducer() {
Properties producerConfig = new Properties();
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class KafkaMetricsTest {

Expand Down Expand Up @@ -68,7 +71,7 @@ void shouldKeepMetersWhenMetricsDoNotChange() {
}

@Test
void closeShouldRemoveAllMeters() {
void closeShouldRemoveAllMetersAndShutdownDefaultScheduler() {
// Given
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>());
Expand All @@ -80,9 +83,35 @@ void closeShouldRemoveAllMeters() {

kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
assertThat(isDefaultMetricsSchedulerThreadAlive()).isTrue();

kafkaMetrics.close();
assertThat(registry.getMeters()).isEmpty();
await().until(() -> !isDefaultMetricsSchedulerThreadAlive());
}

@Test
void closeShouldRemoveAllMetersAndNotShutdownCustomScheduler() {
// Given
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> {
MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>());
KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
return Collections.singletonMap(metricName, metric);
};
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1);
kafkaMetrics = new KafkaMetrics(supplier, Collections.emptyList(), customScheduler);
MeterRegistry registry = new SimpleMeterRegistry();

kafkaMetrics.bindTo(registry);
assertThat(registry.getMeters()).hasSize(1);
await().until(() -> !isDefaultMetricsSchedulerThreadAlive());

kafkaMetrics.close();
assertThat(registry.getMeters()).isEmpty();
assertThat(customScheduler.isShutdown()).isFalse();

customScheduler.shutdownNow();
assertThat(customScheduler.isShutdown()).isTrue();
}

@Test
Expand Down Expand Up @@ -552,4 +581,13 @@ private KafkaMetric createKafkaMetric(MetricName metricName) {
return new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
}

private static boolean isDefaultMetricsSchedulerThreadAlive() {
return Thread.getAllStackTraces()
.keySet()
.stream()
.filter(Thread::isAlive)
.map(Thread::getName)
.anyMatch(name -> name.startsWith("micrometer-kafka-metrics"));
}

}
Loading

0 comments on commit b9d4107

Please sign in to comment.