Skip to content

Commit

Permalink
Add [DELAY <delay>] argument to NACK command
Browse files Browse the repository at this point in the history
  • Loading branch information
eadam-withings committed Jun 10, 2016
1 parent 0192ba7 commit 8037f0c
Showing 1 changed file with 44 additions and 15 deletions.
59 changes: 44 additions & 15 deletions src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,6 @@ void getjobCommand(client *c) {
}

/* ENQUEUE job-id-1 job-id-2 ... job-id-N
* NACK job-id-1 job-id-2 ... job-id-N
*
* If the job is active, queue it if job retry != 0.
* If the job is in any other state, do nothing.
Expand All @@ -846,12 +845,8 @@ void getjobCommand(client *c) {
* NOTE: Even jobs with retry set to 0 are enqueued! Be aware that
* using this command may violate the at-most-once contract.
*
* Return the number of jobs actually move from active to queued state.
*
* The difference between ENQUEUE and NACK is that the latter will propagate
* cluster messages in a way that makes the nacks counter in the receiver
* to increment. */
void enqueueGenericCommand(client *c, int nack) {
* Return the number of jobs actually move from active to queued state. */
void enqueueCommand(client *c) {
int j, enqueued = 0;

if (validateJobIDs(c,c->argv+1,c->argc-1) == C_ERR) return;
Expand All @@ -861,20 +856,54 @@ void enqueueGenericCommand(client *c, int nack) {
job *job = lookupJob(c->argv[j]->ptr);
if (job == NULL) continue;

if (job->state == JOB_STATE_ACTIVE && enqueueJob(job,nack) == C_OK)
if (job->state == JOB_STATE_ACTIVE && enqueueJob(job,0) == C_OK)
enqueued++;
}
addReplyLongLong(c,enqueued);
}

/* See enqueueGenericCommand(). */
void enqueueCommand(client *c) {
enqueueGenericCommand(c,0);
}

/* See enqueueGenericCommand(). */
/* NACK job-id-1 job-id-2 ... job-id-N [DELAY <delay>]
*
* The difference between ENQUEUE and NACK is that the latter will propagate
* cluster messages in a way that makes the nacks counter in the receiver
* to increment.
*
* Return the number of jobs actually move from active to (delayed) queued state.
*/
void nackCommand(client *c) {
enqueueGenericCommand(c,1);
long long delay = 0;
int j, enqueued = 0;
int arg_count = c->argc;

if (arg_count > 2) {
char *opt = c->argv[arg_count-2]->ptr;
if (!strcasecmp(opt,"delay")) {
int retval = getLongLongFromObject(c->argv[arg_count-1],&delay);
if (retval != C_OK || delay <= 0) {
addReplyError(c,"DELAY must be a number greater than zero");
return;
}
arg_count -= 2;
}
}

if (validateJobIDs(c,c->argv+1,arg_count-1) == C_ERR) return;

for (j = 1; j < arg_count; j++) {
char *opt = c->argv[j]->ptr;
job *job = lookupJob(opt);
if (job == NULL) continue;

clusterBroadcastQueued(job, CLUSTERMSG_FLAG0_INCR_NACKS);
job->num_nacks++;

if (job->state == JOB_STATE_ACTIVE) {
updateJobRequeueTime(job,server.mstime+delay*1000);
enqueued++;
}
}

addReplyLongLong(c,enqueued);
}

/* DEQUEUE job-id-1 job-id-2 ... job-id-N
Expand Down

0 comments on commit 8037f0c

Please sign in to comment.