Skip to content

Commit

Permalink
fix(queue): Reset ackAttempts on successful ack
Browse files Browse the repository at this point in the history
Reset ackAttempts from the message metadata when a message is ack'd.
This avoids having this number grow over time for very long-lived
messages that keep getting re-enqueued, with the occasional failure
causing a message to not be acknowledged and eventually get dropped
once 5 acks have been missed in total.
  • Loading branch information
nicolasff committed Jun 20, 2023
1 parent 56c7206 commit 092caf8
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class SqlQueueConfiguration {
lockTtlSeconds = properties.lockTtlSeconds,
mapper = mapper,
serializationMigrator = serializationMigrator,
resetAttemptsOnAck = properties.resetAttemptsOnAck,
ackTimeout = properties.ackTimeout,
deadMessageHandlers = listOf(deadMessageHandler),
publisher = publisher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class SqlQueueProperties {
*/
var ackTimeout: Duration = Duration.ofMinutes(2)

/**
* When set to true, the number of ack attempts in the message metadata will be reset to 0
* when the message is acked. This is to avoid having this count grow over time for long-lived
* messages that risk being dropped when the max attempts is reached.
*/
var resetAttemptsOnAck: Boolean = true

/**
* The length of time in seconds that a message with a locked set on the queue table has to
* be moved to the unacked table, signifying that it is actively being processed. Messages
Expand Down
59 changes: 49 additions & 10 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class SqlQueue(
private val lockTtlSeconds: Int,
private val mapper: ObjectMapper,
private val serializationMigrator: Optional<SerializationMigrator>,
private val resetAttemptsOnAck: Boolean = true,
override val ackTimeout: Duration = Duration.ofMinutes(5),
override val deadMessageHandlers: List<DeadMessageCallback>,
override val canPollMany: Boolean = true,
Expand Down Expand Up @@ -354,7 +355,7 @@ class SqlQueue(
atTime(ackTimeout)
},
maxAttempts = message.getAttribute<MaxAttemptsAttribute>()?.maxAttempts ?: 0,
ackCallback = this::ackMessage.partially1(fingerprint)
ackCallback = this::ackMessage.partially1(fingerprint) // partial application, still needs an up-to-date version of the message
)
)
} catch (e: Exception) {
Expand Down Expand Up @@ -441,7 +442,7 @@ class SqlQueue(
.filterNot { toRelease.contains(it.queueId) }
.forEach {
fire(MessageProcessing(it.message, it.scheduledTime, clock.instant()))
callback(it.message, it.ackCallback)
callback(it.message, { it.ackCallback(it.message) })
}
}

Expand Down Expand Up @@ -700,7 +701,8 @@ class SqlQueue(
if (ackAttemptsAttribute.ackAttempts >= Queue.maxRetries ||
(maxAttempts > 0 && attempts > maxAttempts)
) {
log.warn("Message $fingerprint with payload $message exceeded max ack retries")
log.warn("Message $fingerprint with payload $message exceeded max ack retries," +
" moving to DLQ attempts=$attempts acks=$acks maxAttempts=$maxAttempts")
dlq = true
}
} catch (e: Exception) {
Expand Down Expand Up @@ -842,25 +844,62 @@ class SqlQueue(
}
}

private fun ackMessage(fingerprint: String) {
private fun ackMessage(fingerprint: String, message: Message) {
if (log.isDebugEnabled) {
log.debug("Acking message $fingerprint")
}
withPool(poolName) {
withRetry(WRITE) {
jooq.deleteFrom(unackedTable)
.where(fingerprintField.eq(fingerprint))
.execute()
}

withRetry(WRITE) {
jooq.update(messagesTable)
.set(updatedAtField, clock.millis())
.where(fingerprintField.eq(fingerprint))
.execute()
if (resetAttemptsOnAck) {
resetAcksAndMarkUpdated(fingerprint, message)
} else {
markMessageUpdatedOptBody(fingerprint, null) // only set updatedAt
}
}

fire(MessageAcknowledged)
}

/**
* Mark the message as updated by setting its updateAt field.
* Optionally update the message body if provided.
*/
private fun markMessageUpdatedOptBody(fingerprint: String, message: Message?) =
withRetry(WRITE) {
val step = jooq.update(messagesTable)
.set(updatedAtField, clock.millis())
if (message != null) {
step.set(bodyField, mapper.writeValueAsString(message))
}
step.where(fingerprintField.eq(fingerprint))
.execute()
}

/**
* Mark the message as updated, and reset ackAttempts to 0 if needed.
* This is to avoid messages gradually growing their ackAttempts value over time
* due to sporadic failures until they are dropped. Since we're acking the message,
* we also reset this counter here if it was non-zero.
*/
private fun resetAcksAndMarkUpdated(fingerprint: String, message: Message) {
val ackAttemptsAttribute = (message.getAttribute() ?: AckAttemptsAttribute())
val updateMessage = ackAttemptsAttribute.ackAttempts > 0
if (updateMessage) {
log.info(
"Resetting ackAttempts for message {}, was {}", // at INFO since this is somewhat unusual
fingerprint, ackAttemptsAttribute.ackAttempts
)
ackAttemptsAttribute.ackAttempts = 0
message.setAttribute(ackAttemptsAttribute)
}
markMessageUpdatedOptBody(fingerprint, message)
}

private fun deleteAll(fingerprint: String) {
withRetry(WRITE) {
jooq.deleteFrom(queueTable)
Expand Down Expand Up @@ -963,6 +1002,6 @@ class SqlQueue(
val message: Message,
val expiry: Long,
val maxAttempts: Int,
val ackCallback: () -> Unit
val ackCallback: (Message) -> Unit
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ private fun createQueue(clock: Clock,
)
},
serializationMigrator = Optional.empty(),
resetAttemptsOnAck = true,
ackTimeout = Duration.ofSeconds(60),
deadMessageHandlers = listOf(deadLetterCallback),
publisher = publisher ?: (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class SqlOrcaQueueConfiguration : SqlQueueConfiguration() {
lockTtlSeconds = properties.lockTtlSeconds,
mapper = mapper,
serializationMigrator = serializationMigrator,
resetAttemptsOnAck = properties.resetAttemptsOnAck,
ackTimeout = properties.ackTimeout,
deadMessageHandlers = listOf(deadMessageHandler),
publisher = publisher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class SqlQueueShovelConfiguration {
lockTtlSeconds = properties.lockTtlSeconds,
mapper = mapper,
serializationMigrator = serializationMigrator,
resetAttemptsOnAck = properties.resetAttemptsOnAck,
ackTimeout = properties.ackTimeout,
deadMessageHandlers = listOf(deadMessageHandler),
publisher = publisher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class SqlRedisQueueShovelConfiguration {
lockTtlSeconds = sqlQueueProperties.lockTtlSeconds,
mapper = mapper,
serializationMigrator = serializationMigrator,
resetAttemptsOnAck = sqlQueueProperties.resetAttemptsOnAck,
deadMessageHandlers = emptyList(),
publisher = publisher,
sqlRetryProperties = sqlQueueProperties.retries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class SqlTestConfig {
1,
mapper,
Optional.empty(),
true,
Duration.ofSeconds(1),
emptyList(),
true,
Expand Down

0 comments on commit 092caf8

Please sign in to comment.