From 4ae887a8be800acb78dfe5c5090b4fe7405a5aeb Mon Sep 17 00:00:00 2001 From: Bryan Atkinson Date: Thu, 19 Dec 2024 16:28:07 -0500 Subject: [PATCH] Write OTel GenAI semantic convention span attributes. --- js/ai/src/generate.ts | 7 +- js/ai/src/model.ts | 7 ++ js/ai/src/telemetry.ts | 108 +++++++++++++++++++++++++ js/core/src/index.ts | 1 + js/{ai => core}/src/metrics.ts | 60 +++++--------- js/core/src/tracing/instrumentation.ts | 7 ++ js/core/src/tracing/types.ts | 10 +++ 7 files changed, 155 insertions(+), 45 deletions(-) create mode 100644 js/ai/src/telemetry.ts rename js/{ai => core}/src/metrics.ts (58%) diff --git a/js/ai/src/generate.ts b/js/ai/src/generate.ts index cedf0c74d..a6ad66b94 100755 --- a/js/ai/src/generate.ts +++ b/js/ai/src/generate.ts @@ -34,7 +34,6 @@ import { GenerateUtilParamSchema, generateHelper } from './generate/action.js'; import { GenerateResponseChunk } from './generate/chunk.js'; import { GenerateResponse } from './generate/response.js'; import { Message } from './message.js'; -import { writeMetrics } from './metrics.js'; import { GenerateRequest, GenerationCommonConfigSchema, @@ -292,7 +291,6 @@ export async function generate< registry, stripNoop(resolvedOptions.onChunk ?? resolvedOptions.streamingCallback), async () => { - const startTime = performance.now(); const response = await generateHelper( registry, params, @@ -302,13 +300,10 @@ export async function generate< ...resolvedOptions, tools, }); - const generateResponse = new GenerateResponse(response, { + return new GenerateResponse(response, { request: response.request ?? request, parser: resolvedFormat?.handler(request.output?.schema).parseMessage, }); - const endTime = performance.now(); - writeMetrics(generateResponse, endTime - startTime); - return generateResponse; } ); } diff --git a/js/ai/src/model.ts b/js/ai/src/model.ts index cffad1143..3c21eba4b 100644 --- a/js/ai/src/model.ts +++ b/js/ai/src/model.ts @@ -25,9 +25,11 @@ import { } from '@genkit-ai/core'; import { Registry } from '@genkit-ai/core/registry'; import { toJsonSchema } from '@genkit-ai/core/schema'; +import { SpanMetadata, spanMetadataAlsKey } from '@genkit-ai/core/tracing'; import { performance } from 'node:perf_hooks'; import { DocumentDataSchema } from './document.js'; import { augmentWithContext, validateSupport } from './model/middleware.js'; +import { writeSemConvTelemetry } from './telemetry.js'; // // IMPORTANT: Please keep type definitions in sync with @@ -375,6 +377,8 @@ export const GenerateClientTelemetrySchema = z.object({ operationName: z.string().optional(), serverPort: z.number().optional(), serverAddress: z.string().optional(), + encodingFormats: z.array(z.string()).optional(), + responseId: z.string().optional(), }); export type GenerateClientTelemetry = z.infer< typeof GenerateClientTelemetrySchema @@ -515,6 +519,9 @@ export function defineModel< ...response, latencyMs: performance.now() - startTimeMs, }; + const span = + registry.asyncStore.getStore(spanMetadataAlsKey); + writeSemConvTelemetry(timedResponse, span); return timedResponse; }); } diff --git a/js/ai/src/telemetry.ts b/js/ai/src/telemetry.ts new file mode 100644 index 000000000..4f1e0aa1b --- /dev/null +++ b/js/ai/src/telemetry.ts @@ -0,0 +1,108 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { MetricHistogram } from '@genkit-ai/core'; +import { SpanMetadata } from '@genkit-ai/core/tracing'; +import { AttributeValue, ValueType } from '@opentelemetry/api'; +import { GenerateResponseData } from './model.js'; + +const tokenUsage = new MetricHistogram('gen_ai.client.token.usage', { + description: 'Usage of GenAI tokens.', + valueType: ValueType.INT, + unit: 'token', +}); + +const operationDuration = new MetricHistogram( + 'gen_ai.client.operation.duration', + { + description: 'Time taken for GenAI operations', + valueType: ValueType.DOUBLE, + unit: 'token', + } +); + +export function writeSemConvTelemetry( + output: GenerateResponseData, + span?: SpanMetadata +): void { + writeMetrics(output); + if (span) { + writeSpanAttributes(output, span); + } +} + +function writeMetrics(resp: GenerateResponseData): void { + const commonDimensions = { + 'gen_ai.client.framework': 'genkit', + 'gen_ai.operation.name': resp.clientTelemetry?.operationName, + 'gen_ai.system': resp.clientTelemetry?.system, + 'gen_ai.request.model': resp.clientTelemetry?.requestModel, + 'server.port': resp.clientTelemetry?.serverPort, + 'gen_ai.response.model': resp.clientTelemetry?.responseModel, + 'server.address': resp.clientTelemetry?.serverAddress, + }; + tokenUsage.record(resp.usage?.inputTokens || 0, { + ...commonDimensions, + 'gen_ai.token.type': 'input', + }); + tokenUsage.record(resp.usage?.outputTokens || 0, { + ...commonDimensions, + 'gen_ai.token.type': 'output', + }); + if (resp.latencyMs) { + operationDuration.record(resp.latencyMs, commonDimensions); + } +} + +function writeSpanAttributes( + output: GenerateResponseData, + span: SpanMetadata +): void { + const t: Record = {}; + const client = output.clientTelemetry; + const config = output.request?.config; + const usage = output.usage; + setAttribute(t, 'gen_ai.client.framework', 'genkit'); + setAttribute(t, 'gen_ai.operation.name', client?.operationName); + setAttribute(t, 'gen_ai.system', client?.system); + setAttribute(t, 'gen_ai.request.model', client?.requestModel); + setAttribute(t, 'server.port', client?.serverPort); + setAttribute(t, 'gen_ai.request.encoding_formats', client?.encodingFormats); + setAttribute(t, 'gen_ai.request.frequency_penalty', config?.frequencyPenalty); + setAttribute(t, 'gen_ai.request.max_tokens', config?.maxOutputTokens); + setAttribute(t, 'gen_ai.request.presence_penalty', config?.presencePenalty); + setAttribute(t, 'gen_ai.request.stop_sequences', config?.stopSequences); + setAttribute(t, 'gen_ai.request.temperature', config?.temperature); + setAttribute(t, 'gen_ai.request.top_k', config?.topK); + setAttribute(t, 'gen_ai.request.top_p', config?.topP); + setAttribute(t, 'gen_ai.response.finish_reasons', [output.finishReason]); + setAttribute(t, 'gen_ai.response.id', client?.responseId); + setAttribute(t, 'gen_ai.response.model', client?.responseModel); + setAttribute(t, 'gen_ai.usage.input_tokens', usage?.inputTokens); + setAttribute(t, 'gen_ai.usage.output_tokens', usage?.outputTokens); + setAttribute(t, 'server.address', client?.serverAddress); + span.telemetry = t; +} + +function setAttribute( + attrs: Record, + key: string, + attribute?: AttributeValue +) { + if (attribute) { + attrs[key] = attribute!; + } +} diff --git a/js/core/src/index.ts b/js/core/src/index.ts index 7dce71659..7c8e8e181 100644 --- a/js/core/src/index.ts +++ b/js/core/src/index.ts @@ -46,6 +46,7 @@ export { type StreamingFlowConfig, type __RequestWithAuth, } from './flow.js'; +export * from './metrics.js'; export * from './plugin.js'; export * from './reflection.js'; export { defineJsonSchema, defineSchema, type JSONSchema } from './schema.js'; diff --git a/js/ai/src/metrics.ts b/js/core/src/metrics.ts similarity index 58% rename from js/ai/src/metrics.ts rename to js/core/src/metrics.ts index a07585cad..3b1824d4f 100644 --- a/js/ai/src/metrics.ts +++ b/js/core/src/metrics.ts @@ -14,8 +14,7 @@ * limitations under the License. */ -import { Histogram, Meter, metrics, ValueType } from '@opentelemetry/api'; -import { GenerateResponse } from './generate/response.js'; +import { Counter, Histogram, Meter, metrics } from '@opentelemetry/api'; type MetricCreateFn = (meter: Meter) => T; export const METER_NAME = 'genkit'; @@ -28,7 +27,7 @@ export const METER_NAME = 'genkit'; * conditions we defer the instantiation of the metric to when it is first * ticked. */ -class Metric { +export class Metric { readonly createFn: MetricCreateFn; readonly meterName: string; metric?: T; @@ -49,6 +48,25 @@ class Metric { } } +/** + * Wrapper for an OpenTelemetry Counter. + * + * By using this wrapper, we defer initialization of the counter until it is + * need, which ensures that the OpenTelemetry SDK has been initialized before + * the metric has been defined. + */ +export class MetricCounter extends Metric { + constructor(name: string, options: any) { + super((meter) => meter.createCounter(name, options)); + } + + add(val?: number, opts?: any) { + if (val) { + this.get().add(val, opts); + } + } +} + /** * Wrapper for an OpenTelemetry Histogram. * @@ -67,39 +85,3 @@ export class MetricHistogram extends Metric { } } } - -const tokenUsage = new MetricHistogram('gen_ai.client.token.usage', { - description: 'Usage of GenAI tokens.', - valueType: ValueType.INT, - unit: 'token', -}); - -const operationDuration = new MetricHistogram( - 'gen_ai.client.operation.duration', - { - description: 'Time taken for GenAI operations', - valueType: ValueType.DOUBLE, - unit: 'token', - } -); - -export function writeMetrics(resp: GenerateResponse, durationMs: number): void { - const commonDimensions = { - 'gen_ai.client.framework': 'genkit', - 'gen_ai.operation.name': resp.clientTelemetry?.operationName, - 'gen_ai.system': resp.clientTelemetry?.system, - 'gen_ai.request.model': resp.clientTelemetry?.requestModel, - 'server.port': resp.clientTelemetry?.serverPort, - 'gen_ai.response.model': resp.clientTelemetry?.responseModel, - 'server.address': resp.clientTelemetry?.serverAddress, - }; - tokenUsage.record(resp.usage?.inputTokens || 0, { - ...commonDimensions, - 'gen_ai.token.type': 'input', - }); - tokenUsage.record(resp.usage?.outputTokens || 0, { - ...commonDimensions, - 'gen_ai.token.type': 'output', - }); - operationDuration.record(durationMs, commonDimensions); -} diff --git a/js/core/src/tracing/instrumentation.ts b/js/core/src/tracing/instrumentation.ts index 34612ee2f..fbe668247 100644 --- a/js/core/src/tracing/instrumentation.ts +++ b/js/core/src/tracing/instrumentation.ts @@ -136,6 +136,9 @@ export async function runInNewSpan( throw e; } finally { otSpan.setAttributes(metadataToAttributes(opts.metadata)); + if (opts.metadata.telemetry) { + otSpan.setAttributes(opts.metadata.telemetry); + } otSpan.end(); } } @@ -183,6 +186,10 @@ function getErrorMessage(e: any): string { function metadataToAttributes(metadata: SpanMetadata): Record { const out = {} as Record; Object.keys(metadata).forEach((key) => { + // The telemetry metadata gets added directly as span attributes + if (key === 'telemetry') { + return; + } if ( key === 'metadata' && typeof metadata[key] === 'object' && diff --git a/js/core/src/tracing/types.ts b/js/core/src/tracing/types.ts index e8b53d70a..750ad9b59 100644 --- a/js/core/src/tracing/types.ts +++ b/js/core/src/tracing/types.ts @@ -35,6 +35,15 @@ export const TraceMetadataSchema = z.object({ }); export type TraceMetadata = z.infer; +export const AttributeValueSchema = z.union([ + z.string(), + z.number(), + z.boolean(), + z.array(z.union([z.null(), z.undefined(), z.string()])), + z.array(z.union([z.null(), z.undefined(), z.number()])), + z.array(z.union([z.null(), z.undefined(), z.boolean()])), +]); + export const SpanMetadataSchema = z.object({ name: z.string(), state: z.enum(['success', 'error']).optional(), @@ -42,6 +51,7 @@ export const SpanMetadataSchema = z.object({ output: z.any().optional(), isRoot: z.boolean().optional(), metadata: z.record(z.string(), z.string()).optional(), + telemetry: z.record(z.string(), AttributeValueSchema).optional(), path: z.string().optional(), }); export type SpanMetadata = z.infer;