Skip to content

Commit

Permalink
fix(worker): Fix prometheus and otel regressions (#1345)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjameswh authored Jan 25, 2024
1 parent 6b35787 commit 25a7695
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 82 deletions.
36 changes: 19 additions & 17 deletions packages/core-bridge/src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ impl ArrayHandleConversionsExt for Handle<'_, JsArray> {
}
}

pub(crate) type TelemOptsRes = (
TelemetryOptions,
Option<Box<dyn FnOnce() -> Arc<dyn CoreMeter> + Send>>,
);
type BoxedMeterMaker = Box<dyn FnOnce() -> Result<Arc<dyn CoreMeter>, String> + Send + Sync>;

pub(crate) type TelemOptsRes = (TelemetryOptions, Option<BoxedMeterMaker>);

pub trait ObjectHandleConversionsExt {
fn set_default(&self, cx: &mut FunctionContext, key: &str, value: &str) -> NeonResult<()>;
Expand Down Expand Up @@ -220,20 +219,24 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
.unwrap_err()
})?;

meter_maker = Some(Box::new(move || {
let prom_info = start_prometheus_metric_exporter(options)
.expect("Failed creating prometheus exporter");
prom_info.meter as Arc<dyn CoreMeter>
})
as Box<dyn FnOnce() -> Arc<dyn CoreMeter> + Send>);
meter_maker =
Some(
Box::new(move || match start_prometheus_metric_exporter(options) {
Ok(prom_info) => Ok(prom_info.meter as Arc<dyn CoreMeter>),
Err(e) => Err(format!("Failed to start prometheus exporter: {}", e)),
}) as BoxedMeterMaker,
);
} else if let Some(ref otel) = js_optional_getter!(cx, metrics, "otel", JsObject) {
let mut options = OtelCollectorOptionsBuilder::default();

let url = js_value_getter!(cx, otel, "url", JsString);
match Url::parse(&url) {
Ok(url) => options.url(url),
Err(_) => {
return cx.throw_type_error("Invalid telemetryOptions.metrics.otel.url");
Err(e) => {
return cx.throw_type_error(format!(
"Invalid telemetryOptions.metrics.otel.url: {}",
e
))?;
}
};

Expand Down Expand Up @@ -269,11 +272,10 @@ impl ObjectHandleConversionsExt for Handle<'_, JsObject> {
.unwrap_err()
})?;

