Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add [DELAY <delay>] argument to NACK command #188

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 44 additions & 15 deletions src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,6 @@ void getjobCommand(client *c) {
}

/* ENQUEUE job-id-1 job-id-2 ... job-id-N
* NACK job-id-1 job-id-2 ... job-id-N
*
* If the job is active, queue it if job retry != 0.
* If the job is in any other state, do nothing.
Expand All @@ -846,12 +845,8 @@ void getjobCommand(client *c) {
* NOTE: Even jobs with retry set to 0 are enqueued! Be aware that
* using this command may violate the at-most-once contract.
*
* Return the number of jobs actually move from active to queued state.
*
* The difference between ENQUEUE and NACK is that the latter will propagate
* cluster messages in a way that makes the nacks counter in the receiver
* to increment. */
void enqueueGenericCommand(client *c, int nack) {
* Return the number of jobs actually move from active to queued state. */
void enqueueCommand(client *c) {
int j, enqueued = 0;

if (validateJobIDs(c,c->argv+1,c->argc-1) == C_ERR) return;
Expand All @@ -861,20 +856,54 @@ void enqueueGenericCommand(client *c, int nack) {
job *job = lookupJob(c->argv[j]->ptr);
if (job == NULL) continue;

if (job->state == JOB_STATE_ACTIVE && enqueueJob(job,nack) == C_OK)
if (job->state == JOB_STATE_ACTIVE && enqueueJob(job,0) == C_OK)
enqueued++;
}
addReplyLongLong(c,enqueued);
}

/* See enqueueGenericCommand(). */
void enqueueCommand(client *c) {
enqueueGenericCommand(c,0);
}

/* See enqueueGenericCommand(). */
/* NACK job-id-1 job-id-2 ... job-id-N [DELAY <delay>]
*
* The difference between ENQUEUE and NACK is that the latter will propagate
* cluster messages in a way that makes the nacks counter in the receiver
* to increment.
*
* Return the number of jobs actually move from active to (delayed) queued state.
*/
void nackCommand(client *c) {
enqueueGenericCommand(c,1);
long long delay = 0;
int j, enqueued = 0;
int arg_count = c->argc;

if (arg_count > 2) {
char *opt = c->argv[arg_count-2]->ptr;
if (!strcasecmp(opt,"delay")) {
int retval = getLongLongFromObject(c->argv[arg_count-1],&delay);
if (retval != C_OK || delay <= 0) {
addReplyError(c,"DELAY must be a number greater than zero");
return;
}
arg_count -= 2;
}
}

if (validateJobIDs(c,c->argv+1,arg_count-1) == C_ERR) return;

for (j = 1; j < arg_count; j++) {
char *opt = c->argv[j]->ptr;
job *job = lookupJob(opt);
if (job == NULL) continue;

clusterBroadcastQueued(job, CLUSTERMSG_FLAG0_INCR_NACKS);
job->num_nacks++;

if (job->state == JOB_STATE_ACTIVE) {
updateJobRequeueTime(job,server.mstime+delay*1000);
enqueued++;
}
}

addReplyLongLong(c,enqueued);
}

/* DEQUEUE job-id-1 job-id-2 ... job-id-N
Expand Down