-
I want to build a global ktable from a stream. var config = new StreamConfig
{
ApplicationId = "test",
BootstrapServers = "localhost:9092",
SchemaRegistryUrl = "http://localhost:8081",
AutoRegisterSchemas = true,
DefaultKeySerDes = new StringSerDes(),
DefaultValueSerDes = new AvroSerdes<ComplexType>()
};
StreamBuilder builder = new();
IKTable<string, ComplexType> ktable = builder
.Stream<string, ComplexType>("inputTopic")
.GroupByKey()
.Aggregate<ComplexType>(
() => null!,
// let's imagine a complex aggregation
(key, value, aggregation) => value,
// I really dont need this in memory, can I skip this part?
InMemory.As<string, ComplexType>("storename"));
IGlobalKTable<string, ComplexType> gkTable = builder
.GlobalTable<string, ComplexType>(
// do I need to "guess" the topic name? is there any other way?
$"{config.ApplicationId}-storename-changelog");
record ComplexType(string some, string values); The guessing part, doesn't feel right to me, but I can't find any help on this online. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
Hi @pitermarx, Unfortunately, you can't consume a topic and built a global store manually (you can do it in Kafka Streams JAVA). It's not recommanded to use the changelog topic because it's an internal topic and we don't manage it all. So regarding your use case, it could look like this : StreamBuilder builder = new();
IKTable<string, ComplexType> ktable = builder
.Stream<string, ComplexType>("inputTopic")
.GroupByKey()
.Aggregate<ComplexType>(
() => null!,
// let's imagine a complex aggregation
(key, value, aggregation) => value)
.ToStream()
.To("output");
IGlobalKTable<string, ComplexType> gkTable = builder.GlobalTable<string, ComplexType>("output"); Best regards, |
Beta Was this translation helpful? Give feedback.
Hi @pitermarx,
Unfortunately, you can't consume a topic and built a global store manually (you can do it in Kafka Streams JAVA).
With Streamiz, you have to consume your topic, aggregate, forward the result of your aggregation into an another topic and use this topic as a globalKtable.
It's not recommanded to use the changelog topic because it's an internal topic and we don't manage it all.
So regarding your use case, it could look like this :