diff --git a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs
index d223feb90..416a729a3 100644
--- a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs
+++ b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs
@@ -30,6 +30,14 @@ public interface IConsumerConfigurationBuilder
///
IConsumerConfigurationBuilder ManualAssignPartitions(string topicName, IEnumerable partitions);
+ ///
+ /// Explicitly defines the topic, partitions and offsets that will be used to read the messages
+ ///
+ /// Topic name
+ /// The partition offset dictionary [Partition ID, Offset]
+ ///
+ IConsumerConfigurationBuilder ManualAssignPartitionOffsets(string topicName, IDictionary partitionOffsets);
+
///
/// Sets the topics that will be used to read the messages, the partitions will be automatically assigned
///
diff --git a/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs b/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs
index 3b4177c05..9b0ee6e37 100644
--- a/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs
+++ b/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs
@@ -1,12 +1,11 @@
-namespace KafkaFlow.Configuration
+namespace KafkaFlow.Configuration;
+
+/// SaslOauthbearerMethod enum values
+public enum SaslOauthbearerMethod
{
- /// SaslOauthbearerMethod enum values
- public enum SaslOauthbearerMethod
- {
- /// Default
- Default,
+ /// Default
+ Default,
- /// Oidc
- Oidc,
- }
+ /// Oidc
+ Oidc,
}
diff --git a/src/KafkaFlow.Abstractions/Configuration/TopicPartitionOffsets.cs b/src/KafkaFlow.Abstractions/Configuration/TopicPartitionOffsets.cs
new file mode 100644
index 000000000..5a3a17756
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/Configuration/TopicPartitionOffsets.cs
@@ -0,0 +1,16 @@
+using System.Collections.Generic;
+
+namespace KafkaFlow.Configuration;
+
+public class TopicPartitionOffsets
+{
+ public TopicPartitionOffsets(string name, IDictionary partitionOffsets)
+ {
+ this.Name = name;
+ this.PartitionOffsets = partitionOffsets;
+ }
+
+ public string Name { get; }
+
+ public IDictionary PartitionOffsets { get; }
+}
diff --git a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs
index 67eb2570b..0359999d0 100644
--- a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs
+++ b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs
@@ -12,6 +12,7 @@ public ConsumerConfiguration(
Confluent.Kafka.ConsumerConfig consumerConfig,
IReadOnlyList topics,
IReadOnlyList manualAssignPartitions,
+ IReadOnlyList manualAssignPartitionOffsets,
string consumerName,
ClusterConfiguration clusterConfiguration,
bool managementDisabled,
@@ -48,6 +49,7 @@ public ConsumerConfiguration(
this.AutoCommitInterval = autoCommitInterval;
this.Topics = topics ?? throw new ArgumentNullException(nameof(topics));
this.ManualAssignPartitions = manualAssignPartitions ?? throw new ArgumentNullException(nameof(manualAssignPartitions));
+ this.ManualAssignPartitionOffsets = manualAssignPartitionOffsets ?? throw new ArgumentNullException(nameof(manualAssignPartitionOffsets));
this.ConsumerName = consumerName ?? Guid.NewGuid().ToString();
this.ClusterConfiguration = clusterConfiguration;
this.ManagementDisabled = managementDisabled;
@@ -76,6 +78,8 @@ public ConsumerConfiguration(
public IReadOnlyList ManualAssignPartitions { get; }
+ public IReadOnlyList ManualAssignPartitionOffsets { get; }
+
public string ConsumerName { get; }
public ClusterConfiguration ClusterConfiguration { get; }
diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
index 304fe2d34..cc2a849d2 100644
--- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
+++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
@@ -11,6 +11,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild
{
private readonly List _topics = new();
private readonly List _topicsPartitions = new();
+ private readonly List _topicsPartitionOffsets = new();
private readonly List> _statisticsHandlers = new();
private readonly List _pendingOffsetsStatisticsHandlers = new();
@@ -60,6 +61,12 @@ public IConsumerConfigurationBuilder ManualAssignPartitions(string topicName, IE
return this;
}
+ public IConsumerConfigurationBuilder ManualAssignPartitionOffsets(string topicName, IDictionary partitionOffsets)
+ {
+ _topicsPartitionOffsets.Add(new TopicPartitionOffsets(topicName, partitionOffsets));
+ return this;
+ }
+
public IConsumerConfigurationBuilder WithConsumerConfig(Confluent.Kafka.ConsumerConfig config)
{
_consumerConfig = config;
@@ -259,6 +266,7 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
consumerConfigCopy,
_topics,
_topicsPartitions,
+ _topicsPartitionOffsets,
_name,
clusterConfiguration,
_disableManagement,
diff --git a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs
index e8cd02ab4..57142c933 100644
--- a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs
+++ b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs
@@ -29,6 +29,11 @@ public interface IConsumerConfiguration
///
IReadOnlyList ManualAssignPartitions { get; }
+ ///
+ /// Gets the topic partition offsets to manually assign
+ ///
+ IReadOnlyList ManualAssignPartitionOffsets { get; }
+
///
/// Gets the consumer name
///
diff --git a/src/KafkaFlow/Consumers/Consumer.cs b/src/KafkaFlow/Consumers/Consumer.cs
index 7f478f6e5..e78674d9e 100644
--- a/src/KafkaFlow/Consumers/Consumer.cs
+++ b/src/KafkaFlow/Consumers/Consumer.cs
@@ -273,11 +273,16 @@ private void EnsureConsumer()
if (this.Configuration.ManualAssignPartitions.Any())
{
- this.ManualAssign(this.Configuration.ManualAssignPartitions);
+ this.ManualAssignPartitions(this.Configuration.ManualAssignPartitions);
+ }
+
+ if (this.Configuration.ManualAssignPartitionOffsets.Any())
+ {
+ this.ManualAssignPartitionOffsets(this.Configuration.ManualAssignPartitionOffsets);
}
}
- private void ManualAssign(IEnumerable topics)
+ private void ManualAssignPartitions(IEnumerable topics)
{
var partitions = topics
.SelectMany(
@@ -289,6 +294,19 @@ private void ManualAssign(IEnumerable topics)
this.FirePartitionsAssignedHandlers(_consumer, partitions);
}
+ private void ManualAssignPartitionOffsets(IEnumerable topics)
+ {
+ var partitionOffsets = topics
+ .SelectMany(
+ topic => topic.PartitionOffsets.Select(
+ partitionOffset => new Confluent.Kafka.TopicPartitionOffset(
+ topic.Name, new Partition(partitionOffset.Key), new Offset(partitionOffset.Value))))
+ .ToList();
+
+ _consumer.Assign(partitionOffsets);
+ this.FirePartitionsAssignedHandlers(_consumer, partitionOffsets.Select(x => x.TopicPartition).ToList());
+ }
+
private void FirePartitionsAssignedHandlers(
IConsumer consumer,
List partitions)
diff --git a/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs b/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs
index 43c47600a..adb5c9db5 100644
--- a/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs
+++ b/tests/KafkaFlow.IntegrationTests/ConsumerTest.cs
@@ -1,7 +1,10 @@
using System;
+using System.Collections.Generic;
using System.Linq;
+using System.Text;
using System.Threading.Tasks;
using AutoFixture;
+using Confluent.Kafka;
using global::Microsoft.Extensions.DependencyInjection;
using global::Microsoft.VisualStudio.TestTools.UnitTesting;
using KafkaFlow.Consumers;
@@ -9,6 +12,7 @@
using KafkaFlow.IntegrationTests.Core.Handlers;
using KafkaFlow.IntegrationTests.Core.Messages;
using KafkaFlow.IntegrationTests.Core.Producers;
+using KafkaFlow.Serializer;
namespace KafkaFlow.IntegrationTests;
@@ -157,4 +161,62 @@ public void AddConsumer_WithSharedConsumerConfig_ConsumersAreConfiguratedIndepen
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGroupId)));
Assert.IsNotNull(consumers.FirstOrDefault(x => x.GroupId.Equals(Bootstrapper.ProtobufGzipGroupId)));
}
+
+ [TestMethod]
+ public async Task ManualAssignPartitionOffsetsTest()
+ {
+ // Arrange
+ var producer = _provider.GetRequiredService>();
+ var messages = _fixture
+ .Build()
+ .Without(m => m.Offset)
+ .CreateMany(10).ToList();
+
+ messages.ForEach(m => producer.Produce(m.Id.ToString(), m, null, report => DeliveryHandler(report, messages)));
+
+ foreach (var message in messages)
+ {
+ await MessageStorage.AssertOffsetTrackerMessageNotReceivedAsync(message);
+ }
+
+ var endOffset = MessageStorage.GetOffsetTrack();
+ MessageStorage.Clear();
+
+ // Act
+ var serviceProviderHelper = new ServiceProviderHelper();
+
+ await serviceProviderHelper.GetServiceProviderAsync(
+ consumerConfig =>
+ {
+ consumerConfig.ManualAssignPartitionOffsets(Bootstrapper.OffsetTrackerTopicName, new Dictionary { { 0, endOffset - 4 } })
+ .WithGroupId("ManualAssignPartitionOffsetsTest")
+ .WithBufferSize(100)
+ .WithWorkersCount(10)
+ .AddMiddlewares(
+ middlewares => middlewares
+ .AddDeserializer()
+ .AddTypedHandlers(
+ handlers => handlers.AddHandler()));
+ }, null);
+
+ // Assert
+ for (var i = 0; i < 5; i++)
+ {
+ await MessageStorage.AssertOffsetTrackerMessageNotReceivedAsync(messages[i], false);
+ }
+
+ for (var i = 5; i < 10; i++)
+ {
+ await MessageStorage.AssertOffsetTrackerMessageNotReceivedAsync(messages[i]);
+ }
+
+ await serviceProviderHelper.StopBusAsync();
+ }
+
+ private static void DeliveryHandler(DeliveryReport report, List messages)
+ {
+ var key = Encoding.UTF8.GetString(report.Message.Key);
+ var message = messages.First(m => m.Id.ToString() == key);
+ message.Offset = report.Offset;
+ }
}
diff --git a/tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs b/tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
index 5e317cc8e..730db3743 100644
--- a/tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
+++ b/tests/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
@@ -30,6 +30,7 @@ internal static class Bootstrapper
internal const string AvroGroupId = "consumer-avro";
internal const string JsonGroupId = "consumer-json";
internal const string NullGroupId = "consumer-null";
+ internal const string OffsetTrackerGroupId = "consumer-offset-tracker";
private const string ProtobufTopicName = "test-protobuf";
@@ -43,6 +44,7 @@ internal static class Bootstrapper
private const string AvroTopicName = "test-avro";
private const string NullTopicName = "test-null";
private const string DefaultParamsTopicName = "test-default-params";
+ internal const string OffsetTrackerTopicName = "test-offset-tracker";
private static readonly Lazy s_lazyProvider = new(SetupProvider);
@@ -203,6 +205,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.CreateTopicIfNotExists(ProtobufGzipTopicName, 2, 1)
.CreateTopicIfNotExists(ProtobufGzipTopicName2, 2, 1)
.CreateTopicIfNotExists(NullTopicName, 1, 1)
+ .CreateTopicIfNotExists(OffsetTrackerTopicName, 1, 1)
.CreateTopicIfNotExists(DefaultParamsTopicName)
.AddConsumer(
consumer => consumer
@@ -270,6 +273,22 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandler()
)))
+ .AddConsumer(
+ consumer => consumer
+ .Topic(OffsetTrackerTopicName)
+ .WithGroupId(OffsetTrackerGroupId)
+ .WithBufferSize(100)
+ .WithWorkersCount(10)
+ .WithAutoOffsetReset(AutoOffsetReset.Latest)
+ .AddMiddlewares(
+ middlewares => middlewares
+ .AddDeserializer()
+ .AddTypedHandlers(
+ handlers =>
+ handlers
+ .WithHandlerLifetime(InstanceLifetime.Singleton)
+ .AddHandler()
+ )))
.AddConsumer(
consumer => consumer
.Topics(GzipTopicName)
@@ -325,6 +344,12 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.AddMiddlewares(
middlewares => middlewares
.AddSerializer()))
+ .AddProducer(
+ producer => producer
+ .DefaultTopic(OffsetTrackerTopicName)
+ .AddMiddlewares(
+ middlewares => middlewares
+ .AddSerializer()))
.AddProducer(
producer => producer
.DefaultTopic(JsonGzipTopicName)
diff --git a/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs b/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs
index 40fc634e7..c7d296ca8 100644
--- a/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs
+++ b/tests/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs
@@ -18,6 +18,8 @@ internal static class MessageStorage
private static readonly ConcurrentBag<(long, int)> s_versions = new();
private static readonly ConcurrentBag s_byteMessages = new();
private static readonly ConcurrentBag s_nullMessages = new();
+ private static readonly ConcurrentBag s_offsetTrackerMessages = new();
+ private static long s_offsetTrack;
public static void Add(ITestMessage message)
{
@@ -34,6 +36,12 @@ public static void Add(TestProtoMessage message)
{
s_protoMessages.Add(message);
}
+
+ public static void Add(OffsetTrackerMessage message)
+ {
+ s_offsetTrackerMessages.Add(message);
+ s_offsetTrack = Math.Max(message.Offset, s_offsetTrack);
+ }
public static void Add(byte[] message)
{
@@ -139,6 +147,35 @@ public static async Task AssertNullMessageAsync()
await Task.Delay(100).ConfigureAwait(false);
}
}
+
+ public static async Task AssertOffsetTrackerMessageNotReceivedAsync(OffsetTrackerMessage message, bool assertInStore = true)
+ {
+ var start = DateTime.Now;
+
+ while (!s_offsetTrackerMessages.Any(x => x.Id == message.Id && x.Offset == message.Offset))
+ {
+ if (DateTime.Now.Subtract(start).Seconds > TimeoutSec)
+ {
+ if (assertInStore)
+ {
+ Assert.Fail("Message (OffsetTrackerMessage) not received");
+ }
+ return;
+ }
+
+ await Task.Delay(100).ConfigureAwait(false);
+ }
+
+ if (!assertInStore)
+ {
+ Assert.Fail("Message (OffsetTrackerMessage) received when it should not have been.");
+ }
+ }
+
+ public static long GetOffsetTrack()
+ {
+ return s_offsetTrack;
+ }
public static List<(long ticks, int version)> GetVersions()
{
@@ -151,5 +188,7 @@ public static void Clear()
s_testMessages.Clear();
s_byteMessages.Clear();
s_protoMessages.Clear();
+ s_offsetTrackerMessages.Clear();
+ s_offsetTrack = 0;
}
}
diff --git a/tests/KafkaFlow.IntegrationTests/Core/Handlers/OffsetTrackerMessageHandler.cs b/tests/KafkaFlow.IntegrationTests/Core/Handlers/OffsetTrackerMessageHandler.cs
new file mode 100644
index 000000000..4d514b4df
--- /dev/null
+++ b/tests/KafkaFlow.IntegrationTests/Core/Handlers/OffsetTrackerMessageHandler.cs
@@ -0,0 +1,14 @@
+using System.Threading.Tasks;
+using KafkaFlow.IntegrationTests.Core.Messages;
+
+namespace KafkaFlow.IntegrationTests.Core.Handlers;
+
+internal class OffsetTrackerMessageHandler : IMessageHandler
+{
+ public Task Handle(IMessageContext context, OffsetTrackerMessage message)
+ {
+ message.Offset = context.ConsumerContext.Offset;
+ MessageStorage.Add(message);
+ return Task.CompletedTask;
+ }
+}
\ No newline at end of file
diff --git a/tests/KafkaFlow.IntegrationTests/Core/Messages/OffsetTrackerMessage.cs b/tests/KafkaFlow.IntegrationTests/Core/Messages/OffsetTrackerMessage.cs
new file mode 100644
index 000000000..1bb456bb7
--- /dev/null
+++ b/tests/KafkaFlow.IntegrationTests/Core/Messages/OffsetTrackerMessage.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace KafkaFlow.IntegrationTests.Core.Messages;
+
+internal class OffsetTrackerMessage
+{
+ public Guid Id { get; set; }
+ public long Offset { get; set; }
+}
\ No newline at end of file
diff --git a/tests/KafkaFlow.IntegrationTests/Core/Producers/OffsetTrackerProducer.cs b/tests/KafkaFlow.IntegrationTests/Core/Producers/OffsetTrackerProducer.cs
new file mode 100644
index 000000000..033fb7a26
--- /dev/null
+++ b/tests/KafkaFlow.IntegrationTests/Core/Producers/OffsetTrackerProducer.cs
@@ -0,0 +1,6 @@
+namespace KafkaFlow.IntegrationTests.Core.Producers;
+
+public class OffsetTrackerProducer
+{
+
+}
\ No newline at end of file
diff --git a/tests/KafkaFlow.IntegrationTests/Core/ServiceProviderHelper.cs b/tests/KafkaFlow.IntegrationTests/Core/ServiceProviderHelper.cs
new file mode 100644
index 000000000..7d96a282f
--- /dev/null
+++ b/tests/KafkaFlow.IntegrationTests/Core/ServiceProviderHelper.cs
@@ -0,0 +1,104 @@
+using System;
+using System.IO;
+using System.Linq;
+using System.Threading.Tasks;
+using KafkaFlow.Configuration;
+using KafkaFlow.IntegrationTests.Core.Producers;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.Hosting;
+using Polly;
+
+namespace KafkaFlow.IntegrationTests.Core;
+
+public class ServiceProviderHelper
+{
+ private bool _isPartitionAssigned;
+ private IKafkaBus _bus;
+
+ public async Task GetServiceProviderAsync(
+ Action consumerConfiguration,
+ Action producerConfiguration,
+ Action builderConfiguration = null,
+ Action configureGlobalEvents = null)
+ {
+ if (consumerConfiguration == null && producerConfiguration == null)
+ {
+ throw new ArgumentException("At least one of the configurations must be provided");
+ }
+
+ var clusterBuilderAction = (HostBuilderContext context, IClusterConfigurationBuilder cluster) =>
+ {
+ cluster.WithBrokers(context.Configuration.GetValue("Kafka:Brokers").Split(';'));
+
+ if (consumerConfiguration != null)
+ {
+ cluster.AddConsumer(builder =>
+ {
+ consumerConfiguration(builder);
+ builder.WithPartitionsAssignedHandler((_, _) => { _isPartitionAssigned = true; });
+ });
+ }
+
+ if (producerConfiguration != null)
+ {
+ cluster.AddProducer(producerConfiguration);
+ }
+ };
+
+ clusterBuilderAction += (_, cluster) => { builderConfiguration?.Invoke(cluster); };
+
+ var builder = Host
+ .CreateDefaultBuilder()
+ .ConfigureAppConfiguration(
+ (_, config) =>
+ {
+ config
+ .SetBasePath(Directory.GetCurrentDirectory())
+ .AddJsonFile(
+ "conf/appsettings.json",
+ false,
+ true)
+ .AddEnvironmentVariables();
+ })
+ .ConfigureServices((context, services) =>
+ services.AddKafka(
+ kafka =>
+ {
+ kafka
+ .UseLogHandler()
+ .AddCluster(cluster => { clusterBuilderAction.Invoke(context, cluster); });
+
+ if (configureGlobalEvents != null)
+ {
+ kafka.SubscribeGlobalEvents(configureGlobalEvents);
+ }
+ }))
+ .UseDefaultServiceProvider(
+ (_, options) =>
+ {
+ options.ValidateScopes = true;
+ options.ValidateOnBuild = true;
+ });
+
+ var host = builder.Build();
+ _bus = host.Services.CreateKafkaBus();
+ await _bus.StartAsync();
+
+ await WaitForPartitionAssignmentAsync();
+
+ return host.Services;
+ }
+
+ public async Task StopBusAsync()
+ {
+ await _bus.StopAsync();
+ }
+
+ private async Task WaitForPartitionAssignmentAsync()
+ {
+ await Policy
+ .HandleResult(isAvailable => !isAvailable)
+ .WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2))))
+ .ExecuteAsync(() => Task.FromResult(_isPartitionAssigned));
+ }
+}
\ No newline at end of file
diff --git a/tests/KafkaFlow.IntegrationTests/GlobalEventsTest.cs b/tests/KafkaFlow.IntegrationTests/GlobalEventsTest.cs
index 366a456cb..0f2ddf19e 100644
--- a/tests/KafkaFlow.IntegrationTests/GlobalEventsTest.cs
+++ b/tests/KafkaFlow.IntegrationTests/GlobalEventsTest.cs
@@ -1,6 +1,5 @@
using System;
using System.IO;
-using System.Linq;
using System.Threading.Tasks;
using AutoFixture;
using Confluent.Kafka;
@@ -12,11 +11,8 @@
using KafkaFlow.IntegrationTests.Core.Middlewares;
using KafkaFlow.IntegrationTests.Core.Producers;
using KafkaFlow.Serializer;
-using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Hosting;
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Polly;
namespace KafkaFlow.IntegrationTests;
@@ -25,8 +21,6 @@ public class GlobalEventsTest
{
private readonly Fixture _fixture = new();
private string _topic;
- private bool _isPartitionAssigned;
- private IKafkaBus _bus;
[TestInitialize]
public void Setup()
@@ -66,6 +60,7 @@ public async Task OnStopping_RegisterMultipleOnStoppingCallbacks_AllAreCalled()
var countOnStopping = 0;
// Act
+ var serviceProviderHelper = new ServiceProviderHelper();
await this.GetServiceProviderAsync(
observers => { },
this.ConfigureConsumer,
@@ -74,9 +69,10 @@ await this.GetServiceProviderAsync(
{
cluster.OnStopping(_ => countOnStopping++);
cluster.OnStopping(_ => countOnStopping++);
- });
+ },
+ serviceProviderHelper);
- await _bus?.StopAsync();
+ await serviceProviderHelper.StopBusAsync();
// Assert
Assert.AreEqual(ExpectedOnStoppingCount, countOnStopping);
@@ -270,11 +266,7 @@ private void ConfigureConsumer(IConsumerConfigurationBuilder consumerConfigur
.AddMiddlewares(
middlewares => middlewares
.AddDeserializer()
- .Add())
- .WithPartitionsAssignedHandler((_, _) =>
- {
- _isPartitionAssigned = true;
- });
+ .Add());
}
private void ConfigureProducer(IProducerConfigurationBuilder producerConfigurationBuilder)
@@ -289,65 +281,20 @@ private async Task GetServiceProviderAsync(
Action configureGlobalEvents,
Action consumerConfiguration,
Action producerConfiguration,
- Action builderConfiguration = null)
+ Action builderConfiguration = null,
+ ServiceProviderHelper serviceProviderHelper = null)
{
- _isPartitionAssigned = false;
-
- var clusterBuilderAction = (HostBuilderContext context, IClusterConfigurationBuilder cluster) =>
- {
- cluster
- .WithBrokers(context.Configuration.GetValue("Kafka:Brokers").Split(';'))
- .CreateTopicIfNotExists(_topic, 1, 1)
- .AddProducer(producerConfiguration)
- .AddConsumer(consumerConfiguration);
- };
-
- clusterBuilderAction += (_, cluster) =>
- {
- builderConfiguration?.Invoke(cluster);
- };
-
- var builder = Host
- .CreateDefaultBuilder()
- .ConfigureAppConfiguration(
- (_, config) =>
- {
- config
- .SetBasePath(Directory.GetCurrentDirectory())
- .AddJsonFile(
- "conf/appsettings.json",
- false,
- true)
- .AddEnvironmentVariables();
- })
- .ConfigureServices((context, services) =>
- services.AddKafka(
- kafka => kafka
- .UseLogHandler()
- .AddCluster(cluster => { clusterBuilderAction.Invoke(context, cluster); })
- .SubscribeGlobalEvents(configureGlobalEvents)))
- .UseDefaultServiceProvider(
- (_, options) =>
- {
- options.ValidateScopes = true;
- options.ValidateOnBuild = true;
- });
-
- var host = builder.Build();
- _bus = host.Services.CreateKafkaBus();
- _bus.StartAsync().GetAwaiter().GetResult();
-
- await this.WaitForPartitionAssignmentAsync();
-
- return host.Services;
- }
-
- private async Task WaitForPartitionAssignmentAsync()
- {
- await Policy
- .HandleResult(isAvailable => !isAvailable)
- .WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2))))
- .ExecuteAsync(() => Task.FromResult(_isPartitionAssigned));
+ serviceProviderHelper ??= new ServiceProviderHelper();
+ return await serviceProviderHelper.GetServiceProviderAsync(
+ consumerConfiguration,
+ producerConfiguration,
+ cluster =>
+ {
+ cluster.CreateTopicIfNotExists(_topic, 1, 1);
+ builderConfiguration?.Invoke(cluster);
+ },
+ configureGlobalEvents
+ );
}
private class TriggerErrorMessageMiddleware : IMessageMiddleware
diff --git a/website/docs/guides/consumers/add-consumers.md b/website/docs/guides/consumers/add-consumers.md
index eb8c1333b..5cf11971e 100644
--- a/website/docs/guides/consumers/add-consumers.md
+++ b/website/docs/guides/consumers/add-consumers.md
@@ -79,6 +79,27 @@ services.AddKafka(kafka => kafka
);
```
+## Manual Partition Offset Assignment
+
+The client application can specify the offsets to start consuming from per partition for topics manually using the `ManualAssignPartitionOffsets()` method.
+
+
+```csharp
+using KafkaFlow;
+using KafkaFlow.Serializer;
+using Microsoft.Extensions.DependencyInjection;
+
+services.AddKafka(kafka => kafka
+ .AddCluster(cluster => cluster
+ .WithBrokers(new[] { "localhost:9092" })
+ .AddConsumer(consumer => consumer
+ .ManualAssignPartitionOffsets("topic-name", new Dictionary { { 0, 100 }, { 1, 120 } })
+ ...
+ )
+ )
+);
+```
+
## Offset Strategy
You can configure the Offset Strategy for a consumer group in case the Consumer Group has no offset stored in Kafka.