Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(queue): Reset ackAttempts on successful SQL ack #4479

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class SqlQueueConfiguration {
lockTtlSeconds = properties.lockTtlSeconds,
mapper = mapper,
serializationMigrator = serializationMigrator,
resetAttemptsOnAck = properties.resetAttemptsOnAck,
ackTimeout = properties.ackTimeout,
deadMessageHandlers = listOf(deadMessageHandler),
publisher = publisher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class SqlQueueProperties {
*/
var ackTimeout: Duration = Duration.ofMinutes(2)

/**
* When set to true, the number of ack attempts in the message metadata will be reset to 0
* when the message is acked. This is to avoid having this count grow over time for long-lived
* messages that risk being dropped when the max attempts is reached.
*/
var resetAttemptsOnAck: Boolean = true

/**
* The length of time in seconds that a message with a locked set on the queue table has to
* be moved to the unacked table, signifying that it is actively being processed. Messages
Expand Down
59 changes: 49 additions & 10 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class SqlQueue(
private val lockTtlSeconds: Int,
private val mapper: ObjectMapper,
private val serializationMigrator: Optional<SerializationMigrator>,
private val resetAttemptsOnAck: Boolean = true,
override val ackTimeout: Duration = Duration.ofMinutes(5),
override val deadMessageHandlers: List<DeadMessageCallback>,
override val canPollMany: Boolean = true,
Expand Down Expand Up @@ -354,7 +355,7 @@ class SqlQueue(
atTime(ackTimeout)
},
maxAttempts = message.getAttribute<MaxAttemptsAttribute>()?.maxAttempts ?: 0,
ackCallback = this::ackMessage.partially1(fingerprint)
ackCallback = this::ackMessage.partially1(fingerprint) // partial application, still needs an up-to-date version of the message
)
)
} catch (e: Exception) {
Expand Down Expand Up @@ -441,7 +442,7 @@ class SqlQueue(
.filterNot { toRelease.contains(it.queueId) }
.forEach {
fire(MessageProcessing(it.message, it.scheduledTime, clock.instant()))
callback(it.message, it.ackCallback)
callback(it.message, { it.ackCallback(it.message) })
}
}

Expand Down Expand Up @@ -700,7 +701,8 @@ class SqlQueue(
if (ackAttemptsAttribute.ackAttempts >= Queue.maxRetries ||
(maxAttempts > 0 && attempts > maxAttempts)
) {
log.warn("Message $fingerprint with payload $message exceeded max ack retries")
log.warn("Message $fingerprint with payload $message exceeded max ack retries," +
" moving to DLQ attempts=$attempts acks=$acks maxAttempts=$maxAttempts")
dlq = true
}
} catch (e: Exception) {
Expand Down Expand Up @@ -842,25 +844,62 @@ class SqlQueue(
}
}

private fun ackMessage(fingerprint: String) {
private fun ackMessage(fingerprint: String, message: Message) {
if (log.isDebugEnabled) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon my ignorance, is this required? If debug logging isn't enabled log.debug won't do anything anyway right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If fingerprint is big, we'll still spend non-trivial compute resources building the string to log even though log.debug doesn't actually log it.

log.debug("Acking message $fingerprint")
}
withPool(poolName) {
withRetry(WRITE) {
jooq.deleteFrom(unackedTable)
.where(fingerprintField.eq(fingerprint))
.execute()
}

withRetry(WRITE) {
jooq.update(messagesTable)
.set(updatedAtField, clock.millis())
.where(fingerprintField.eq(fingerprint))
.execute()
if (resetAttemptsOnAck) {
resetAcksAndMarkUpdated(fingerprint, message)
} else {
markMessageUpdatedOptBody(fingerprint, null) // only set updatedAt
}
}

fire(MessageAcknowledged)
}

/**
* Mark the message as updated by setting its updateAt field.
* Optionally update the message body if provided.
*/
private fun markMessageUpdatedOptBody(fingerprint: String, message: Message?) =
withRetry(WRITE) {
val step = jooq.update(messagesTable)
.set(updatedAtField, clock.millis())
if (message != null) {
step.set(bodyField, mapper.writeValueAsString(message))
}
step.where(fingerprintField.eq(fingerprint))
.execute()
}

/**
* Mark the message as updated, and reset ackAttempts to 0 if needed.
* This is to avoid messages gradually growing their ackAttempts value over time
* due to sporadic failures until they are dropped. Since we're acking the message,
* we also reset this counter here if it was non-zero.
*/
private fun resetAcksAndMarkUpdated(fingerprint: String, message: Message) {
val ackAttemptsAttribute = (message.getAttribute() ?: AckAttemptsAttribute())
val updateMessage = ackAttemptsAttribute.ackAttempts > 0
if (updateMessage) {
log.info(
"Resetting ackAttempts for message {}, was {}", // at INFO since this is somewhat unusual
fingerprint, ackAttemptsAttribute.ackAttempts
)
ackAttemptsAttribute.ackAttempts = 0
message.setAttribute(ackAttemptsAttribute)
}
markMessageUpdatedOptBody(fingerprint, message)
}

private fun deleteAll(fingerprint: String) {
withRetry(WRITE) {
jooq.deleteFrom(queueTable)
Expand Down Expand Up @@ -963,6 +1002,6 @@ class SqlQueue(
val message: Message,
val expiry: Long,
val maxAttempts: Int,
val ackCallback: () -> Unit
val ackCallback: (Message) -> Unit
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2023 Apple, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.q.sql

import com.netflix.spinnaker.kork.sql.test.SqlTestUtil
import com.netflix.spinnaker.q.AckAttemptsAttribute
import com.netflix.spinnaker.q.DeadMessageCallback
import com.netflix.spinnaker.q.TestMessage
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.time.MutableClock
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.reset
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.junit.platform.runner.JUnitPlatform
import org.junit.runner.RunWith
import java.util.*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please avoid star imports



@RunWith(JUnitPlatform::class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With PRs like #4484, we're trying to modernize junit things. I tried on my local machine and the tests still execute without this line. Here are the diffs I used:

diff --git a/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlAckQueueTest.kt b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlAckQueueTest.kt
index e7bb78f82..0ff10042a 100644
--- a/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlAckQueueTest.kt
+++ b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlAckQueueTest.kt
@@ -29,12 +29,8 @@ import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.junit.platform.runner.JUnitPlatform
-import org.junit.runner.RunWith
 import java.util.*
 
-
-@RunWith(JUnitPlatform::class)
 class SqlAckQueueTest : Spek({
   describe("both values of resetAttemptsOnAck") {
     // map of resetAttemptsOnAck to expected number of ackAttempts still on the message after ack

class SqlAckQueueTest : Spek({
describe("both values of resetAttemptsOnAck") {
// map of resetAttemptsOnAck to expected number of ackAttempts still on the message after ack
val flagToAckAttempts = mapOf(
true to 0,
false to 1
)

// check both values of resetAttemptsOnAck
flagToAckAttempts.forEach { resetFlag, expectedAckAttempts ->
val testDescription = "SqlQueue with resetAttemptsOnAck = $resetFlag"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this in the test output (with no local changes). Here's what I see:
image


given(testDescription) {
var queue: SqlQueue? = null
val clock = MutableClock()
val deadMessageHandler: DeadMessageCallback = mock()
val publisher: EventPublisher = mock()

val testDb = SqlTestUtil.initTcMysqlDatabase()
val jooq = testDb.context

fun resetMocks() = reset(deadMessageHandler, publisher)

fun stopQueue() {
SqlTestUtil.cleanupDb(jooq)
}

fun startQueue() {
queue = createQueueWithResetAck(clock, deadMessageHandler, publisher, resetFlag) // from SqlQueueTest
}

describe("message is dropped once then retried successfully") {
beforeGroup(::startQueue)
afterGroup(::stopQueue)
afterGroup(::resetMocks)

given("a test message") {
val message = TestMessage("a")

on("pushing a message that gets dropped") {
with(queue!!) {
push(message)
poll { _, _ -> } // do not ack the message after de-queuing it
clock.incrementBy(ackTimeout)

retry() // make it available again
clock.incrementBy(ackTimeout)
}
}

it("has an ackAttempt count of 1 upon retrying") {
with(queue!!) {
poll { msg, ack ->
val ackAttemptsAttribute = (msg.getAttribute() ?: AckAttemptsAttribute())
assert(ackAttemptsAttribute.ackAttempts == 1)
ack() // *now* we acknowledge the message
push(msg) // and re-enqueue it (as with a RunTask returning RUNNING)
}
}
}

val validatesAckAttrAfterAckdRetry = if (expectedAckAttempts == 0) {
"has reset its ackAttempts upon being ack'd"
} else {
"still has ackAttempts=1 even after being ack'd"
}
it(validatesAckAttrAfterAckdRetry) {
with(queue!!) {
poll { msg, ack ->
val ackAttemptsAttribute = (msg.getAttribute() ?: AckAttemptsAttribute())
assert(ackAttemptsAttribute.ackAttempts == expectedAckAttempts)
}
}
}
}
}
}
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ private val createQueueNoPublisher = { clock: Clock,

private fun createQueue(clock: Clock,
deadLetterCallback: DeadMessageCallback,
publisher: EventPublisher?): SqlQueue {
publisher: EventPublisher?): SqlQueue =
createQueueWithResetAck(clock, deadLetterCallback, publisher, true)

internal fun createQueueWithResetAck(clock: Clock,
deadLetterCallback: DeadMessageCallback,
publisher: EventPublisher?,
resetAttemptsOnAck: Boolean): SqlQueue {
return SqlQueue(
queueName = "test",
schemaVersion = 1,
Expand All @@ -56,6 +62,7 @@ private fun createQueue(clock: Clock,
)
},
serializationMigrator = Optional.empty(),
resetAttemptsOnAck = resetAttemptsOnAck,
ackTimeout = Duration.ofSeconds(60),
deadMessageHandlers = listOf(deadLetterCallback),
publisher = publisher ?: (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class SqlOrcaQueueConfiguration : SqlQueueConfiguration() {
lockTtlSeconds = properties.lockTtlSeconds,
mapper = mapper,
serializationMigrator = serializationMigrator,
resetAttemptsOnAck = properties.resetAttemptsOnAck,
ackTimeout = properties.ackTimeout,
deadMessageHandlers = listOf(deadMessageHandler),
publisher = publisher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class SqlQueueShovelConfiguration {
lockTtlSeconds = properties.lockTtlSeconds,
mapper = mapper,
serializationMigrator = serializationMigrator,
resetAttemptsOnAck = properties.resetAttemptsOnAck,
ackTimeout = properties.ackTimeout,
deadMessageHandlers = listOf(deadMessageHandler),
publisher = publisher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class SqlRedisQueueShovelConfiguration {
lockTtlSeconds = sqlQueueProperties.lockTtlSeconds,
mapper = mapper,
serializationMigrator = serializationMigrator,
resetAttemptsOnAck = sqlQueueProperties.resetAttemptsOnAck,
deadMessageHandlers = emptyList(),
publisher = publisher,
sqlRetryProperties = sqlQueueProperties.retries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class SqlTestConfig {
1,
mapper,
Optional.empty(),
true,
Duration.ofSeconds(1),
emptyList(),
true,
Expand Down