Skip to content
This repository has been archived by the owner on May 5, 2022. It is now read-only.

propagate tracer for async code #44

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions appmetrics-zipkin.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ var PropertyReader = require('properties-reader');
var properties = PropertyReader(__dirname + '/appmetrics-zipkin.properties');
var tcpp = require('tcp-ping');

// create namespace
const { createNamespace } = require('./lib/request-context.js');
createNamespace('appmetrics-zipkin-ns');

const {
BatchRecorder
} = require('zipkin');
Expand Down
31 changes: 31 additions & 0 deletions lib/namespace.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'use strict';

const asyncHooks = require('async_hooks');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async_hooks was only added to core in Node 8 and above. This is going to cause issues on Node 6


class Namespace {

constructor() {
this.context = {};
}

run(fn) {
const eid = asyncHooks.executionAsyncId();
this.context[eid] = {};
fn();
}

set(key, val) {
const eid = asyncHooks.executionAsyncId();
this.context[eid][key] = val;
}

get(key) {
const eid = asyncHooks.executionAsyncId();
if (this.context[eid])
return this.context[eid][key];
else
return undefined;
}
}

module.exports = Namespace;
48 changes: 48 additions & 0 deletions lib/request-context.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
'use strict';

const asyncHooks = require('async_hooks');
const Namespace = require('./namespace');

const namespaces = {};

function createHooks(namespace) {
function init(asyncId, type, triggerId, resource) {
if (namespace.context[triggerId]) {
namespace.context[asyncId] = namespace.context[triggerId];
}
}

function destroy(asyncId) {
delete namespace.context[asyncId];
}

const asyncHook = asyncHooks.createHook({
init,
destroy
});

asyncHook.enable();
}

function createNamespace(name) {
if (namespaces[name]) {
return namespace;
}

const namespace = new Namespace();
namespaces[name] = namespace;

createHooks(namespace);

return namespace;
}

function getNamespace(name) {
return namespaces[name];
}


module.exports = {
createNamespace,
getNamespace
};
7 changes: 7 additions & 0 deletions probes/http-outbound-probe-zipkin.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var util = require('util');
var url = require('url');
var semver = require('semver');
const zipkin = require('zipkin');
const { getNamespace } = require('../lib/request-context.js');

var serviceName;

Expand Down Expand Up @@ -83,6 +84,12 @@ HttpOutboundProbeZipkin.prototype.attach = function(name, target) {
methodArgs[0] = Object.assign({}, parsedOptions);
}

// replace tracer from namespace
const namespace = getNamespace('appmetrics-zipkin-ns');
if (namespace) {
tracer.setId(namespace.get('tracer-id'));
}

if (!methodArgs[0].headers) methodArgs[0].headers = {};
let { headers } = Request.addZipkinHeaders(methodArgs[0], tracer.createChildId());
Object.assign(methodArgs[0].headers, { headers });
Expand Down
93 changes: 51 additions & 42 deletions probes/http-probe-zipkin.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ var aspect = require('../lib/aspect.js');
var util = require('util');
const zipkin = require('zipkin');

// get namespace
const { getNamespace } = require('../lib/request-context.js');
const namespace = getNamespace('appmetrics-zipkin-ns');

var serviceName;

