-
hello. I have a left join where the left stream can have a message (L-message) every 5 seconds and the right one has a message (R-message) every 2 seconds. also the right stream message can be published with some delay so I put grace period: 10 sec
my expectation was that there will be two messages in the output topic - one for the L-message and one more when the R-message appeared but I can only see the final message produced out of the both streams. Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 4 replies
-
Hi @ppilev , Can you enable debug logging, reprocess the records with your expectation and push these logs here please ? Best regards, |
Beta Was this translation helpful? Give feedback.
-
hello. I tried to crate a simplified version of that app I'm testing with but now my secondary stream data doesn't appear into joined stream at all. I have two topics -
The C# code setting up the streams: var products = builder.Stream<TopicKey, Message<Product>>(
"products",
new JsonSerDes<TopicKey>(), new JsonSerDes<Message<Product>>(),
new MessageTimestampExtractor<Message<Product>>(m => m.Data.LastUpdated));
var details = builder.Stream<TopicKey, Message<ProductDetail>>(
"product-details",
new JsonSerDes<TopicKey>(), new JsonSerDes<Message<ProductDetail>>(),
new MessageTimestampExtractor<Message<ProductDetail>>(m => m.Data.LastUpdated));
var windowOptions = new CustomWindowsOptions(0, 2000L, 10000L, JoinWindowOptions.DEFAULT_RETENTION_MS);
products
.LeftJoin<Message<ProductDetail>, ProductWithDetail, JsonSerDes<Message<ProductDetail>>, JsonSerDes<ProductWithDetail>>(
details,
(productMessage, detailMessage) => new ProductWithDetail(productMessage.Data, detailMessage?.Data),
windowOptions)
.To<JsonSerDes<TopicKey>, JsonSerDes<ProductWithDetail>>("product-with-detail"); My stream config: var config = new StreamConfig
{
ApplicationId = "streamiz-processor-products",
BootstrapServers = "localhost:29092",
ReplicationFactor = 1,
AutoOffsetReset = AutoOffsetReset.Earliest,
CommitIntervalMs = 1000,
Logger = LoggerFactory.Create(builder => builder
.SetMinimumLevel(LogLevel.Debug)
.AddConsole()
.AddLog4Net(new Log4NetProviderOptions()))
}; The debug logs: Thanks! |
Beta Was this translation helpful? Give feedback.
-
@LGouellec, var windowOptions = new CustomWindowsOptions(2000L, 0L, 10000L, JoinWindowOptions.DEFAULT_RETENTION_MS); this config however doesn't seem to satisfy the second part of the
|
Beta Was this translation helpful? Give feedback.
-
Thanks @ppilev , |
Beta Was this translation helpful? Give feedback.
-
Hi @ppilev , Your analyse is brilliant ! You totally right, Kafka Streams JAVA swap the values Sure ! If you wrote some tests, please feel free to submit PR and I will try to merge ASAP for the next release ( Regards, |
Beta Was this translation helpful? Give feedback.
Hi @ppilev ,
Your analyse is brilliant ! You totally right, Kafka Streams JAVA swap the values
beforeMs
andafterMs
here.Sure ! If you wrote some tests, please feel free to submit PR and I will try to merge ASAP for the next release (
1.5.0
)Regards,