You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently, handling of invalid messages with the available DLQ does only work if the strategy has a direct processing chain and does not include batching. Consider the following:
0) consume -> 1) batch -> 2) process in batch -> 3) unbatch -> 4) process message wise -> 5) produce -> 6) commit
All (individual) strategies after the batch step (2 & 4) cannot raise an InvalidMessage exception to trigger the DLQ, because any exception would dump the current batch of messages. Furthermore, if we need the faulty message in the DLQ, the strategy needs some kind of routing producer, which routes valid messages to one and invalid messages to the DLQ topic.
To avoid this, I would like the StreamProcessor to expose it’s handle_invalid_message to the strategy factory, like it does with commit. This would introduce considerably changes to the API but also allow strategies to pass invalid messages directly to the dlq handler, utilizing the configured policy and buffered messages, while avoiding complex routing logic and replicating the DLQ handling.
The text was updated successfully, but these errors were encountered:
On further thought I think the following changes would be necessary to make this work:
inject 'handle_invalid_message' to the strategy factory
adjust the signature of 'create_with_partitions'
adjust the task strategies to pass 'InvalidMessage' exceptions to the provided handler
Which would move the responsibility of triggering the DLQ-logic from the processor to the strategies, but keep it consistent for batch and "simple" processing.
To put individual messages from batches to the DLQ (step 2 in the example above) the strategy would also need to pass the handle to the function. Thus, another breaking change or we could simply add a new 'RunTaskOnBatch' strategy which covers that.
Currently, handling of invalid messages with the available DLQ does only work if the strategy has a direct processing chain and does not include batching. Consider the following:
All (individual) strategies after the batch step (2 & 4) cannot raise an
InvalidMessage
exception to trigger the DLQ, because any exception would dump the current batch of messages. Furthermore, if we need the faulty message in the DLQ, the strategy needs some kind of routing producer, which routes valid messages to one and invalid messages to the DLQ topic.To avoid this, I would like the
StreamProcessor
to expose it’shandle_invalid_message
to the strategy factory, like it does withcommit
. This would introduce considerably changes to the API but also allow strategies to pass invalid messages directly to the dlq handler, utilizing the configured policy and buffered messages, while avoiding complex routing logic and replicating the DLQ handling.The text was updated successfully, but these errors were encountered: