Skip to content

Commit

Permalink
Changed statement from UPDATE to UPSERT
Browse files Browse the repository at this point in the history
Changed default retry strategy with dynamic delay
Added unique record identifier to retry records
  • Loading branch information
stroiker committed Jun 30, 2024
1 parent c5f3a09 commit b450d32
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 84 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ apply plugin: 'java-library'
apply plugin: 'maven-publish'

group = 'com.stroiker'
version = "1.0.0"

repositories {
mavenCentral()
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
18 changes: 18 additions & 0 deletions src/main/kotlin/com/stroiker/distributed/deduplicator/Utils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.stroiker.distributed.deduplicator

import com.datastax.oss.driver.api.core.ConsistencyLevel
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel
import com.datastax.oss.driver.api.core.config.DefaultDriverOption
import java.time.Duration

object Utils {
internal fun CqlSession.getConsistencyLevel(profileName: String): ConsistencyLevel =
DefaultConsistencyLevel.valueOf(
context.configLoader.initialConfig.getProfile(profileName)
.getString(DefaultDriverOption.REQUEST_CONSISTENCY)
)

internal fun CqlSession.getRequestTimeout(profileName: String): Duration =
context.configLoader.initialConfig.getProfile(profileName).getDuration(DefaultDriverOption.REQUEST_TIMEOUT)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.stroiker.distributed.deduplicator.provider
import com.datastax.oss.driver.api.core.ConsistencyLevel.EACH_QUORUM
import com.datastax.oss.driver.api.core.ConsistencyLevel.LOCAL_QUORUM
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.config.DefaultDriverOption
import com.datastax.oss.driver.api.core.config.DriverConfigLoader
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile
import com.datastax.oss.driver.api.core.cql.PreparedStatement
Expand All @@ -13,7 +12,8 @@ import com.datastax.oss.driver.api.core.type.DataTypes
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs
import com.datastax.oss.driver.api.querybuilder.QueryBuilder
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable
import com.datastax.oss.driver.api.querybuilder.relation.Relation
import com.stroiker.distributed.deduplicator.Utils.getConsistencyLevel
import com.stroiker.distributed.deduplicator.Utils.getRequestTimeout
import com.stroiker.distributed.deduplicator.exception.DuplicateException
import com.stroiker.distributed.deduplicator.exception.FailedException
import com.stroiker.distributed.deduplicator.exception.RetryException
Expand All @@ -22,7 +22,7 @@ import com.stroiker.distributed.deduplicator.provider.DeduplicationProvider.Reco
import com.stroiker.distributed.deduplicator.provider.DeduplicationProvider.RecordState.RETRY
import com.stroiker.distributed.deduplicator.provider.DeduplicationProvider.RecordState.SUCCESS
import com.stroiker.distributed.deduplicator.strategy.RetryStrategy
import com.stroiker.distributed.deduplicator.strategy.impl.FixedDelayRetryStrategy
import com.stroiker.distributed.deduplicator.strategy.impl.ExponentialDelayRetryStrategy
import java.time.Duration
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
Expand All @@ -36,8 +36,8 @@ class DeduplicationProvider private constructor(
private val preparedStatementCache = ConcurrentHashMap<String, PreparedStatement>()

init {
when (session.context.configLoader.initialConfig.getProfile(profileName).getString(DefaultDriverOption.REQUEST_CONSISTENCY)) {
LOCAL_QUORUM.name(), EACH_QUORUM.name() -> {}
when (session.getConsistencyLevel(profileName)) {
LOCAL_QUORUM, EACH_QUORUM -> {}
else -> throw UnsupportedOperationException("Only LOCAL_QUORUM or EACH_QUORUM consistency levels are supported. Weaker consistency levels can not guarantee strict deduplication")
}
}
Expand All @@ -48,8 +48,8 @@ class DeduplicationProvider private constructor(
keyspace: String,
ttl: Duration,
block: () -> T
): T = UUID.randomUUID().toString().let { selfRecordUuid ->
retryStrategy.retry {
): T = retryStrategy.retry {
UUID.randomUUID().toString().let { selfRecordUuid ->
insertRecord(key = key, keyspace = keyspace, recordUuid = selfRecordUuid, table = table, ttl = ttl)
val successRecords = getSuccessRecords(key = key, table = table, keyspace = keyspace)
if (successRecords.size > 1) {
Expand Down Expand Up @@ -104,10 +104,12 @@ class DeduplicationProvider private constructor(
}

private fun getSuccessRecords(key: String, table: String, keyspace: String): List<DeduplicationData> =
getSelectStatement(table = table, keyspace = keyspace).bind().setString(KEY_COLUMN, key).let { boundStatement ->
session.execute(boundStatement).map { row -> row.toDeduplicationData() }
.filter { deduplicationData -> deduplicationData.recordState == SUCCESS }
}
getSelectStatement(table = table, keyspace = keyspace).bind()
.setString(KEY_COLUMN, key)
.let { boundStatement ->
session.execute(boundStatement).map { row -> row.toDeduplicationData() }
.filter { deduplicationData -> deduplicationData.recordState == SUCCESS }
}

private fun insertRecord(
key: String,
Expand All @@ -116,9 +118,12 @@ class DeduplicationProvider private constructor(
recordUuid: String,
ttl: Duration
) {
getInsertStatement(table = table, keyspace = keyspace).bind().setString(KEY_COLUMN, key)
getInsertStatement(table = table, keyspace = keyspace).bind()
.setString(KEY_COLUMN, key)
.setString(STATE_COLUMN, SUCCESS.name)
.setString(RECORD_UUID_COLUMN, recordUuid).setInt(TTL, ttl.seconds.toInt()).also { boundStatement ->
.setString(RECORD_UUID_COLUMN, recordUuid)
.setInt(TTL, ttl.seconds.toInt())
.also { boundStatement ->
session.execute(boundStatement).wasApplied().also { applied ->
if (!applied) throw FailedException(key, table, "Insert record wasn't applied")
}
Expand All @@ -134,11 +139,13 @@ class DeduplicationProvider private constructor(
state: RecordState,
ttl: Duration
) {
getUpdateStatement(table = table, keyspace = keyspace).bind()
getUpsertStatement(table = table, keyspace = keyspace).bind()
.setString(KEY_COLUMN, key)
.setUuid(TIME_UUID_COLUMN, timeUuid)
.setString(RECORD_UUID_COLUMN, recordUuid).setString(STATE_COLUMN, state.name)
.setInt(TTL, ttl.seconds.toInt()).also { boundStatement ->
.setString(RECORD_UUID_COLUMN, recordUuid)
.setString(STATE_COLUMN, state.name)
.setInt(TTL, ttl.seconds.toInt())
.also { boundStatement ->
session.execute(boundStatement).wasApplied().also { applied ->
if (!applied) throw FailedException(key, table, "Update record to '$state' wasn't applied")
}
Expand Down Expand Up @@ -172,18 +179,16 @@ class DeduplicationProvider private constructor(
)
}

private fun getUpdateStatement(table: String, keyspace: String): PreparedStatement =
private fun getUpsertStatement(table: String, keyspace: String): PreparedStatement =
preparedStatementCache.computeIfAbsent("u:$keyspace:$table") {
createTableIfNotExist(table = table, keyspace = keyspace)
session.prepare(
QueryBuilder.update(keyspace, table)
QueryBuilder.insertInto(keyspace, table)
.value(KEY_COLUMN, QueryBuilder.bindMarker(KEY_COLUMN))
.value(TIME_UUID_COLUMN, QueryBuilder.bindMarker(TIME_UUID_COLUMN))
.value(RECORD_UUID_COLUMN, QueryBuilder.bindMarker(RECORD_UUID_COLUMN))
.value(STATE_COLUMN, QueryBuilder.bindMarker(STATE_COLUMN))
.usingTtl(QueryBuilder.bindMarker(TTL))
.setColumn(STATE_COLUMN, QueryBuilder.bindMarker(STATE_COLUMN))
.where(
Relation.column(KEY_COLUMN).isEqualTo(QueryBuilder.bindMarker(KEY_COLUMN)),
Relation.column(TIME_UUID_COLUMN).isEqualTo(QueryBuilder.bindMarker(TIME_UUID_COLUMN)),
Relation.column(RECORD_UUID_COLUMN).isEqualTo(QueryBuilder.bindMarker(RECORD_UUID_COLUMN))
)
.build()
.setExecutionProfileName(profileName)
)
Expand Down Expand Up @@ -224,7 +229,12 @@ class DeduplicationProvider private constructor(
.withConfigLoader(DriverConfigLoader.fromClasspath("application.conf"))
.build()
}
private var strategy: RetryStrategy = FixedDelayRetryStrategy(3, Duration.ofMillis(100))
private var strategy: Lazy<RetryStrategy> = lazy {
ExponentialDelayRetryStrategy(
3,
session.value.getRequestTimeout(profileName).multipliedBy(2)
)
}
private var profileName: String = DriverExecutionProfile.DEFAULT_NAME

fun session(session: CqlSession): DeduplicationProviderBuilder {
Expand All @@ -233,7 +243,7 @@ class DeduplicationProvider private constructor(
}

fun strategy(strategy: RetryStrategy): DeduplicationProviderBuilder {
this.strategy = strategy
this.strategy = lazyOf(strategy)
return this
}

Expand All @@ -242,7 +252,7 @@ class DeduplicationProvider private constructor(
return this
}

fun build(): DeduplicationProvider = DeduplicationProvider(session.value, profileName, strategy)
fun build(): DeduplicationProvider = DeduplicationProvider(session.value, profileName, strategy.value)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,27 @@ import com.stroiker.distributed.deduplicator.exception.RetriesExceededException
import com.stroiker.distributed.deduplicator.exception.RetryException
import com.stroiker.distributed.deduplicator.strategy.RetryStrategy
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
import kotlin.math.exp

class ExponentialDelayRetryStrategy(private val times: Int, private val initialDelay: Duration) : RetryStrategy {

private val threadLocalCounter: ThreadLocal<AtomicInteger> = ThreadLocal.withInitial { AtomicInteger(0) }
override fun <T> retry(action: () -> T): T = retry(0, action)

override fun <T> retry(action: () -> T): T =
private fun <T> retry(counter: Int, action: () -> T): T =
runCatching {
action().also { threadLocalCounter.remove() }
action()
}.getOrElse { error ->
if (error is RetryException) {
val retryCounter = threadLocalCounter.get().incrementAndGet()
if (retryCounter <= times) {
Thread.sleep(computeDelayMillis(retryCounter, initialDelay))
retry(action)
if (counter == times) {
throw RetriesExceededException(error.key, error.table)
} else {
threadLocalCounter.remove()
throw throw RetriesExceededException(error.key, error.table)
Thread.sleep(computeDelayMillis(counter))
retry(counter + 1, action)
}
} else {
threadLocalCounter.remove()
throw error
}
throw error
}

private fun computeDelayMillis(retryCount: Int, delay: Duration): Long =
if (retryCount == 0) delay.toMillis() else (delay.toMillis() * exp(retryCount.toDouble())).toLong()
private fun computeDelayMillis(retryCount: Int): Long =
if (retryCount == 0) initialDelay.toMillis() else (initialDelay.toMillis() * exp(retryCount.toDouble())).toLong()
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,23 @@ import com.stroiker.distributed.deduplicator.exception.RetriesExceededException
import com.stroiker.distributed.deduplicator.exception.RetryException
import com.stroiker.distributed.deduplicator.strategy.RetryStrategy
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger

class FixedDelayRetryStrategy(private val times: Int, private val delay: Duration) : RetryStrategy {

private val threadLocalCounter: ThreadLocal<AtomicInteger> = ThreadLocal.withInitial { AtomicInteger(times) }
override fun <T> retry(action: () -> T): T = retry(times, action)

override fun <T> retry(action: () -> T): T =
private fun <T> retry(counter: Int, action: () -> T): T =
runCatching {
action().also { threadLocalCounter.remove() }
action()
}.getOrElse { error ->
if (error is RetryException) {
if (threadLocalCounter.get().decrementAndGet() >= 0) {
Thread.sleep(delay.toMillis())
retry(action)
} else {
threadLocalCounter.remove()
if (counter == 0) {
throw RetriesExceededException(error.key, error.table)
} else {
Thread.sleep(delay.toMillis())
retry(counter - 1, action)
}
} else {
threadLocalCounter.remove()
throw error
}
throw error
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class DeduplicationProviderTest {
doReturn(mockResultSet).`when`(session).execute(
argThat<Statement<*>> {
when (this) {
is BoundStatement -> this.preparedStatement.query.startsWith("UPDATE", true)
is BoundStatement -> this.preparedStatement.query.contains("INSERT", true) && this.getString(DeduplicationProvider.STATE_COLUMN) == DeduplicationProvider.RecordState.FAILED.name
else -> false
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,34 @@
package com.stroiker.distributed.deduplicator.strategy.impl

import com.datastax.oss.driver.shaded.guava.common.base.Stopwatch
import com.stroiker.distributed.deduplicator.exception.RetriesExceededException
import com.stroiker.distributed.deduplicator.exception.RetryException
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.junitpioneer.jupiter.RetryingTest
import java.time.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS

class ExponentialDelayRetryStrategyTest {

private val strategy = ExponentialDelayRetryStrategy(3, Duration.ofMillis(10))

@RetryingTest(maxAttempts = 5, name = "should retries 0 times")
@Test
fun `should retries 0 times`() {
var counter = 0
val stopwatch = Stopwatch.createStarted()
strategy.retry { counter++ }
assertTrue(stopwatch.elapsed(MILLISECONDS) in 0..9)
assertEquals(1, counter)
}

@RetryingTest(maxAttempts = 5, name = "should retries 0 times with other error")
@Test
fun `should retries 0 times with other error`() {
var counter = 0
val stopwatch = Stopwatch.createStarted()
assertThrows<RuntimeException> { strategy.retry { counter++; throw RuntimeException() } }
assertTrue(stopwatch.elapsed(MILLISECONDS) in 0..9)
assertEquals(1, counter)
}

@RetryingTest(maxAttempts = 5, name = "should retries 3 times with retry error")
@Test
fun `should retries 3 times with retry error`() {
var counter = 0
val stopwatch = Stopwatch.createStarted()
assertThrows<RetriesExceededException> { strategy.retry { counter++; throw RetryException("", "") } }
assertTrue(stopwatch.elapsed(MILLISECONDS) in 300..320)
assertEquals(4, counter)
}
}
Original file line number Diff line number Diff line change
@@ -1,43 +1,34 @@
package com.stroiker.distributed.deduplicator.strategy.impl

import com.datastax.oss.driver.shaded.guava.common.base.Stopwatch
import com.stroiker.distributed.deduplicator.exception.RetriesExceededException
import com.stroiker.distributed.deduplicator.exception.RetryException
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.junitpioneer.jupiter.RetryingTest
import java.time.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS

class FixedDelayRetryStrategyTest {

private val strategy = FixedDelayRetryStrategy(3, Duration.ofMillis(10))
private val strategy = FixedDelayRetryStrategy(3, Duration.ofMillis(1000))

@RetryingTest(maxAttempts = 5, name = "should retries 0 times")
@Test
fun `should retries 0 times`() {
var counter = 0
val stopwatch = Stopwatch.createStarted()
strategy.retry { counter++ }
assertTrue(stopwatch.elapsed(MILLISECONDS) in 0..9)
assertEquals(1, counter)
}

@RetryingTest(maxAttempts = 5, name = "should retries 0 times with other error")
@Test
fun `should retries 0 times with other error`() {
var counter = 0
val stopwatch = Stopwatch.createStarted()
assertThrows<RuntimeException> { strategy.retry { counter++; throw RuntimeException() } }
assertTrue(stopwatch.elapsed(MILLISECONDS) in 0..9)
assertEquals(1, counter)
}

@RetryingTest(maxAttempts = 5, name = "should retries 3 times with retry error")
@Test
fun `should retries 3 times with retry error`() {
var counter = 0
val stopwatch = Stopwatch.createStarted()
assertThrows<RetriesExceededException> { strategy.retry { counter++; throw RetryException("", "") } }
assertTrue(stopwatch.elapsed(MILLISECONDS) in 30..39)
assertEquals(4, counter)
}
}

0 comments on commit b450d32

Please sign in to comment.