From c3b46153ecf1d759d71c0860d35fa410e31898f7 Mon Sep 17 00:00:00 2001 From: Remita Amine Date: Sat, 17 Aug 2024 22:17:55 +0100 Subject: [PATCH] feat: mark run activity span as consumer span --- .../src/instrumentation.ts | 14 ++++++++++++-- .../interceptors-opentelemetry/src/worker/index.ts | 8 +++++++- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/packages/interceptors-opentelemetry/src/instrumentation.ts b/packages/interceptors-opentelemetry/src/instrumentation.ts index 833a16955..ef6ce6bff 100644 --- a/packages/interceptors-opentelemetry/src/instrumentation.ts +++ b/packages/interceptors-opentelemetry/src/instrumentation.ts @@ -58,6 +58,7 @@ async function wrapWithSpan( export interface InstrumentOptions { tracer: otel.Tracer; spanName: string; + spanOptions?: otel.SpanOptions; fn: (span: otel.Span) => Promise; context?: otel.Context; acceptableErrors?: (err: unknown) => boolean; @@ -69,14 +70,23 @@ export interface InstrumentOptions { export async function instrument({ tracer, spanName, + spanOptions = {}, fn, context, acceptableErrors, }: InstrumentOptions): Promise { if (context) { return await otel.context.with(context, async () => { - return await tracer.startActiveSpan(spanName, async (span) => await wrapWithSpan(span, fn, acceptableErrors)); + return await tracer.startActiveSpan( + spanName, + spanOptions, + async (span) => await wrapWithSpan(span, fn, acceptableErrors) + ); }); } - return await tracer.startActiveSpan(spanName, async (span) => await wrapWithSpan(span, fn, acceptableErrors)); + return await tracer.startActiveSpan( + spanName, + spanOptions, + async (span) => await wrapWithSpan(span, fn, acceptableErrors) + ); } diff --git a/packages/interceptors-opentelemetry/src/worker/index.ts b/packages/interceptors-opentelemetry/src/worker/index.ts index fb9c035ef..a820d65a3 100644 --- a/packages/interceptors-opentelemetry/src/worker/index.ts +++ b/packages/interceptors-opentelemetry/src/worker/index.ts @@ -29,7 +29,13 @@ export class OpenTelemetryActivityInboundInterceptor implements ActivityInboundC async execute(input: ActivityExecuteInput, next: Next): Promise { const context = await extractContextFromHeaders(input.headers); const spanName = `${SpanName.ACTIVITY_EXECUTE}${SPAN_DELIMITER}${this.ctx.info.activityType}`; - return await instrument({ tracer: this.tracer, spanName, fn: () => next(input), context }); + return await instrument({ + tracer: this.tracer, + spanName, + spanOptions: { attributes: { 'messaging.system': 'temporal' }, kind: otel.SpanKind.CONSUMER }, + fn: () => next(input), + context, + }); } }