-
Hey @LGouellec, I am running three streamiz applications on a 3 node cluster, and would like to change the Also, I tried to change the strategy through the consumer configs, The strategy is first set in _consumerConfig and later with the AddConsumerConfig in _internalConsumerConfig which are unioned at the end and the duplication occurs. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Hi @ognenbek, The But there is no impact if the consumer use the range partition assignor now. So I can be change the setter to public for the next release. As a workaround, you can wrap your own stream config and set the assignment strategy like this implementation : public class MyStreamConfig<KS, VS> : IStreamConfig
where KS : ISerDes, new()
where VS : ISerDes, new()
{
private StreamConfig<KS, VS> wrapperConfig = new();
public IStreamConfig Clone()
=> wrapperConfig.Clone();
public void AddConfig(string key, string value)
=> wrapperConfig.AddConfig(key, value);
public void AddAdminConfig(string key, string value)
=> wrapperConfig.AddAdminConfig(key, value);
public void AddConsumerConfig(string key, string value)
=> wrapperConfig.AddConsumerConfig(key, value);
public void AddProducerConfig(string key, string value)
=> wrapperConfig.AddProducerConfig(key, value);
public ProducerConfig ToProducerConfig()
=> wrapperConfig.ToProducerConfig();
public ProducerConfig ToProducerConfig(string clientId)
=> wrapperConfig.ToProducerConfig(clientId);
public ConsumerConfig ToConsumerConfig()
{
var config = wrapperConfig.ToConsumerConfig();
config.PartitionAssignmentStrategy = PartitionAssignmentStrategy.Range;
return config;
}
public ConsumerConfig ToConsumerConfig(string clientId)
{
var config = wrapperConfig.ToConsumerConfig(clientId);
config.PartitionAssignmentStrategy = PartitionAssignmentStrategy.Range;
return config;
}
public ConsumerConfig ToGlobalConsumerConfig(string clientId)
=> wrapperConfig.ToGlobalConsumerConfig(clientId);
public AdminClientConfig ToAdminConfig(string clientId)
=> wrapperConfig.ToAdminConfig(clientId);
public dynamic Get(string key)
=> wrapperConfig.Get(key);
public Action<string, RocksDbOptions> RocksDbConfigHandler
{
get => wrapperConfig.RocksDbConfigHandler;
set => wrapperConfig.RocksDbConfigHandler = value;
}
public Func<Exception, ExceptionHandlerResponse> InnerExceptionHandler
{
get => wrapperConfig.InnerExceptionHandler;
set => wrapperConfig.InnerExceptionHandler = value;
}
public Func<ProcessorContext, ConsumeResult<byte[], byte[]>, Exception, ExceptionHandlerResponse>
DeserializationExceptionHandler
{
get => wrapperConfig.DeserializationExceptionHandler;
set => wrapperConfig.DeserializationExceptionHandler = value;
}
public Func<DeliveryReport<byte[], byte[]>, ExceptionHandlerResponse> ProductionExceptionHandler
{
get => wrapperConfig.ProductionExceptionHandler;
set => wrapperConfig.ProductionExceptionHandler = value;
}
public int? MaxPollIntervalMs
{
get => wrapperConfig.MaxPollIntervalMs;
set => wrapperConfig.MaxPollIntervalMs = value;
}
public long MaxPollRecords
{
get => wrapperConfig.MaxPollRecords;
set => wrapperConfig.MaxPollRecords = value;
}
public long MaxPollRestoringRecords
{
get => wrapperConfig.MaxPollRestoringRecords;
set => wrapperConfig.MaxPollRestoringRecords = value;
}
public long PollMs
{
get => wrapperConfig.PollMs;
set => wrapperConfig.PollMs = value;
}
public long CommitIntervalMs
{
get => wrapperConfig.CommitIntervalMs;
set => wrapperConfig.CommitIntervalMs = value;
}
public TimeSpan TransactionTimeout
{
get => wrapperConfig.TransactionTimeout;
set => wrapperConfig.TransactionTimeout = value;
}
public string TransactionalId
{
get => wrapperConfig.TransactionalId;
set => wrapperConfig.TransactionalId = value;
}
public string ApplicationId
{
get => wrapperConfig.ApplicationId;
set => wrapperConfig.ApplicationId = value;
}
public string ClientId
{
get => wrapperConfig.ClientId;
set => wrapperConfig.ClientId = value;
}
public int NumStreamThreads
{
get => wrapperConfig.NumStreamThreads;
set => wrapperConfig.NumStreamThreads = value;
}
public ISerDes DefaultKeySerDes
{
get => wrapperConfig.DefaultKeySerDes;
set => wrapperConfig.DefaultKeySerDes = value;
}
public ISerDes DefaultValueSerDes
{
get => wrapperConfig.DefaultValueSerDes;
set => wrapperConfig.DefaultValueSerDes = value;
}
public ITimestampExtractor DefaultTimestampExtractor
{
get => wrapperConfig.DefaultTimestampExtractor;
set => wrapperConfig.DefaultTimestampExtractor = value;
}
public ProcessingGuarantee Guarantee
{
get => wrapperConfig.Guarantee;
set => wrapperConfig.Guarantee = value;
}
public string BootstrapServers
{
get => wrapperConfig.BootstrapServers;
set => wrapperConfig.BootstrapServers = value;
}
public long MaxTaskIdleMs
{
get => wrapperConfig.MaxTaskIdleMs;
set => wrapperConfig.MaxTaskIdleMs = value;
}
public long BufferedRecordsPerPartition
{
get => wrapperConfig.BufferedRecordsPerPartition;
set => wrapperConfig.BufferedRecordsPerPartition = value;
}
public bool FollowMetadata
{
get => wrapperConfig.FollowMetadata;
set => wrapperConfig.FollowMetadata = value;
}
public string StateDir
{
get => wrapperConfig.StateDir;
set => wrapperConfig.StateDir = value;
}
public int ReplicationFactor
{
get => wrapperConfig.ReplicationFactor;
set => wrapperConfig.ReplicationFactor = value;
}
public long WindowStoreChangelogAdditionalRetentionMs
{
get => wrapperConfig.WindowStoreChangelogAdditionalRetentionMs;
set => wrapperConfig.WindowStoreChangelogAdditionalRetentionMs = value;
}
public IOffsetCheckpointManager OffsetCheckpointManager
{
get => wrapperConfig.OffsetCheckpointManager;
set => wrapperConfig.OffsetCheckpointManager = value;
}
public ILoggerFactory Logger
{
get => wrapperConfig.Logger;
set => wrapperConfig.Logger = value;
}
public long MetricsIntervalMs
{
get => wrapperConfig.MetricsIntervalMs;
set => wrapperConfig.MetricsIntervalMs = value;
}
public Action<IEnumerable<Sensor>> MetricsReporter
{
get => wrapperConfig.MetricsReporter;
set => wrapperConfig.MetricsReporter = value;
}
public bool ExposeLibrdKafkaStats
{
get => wrapperConfig.ExposeLibrdKafkaStats;
set => wrapperConfig.ExposeLibrdKafkaStats = value;
}
public MetricsRecordingLevel MetricsRecording
{
get => wrapperConfig.MetricsRecording;
set => wrapperConfig.MetricsRecording = value;
}
public long StartTaskDelayMs
{
get => wrapperConfig.StartTaskDelayMs;
set => wrapperConfig.StartTaskDelayMs = value;
}
public bool ParallelProcessing
{
get => wrapperConfig.ParallelProcessing;
set => wrapperConfig.ParallelProcessing = value;
}
public int MaxDegreeOfParallelism
{
get => wrapperConfig.MaxDegreeOfParallelism;
set => wrapperConfig.MaxDegreeOfParallelism = value;
}
public void AddMiddleware(IStreamMiddleware item)
=> wrapperConfig.AddMiddleware(item);
public void ClearMiddleware()
=> wrapperConfig.ClearMiddleware();
public bool RemoveMiddleware(IStreamMiddleware item)
=> wrapperConfig.RemoveMiddleware(item);
public IEnumerable<IStreamMiddleware> Middlewares { get; }
} |
Beta Was this translation helpful? Give feedback.
Hi @ognenbek,
The
PartitionAssignmentStrategy
setter is private now because Streamiz enforce to use the cooperative rebalancing protocol.For more information : https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/
But there is no impact if the consumer use the range partition assignor now. So I can be change the setter to public for the next release.
As a workaround, you can wrap your own stream config and set the assignment strategy like this implementation :