(inspired heavily by msavin:sjobs)
Run scheduled tasks with the simple jobs queue made just for Meteor. With tight MongoDB integration and fibers-based timing functions, this package is quick, reliable and effortless to use.
- Jobs run on one server at a time
- Jobs run predictably and consecutively
- Job timers are super-efficient
- Jobs are stored in MongoDB
- No third party dependencies
It can run hundreds of jobs in seconds with minimal CPU impact, making it a reasonable choice for many applications. To get started, check out the quick start guide and the full API documentation below.
This package has an API inspired by msavin:sjobs and in some cases can be a drop-in replacement. If you're coming from msavin:jobs read about the potentially breaking API differences. If any of these differences make this package unsuitable for you, please let me know and I'll consider fixing.
The main difference in this package compared to msavin:jobs
is that this package doesn't continuously poll the job queue. Instead, it intelligently sets a single timer for the next due job. This means that most of the time this package is doing absolutely nothing, compared to msavin:jobs
which can use significant CPU even when idle. It also means that jobs are executed closer to their due date, instead of potentially late due to the polling interval.
Unfortunately I found the job queue system in msavin:jobs
too fundamentally built-in to modify and create a PR, so it was easier to write my own package.
First, install the package, and import if necessary:
meteor add wildhart:jobs
import { Jobs } from 'meteor/wildhart:jobs'
Then, write your background jobs like you would write your methods:
Jobs.register({
"sendReminder": function (to, message) {
var call = HTTP.put("http://www.magic.com/sendEmail", {
to: to,
message: message
});
if (call.statusCode === 200) {
this.success(call.result);
} else {
this.reschedule({in: {minutes: 5}});
}
}
});
Finally, schedule a background job like you would call a method:
Jobs.run("sendReminder", "[email protected]", "The future is here!");
One more thing: the function above will schedule the job to run on the moment that the function was called, however, you can delay it by passing in a special configuration object at the end:
Jobs.run("sendReminder", "[email protected]", "The future is here!", {
in: {
days: 3,
},
on: {
hour: 9,
minute: 42
},
priority: 9999999999
});
The configuration object supports date
, in
, on
, and priority
, all of which are completely optional, see Jobs.run.
With version 1.0.18 we introduced a more convenient and strongly typed wrapper Class around our traditional API. You can still use the old API, and even upgrade to this new version with no additional work, then you are free to gradually update your code to the new API.
** Don't forget to copy our new wildhart-jobs.d.ts
into your project's @types
folder.
Benefits of the new API:
- All job parameters are strongly typed, so in code which schedules a job you will get IDE warnings if the types are incorrect.
- No more scheduling jobs by string name, so no risk of typos.
With the new API, the above code would be replaced with:
import { TypedJob } from "meteor/wildhart:jobs";
export const sendReminderJob = new TypedJob('sendReminders', function(to: string, message: string) {
...
});
Note that when defining the job, that's only only place you need to refer to the job with a string name.
When scheduling the job, you can reference the class instance directly:
import { sendReminderJob } from './reminders';
sendReminderJob.withArgs('[email protected]", The future is here!').run({
in: {
days: 3,
},
on: {
hour: 9,
minute: 42
},
priority: 9999999999
});
Almost all of the traditional API can be replaced with this new API:
// as example above
sendReminderJob.withArgs(...).run(configObject);
// equivalent to Jobs.clear('*', 'sendReminder', '*', ...args);
sendReminderJob.clear('*', ...args);
// NEW API equivalent to Jobs.collection.clear({...query, name: 'sendReminderJob');
sendReminderJob.clearQuery(query);
// same as Jobs.remove(....), but without having to import "Jobs"
const scheduledJob: JobDocument | false = myJob.withArgs(...).run(...);
sendReminderJob.remove(scheduledJob);
// or
sendReminderJob.remove(scheduledJob._id);
// equivalent to Jobs.start('sendReminders');
sendReminderJob.start();
// equivalent to Jobs.stop('sendReminders');
sendReminderJob.stop();
// equivalent to Jobs.count('sendReminders', '[email protected]');
sendReminderJob.count('[email protected]');
// equivalent to Jobs.findOne('sendReminders', '[email protected]');
sendReminderJob.findOne('[email protected]');
// this is new API equivalent to Jobs.update({query, ..name: 'sendReminderJob'}, options);
sendReminderJob.update(query, options);
// if you need to query the Jobs collection directly, the original name of the job can be obtained:
sendReminderJob.name; // == 'sendReminders'
Further details of these methods are as per the traditional API below.
One big caveat of the new API is that to run a job you have to import the code from the file where the job was defined, which by definition should be exposed on the server side only. Therefore, in shared client/server code (e.g. a Meteor Method) if you are used to doing:
if (Meteor.isServer) {
Jobs.run('sendEmail', '[email protected]', 'hello', {in: {days: 1}});
}
You have to be careful not to import the server-side code into the front-end, by using import().then()
:
if (Meteor.isServer) {
import('../../server/reminderJobs').then(({sendEmailJob}) => {
sendEmailJob.withArgs(...).run(...));
});
}
Jobs.register
and Jobs.run
are all you need to get started, but that's only the beginning of what the package can do. To explore the rest of the functionality, jump into the documentation:
- Jobs.configure
- Jobs.register
- Jobs.run
- Jobs.execute
- Jobs.reschedule
- Jobs.replicate
- Jobs.start
- Jobs.stop
- Jobs.clear
- Jobs.remove
- Jobs.jobs
- Jobs.collection
- Repeating Jobs
- Async Jobs/Promises
- Bulk Operations
- Version History
Jobs.configure
allows you to configure how the package should work. You can configure one option or all of them. Defaults are shown in the code below:
Jobs.configure({
// (milliseconds) specify how long the server could be inactive before another server
// takes on the master role (default = 5min)
maxWait: Number,
// (milliseconds) specify how long after server startup the package should start running
startupDelay: Number,
// determine how to set the serverId - see below. (default = random string)
setServerId: String || Function,
// determine if/how to log the package outputs (default = console.log)
log: Boolean || Function,
// specify if all job queues should start automatically on first launch (default = true)...
// ... after server relaunch the list of paused queues is restored from the database.
autoStart: Boolean,
// whether to mark successful just as successful, or remove them,
// otherwise you have to resolve every job with this.success() or this.remove()
defaultCompletion: 'success' | 'remove',
})
setServerId
- In a multi-server deployment, jobs are only executed on one server. Each server should have a unique ID so that it knows if it is control of the job queue or not. You can provide a function which returns a serverId from somewhere (e.g. from an environment variable) or just use the default of a random string. In a single-server deployment set this to a static string so that the server knows that it is always in control and can take control more quickly after a reboot.
Jobs.register
allows you to register a function for a job.
Jobs.register({
sendEmail: function (to, content) {
var send = Magic.sendEmail(to, content);
if (send) {
this.success();
} else {
this.reschedule({in: {minutes: 5}});
}
},
sendReminder: function (userId, content) {
var doc = Reminders.insert({
to: userId,
content: content
})
if (doc) {
this.remove();
} else {
this.reschedule({in: {minutes: 5}});
}
}
});
// or NEW API:
const sendEmail = new TypedJob('sendEmail', function(to: string, content: EmailDoc) {
...
});
const sendReminder = new TypedJob('sendReminder', function(to: string, content: ReminderContent) {
...
});
Each job is bound with a set of functions to give you maximum control over how the job runs:
this.document
- access the job documentthis.success()
- tell the queue the job is completedthis.failure()
- tell the queue the job failedthis.reschedule(config)
- tell the queue to schedule the job for a future datethis.remove()
- remove the job from the queuethis.replicate(config)
- create a copy of the job with a different due date provided byconfig
(returns the new jobId)
Each job must be resolved with success, failure, reschedule, and/or remove.
See Repeating Jobs and Async Jobs/Promises
Jobs.run
allows you to schedule a job to run. You call it just like you would call a method, by specifying the job name and its arguments. At the end, you can pass in a special configuration object. Otherwise, it will be scheduled to run as soon as possible.
var jobDoc = Jobs.run("sendReminder", "[email protected]", "The future is here!", {
in: {
days: 3,
},
on: {
hour: 9,
minute: 42
},
priority: 9999999999,
singular: true
});
// or NEW API:
sendReminderJob.withArgs("[email protected]", "The future is here!").run(...);
Jobs.run
returns a jobDoc
.
The configuration object supports the following inputs:
in
- Object- The
in
parameter will schedule the job at a later time, using the current time and your inputs to calculate the due time.
- The
on
- Object- The
on
parameter override the current time with your inputs.
- The
in
andon
- Object- The supported fields for in and on can be used in singular and/or plural versions:
- millisecond, second, minute, hour, day, month, and year
- milliseconds, seconds, minutes, hours, days, months, and years
- The date object will be updated in the order that is specified. This means that if it is year 2017, and you set
in
one year, buton
2019, the year 2019 will be the final result. However, if you seton
2019 andin
one year, then the year 2020 will be the final result.
- The supported fields for in and on can be used in singular and/or plural versions:
priority
- Number- The default priority for each job is 0
- If you set it to a positive integer, it will run ahead of other jobs.
- If you set it to a negative integer, it will only run after all the zero or positive jobs have completed.
date
- Date- Provide your own date. This stacks with the
in
andon
operator, and will be applied before they perform their operations.
- Provide your own date. This stacks with the
unique
- Boolean- If a job is marked as unique, it will only be scheduled if no other job exists with the same arguments
singular
- Boolean- If a job is marked as singular, it will only be scheduled if no other job is pending with the same arguments
awaitAsync
- Boolean- If an async job with run with
awaitAsync: true
is running, then no other job of the same name will start until the running job has completed.
- If an async job with run with
callback
- Function- Run a callback function after scheduling the job
Jobs.execute
allows you to run a job ahead of its due date. It can only work on jobs that have not been resolved.
Jobs.execute(doc) // or (doc._id)
// or NEW API
sendReminderJob.execute(doc); // or (doc._id)
Jobs.reschedule
allows you to reschedule a job. It can only work on jobs that have not been resolved.
Jobs.reschedule(job, { // or (job._id)
in: {
minutes: 5
},
priority: 9999999
});
// or NEW API
sendReminderJob.execute(job, {...}); // or (job._id, {...});
The configuration is passed in as the second argument, and it supports the same inputs as Jobs.run
.
Jobs.replicate
allows you to replicate a job.
var jobId = Jobs.replicate(job, { // or (job._id, {...
in: {
minutes: 5
}
})
// or NEW API
sendReminderJob.execute(job, {...}); // or (job._id, {...});
Jobs.replicate
returns a jobId
.
Jobs.start
allows you start all the queues. This runs automatically unless autoStart
is set to false
. If you call the function with no arguments, it will start all the queues. If you pass in a String, it will start a queue with that name. If you pass in an Array, it will start all the queues named in the array.
// Start all the queues
Jobs.start()
// Start just one queue
Jobs.start("sendReminder")
// or NEW API
sendReminderJob.start();
// Start multiple queues
Jobs.start(["sendReminder", "sendEmail"])
Unlike msavin:sjobs, this function can be called on any server and whichever server is currently in control of the job queue will be notified.
Jobs.stop
allows you stop all the queues. If you call the function with no arguments, it will stop all the queues. If you pass in a String, it will stop a queue with that name. If you pass in an Array, it will stop all the queues named in the array.
// Stop all the queues
Jobs.stop()
// Stop just one queue
Jobs.stop("sendReminder")
// or NEW API
sendReminderJob.stop();
// Stop multiple queues
Jobs.stop(["sendReminder", "sendEmail"])
Unlike msavin:sjobs, this function can be called on any server and whichever server is currently in control of the job queue will be notified.
If you need to stop all jobs via mongo use:
mongo> db.jobs_dominator_3.update({_id:"dominatorId"}, {$set: {pausedJobs: ['*']}});
The in-control server should observe the change and stop instantly. Use {$unset: {pausedJobs: 1}}
or {$set: {pausedJobs: []}}
to start all the queues again.
Jobs.clear
allows you to clear all or some of the jobs in your database.
var count = Jobs.clear(state, jobName, ...arguments, callback);
e.g:
count = Jobs.clear(); // remove all completed jobs (success or failure)
count = Jobs.clear('*'); // remove all jobs
count = Jobs.clear('failure', 'sendEmail', '[email protected]', function(err, count) {console.log(err, count)});
// or NEW API
count = sendEmailJob.clear('failure', '[email protected]', ...);
Parameters:
state
for selecting a job state (eitherpending
,success
,failure
, or*
to select all of them), or omit to all exceptpending
jobs.jobName
to only remove jobs with a specific name.- provide
arguments
to match jobs only with the same arguments. callback
to provide a callback function witherror
andresult
parameters, whereresult
is the number of jobs removed.
Jobs.remove
allows you to remove a job from the collection.
var success = Jobs.remove(doc); // or (doc._id)
// or NEW API
sendEmailJob.remove(doc); // or (doc._id)
Jobs.jobs
gives access to an object of defined job functions:
var jobNames = Object.keys(Jobs.jobs); // ['sendEmail', 'sendReminder']
var nJobTypes = jobNames.length; // 2
Jobs.collection
allows you to access the MongoDB collection where the jobs are stored. Ideally, you should not require interaction with the database directly.
Repeating jobs can be created by using this.reschedule()
in the job function, e.g.:
Jobs.register({
processMonthlyPayments() {
this.reschedule({in: {months: 1}});
processPayments();
},
});
Jobs.run('processMonthlyPayments', {singular: true});
Since this package doesn't keep a job history (compared with msavin:sjobs), you can use this.reschedule()
indefinitely without polluting the jobs database, instead of having to use this.replicate()
followed by this.remove()
.
The job function can use async/await
or return a promise:
Jobs.register({
async asyncJob(...args) {
await new Promise(resolve => Meteor.setTimeout(() => resolve(0), 4000));
this.remove();
},
promiseJob(...args) {
return new Promise(resolve => Meteor.setTimeout(() => {
this.remove();
resolve(0);
}, 8000));
},
});
This defers the error message 'Job was not resolved with success, failure, reschedule or remove'
until the promise resolves. Note that:
- While jobs are executing their status is set to
'executing'
. - Other jobs of the same type will still run when scheduled while asynchronous jobs are executing, unless the running job was configured with
awaitSync: true
, in which case the pending job will wait until the previous job of that name has completed. - Asynchronous code may need to be wrapped in
Meteor.bindEnvironment()
.
The job queue intelligently prevents lots of a single job dominating the job queue, so feel free to use this package to safely schedule bulk operations, e.g, sending 1000s of emails. Although it may take some time to send all of these emails, any other jobs which are scheduled to run while they are being sent will still be run on time. Run each operation as its own job (e.g, 1000 separate "sendSingleEmail"
jobs rather than a single "send1000Emails"
job. The job queue will run all 1000 "sendSingleEmail"
jobs in sequence, but after each job it will check if any other jobs need to run first.
If any of these differences make this package unsuitable for you, please let me know and I'll consider fixing.
- This package doesn't keep a job history.
failed
jobs are not retried, unless they have already been rescheduled.- The Job configuration object doesn't support the
data
attribute - I never found any use for this. - The following Jobs.configure() options are not available or different:
interval
- this package doesn't regularly query the job queue for due jobs, instead it intelligently sets a timer for the next job.getDate
disableDevelopmentMode
remoteCollection
autoStart
- only relevant on first launch. On relaunch the list of paused queues is restored from the database.
- The following Jobs.configure() options have additional options:
setServerId
can be aString
as as well as aFunction
log
can be aBoolean
as well as aFunction
- In a job function,
this.set()
andthis.get()
are not provided - I never found any use for this. - In a job function,
this.success()
andthis.failure()
to not take aresult
parameter - this package doesn't keep a job history - singular jobs only check for
pending
jobs of the same name, so they can be run again even if a previous job failed. Jobs.start()
andJobs.stop()
can be called on any server and whichever server is in control of the job queue will be notified.Jobs.cancel()
doesn't exist. Just remove it with Jobs.remove() - I don't see the point in keeping old jobs lying around.- Jobs.clear() can take additional
argument
parameters to only delete jobs matching those arguments. - Jobs.jobs doesn't exist in msavin:sjobs
- Added new strongly-typed API.
- Added
awaitAsync: true
option for async jobs. Fixes #14.
- Fixed not accepting
startUpDelay
inJobs.Configure
. Fixes #13.
- Removed typescript verison constraint.
- Added the
'defaultCompletion'
Jobs.configure option. Suggested in #10.
- Better support for Async Jobs/Promises. Fixes #7.
- While jobs are executing their status is set to
'executing'
.
- Capped timeout to 24 hours to avoid node limit. Fixes #5.
- Fix bug when using months to set the job time.
- Add return values to this.remove() etc within job.
- Fix observer/timeout race condition executing same job twice
- Prevent console.logging done jobs.
- Allow job queue to be paused while in executing loop.
- Fix bug when logging observer.
- Jobs which result in an error but have already been rescheduled will still run again at the rescheduled time.
- Access to the list of defined job types with Jobs.jobs.
- Prevent a single job type from dominating the job queue (e.g. bulk email sending).
- First release.