From 81961031a832eff7c6f2802bf12c0eff76be61d4 Mon Sep 17 00:00:00 2001 From: Paul Cioanca Date: Mon, 10 Jun 2024 11:40:12 +0300 Subject: [PATCH] feat: introduce output argument to `.cancel()` and `.failWithoutRetry()` to mark jobs as failed without implicitly retrying --- .github/workflows/ci.yml | 1 + src/manager.js | 13 +++++++++++-- src/plans.js | 29 ++++++++++++++++++++++++++++- test/cancelTest.js | 20 +++++++++++++++++++- test/failureTest.js | 18 ++++++++++++++++++ test/resumeTest.js | 2 +- types.d.ts | 3 +++ 7 files changed, 81 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 81de1875..d537131c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,6 +19,7 @@ jobs: image: postgres env: POSTGRES_PASSWORD: postgres + POSTGRES_INITDB_ARGS: "-c max_connections=300" options: >- --health-cmd pg_isready --health-interval 10s diff --git a/src/manager.js b/src/manager.js index 29727f03..744e8495 100644 --- a/src/manager.js +++ b/src/manager.js @@ -59,6 +59,7 @@ class Manager extends EventEmitter { this.cancelJobsCommand = plans.cancelJobs(config.schema) this.resumeJobsCommand = plans.resumeJobs(config.schema) this.failJobsCommand = plans.failJobs(config.schema) + this.failJobsWithoutRetryCommand = plans.failJobsWithoutRetry(config.schema) this.getJobByIdCommand = plans.getJobById(config.schema) this.getArchivedJobByIdCommand = plans.getArchivedJobById(config.schema) this.subscribeCommand = plans.subscribe(config.schema) @@ -71,6 +72,7 @@ class Manager extends EventEmitter { this.cancel, this.resume, this.fail, + this.failWithoutRetry, this.fetch, this.fetchCompleted, this.work, @@ -552,10 +554,17 @@ class Manager extends EventEmitter { return this.mapCompletionResponse(ids, result) } - async cancel (id, options = {}) { + async failWithoutRetry (id, data, options = {}) { + const db = options.db || this.db + const ids = this.mapCompletionIdArg(id, 'fail') + const result = await db.executeSql(this.failJobsWithoutRetryCommand, [ids, this.mapCompletionDataArg(data)]) + return this.mapCompletionResponse(ids, result) + } + + async cancel (id, data, options = {}) { const db = options.db || this.db const ids = this.mapCompletionIdArg(id, 'cancel') - const result = await db.executeSql(this.cancelJobsCommand, [ids]) + const result = await db.executeSql(this.cancelJobsCommand, [ids, this.mapCompletionDataArg(data)]) return this.mapCompletionResponse(ids, result) } diff --git a/src/plans.js b/src/plans.js index 73dc0014..1846c06d 100644 --- a/src/plans.js +++ b/src/plans.js @@ -29,6 +29,7 @@ module.exports = { cancelJobs, resumeJobs, failJobs, + failJobsWithoutRetry, insertJob, insertJobs, getTime, @@ -478,6 +479,31 @@ function failJobs (schema) { ` } +function failJobsWithoutRetry (schema) { + return ` + WITH results AS ( + UPDATE ${schema}.job + SET state = '${states.failed}'::${schema}.job_state, + completedOn = now(), + output = $2::jsonb + WHERE id IN (SELECT UNNEST($1::uuid[])) + AND state < '${states.completed}' + RETURNING * + ), completion_jobs as ( + INSERT INTO ${schema}.job (name, data, keepUntil) + SELECT + '${COMPLETION_JOB_PREFIX}' || name, + ${buildJsonCompletionObject(true)}, + ${keepUntilInheritance} + FROM results + WHERE state = '${states.failed}' + AND NOT name LIKE '${COMPLETION_JOB_PREFIX}%' + AND on_complete + ) + SELECT COUNT(*) FROM results + ` +} + function expire (schema) { return ` WITH results AS ( @@ -509,7 +535,8 @@ function cancelJobs (schema) { with results as ( UPDATE ${schema}.job SET completedOn = now(), - state = '${states.cancelled}' + state = '${states.cancelled}', + output = $2::jsonb WHERE id IN (SELECT UNNEST($1::uuid[])) AND state < '${states.completed}' RETURNING 1 diff --git a/test/cancelTest.js b/test/cancelTest.js index 6bff1d6d..01d0174a 100644 --- a/test/cancelTest.js +++ b/test/cancelTest.js @@ -72,11 +72,29 @@ describe('cancel', function () { const jobId = await boss.send('will_cancel', null, { startAfter: 1 }) - await boss.cancel(jobId, { db }) + await boss.cancel(jobId, null, { db }) const job = await boss.getJobById(jobId) assert(job && job.state === 'cancelled') assert.strictEqual(called, true) }) + + it('should cancel a pending job, populating job output if provided', async function () { + const queue = 'cancel-data-batch' + + const boss = this.test.boss = await helper.start(this.test.bossConfig) + await boss.send(queue) + + const jobId = await boss.send('will_cancel', null, { startAfter: 1 }) + + const cancellationData = { msg: 'i am cancelled' } + + await boss.cancel(jobId, cancellationData) + + const job = await boss.getJobById(jobId) + + assert(job && job.state === 'cancelled') + assert.strictEqual(job.output.msg, cancellationData.msg) + }) }) diff --git a/test/failureTest.js b/test/failureTest.js index 6b66dd4f..31bcb9dd 100644 --- a/test/failureTest.js +++ b/test/failureTest.js @@ -208,4 +208,22 @@ describe('failure', function () { assert(job) }) + + it('should accept a payload and not retry', async function () { + const boss = this.test.boss = await helper.start(this.test.bossConfig) + const queue = this.test.bossConfig.schema + + const failPayload = { message: 'i am a failed job' } + + const jobId = await boss.send(queue, null, { onComplete: true, retryLimit: 20 }) + + await boss.failWithoutRetry(jobId, failPayload) + + const job = await boss.getJobById(jobId) + + assert.strictEqual(job.state, 'failed') + assert.strictEqual(job.retrycount, 0) + assert.strictEqual(job.retrylimit, 20) + assert.strictEqual(job.output.message, failPayload.message) + }) }) diff --git a/test/resumeTest.js b/test/resumeTest.js index a6d5797b..8f636ed7 100644 --- a/test/resumeTest.js +++ b/test/resumeTest.js @@ -47,7 +47,7 @@ describe('cancel', function () { } } - await boss.cancel(jobId, { db }) + await boss.cancel(jobId, null, { db }) const job = await boss.getJobById(jobId, { db }) diff --git a/types.d.ts b/types.d.ts index 2ffab407..fea3df83 100644 --- a/types.d.ts +++ b/types.d.ts @@ -352,6 +352,7 @@ declare class PgBoss extends EventEmitter { fetchCompleted(name: string, batchSize: number, options: PgBoss.FetchOptions): Promise[] | null>; cancel(id: string, options?: PgBoss.ConnectionOptions): Promise; + cancel(id: string, data: object, options?: PgBoss.ConnectionOptions): Promise; cancel(ids: string[], options?: PgBoss.ConnectionOptions): Promise; resume(id: string, options?: PgBoss.ConnectionOptions): Promise; @@ -365,6 +366,8 @@ declare class PgBoss extends EventEmitter { fail(id: string, data: object, options?: PgBoss.ConnectionOptions): Promise; fail(ids: string[], options?: PgBoss.ConnectionOptions): Promise; + failWithoutRetry(id: string, data: object, options?: PgBoss.ConnectionOptions): Promise; + getQueueSize(name: string, options?: object): Promise; getJobById(id: string, options?: PgBoss.ConnectionOptions): Promise;