PartitionAssigned/Revoked handler #228
-
Is there a way to register a custom PartitionsAssigned/PartitionsRevoked handler for a consumer/stream? |
Beta Was this translation helpful? Give feedback.
Answered by
LGouellec
Jan 10, 2023
Replies: 2 comments 1 reply
-
Hi @AndreasKorn , What is the final use case ? What do you want to do ? Best regards, |
Beta Was this translation helpful? Give feedback.
1 reply
-
Hi @AndreasKorn , You can try something like that : public class MyOwnConsumerListener : IConsumerRebalanceListener
{
private readonly IConsumerRebalanceListener wrapper;
public MyOwnConsumerListener(IConsumerRebalanceListener wrapper)
{
this.wrapper = wrapper;
}
public void PartitionsAssigned(IConsumer<byte[], byte[]> consumer, List<TopicPartition> partitions)
{
wrapper.PartitionsAssigned(consumer, partitions);
// do what you want
}
public void PartitionsRevoked(IConsumer<byte[], byte[]> consumer, List<TopicPartitionOffset> partitions)
{
wrapper.PartitionsRevoked(consumer, partitions);
// do what you want
}
public void PartitionsLost(IConsumer<byte[], byte[]> consumer, List<TopicPartitionOffset> partitions)
{
wrapper.PartitionsLost(consumer, partitions);
// do what you want
}
}
public class MyOwnKafkaClientSupplier : IKafkaSupplier
{
private readonly DefaultKafkaClientSupplier internalSupplier;
public MyOwnKafkaClientSupplier(IStreamConfig configuration)
{
internalSupplier = new DefaultKafkaClientSupplier(new KafkaLoggerAdapter(configuration), configuration);
}
public IConsumer<byte[], byte[]> GetConsumer(ConsumerConfig config,
IConsumerRebalanceListener rebalanceListener)
=> internalSupplier.GetConsumer(config, new MyOwnConsumerListener(rebalanceListener));
public IProducer<byte[], byte[]> GetProducer(ProducerConfig config)
=> internalSupplier.GetProducer(config);
public IAdminClient GetAdmin(AdminClientConfig config)
=> internalSupplier.GetAdmin(config);
public IConsumer<byte[], byte[]> GetGlobalConsumer(ConsumerConfig config)
=> internalSupplier.GetGlobalConsumer(config);
public StreamMetricsRegistry MetricsRegistry
{
get => internalSupplier.MetricsRegistry;
set => internalSupplier.MetricsRegistry = value;
}
}
var config = new StreamConfig();
// ....
KafkaStream stream = new KafkaStream(topology, config, new MyOwnKafkaClientSupplier(config));
By keep in mind that your own consumer rebalance listener must be fast. Don't make lazy operation inside. |
Beta Was this translation helpful? Give feedback.
0 replies
Answer selected by
LGouellec
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @AndreasKorn ,
You can try something like that :