From 1849ee481d2aff5d646eda21b3e88b4c96d9cb3a Mon Sep 17 00:00:00 2001 From: Ben Osheroff Date: Sat, 9 Sep 2023 08:57:56 -0700 Subject: [PATCH] Revert "potential fix for #2016" Right idea, but dumb implementation. This reverts commit c20370c98aae16994046f579e99e3610c7b4050d. --- .../producer/AbstractAsyncProducer.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java b/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java index f04c842e9..97262b537 100644 --- a/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java @@ -30,18 +30,20 @@ public CallbackCompleter(InflightMessageList inflightMessages, Position position public void markCompleted() { inflightMessages.freeSlot(messageID); - InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position); + if(isTXCommit) { + InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position); - if (message != null && this.isTXCommit) { - context.setPosition(message.position); - long currentTime = System.currentTimeMillis(); - long endToEndLatency = currentTime - message.eventTimeMS; + if (message != null) { + context.setPosition(message.position); + long currentTime = System.currentTimeMillis(); + long endToEndLatency = currentTime - message.eventTimeMS; - messagePublishTimer.update(currentTime - message.sendTimeMS, TimeUnit.MILLISECONDS); - messageLatencyTimer.update(Math.max(0L, endToEndLatency - 500L), TimeUnit.MILLISECONDS); + messagePublishTimer.update(currentTime - message.sendTimeMS, TimeUnit.MILLISECONDS); + messageLatencyTimer.update(Math.max(0L, endToEndLatency - 500L), TimeUnit.MILLISECONDS); - if (endToEndLatency > metricsAgeSloMs) { - messageLatencySloViolationCount.inc(); + if (endToEndLatency > metricsAgeSloMs) { + messageLatencySloViolationCount.inc(); + } } } } @@ -81,7 +83,9 @@ public final void push(RowMap r) throws Exception { long messageID = inflightMessages.waitForSlot(); - inflightMessages.addMessage(position, r.getTimestampMillis(), messageID); + if(r.isTXCommit()) { + inflightMessages.addMessage(position, r.getTimestampMillis(), messageID); + } CallbackCompleter cc = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID);