-
Hi, Recently started using this package and I need some help. I've setup a topic which I then filter to another topic:
This works fine. Now I want to group the message by key, reduce them and output to another topic:
This doesn't seem to be working as there are no messages in the finalTopic. Please can you indicate where I am going wrong - basically how do I get the results from the KTable into a topic? Thanks |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 7 replies
-
Hi, Some strange. builder.Stream<string, string>("tempTopic").GroupByKey().Reduce(new MyReducer()).ToStream().To("finalTopic"); Example : var config = new StreamConfig<StringSerDes, StringSerDes>();
config.ApplicationId = "test-reduce";
var builder = new StreamBuilder();
builder
.Stream<string, string>("tempTopic")
.GroupByKey()
.Reduce(new MyReducer())
.ToStream()
.To("finalTopic");
var topology = builder.Build();
using (var driver = new TopologyTestDriver(topology, config))
{
var input = driver.CreateInputTopic<string, string>("tempTopic");
var output = driver.CreateOutputTopic<string, string>("finalTopic");
input.PipeInput("test", "1");
input.PipeInput("test", "100");
// todo - get value on output topic
} Do you have any records in output topic ? |
Beta Was this translation helpful? Give feedback.
-
Thanks for the reply. I have now tested both parts individually and they work but putting the together on the same builder doesn't work. var config = new StreamConfig<StringSerDes, StringSerDes>()
{
ApplicationId = "test-reduction"
};
StreamBuilder builder = new StreamBuilder();
builder
.Stream<string, string>("myTopic")
.Filter((key, value) =>
{
return key == "1";
})
.To("tempTopic");
builder.Stream<string, string>("tempTopic")
.GroupByKey()
.Reduce(new MediumEventMessageReducer())
.ToStream()
.To("finalTopic");
var topology = builder.Build();
using (var driver = new TopologyTestDriver(topology, config))
{
var input = driver.CreateInputTopic<string, string>("myTopic");
var output1 = driver.CreateInputTopic<string, string>("tempTopic");
var output = driver.CreateOuputTopic<string, string>("finalTopic");
input.PipeInput("1", "Once");
input.PipeInput("2", "Once");
input.PipeInput("1", "Twice");
input.PipeInput("3", "Once");
input.PipeInput("1", "Thrice");
input.PipeInput("2", "Twice");
var list = output.ReadKeyValueList().Select(r => KeyValuePair.Create(r.Message.Key, r.Message.Value)).ToList();
foreach (var item in list)
{
Console.WriteLine(item);
}
Assert.IsNotNull("x");
} Must something I'm doing? By the way, your TopologyTestDriver doesn't have a CreateOutputTopic, it's spelt CreateOuputTopic - not sure if that's intentional |
Beta Was this translation helpful? Give feedback.
-
If you run your topology with a real cluster, you haven't records in output topic too ? |
Beta Was this translation helpful? Give feedback.
-
First of all, Thank you very very much for your efforts. Really great work. Probably I am experiencing the same issue so continuing with this thread. My stream was working in RC1 Package but not anymore.
|
Beta Was this translation helpful? Give feedback.
-
Release 1.2.1 is available : https://github.com/LGouellec/kafka-streams-dotnet/releases/tag/v1.2.1 |
Beta Was this translation helpful? Give feedback.
Release 1.2.1 is available : https://github.com/LGouellec/kafka-streams-dotnet/releases/tag/v1.2.1