const {
Expand Down Expand Up @@ -84,50 +88,55 @@ HttpProbeZipkin.prototype.attach = function(name, target) {
if (obj.__zipkinhttpProbe__) return;
obj.__zipkinhttpProbe__ = true;
aspect.aroundCallback(args, probeData, function(obj, args, probeData) {
var httpReq = args[0];
var res = args[1];
// Filter out urls where filter.to is ''
var traceUrl = parse(httpReq.url);
// console.log(util.inspect(httpReq));
if (traceUrl !== '') {
const method = httpReq.method;
if (hasZipkinHeader(httpReq)) {
const headers = httpReq.headers;
var spanId = headers[(Header.SpanId).toLowerCase()];
if (spanId !== undefined) {
const traceId = new Some(headers[(Header.TraceId).toLowerCase()]);
const parentSpanId = new Some(headers[(Header.ParentSpanId).toLowerCase()]);
const sampled = new Some(headers[(Header.Sampled).toLowerCase()]);
const flags = (new Some(headers[(Header.Flags).toLowerCase()])).flatMap(stringToIntOption).getOrElse(0);
var id = new TraceId({
traceId: traceId,
parentId: parentSpanId,
spanId: spanId,
sampled: sampled.map(stringToBoolean),
flags
});
tracer.setId(id);
namespace.run(() => {
var httpReq = args[0];
var res = args[1];
// Filter out urls where filter.to is ''
var traceUrl = parse(httpReq.url);
// console.log(util.inspect(httpReq));
if (traceUrl !== '') {
const method = httpReq.method;
if (hasZipkinHeader(httpReq)) {
const headers = httpReq.headers;
var spanId = headers[(Header.SpanId).toLowerCase()];
if (spanId !== undefined) {
const traceId = new Some(headers[(Header.TraceId).toLowerCase()]);
const parentSpanId = new Some(headers[(Header.ParentSpanId).toLowerCase()]);
const sampled = new Some(headers[(Header.Sampled).toLowerCase()]);
const flags = (new Some(
headers[(Header.Flags).toLowerCase()])).flatMap(stringToIntOption).getOrElse(0);
var id = new TraceId({
traceId: traceId,
parentId: parentSpanId,
spanId: spanId,
sampled: sampled.map(stringToBoolean),
flags
});
tracer.setId(id);
probeData.traceId = tracer.id;
};
} else {
tracer.setId(tracer.createRootId());
probeData.traceId = tracer.id;
};
} else {
tracer.setId(tracer.createRootId());
probeData.traceId = tracer.id;
// Must assign new options back to args[0]
const { headers } = Request.addZipkinHeaders(args[0], tracer.id);
Object.assign(args[0].headers, headers);
// store tracer in namespace
namespace.set('tracer-id', tracer.id);
// Must assign new options back to args[0]
const { headers } = Request.addZipkinHeaders(args[0], tracer.id);
Object.assign(args[0].headers, headers);
}

tracer.recordServiceName(serviceName);
tracer.recordRpc(method.toUpperCase());
tracer.recordBinary('http.url', httpReq.headers.host + traceUrl);
tracer.recordAnnotation(new Annotation.ServerRecv());
tracer.recordAnnotation(new Annotation.LocalAddr(0));

aspect.after(res, 'end', probeData, function(obj, methodName, args, probeData, ret) {
tracer.recordBinary('http.status_code', res.statusCode.toString());
tracer.recordAnnotation(new Annotation.ServerSend());
});
}

tracer.recordServiceName(serviceName);
tracer.recordRpc(method.toUpperCase());
tracer.recordBinary('http.url', httpReq.headers.host + traceUrl);
tracer.recordAnnotation(new Annotation.ServerRecv());
tracer.recordAnnotation(new Annotation.LocalAddr(0));

aspect.after(res, 'end', probeData, function(obj, methodName, args, probeData, ret) {
tracer.recordBinary('http.status_code', res.statusCode.toString());
tracer.recordAnnotation(new Annotation.ServerSend());
});
}
});
});
});
}
Expand Down
8 changes: 8 additions & 0 deletions probes/https-outbound-probe-zipkin.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var util = require('util');
var url = require('url');
var semver = require('semver');
const zipkin = require('zipkin');
const { getNamespace } = require('../lib/request-context.js');

var serviceName;

Expand Down Expand Up @@ -78,6 +79,13 @@ HttpsOutboundProbeZipkin.prototype.attach = function(name, target) {
requestMethod = parsedOptions.method;
}
}

// replace tracer from namespace
const namespace = getNamespace('appmetrics-zipkin-ns');
if (namespace) {
tracer.setId(namespace.get('tracer-id'));
}

// Must assign new options back to methodArgs[0]
methodArgs[0] = Request.addZipkinHeaders(methodArgs[0], tracer.createChildId());
tracer.recordServiceName(serviceName);
Expand Down
93 changes: 51 additions & 42 deletions probes/https-probe-zipkin.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ var aspect = require('../lib/aspect.js');
var util = require('util');
const zipkin = require('zipkin');

