Skip to content

Commit

Permalink
Merge pull request #2019 from zendesk/ben/issue_2016
Browse files Browse the repository at this point in the history
potential fix for #2016
  • Loading branch information
osheroff authored Jul 5, 2023
2 parents 3ab28c6 + c20370c commit 800a489
Showing 1 changed file with 10 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,18 @@ public CallbackCompleter(InflightMessageList inflightMessages, Position position

public void markCompleted() {
inflightMessages.freeSlot(messageID);
if(isTXCommit) {
InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position);
InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position);

if (message != null) {
context.setPosition(message.position);
long currentTime = System.currentTimeMillis();
long endToEndLatency = currentTime - message.eventTimeMS;
if (message != null && this.isTXCommit) {
context.setPosition(message.position);
long currentTime = System.currentTimeMillis();
long endToEndLatency = currentTime - message.eventTimeMS;

messagePublishTimer.update(currentTime - message.sendTimeMS, TimeUnit.MILLISECONDS);
messageLatencyTimer.update(Math.max(0L, endToEndLatency - 500L), TimeUnit.MILLISECONDS);
messagePublishTimer.update(currentTime - message.sendTimeMS, TimeUnit.MILLISECONDS);
messageLatencyTimer.update(Math.max(0L, endToEndLatency - 500L), TimeUnit.MILLISECONDS);

if (endToEndLatency > metricsAgeSloMs) {
messageLatencySloViolationCount.inc();
}
if (endToEndLatency > metricsAgeSloMs) {
messageLatencySloViolationCount.inc();
}
}
}
Expand Down Expand Up @@ -84,9 +82,7 @@ public final void push(RowMap r) throws Exception {

long messageID = inflightMessages.waitForSlot();

if(r.isTXCommit()) {
inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);
}
inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);

CallbackCompleter cc = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID);

Expand Down

0 comments on commit 800a489

Please sign in to comment.