meter_maker = Some(Box::new(move || {
let otlp_exporter =
build_otlp_metric_exporter(options).expect("Failed to build otlp exporter");
Arc::new(otlp_exporter) as Arc<dyn CoreMeter>
}));
meter_maker = Some(Box::new(move || match build_otlp_metric_exporter(options) {
Ok(otlp_exporter) => Ok(Arc::new(otlp_exporter) as Arc<dyn CoreMeter>),
Err(e) => Err(format!("Failed to start otlp exporter: {}", e)),
}) as BoxedMeterMaker);
} else {
cx.throw_type_error(
"Invalid telemetryOptions.metrics, missing `prometheus` or `otel` option",
Expand Down
36 changes: 32 additions & 4 deletions packages/core-bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use temporal_sdk_core::{
replay::{HistoryForReplay, ReplayWorkerInput},
ClientOptions, RetryClient, WorkerConfig,
};
use tokio::sync::oneshot;
use tokio::sync::{
mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
Mutex,
Expand Down Expand Up @@ -119,6 +120,7 @@ pub fn start_bridge_loop(
telemetry_options: TelemOptsRes,
channel: Arc<Channel>,
receiver: &mut UnboundedReceiver<RuntimeRequest>,
result_sender: oneshot::Sender<Result<(), String>>,
) {
let mut tokio_builder = tokio::runtime::Builder::new_multi_thread();
tokio_builder.enable_all().thread_name("core");
Expand All @@ -129,10 +131,24 @@ pub fn start_bridge_loop(

core_runtime.tokio_handle().block_on(async {
if let Some(meter_maker) = meter_maker {
core_runtime
.telemetry_mut()
.attach_late_init_metrics(meter_maker());
match meter_maker() {
Ok(meter) => {
core_runtime.telemetry_mut().attach_late_init_metrics(meter);
}
Err(err) => {
result_sender
.send(Err(format!("Failed to create meter: {}", err)))
.unwrap_or_else(|_| {
panic!("Failed to report runtime start error: {}", err)
});
return;
}
}
}
result_sender
.send(Ok(()))
.expect("Failed to report runtime start success");

loop {
let request_option = receiver.recv().await;
let request = match request_option {
Expand Down Expand Up @@ -386,7 +402,19 @@ pub fn runtime_new(mut cx: FunctionContext) -> JsResult<BoxedRuntime> {
let channel = Arc::new(cx.channel());
let (sender, mut receiver) = unbounded_channel::<RuntimeRequest>();

std::thread::spawn(move || start_bridge_loop(telemetry_options, channel, &mut receiver));
// FIXME: This is a temporary fix to get sync notifications of errors while initializing the runtime.
// The proper fix would be to avoid spawning a new thread here, so that start_bridge_loop
// can simply yeild back a Result. But early attempts to do just that caused panics
// on runtime shutdown, so let's use this hack until we can dig deeper.
let (result_sender, result_receiver) = oneshot::channel::<Result<(), String>>();

std::thread::spawn(move || {
start_bridge_loop(telemetry_options, channel, &mut receiver, result_sender)
});

if let Ok(Err(e)) = result_receiver.blocking_recv() {
Err(cx.throw_error::<_, String>(e).unwrap_err())?;
}

Ok(cx.boxed(Arc::new(RuntimeHandle { sender })))
}
Expand Down
17 changes: 17 additions & 0 deletions packages/test/src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as net from 'net';
import path from 'path';
import StackUtils from 'stack-utils';
import ava, { TestFn } from 'ava';
Expand Down Expand Up @@ -93,6 +94,7 @@ export const bundlerOptions = {
'@grpc/grpc-js',
'async-retry',
'uuid',
'net',
],
};

Expand Down Expand Up @@ -183,3 +185,18 @@ export async function registerDefaultCustomSearchAttributes(connection: Connecti
const timeTaken = Date.now() - startTime;
console.log(`... Registered (took ${timeTaken / 1000} sec)!`);
}

export async function getRandomPort(fn = (_port: number) => Promise.resolve()): Promise<number> {
return new Promise<number>((resolve, reject) => {
const srv = net.createServer();
srv.listen({ port: 0, host: '127.0.0.1' }, () => {
const addr = srv.address();
if (typeof addr === 'string' || addr === null) {
throw new Error('Unexpected server address type');
}
fn(addr.port)
.catch((e) => reject(e))
.finally(() => srv.close((_) => resolve(addr.port)));
});
});
}
90 changes: 89 additions & 1 deletion packages/test/src/test-otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/**
* Manual tests to inspect tracing output
*/
import * as http2 from 'http2';
import { SpanStatusCode } from '@opentelemetry/api';
import { ExportResultCode } from '@opentelemetry/core';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc';
Expand All @@ -17,11 +18,98 @@ import {
} from '@temporalio/interceptors-opentelemetry/lib/worker';
import { OpenTelemetrySinks, SpanName, SPAN_DELIMITER } from '@temporalio/interceptors-opentelemetry/lib/workflow';
import { DefaultLogger, InjectedSinks, Runtime } from '@temporalio/worker';
import { TestWorkflowEnvironment } from '@temporalio/testing';
import * as activities from './activities';
import { ConnectionInjectorInterceptor } from './activities/interceptors';
import { RUN_INTEGRATION_TESTS, Worker } from './helpers';
import * as workflows from './workflows';

async function withHttp2Server(
fn: (port: number) => Promise<void>,
requestListener?: (request: http2.Http2ServerRequest, response: http2.Http2ServerResponse) => void
): Promise<void> {
return new Promise<void>((resolve, reject) => {
const srv = http2.createServer();
srv.listen({ port: 0, host: '127.0.0.1' }, () => {
const addr = srv.address();
if (typeof addr === 'string' || addr === null) {
throw new Error('Unexpected server address type');
}
srv.on('request', async (req, res) => {
if (requestListener) await requestListener(req, res);
res.end();
});
fn(addr.port)
.catch((e) => reject(e))
.finally(() => srv.close((_) => resolve()));
});
});
}

test.serial('Runtime.install() throws meaningful error when passed invalid metrics.otel.url', async (t) => {
t.throws(() => Runtime.install({ telemetryOptions: { metrics: { otel: { url: ':invalid' } } } }), {
instanceOf: TypeError,
message: /Invalid telemetryOptions.metrics.otel.url/,
});
});

test.serial('Runtime.install() accepts metrics.otel.url without headers', async (t) => {
try {
Runtime.install({ telemetryOptions: { metrics: { otel: { url: 'http://127.0.0.1:1234' } } } });
t.pass();
} finally {
// Cleanup the runtime so that it doesn't interfere with other tests
await Runtime._instance?.shutdown();
}
});

test.serial('Exporting OTEL metrics from Core works', async (t) => {
let resolveCapturedRequest = (_req: http2.Http2ServerRequest) => undefined as void;
const capturedRequest = new Promise<http2.Http2ServerRequest>((r) => (resolveCapturedRequest = r));
await withHttp2Server(async (port: number) => {
Runtime.install({
telemetryOptions: {
metrics: {
otel: {
url: `http://127.0.0.1:${port}`,
headers: {
'x-test-header': 'test-value',
},
metricsExportInterval: 10,
},
},
},
});

const localEnv = await TestWorkflowEnvironment.createLocal();
try {
const worker = await Worker.create({
connection: localEnv.nativeConnection,
workflowsPath: require.resolve('./workflows'),
taskQueue: 'test-otel',
});
const client = new WorkflowClient({
connection: localEnv.connection,
});
await worker.runUntil(async () => {
await client.execute(workflows.successString, {
taskQueue: 'test-otel',
workflowId: uuid4(),
});
const req = await Promise.race([
capturedRequest,
await new Promise<undefined>((resolve) => setTimeout(() => resolve(undefined), 2000)),
]);
t.truthy(req);
t.is(req?.url, '/opentelemetry.proto.collector.metrics.v1.MetricsService/Export');
t.is(req?.headers['x-test-header'], 'test-value');
});
} finally {
await localEnv.teardown();
}
}, resolveCapturedRequest);
});

if (RUN_INTEGRATION_TESTS) {
test.serial('Otel interceptor spans are connected and complete', async (t) => {
const spans = Array<opentelemetry.tracing.ReadableSpan>();
Expand Down Expand Up @@ -178,7 +266,7 @@ if (RUN_INTEGRATION_TESTS) {
});
await worker.runUntil(client.execute(workflows.smorgasbord, { taskQueue: 'test-otel', workflowId: uuid4() }));
// Allow some time to ensure spans are flushed out to collector
await new Promise((resolve) => setTimeout(resolve, 5000));
await new Promise<void>((resolve) => setTimeout(resolve, 5000));
t.pass();
});

Expand Down
80 changes: 43 additions & 37 deletions packages/test/src/test-prometheus.ts
Original file line number Diff line number Diff line change
@@ -1,51 +1,57 @@
import * as net from 'net';
import test from 'ava';
import { v4 as uuid4 } from 'uuid';
import fetch from 'node-fetch';
import { WorkflowClient } from '@temporalio/client';
import { NativeConnection, Runtime } from '@temporalio/worker';
import * as activities from './activities';
import { RUN_INTEGRATION_TESTS, Worker } from './helpers';
import { Runtime } from '@temporalio/worker';
import { TestWorkflowEnvironment } from '@temporalio/testing';
import { Worker, getRandomPort } from './helpers';
import * as workflows from './workflows';

async function getRandomPort(): Promise<number> {
return new Promise<number>((res) => {
const srv = net.createServer();
srv.listen(0, () => {
const addr = srv.address();
if (typeof addr === 'string' || addr === null) {
throw new Error('Unexpected server address type');
}
srv.close((_) => res(addr.port));
});
test.serial('Runtime.install() throws meaningful error when passed invalid metrics.prometheus.bindAddress', (t) => {
t.throws(() => Runtime.install({ telemetryOptions: { metrics: { prometheus: { bindAddress: ':invalid' } } } }), {
instanceOf: TypeError,
message: 'Invalid telemetryOptions.metrics.prometheus.bindAddress',
});
}
});

test.serial(
'Runtime.install() throws meaningful error when metrics.prometheus.bindAddress port is already taken',
async (t) => {
await getRandomPort(async (port: number) => {
t.throws(
() => Runtime.install({ telemetryOptions: { metrics: { prometheus: { bindAddress: `127.0.0.1:${port}` } } } }),
{
instanceOf: Error,
message: /(Address already in use|socket address)/,
}
);
});
}
);

if (RUN_INTEGRATION_TESTS) {
test.serial('Prometheus metrics work', async (t) => {
const port = await getRandomPort();
Runtime.install({
telemetryOptions: {
metrics: {
prometheus: {
bindAddress: `127.0.0.1:${port}`,
},
test.serial('Exporting Prometheus metrics from Core works', async (t) => {
const port = await getRandomPort();
Runtime.install({
telemetryOptions: {
metrics: {
prometheus: {
bindAddress: `127.0.0.1:${port}`,
},
},
});
const connection = await NativeConnection.connect({
address: '127.0.0.1:7233',
});
},
});
const localEnv = await TestWorkflowEnvironment.createLocal();
try {
const worker = await Worker.create({
connection,
connection: localEnv.nativeConnection,
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'test-prometheus',
});

const client = new WorkflowClient();
const client = new WorkflowClient({
connection: localEnv.connection,
});
await worker.runUntil(async () => {
await client.execute(workflows.cancelFakeProgress, {
await client.execute(workflows.successString, {
taskQueue: 'test-prometheus',
workflowId: uuid4(),
});
Expand All @@ -54,7 +60,7 @@ if (RUN_INTEGRATION_TESTS) {
const text = await resp.text();
t.assert(text.includes('task_slots'));
});

t.pass();
});
}
} finally {
await localEnv.teardown();
}
});
Loading

0 comments on commit 25a7695

Please sign in to comment.