// get namespace
const { getNamespace } = require('../lib/request-context.js');
const namespace = getNamespace('appmetrics-zipkin-ns');

var serviceName;

const {
Expand Down Expand Up @@ -84,50 +88,55 @@ HttpsProbeZipkin.prototype.attach = function(name, target) {
if (obj.__zipkinhttpsProbe__) return;
obj.__zipkinhttpsProbe__ = true;
aspect.aroundCallback(args, probeData, function(obj, args, probeData) {
var httpsReq = args[0];
var res = args[1];
// Filter out urls where filter.to is ''
var traceUrl = parse(httpsReq.url);
if (traceUrl !== '') {
const method = httpsReq.method;

if (hasZipkinHeader(httpsReq)) {
const headers = httpsReq.headers;
var spanId = headers[(Header.SpanId).toLowerCase()];
if (spanId !== undefined) {
const traceId = new Some(headers[(Header.TraceId).toLowerCase()]);
const parentSpanId = new Some(headers[(Header.ParentSpanId).toLowerCase()]);
const sampled = new Some(headers[(Header.Sampled).toLowerCase()]);
const flags = (new Some(headers[(Header.Flags).toLowerCase()])).flatMap(stringToIntOption).getOrElse(0);
var id = new TraceId({
traceId: traceId,
parentId: parentSpanId,
spanId: spanId,
sampled: sampled.map(stringToBoolean),
flags
});
tracer.setId(id);
namespace.run(() => {
var httpsReq = args[0];
var res = args[1];
// Filter out urls where filter.to is ''
var traceUrl = parse(httpsReq.url);
if (traceUrl !== '') {
const method = httpsReq.method;

if (hasZipkinHeader(httpsReq)) {
const headers = httpsReq.headers;
var spanId = headers[(Header.SpanId).toLowerCase()];
if (spanId !== undefined) {
const traceId = new Some(headers[(Header.TraceId).toLowerCase()]);
const parentSpanId = new Some(headers[(Header.ParentSpanId).toLowerCase()]);
const sampled = new Some(headers[(Header.Sampled).toLowerCase()]);
const flags = (new Some(
headers[(Header.Flags).toLowerCase()])).flatMap(stringToIntOption).getOrElse(0);
var id = new TraceId({
traceId: traceId,
parentId: parentSpanId,
spanId: spanId,
sampled: sampled.map(stringToBoolean),
flags
});
tracer.setId(id);
probeData.traceId = tracer.id;
};
} else {
tracer.setId(tracer.createRootId());
probeData.traceId = tracer.id;
};
} else {
tracer.setId(tracer.createRootId());
probeData.traceId = tracer.id;
// Must assign new options back to args[0]
args[0] = Request.addZipkinHeaders(args[0], tracer.id);
// store tracer in namespace
namespace.set('tracer-id', tracer.id);
// Must assign new options back to args[0]
args[0] = Request.addZipkinHeaders(args[0], tracer.id);
}

tracer.recordServiceName(serviceName);
tracer.recordRpc(method.toUpperCase());
tracer.recordBinary('http.url', httpsReq.headers.host + traceUrl);
tracer.recordAnnotation(new Annotation.ServerRecv());
tracer.recordAnnotation(new Annotation.LocalAddr(0));


aspect.after(res, 'end', probeData, function(obj, methodName, args, probeData, ret) {
tracer.recordBinary('http.status_code', res.statusCode.toString());
tracer.recordAnnotation(new Annotation.ServerSend());
});
}

tracer.recordServiceName(serviceName);
tracer.recordRpc(method.toUpperCase());
tracer.recordBinary('http.url', httpsReq.headers.host + traceUrl);
tracer.recordAnnotation(new Annotation.ServerRecv());
tracer.recordAnnotation(new Annotation.LocalAddr(0));


aspect.after(res, 'end', probeData, function(obj, methodName, args, probeData, ret) {
tracer.recordBinary('http.status_code', res.statusCode.toString());
tracer.recordAnnotation(new Annotation.ServerSend());
});
}
});
});
});
}
Expand Down