Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better adaptor errors #583

Draft
wants to merge 34 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4f26afc
engine: spike mapping console logs to an adaptor logger
josephjclark Jan 30, 2024
201d528
runtime: messy tweak to module loading
josephjclark Jan 30, 2024
1780329
engine,runtime: revert linker change and fix tests
josephjclark Jan 30, 2024
058eff2
engine: track test file
josephjclark Jan 30, 2024
649ca43
logger: dont stringify json output AND serialize errors
josephjclark Jan 31, 2024
deee94b
logger: tidy
josephjclark Jan 31, 2024
271f1a8
engine: don't parse json logs coming out of the logger
josephjclark Jan 31, 2024
3877474
engine, worker: better handling of objects coming from the logger
josephjclark Jan 31, 2024
bdf2586
engine: fix tests
josephjclark Jan 31, 2024
d13adfb
logger: tests and types
josephjclark Jan 31, 2024
593ebd5
cli: update test
josephjclark Jan 31, 2024
d8e8ee3
engine: types
josephjclark Jan 31, 2024
a60a83e
worker: update tests
josephjclark Jan 31, 2024
0d9804e
logger: set a special json emitter so that json logs get nicely print…
josephjclark Jan 31, 2024
99defcd
logger: fix types
josephjclark Jan 31, 2024
94c900f
logger: log all json to .log
josephjclark Jan 31, 2024
77bab1a
tests: fixes
josephjclark Jan 31, 2024
1313b09
logger: fix tests
josephjclark Jan 31, 2024
63a251a
logger: serialise print() properly
josephjclark Jan 31, 2024
8024831
logger: types
josephjclark Jan 31, 2024
bf610e7
engine: fix logs to gcp
josephjclark Jan 31, 2024
6cc50fc
test: update log handling
josephjclark Jan 31, 2024
502eaef
engine: fix passing test
josephjclark Jan 31, 2024
4b080c0
runtime: add tests on job logger and errors
josephjclark Jan 31, 2024
6e18805
logger: improve detection of error objects
josephjclark Jan 31, 2024
32aed67
engine: tests on error logging
josephjclark Jan 31, 2024
63baf52
Merge branch 'main' into handle-adaptor-logs
josephjclark Feb 1, 2024
2f80a78
engine: restore adaptor logger
josephjclark Feb 1, 2024
823b471
changesets
josephjclark Feb 1, 2024
b76266e
Tidy ups
josephjclark Feb 1, 2024
425c515
engine: refactor log messages (and be a bit more lenient about struct…
josephjclark Feb 1, 2024
2fb6493
worker: simplify logging
josephjclark Feb 1, 2024
7cd9e5e
tiny tidyups
josephjclark Feb 1, 2024
d38755c
spike: better error reporting for adaptor errors
josephjclark Feb 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/dull-bags-punch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@openfn/logger': patch
---

In JSON mode, do not stringify emitted messages.
Better handling of error objects
5 changes: 5 additions & 0 deletions .changeset/real-snakes-begin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/engine-multi': patch
---

Update handling of logs so that JSON messages are stringified
5 changes: 5 additions & 0 deletions .changeset/unlucky-moose-greet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/ws-worker': patch
---

Update handling of logs to accept stringified messages
6 changes: 3 additions & 3 deletions integration-tests/cli/test/errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ test.serial('circular workflow', async (t) => {
assertLog(t, stdlogs, /Error validating execution plan/i);
assertLog(t, stdlogs, /Workflow failed/i);

const error = stdlogs.find((l) => l.message[0].severity);
const error = stdlogs.find((l) => l.message[0].name === 'ValidationError');
t.regex(error.message[0].message, /circular dependency: b <-> a/i);
});

Expand All @@ -116,7 +116,7 @@ test.serial('multiple inputs', async (t) => {
assertLog(t, stdlogs, /Error validating execution plan/i);
assertLog(t, stdlogs, /Workflow failed/i);

const error = stdlogs.find((l) => l.message[0].severity);
const error = stdlogs.find((l) => l.message[0].name === 'ValidationError');
t.regex(error.message[0].message, /multiple dependencies detected for: c/i);
});

Expand All @@ -132,6 +132,6 @@ test.serial('invalid start', async (t) => {
assertLog(t, stdlogs, /Workflow failed/i);

// Find the error obejct which is logged out
const error = stdlogs.find((l) => l.message[0].severity);
const error = stdlogs.find((l) => l.message[0].name === 'ValidationError');
t.regex(error.message[0].message, /could not find start job: nope/i);
});
1 change: 0 additions & 1 deletion integration-tests/cli/test/metadata.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ test.serial(
`openfn metadata -S "${state}" -a test=${modulePath} --log-json --log info`,
async (t) => {
const { stdout } = await run(t.title);

t.regex(stdout, /Generating metadata/);
t.regex(stdout, /Metadata function found. Generating metadata/);
t.notRegex(stdout, /Returning metadata from cache/);
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ test("Don't send job logs to stdout", (t) => {
};

lightning.once('run:complete', () => {
const jsonLogs = engineLogger._history.map((l) => JSON.parse(l));
const jsonLogs = engineLogger._history;

// The engine logger shouldn't print out any job logs
const jobLog = jsonLogs.find((l) => l.name === 'JOB');
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/test/util/print-versions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ test('json output', async (t) => {
const logger = createMockLogger('', { level: 'info', json: true });
await printVersions(logger, { adaptors: ['http'], logJson: true });

const last = JSON.parse(logger._last) as JSONLog;
const last = logger._last as JSONLog;
t.is(last.level, 'always');

const [{ versions }] = last.message;
Expand Down
10 changes: 10 additions & 0 deletions packages/engine-multi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,13 @@ engine.execute(plan, { resolvers });
```

Initial state and credentials are at the moment pre-loaded, with a "fully resolved" state object passed into the runtime. The Runtime has the ability to lazy load but implementing lazy loading across the worker_thread interface has proven tricky.

## Note on Debugging

Debugging in the engine can be really tricky.

First there's the problem that a lot of code runs inside a worker thread in a child process, which is hard to get a breakpoint into (at the time of writing I haven't managed to do it).

But also, any console.log statements inside the inner thread will get consumed by the adaptor logger and won't go to stdout.

As a workaround to this, use console.debug inside the thread to print to stdout. This is not bound to the adaptor logger.
3 changes: 2 additions & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"@openfn/compiler": "workspace:*",
"@openfn/language-common": "2.0.0-rc3",
"@openfn/logger": "workspace:*",
"@openfn/runtime": "workspace:*"
"@openfn/runtime": "workspace:*",
"fast-safe-stringify": "^2.1.1"
},
"devDependencies": {
"@types/node": "^18.15.13",
Expand Down
10 changes: 8 additions & 2 deletions packages/engine-multi/src/api/call-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,20 @@ export default function initWorkers(
logger
);

const callWorker: CallWorker = (task, args = [], events = [], options = {}) =>
workers.exec(task, args, {
const callWorker: CallWorker = (
task,
args = [],
events = [],
options = {}
) => {
return workers.exec(task, args, {
...options,
on: ({ type, ...args }: WorkerEvent) => {
// just call the callback
events[type]?.(args);
},
});
};

const closeWorkers = async (instant?: boolean) => workers.destroy(instant);

Expand Down
4 changes: 2 additions & 2 deletions packages/engine-multi/src/api/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ const execute = async (context: ExecutionContext) => {
type: workerEvents.LOG,
workflowId: state.plan.id!,
threadId: '-', // no thread at this point
message: {
log: {
level: 'debug',
message: [`Memory limit: ${workerOptions.memoryLimitMb}mb`],
name: 'RTE',
Expand All @@ -80,7 +80,7 @@ const execute = async (context: ExecutionContext) => {
type: workerEvents.LOG,
workflowId: state.plan.id!,
threadId: '-', // no thread at this point
message: {
log: {
level: 'debug',
message: [`Timeout: ${workerOptions.timeout / 1000}s`],
name: 'RTE',
Expand Down
15 changes: 12 additions & 3 deletions packages/engine-multi/src/api/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,22 @@ export const log = (
) => {
const { threadId } = event;

if (event.message.name !== 'JOB') {
context.logger.proxy(event.message);
if (event.log.name !== 'JOB') {
// Forward the log event to the engine's logger
// Note that we may have to parse the serialized log string
const proxy = {
...event.log,
message:
typeof event.log.message == 'string'
? JSON.parse(event.log.message)
: event.log.message,
};
context.logger.proxy(proxy);
}

context.emit(externalEvents.WORKFLOW_LOG, {
threadId,
...event.message,
...event.log,
});
};

Expand Down
1 change: 0 additions & 1 deletion packages/engine-multi/src/api/validate-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ export default async (api: EngineAPI, timeout = 5000) => {
// TODO argument drive this
await api.callWorker('handshake', [], {}, { timeout });
} catch (e) {
console.error(e);
throw new Error('Invalid worker path');
}
};
7 changes: 2 additions & 5 deletions packages/engine-multi/src/events.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
// TODO remove ths file in favour of types

// TODO mayberename event constants
import { JSONLog } from '@openfn/logger';
import { Versions } from './types';
import { SerializedLogEvent } from './worker/events';

// If the worker thread exists a process safely, it'll return this error code
// any other error code is unexpected
Expand Down Expand Up @@ -89,7 +86,7 @@ export interface JobErrorPayload extends ExternalEvent {
next: string[]; // downstream jobs
}

export interface WorkerLogPayload extends ExternalEvent, JSONLog {}
export interface WorkerLogPayload extends ExternalEvent, SerializedLogEvent {}

export interface EdgeResolvedPayload extends ExternalEvent {
edgeId: string; // interesting, we don't really have this yet. Is index more appropriate? key? yeah, it's target node basically
Expand Down
8 changes: 7 additions & 1 deletion packages/engine-multi/src/worker/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,14 @@ export interface JobErrorEvent extends InternalEvent {
next: string[];
}

export type SerializedLogEvent = Omit<JSONLog, 'message'> & {
// the message is either an array of strings/object to log,
// or a JSON array that was previously serialized
message: string | any[];
};

export interface LogEvent extends InternalEvent {
message: JSONLog;
log: SerializedLogEvent;
}

export interface ErrorEvent {
Expand Down
27 changes: 20 additions & 7 deletions packages/engine-multi/src/worker/thread/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// This is designed to minimize the amount of code we have to mock

import process from 'node:process';

import stringify from 'fast-safe-stringify';
import createLogger, { SanitizePolicies } from '@openfn/logger';

import * as workerEvents from '../events';
Expand All @@ -11,17 +11,22 @@ import { ExecutionError, ExitError } from '../../errors';

import { publish } from './runtime';
import serializeError from '../../util/serialize-error';
import { JSONLog } from '@openfn/logger';

export const createLoggers = (
workflowId: string,
sanitize?: SanitizePolicies
sanitize: SanitizePolicies = 'none',
publish?: any
) => {
const log = (message: string) => {
// Apparently the json log stringifies the message
// We don't really want it to do that
const log = (message: JSONLog) => {
publish(workerEvents.LOG, {
workflowId,
message: JSON.parse(message),
log: {
...message,
// stringify the message now so that we know it's safe
// this also makes it more performant to feed up to the worker
message: stringify(message.message),
},
} as workerEvents.LogEvent);
};

Expand All @@ -41,14 +46,22 @@ export const createLoggers = (
json: true,
sanitize,
});

const jobLogger = createLogger('JOB', {
logger: emitter,
level: 'debug',
json: true,
sanitize,
});

return { logger, jobLogger };
const adaptorLogger = createLogger('ADA', {
logger: emitter,
level: 'debug',
json: true,
sanitize,
});

return { logger, jobLogger, adaptorLogger };
};

// Execute wrapper function
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/src/worker/thread/mock-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type MockExecutionPlan = {
// optionally delay
function mockRun(plan: MockExecutionPlan) {
const [job] = plan.jobs;
const { jobLogger } = createLoggers(plan.id!);
const { jobLogger } = createLoggers(plan.id!, 'none', publish);
const workflowId = plan.id;
return new Promise((resolve) => {
const jobId = job.id || '<job>';
Expand Down
18 changes: 16 additions & 2 deletions packages/engine-multi/src/worker/thread/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,23 @@ register({
run: (plan: ExecutionPlan, runOptions: RunOptions) => {
const { adaptorPaths, whitelist, sanitize, statePropsToRemove } =
runOptions;
const { logger, jobLogger } = createLoggers(plan.id!, sanitize);
// TODO I would like to pull these options out of here
const { logger, jobLogger, adaptorLogger } = createLoggers(
plan.id!,
sanitize,
publish
);

// Save the debug function so that we can use it
const debug = console.debug;

// override console: any console.log statements will now get treated as adaptor logs
console = adaptorLogger;

// Leave console.debug for local debugging
// This goes to stdout but not the adpator logger
console.debug = debug;

// TODO I would like to pull these options out of here
const options = {
// disable the run/step timeout
timeout: 0,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

2 changes: 1 addition & 1 deletion packages/engine-multi/test/__repo__/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
"private": true,
"version": "1.0.0",
"dependencies": {
"helper_1.0.0": "@npm:[email protected]"
"@openfn/helper_1.0.0": "@npm:@openfn/[email protected]"
}
}
2 changes: 1 addition & 1 deletion packages/engine-multi/test/api/execute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ test.serial('should emit a log event', async (t) => {
await execute(context);

t.is(workflowLog.workflowId, 'y');
t.is(workflowLog.message[0], 'hi');
t.is(workflowLog.level, 'info');
t.deepEqual(workflowLog.message, JSON.stringify(['hi']));
});

test.serial('log events are timestamped in hr time', async (t) => {
Expand Down
6 changes: 3 additions & 3 deletions packages/engine-multi/test/api/lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ test(`log: emits ${e.WORKFLOW_LOG}`, (t) => {
const event = {
workflowId,
threadId: 'a',
message: {
log: {
level: 'info',
name: 'job',
message: ['oh hai'],
message: JSON.stringify(['oh hai']),
time: Date.now() - 100,
},
};
Expand All @@ -182,7 +182,7 @@ test(`log: emits ${e.WORKFLOW_LOG}`, (t) => {
t.deepEqual(evt, {
workflowId,
threadId: 'a',
...event.message,
...event.log,
});
done();
});
Expand Down
4 changes: 2 additions & 2 deletions packages/engine-multi/test/errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import path from 'node:path';

import createEngine, { EngineOptions } from '../src/engine';
import { createMockLogger } from '@openfn/logger';
import { WORKFLOW_COMPLETE, WORKFLOW_ERROR } from '../src/events';
import { WORKFLOW_ERROR } from '../src/events';

let engine;

Expand Down Expand Up @@ -160,7 +160,7 @@ test.serial('emit a crash error on process.exit()', (t) => {
id: 'z',
jobs: [
{
adaptor: '[email protected]',
adaptor: '@openfn/[email protected]',
expression: 'export default [exit()]',
},
],
Expand Down
Loading