Skip to content

Commit

Permalink
feat(retry backoff): Adding retry with backoff (#6)
Browse files Browse the repository at this point in the history
* feat(retry backoff): Adding retry with backoff

* dropped retry func

* use main branch
  • Loading branch information
thoven87 authored Sep 14, 2024
1 parent 34e144d commit a455c8f
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 5 deletions.
10 changes: 6 additions & 4 deletions Sources/JobsRedis/RedisJobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,15 @@ public final class RedisJobQueue: JobQueueDriver {
/// - Returns: Job ID
@discardableResult public func push(_ buffer: ByteBuffer, options: JobOptions) async throws -> JobID {
let jobInstanceID = JobID(delayUntil: options.delayUntil)

try await self.set(jobId: jobInstanceID, buffer: buffer)

_ = try await self.redisConnectionPool.wrappedValue.lpush(jobInstanceID.redisKey, into: self.configuration.queueKey).get()
try await self.addToQueue(jobInstanceID, buffer: buffer)
return jobInstanceID
}

private func addToQueue(_ jobId: JobID, buffer: ByteBuffer) async throws {
try await self.set(jobId: jobId, buffer: buffer)
_ = try await self.redisConnectionPool.wrappedValue.lpush(jobId.redisKey, into: self.configuration.queueKey).get()
}

/// Flag job is done
///
/// Removes job id from processing queue
Expand Down
41 changes: 40 additions & 1 deletion Tests/JobsRedisTests/RedisJobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ final class RedisJobsTests: XCTestCase {
)
),
numWorkers: numWorkers,
logger: logger
logger: logger,
options: .init(
maximumBackoff: 0.01,
maxJitter: 0.01,
minJitter: 0.0
)
)

return try await withThrowingTaskGroup(of: Void.self) { group in
Expand Down Expand Up @@ -176,6 +181,40 @@ final class RedisJobsTests: XCTestCase {
}
}

func testErrorRetryAndThenSucceed() async throws {
let jobIdentifer = JobIdentifier<Int>(#function)
let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2)
let currentJobTryCount: NIOLockedValueBox<Int> = .init(0)
struct FailedError: Error {}
try await self.testJobQueue(numWorkers: 1) { jobQueue in
jobQueue.registerJob(id: jobIdentifer, maxRetryCount: 3) { _, _ in
defer {
currentJobTryCount.withLockedValue {
$0 += 1
}
}
expectation.fulfill()
if currentJobTryCount.withLockedValue({ $0 }) == 0 {
throw FailedError()
}
}
try await jobQueue.push(id: jobIdentifer, parameters: 0)

await self.fulfillment(of: [expectation], timeout: 5)
try await Task.sleep(for: .milliseconds(200))

let failedJobs = try await jobQueue.queue.redisConnectionPool.wrappedValue.llen(of: jobQueue.queue.configuration.failedQueueKey).get()
XCTAssertEqual(failedJobs, 0)

let pendingJobs = try await jobQueue.queue.redisConnectionPool.wrappedValue.llen(of: jobQueue.queue.configuration.queueKey).get()
XCTAssertEqual(pendingJobs, 0)

let processingJobs = try await jobQueue.queue.redisConnectionPool.wrappedValue.llen(of: jobQueue.queue.configuration.processingQueueKey).get()
XCTAssertEqual(processingJobs, 0)
}
XCTAssertEqual(currentJobTryCount.withLockedValue { $0 }, 2)
}

func testJobSerialization() async throws {
struct TestJobParameters: Codable {
let id: Int
Expand Down

0 comments on commit a455c8f

Please sign in to comment.