Replies: 2 comments 4 replies
-
Hi @oesebus , Regarding your topology, I suppose that But the groupBy operation are not the correct value serdes, could you try this one ? var mergedRawDiscussionKTable =
KS0Keyed
.Merge(KS1Keyed)
.Merge(KS2Keyed)
.Merge(KS3Keyed)
.Merge(KS4Keyed)
.GroupByKey<StringSerDes, StoryBuilderSerDes>()
.Count(InMemory<string, long>.As("DiscussionCountState"));
mergedRawDiscussionKTable.ToStream().Peek((k, v) => Console.WriteLine($" Key {k} | Timestamp : {v}")); Regards, |
Beta Was this translation helpful? Give feedback.
4 replies
-
Link : #163 |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
IKStream<string, XtBridgeEvent> KS0 = builder.Stream("XtBridgeEvent", new StringSerDes(), new XtBridgeEventSerDes()); IKStream<string, XtMediaAssetOrchestratorCommand> KS1 = builder.Stream("XtMediaAssetOrchestratorCommand", new StringSerDes(), new XtMediaAssetOrchestratorCommandSerDes()); IKStream<string, XtMediaAssetOrchestratorEvent> KS2 = builder.Stream("XtMediaAssetOrchestratorEvent", new StringSerDes(), new XtMediaAssetOrchestratorEventSerDes()); IKStream<string, XtBridgeCommand> KS3 = builder.Stream("XtBridgeCommand", new StringSerDes(), new XtBridgeCommandSerDes()); IKStream<string, RecorderGroupQueryCommand> KS4 = builder.Stream("RecorderGroupQueryCommand", new StringSerDes(), new RecorderGroupQueryCommandSerDes());
Filtering Operations
var KS0Keyed = KS0.Filter((_, v) => v.SpecificCase is not XtBridgeEvent.SpecificOneofCase.None).SelectKey((_, v) => v.Headers.DiscussionId).MapValues((v) => StoryBuilder.Map(v)); var KS1Keyed = KS1.Filter((_, v) => v.SpecificCase is not XtMediaAssetOrchestratorCommand.SpecificOneofCase.None).SelectKey((_, v) => v.Headers.DiscussionId).MapValues((v) => StoryBuilder.Map(v)); var KS2Keyed = KS2.Filter((_, v) => v.SpecificCase is not XtMediaAssetOrchestratorEvent.SpecificOneofCase.None).SelectKey((_, v) => v.Headers.DiscussionId).MapValues((v) => StoryBuilder.Map(v)); var KS3Keyed = KS3.Filter((_, v) => v.SpecificCase is not XtBridgeCommand.SpecificOneofCase.None).SelectKey((_, v) => v.Headers.DiscussionId).MapValues((v) => StoryBuilder.Map(v)); var KS4Keyed = KS4.Filter((_, v) => v.SpecificCase is not RecorderGroupQueryCommand.SpecificOneofCase.None).SelectKey((_, v) => v.Headers.DiscussionId).MapValues((v) => StoryBuilder.Map(v));
Merging Operation + grouping + Count
var mergedRawDiscussionKTable = KS0Keyed.Merge(KS1Keyed) .Merge(KS2Keyed) .Merge(KS3Keyed) .Merge(KS4Keyed) .GroupBy<string,StringSerDes>((k,_) => k) .Count(InMemory<string, long>.As("DiscussionCountState")); mergedRawDiscussionKTable.ToStream().Peek((k, v) => Console.WriteLine($" Key {k} | Timestamp : {v}"));
Error =>
Inner Exception handling => Streamiz.Kafka.Net.Errors.StreamsException: stream-task[0|0]|processor[KSTREAM-SINK-0000000026]- The value serdes (Streamiz.Kafka.Net.SerDes.StringSerDes) is not compatible to the actual value (Oesebus.EVS.KafkaService.Application.Core.Models.StoryBuilder) for this processor. Change the default value serdes in StreamConfig or provide correct Serdes via method parameters(using the DSL)
at Streamiz.Kafka.Net.Processors.SinkProcessor
2.Process(K key, V value) at Streamiz.Kafka.Net.Processors.AbstractProcessor
2.Forward(K key, V value)at Streamiz.Kafka.Net.Processors.KStreamFilterProcessor`2.Process(K key, V value)
Any idea why i receive such error ?
Thanks a lot for your help
Beta Was this translation helpful? Give feedback.
All reactions