Skip to content

Commit

Permalink
Auto-flush on process exit (#141)
Browse files Browse the repository at this point in the history
This attaches to the process’s `beforeExit` event (supported in Node.js, Deno, and Bun) when auto-flushing is enabled in order to automatically flush when a program exits. Before, you needed to remember to call `metrics.flush()` manually at the end of your program, which is easy to forget (or not even know about in the first place!).

It also adds a `stop()` method that disables auto-flushing and flushes any remaining buffered metrics.

Fixes #136.
  • Loading branch information
Mr0grog authored Dec 18, 2024
1 parent 06b0bb6 commit 31ababb
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 62 deletions.
65 changes: 49 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ Where `options` is an object and can contain the following:
* `prefix`: Sets a default prefix for all metrics. (optional)
* Use this to namespace your metrics.
* `flushIntervalSeconds`: How often to send metrics to Datadog. (optional)
* This defaults to 15 seconds. Set it to 0 to disable auto-flushing which
means you must call `flush()` manually.
* This defaults to 15 seconds. Set it to `0` to disable auto-flushing (which
means you must call `flush()` manually).
* `site`: Sets the Datadog "site", or server where metrics are sent. (optional)
* Defaults to `datadoghq.com`.
* See more details on setting your site at:
Expand Down Expand Up @@ -197,12 +197,12 @@ metrics.init({

`metrics.gauge(key, value[, tags[, timestamp]])`

Record the current *value* of a metric. The most recent value in
a given flush interval will be recorded. Optionally, specify a set of
tags to associate with the metric. This should be used for sum values
such as total hard disk space, process uptime, total number of active
users, or number of rows in a database table. The optional timestamp
is in milliseconds since 1 Jan 1970 00:00:00 UTC, e.g. from `Date.now()`.
Record the current *value* of a metric. The most recent value since the last
flush will be recorded. Optionally, specify a set of tags to associate with the
metric. This should be used for sum values such as total hard disk space,
process uptime, total number of active users, or number of rows in a database
table. The optional timestamp is in milliseconds since 1 Jan 1970 00:00:00 UTC,
e.g. from `Date.now()`.

Example:

Expand Down Expand Up @@ -284,15 +284,34 @@ metrics.distribution('test.service_time', 0.248);

### Flushing

`metrics.flush()`
By default, datadog-metrics will automatically flush, or send accumulated
metrics to Datadog, at regular intervals, and, in environments that support it,
before your program exits. (However, if you call `process.exit()` to cause a
hard exit, datadog-metrics doesn’t get a chance to flush. In this case, you may
want to call `await metrics.stop()` first.)

You can adjust the interval by using the `flushIntervalSeconds` option. Setting
it to `0` will disable auto-flushing entirely:

```js
// Set auto-flush interval to 10 seconds.
metrics.init({ flushIntervalSeconds: 10 });
```

You can also send accumulated metrics manually at any time by calling
`metrics.flush()`.

Please note that, when calling the `BufferedMetricsLogger` constructor directly,
`flushIntervalSeconds` defaults to `0` instead. When constructing your own
logger this way, you must expicitly opt-in to auto-flushing by setting a
positive value.

Calling `flush` sends any buffered metrics to Datadog and returns a promise.
This function will be called automatically unless you set `flushIntervalSeconds`
to `0`.

It can be useful to trigger a manual flush by calling if you want to
make sure pending metrics have been sent before you quit the application
process, for example.
#### `metrics.flush()`

Sends any buffered metrics to Datadog and returns a promise. By default,
`flush()` will be called for you automatically unless you set
`flushIntervalSeconds` to `0` (see above for more details).

⚠️ This method used to take two callback arguments for handling successes and
errors. That form is deprecated and will be removed in a future update:
Expand All @@ -318,6 +337,18 @@ metrics.flush()
.catch((error) => console.log('Flush error:', error)) ;
```

#### `metrics.stop(options)`

Stops auto-flushing (if enabled) and flushes any currently buffered metrics.
This is mainly useful if you want to manually clean up and send remaining
metrics before hard-quitting your program (usually by calling `process.exit()`).
Returns a promise for the result of the flush.

Takes an optional object with properties:
* `flush` (boolean) Whether to flush any remaining metrics after stopping.
Defaults to `true`.


## Logging

Datadog-metrics uses the [debug](https://github.com/visionmedia/debug)
Expand All @@ -344,7 +375,9 @@ TBD

**New Features:**

TBD
* When auto-flushing is enabled, metrics are now also flushed before the process exits. In previous versions, you needed to do this manually by calling `metrics.flush()` at the every end of your program.

You will still need to flush manually if you set `flushIntervalSeconds` to `0` or you are quitting your program by calling `process.exit()` [(which interrupts a variety of operations)](https://nodejs.org/docs/latest/api/process.html#processexitcode).

**Deprecations:**

Expand Down
7 changes: 7 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ function callOnSharedLogger(func) {
// compiler that this satisfies the types. :(
return (...args) => {
if (sharedLogger === null) {
// Special case: don't make a new logger just to stop it.
// @ts-expect-error TypeScript compiler can't figure this one out.
if (func === BufferedMetricsLogger.prototype.stop) {
return Promise.resolve(undefined);
}

init();
}
return func.apply(sharedLogger, args);
Expand All @@ -44,6 +50,7 @@ function callOnSharedLogger(func) {
module.exports = {
init,
flush: callOnSharedLogger(BufferedMetricsLogger.prototype.flush),
stop: callOnSharedLogger(BufferedMetricsLogger.prototype.stop),
gauge: callOnSharedLogger(BufferedMetricsLogger.prototype.gauge),
increment: callOnSharedLogger(BufferedMetricsLogger.prototype.increment),
histogram: callOnSharedLogger(BufferedMetricsLogger.prototype.histogram),
Expand Down
87 changes: 70 additions & 17 deletions lib/loggers.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ const Counter = require('./metrics').Counter;
const Histogram = require('./metrics').Histogram;
const Distribution = require('./metrics').Distribution;

const supportsProcessExit = typeof process !== 'undefined'
&& typeof process.once === 'function';

/**
* @typedef {object} AggregatorType Buffers metrics to send.
* @property {(
Expand Down Expand Up @@ -103,6 +106,9 @@ class BufferedMetricsLogger {
opts.site = opts.site || opts.apiHost;
}

this.performAutoFlush = this.performAutoFlush.bind(this);
this.handleProcessExit = this.handleProcessExit.bind(this);

/** @private */
this.aggregator = opts.aggregator || new Aggregator(opts.defaultTags);
/** @private @type {ReporterType} */
Expand All @@ -117,34 +123,27 @@ class BufferedMetricsLogger {
/** @private */
this.prefix = opts.prefix || '';
/** @private */
this.flushIntervalSeconds = opts.flushIntervalSeconds;
/** @private */
this.histogramOptions = opts.histogram;

/** @private */
this.onError = null;
if (typeof opts.onError === 'function') {
/** @private */
this.onError = opts.onError;
} else if (opts.onError != null) {
throw new TypeError('The `onError` option must be a function');
}

if (this.flushIntervalSeconds) {
logDebug('Auto-flushing every %d seconds', this.flushIntervalSeconds);
/** @private */
this.flushTimer = null;
/** @private */
this.flushIntervalSeconds = 0;
if (opts.flushIntervalSeconds < 0) {
throw new TypeError(`flushIntervalSeconds must be >= 0 (got: ${opts.flushIntervalSeconds})`);
} else {
logDebug('Auto-flushing is disabled');
this.flushIntervalSeconds = opts.flushIntervalSeconds;
}

const autoFlushCallback = () => {
this.flush();
if (this.flushIntervalSeconds) {
const interval = this.flushIntervalSeconds * 1000;
const tid = setTimeout(autoFlushCallback, interval);
// Let the event loop exit if this is the only active timer.
if (tid.unref) tid.unref();
}
};

autoFlushCallback();
this.start();
}

/**
Expand Down Expand Up @@ -326,6 +325,60 @@ class BufferedMetricsLogger {

return result;
}

/**
* Start auto-flushing metrics.
*/
start() {
if (this.flushTimer) {
logDebug('Auto-flushing is already enabled');
} else if (this.flushIntervalSeconds > 0) {
logDebug('Auto-flushing every %d seconds', this.flushIntervalSeconds);
if (supportsProcessExit) {
process.once('beforeExit', this.handleProcessExit);
}
this.performAutoFlush();
} else {
logDebug('Auto-flushing is disabled');
}
}

/**
* Stop auto-flushing metrics. By default, this will also flush any
* currently buffered metrics. You can leave them in the buffer and not
* flush by setting the `flush` option to `false`.
* @param {Object} [options]
* @param {boolean} [options.flush] Whether to flush before returning.
* Defaults to true.
* @returns {Promise}
*/
async stop(options) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
if (supportsProcessExit) {
process.off('beforeExit', this.handleProcessExit);
}
if (!options || options.flush) {
await this.flush();
}
}

/** @private */
performAutoFlush() {
this.flush();
if (this.flushIntervalSeconds) {
const interval = this.flushIntervalSeconds * 1000;
this.flushTimer = setTimeout(this.performAutoFlush, interval);
// Let the event loop exit if this is the only active timer.
if (this.flushTimer.unref) this.flushTimer.unref();
}
}

/** @private */
async handleProcessExit() {
logDebug('Auto-flushing before process exits...');
this.flush();
}
}

module.exports = {
Expand Down
5 changes: 5 additions & 0 deletions test-other/types_check.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
reporters,
init,
flush,
stop,
gauge,
increment,
histogram,
Expand All @@ -17,6 +18,8 @@ function useLogger(logger: BufferedMetricsLogger) {
logger.histogram('histogram.key', 11);
logger.distribution('distribution.key', 11);
logger.flush();
logger.stop();
logger.stop({ flush: false });
}

useLogger(new BufferedMetricsLogger());
Expand Down Expand Up @@ -51,3 +54,5 @@ increment('increment.key');
histogram('histogram.key', 11);
distribution('distribution.key', 11);
flush();
stop();
stop({ flush: false });
Loading

0 comments on commit 31ababb

Please sign in to comment.