-
Disclaimer: I'm new to Kafka Streams so maybe this isn't related to this specific implementation. I'm able to forward messages from one topic to another and filter events as well. For example: using Confluent.Kafka;
using Streamiz.Kafka.Net;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.Stream;
var config = new StreamConfig<StringSerDes, StringSerDes>();
config.ApplicationId = "test-app";
config.BootstrapServers = "pkc-n00kk.us-east-1.aws.confluent.cloud:9092";
config.SecurityProtocol = SecurityProtocol.SaslSsl;
config.SaslUsername = "<<SaslUsername>>";
config.SaslPassword = "<<SaslPassword>>";
config.SaslMechanism = SaslMechanism.Plain;
StreamBuilder builder = new StreamBuilder();
builder.Stream<string, string>("topic1")
.FilterNot((k, v) => v.Contains("test"))
.To("topic2");
Topology t = builder.Build();
KafkaStream stream = new KafkaStream(t, config);
Console.CancelKeyPress += (o, e) =>
{
stream.Dispose();
};
await stream.StartAsync(); However, when I do any operation that involves tables or grouping/counting/aggregating the program exists after 5 seconds (I think this is related to the default value for For example: using Confluent.Kafka;
using Streamiz.Kafka.Net;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.Stream;
var config = new StreamConfig<StringSerDes, StringSerDes>();
config.ApplicationId = "test-app";
config.BootstrapServers = "pkc-n00kk.us-east-1.aws.confluent.cloud:9092";
config.SecurityProtocol = SecurityProtocol.SaslSsl;
config.SaslUsername = "<<SaslUsername>>";
config.SaslPassword = "<<SaslPassword>>";
config.SaslMechanism = SaslMechanism.Plain;
StreamBuilder builder = new StreamBuilder();
builder.Stream<string, string>("topic1")
.GroupBy((k, v) => v)
.Count()
.ToStream()
.To("topic2");
Topology t = builder.Build();
KafkaStream stream = new KafkaStream(t, config);
Console.CancelKeyPress += (o, e) =>
{
stream.Dispose();
};
await stream.StartAsync(); Or for example: using Confluent.Kafka;
using Streamiz.Kafka.Net;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.Stream;
var config = new StreamConfig<StringSerDes, StringSerDes>();
config.ApplicationId = "test-app";
config.BootstrapServers = "pkc-n00kk.us-east-1.aws.confluent.cloud:9092";
config.SecurityProtocol = SecurityProtocol.SaslSsl;
config.SaslUsername = "<<SaslUsername>>";
config.SaslPassword = "<<SaslPassword>>";
config.SaslMechanism = SaslMechanism.Plain;
StreamBuilder builder = new StreamBuilder();
builder.Table("topic_9", InMemory<string, string>.As("test-store"))
.ToStream()
.To("topic_10");
Topology t = builder.Build();
KafkaStream stream = new KafkaStream(t, config);
Console.CancelKeyPress += (o, e) =>
{
stream.Dispose();
};
await stream.StartAsync(); Notes: Any idea why this is happening and how to fix this? Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 6 replies
-
Hi @yornstei , Could you forward your logs when the issue happen please ? Kr |
Beta Was this translation helpful? Give feedback.
Hi @yornstei ,
Could you forward your logs when the issue happen please ?
Kr