Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(queue): Reset ackAttempts on successful SQL ack #4479

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2023 Apple, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.q.sql

import com.netflix.spinnaker.kork.sql.test.SqlTestUtil
import com.netflix.spinnaker.q.AckAttemptsAttribute
import com.netflix.spinnaker.q.DeadMessageCallback
import com.netflix.spinnaker.q.TestMessage
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.time.MutableClock
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.reset
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.junit.platform.runner.JUnitPlatform
import org.junit.runner.RunWith
import java.util.*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please avoid star imports



@RunWith(JUnitPlatform::class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With PRs like #4484, we're trying to modernize junit things. I tried on my local machine and the tests still execute without this line. Here are the diffs I used:

diff --git a/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlAckQueueTest.kt b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlAckQueueTest.kt
index e7bb78f82..0ff10042a 100644
--- a/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlAckQueueTest.kt
+++ b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlAckQueueTest.kt
@@ -29,12 +29,8 @@ import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.junit.platform.runner.JUnitPlatform
-import org.junit.runner.RunWith
 import java.util.*
 
-
-@RunWith(JUnitPlatform::class)
 class SqlAckQueueTest : Spek({
   describe("both values of resetAttemptsOnAck") {
     // map of resetAttemptsOnAck to expected number of ackAttempts still on the message after ack

class SqlAckQueueTest : Spek({
describe("both values of resetAttemptsOnAck") {
// map of resetAttemptsOnAck to expected number of ackAttempts still on the message after ack
val flagToAckAttempts = mapOf(
true to 0,
false to 1
)

// check both values of resetAttemptsOnAck
flagToAckAttempts.forEach { resetFlag, expectedAckAttempts ->
val testDescription = "SqlQueue with resetAttemptsOnAck = $resetFlag"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this in the test output (with no local changes). Here's what I see:
image


given(testDescription) {
var queue: SqlQueue? = null
val clock = MutableClock()
val deadMessageHandler: DeadMessageCallback = mock()
val publisher: EventPublisher = mock()

val testDb = SqlTestUtil.initTcMysqlDatabase()
val jooq = testDb.context

fun resetMocks() = reset(deadMessageHandler, publisher)

fun stopQueue() {
SqlTestUtil.cleanupDb(jooq)
}

fun startQueue() {
queue = createQueueWithResetAck(clock, deadMessageHandler, publisher, resetFlag) // from SqlQueueTest
}

describe("message is dropped once then retried successfully") {
beforeGroup(::startQueue)
afterGroup(::stopQueue)
afterGroup(::resetMocks)

given("a test message") {
val message = TestMessage("a")

on("pushing a message that gets dropped") {
with(queue!!) {
push(message)
poll { _, _ -> } // do not ack the message after de-queuing it
clock.incrementBy(ackTimeout)

retry() // make it available again
clock.incrementBy(ackTimeout)
}
}

it("has an ackAttempt count of 1 upon retrying") {
with(queue!!) {
poll { msg, ack ->
val ackAttemptsAttribute = (msg.getAttribute() ?: AckAttemptsAttribute())
assert(ackAttemptsAttribute.ackAttempts == 1)
ack() // *now* we acknowledge the message
push(msg) // and re-enqueue it (as with a RunTask returning RUNNING)
}
}
}

val validatesAckAttrAfterAckdRetry = if (expectedAckAttempts == 0) {
"has reset its ackAttempts upon being ack'd"
} else {
"still has ackAttempts=1 even after being ack'd"
}
it(validatesAckAttrAfterAckdRetry) {
with(queue!!) {
poll { msg, ack ->
val ackAttemptsAttribute = (msg.getAttribute() ?: AckAttemptsAttribute())
assert(ackAttemptsAttribute.ackAttempts == expectedAckAttempts)
}
}
}
}
}
}
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ private val createQueueNoPublisher = { clock: Clock,

private fun createQueue(clock: Clock,
deadLetterCallback: DeadMessageCallback,
publisher: EventPublisher?): SqlQueue {
publisher: EventPublisher?): SqlQueue =
createQueueWithResetAck(clock, deadLetterCallback, publisher, true)

internal fun createQueueWithResetAck(clock: Clock,
deadLetterCallback: DeadMessageCallback,
publisher: EventPublisher?,
resetAttemptsOnAck: Boolean): SqlQueue {
return SqlQueue(
queueName = "test",
schemaVersion = 1,
Expand All @@ -56,7 +62,7 @@ private fun createQueue(clock: Clock,
)
},
serializationMigrator = Optional.empty(),
resetAttemptsOnAck = true,
resetAttemptsOnAck = resetAttemptsOnAck,
ackTimeout = Duration.ofSeconds(60),
deadMessageHandlers = listOf(deadLetterCallback),
publisher = publisher ?: (
Expand Down