Skip to content

Commit

Permalink
feat: better transferring (#1)
Browse files Browse the repository at this point in the history
* fix: fix handling of transferables

Signed-off-by: Okiki <[email protected]>

* feat: support boolean type for UseWorkerFnOpts.transfer

* test: add tests

* chore: resolve issue comments

Signed-off-by: Okiki <[email protected]>

* update

---------

Signed-off-by: Okiki <[email protected]>
Co-authored-by: mys1024 <[email protected]>
  • Loading branch information
okikio and mys1024 authored Mar 17, 2024
1 parent da7c792 commit 6c02271
Show file tree
Hide file tree
Showing 13 changed files with 193 additions and 61 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,30 @@ jobs:

- name: Test
run: deno task test:cov

npm-trial-build:
name: npm trial build
runs-on: ubuntu-latest
needs: test

steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Prepare files
run: mv npm/* .

- name: Setup pnpm
uses: pnpm/action-setup@v3

- name: Setup Node
uses: actions/setup-node@v4
with:
registry-url: 'https://registry.npmjs.org'
cache: pnpm

- name: Install
run: pnpm install

- name: Build
run: pnpm run build
2 changes: 2 additions & 0 deletions .npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
registry=https://registry.npmjs.org
@jsr:registry=https://npm.jsr.io
2 changes: 2 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
"deno",
"denoland",
"minzip",
"okikio",
"pnpx",
"runtimes",
"Transferables",
"tsup",
"typeof"
]
Expand Down
1 change: 1 addition & 0 deletions deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"publish": "deno publish"
},
"imports": {
"@okikio/transferables": "jsr:@okikio/transferables@^1.0.2",
"@std/assert": "jsr:@std/[email protected]"
},
"exports": {
Expand Down
5 changes: 5 additions & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions npm/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "worker-fn hides the complexity of communication between the JavaScript main thread and Worker threads, making it easy to call the functions defined in workers.",
"type": "module",
"author": "mys1024",
"packageManager": "pnpm@8.6.1",
"packageManager": "pnpm@8.15.4",
"license": "MIT",
"homepage": "https://github.com/mys1024/worker-fn#readme",
"repository": {
Expand Down Expand Up @@ -41,7 +41,8 @@
}
},
"devDependencies": {
"@okikio/transferables": "npm:@jsr/okikio__transferables@^1.0.2",
"tsup": "^8.0.2",
"typescript": "^5.3.3"
"typescript": "^5.4.2"
}
}
21 changes: 14 additions & 7 deletions npm/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 26 additions & 12 deletions src/rpc/rpc.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { getTransferables, isSupported } from "@okikio/transferables";
import type {
AnyFn,
AwaitedRet,
Expand All @@ -6,16 +7,21 @@ import type {
RpcCallMsg,
RpcReturnMsg,
} from "./types.ts";
import {
isRpcCallMsg,
isRpcReturnMsg,
isTransferable,
toMsgPortNormalized,
} from "./utils.ts";
import { isRpcCallMsg, isRpcReturnMsg, toMsgPortNormalized } from "./utils.ts";

const DEFAULT_NAMESPACE = "rpc";
const RPC_AGENT = Symbol("rpcAgent");

// Support flags for features like streams and channels,
// we want to confirm if streams and channels are transferable
// transferables aren't supported the same way across all browsers and runtimes,
// in some runtimes it's more like partial support
let supportFlags: Awaited<ReturnType<typeof isSupported>> = {
streams: false,
channel: false,
};
isSupported().then((res) => (supportFlags = res));

export class RpcAgent {
#msgPort: MsgPortNormalized;
#callCount = 0;
Expand Down Expand Up @@ -73,12 +79,11 @@ export class RpcAgent {
}
}

callRemoteFn<FN extends AnyFn>(name: string, options: {
callRemoteFn<FN extends AnyFn>(name: string, args: Parameters<FN>, options: {
namespace?: string;
args?: Parameters<FN>;
transfer?: (args: Parameters<FN>) => Transferable[];
transfer?: boolean | ((ctx: { args: Parameters<FN> }) => Transferable[]);
} = {}) {
const { namespace = DEFAULT_NAMESPACE, args = [], transfer } = options;
const { namespace = DEFAULT_NAMESPACE, transfer } = options;

const keyCallMap = this.#getKeyCallMap(namespace, true);

Expand All @@ -94,7 +99,11 @@ export class RpcAgent {
type: "call",
args,
}, {
transfer: transfer?.(args as any),
transfer: transfer
? transfer === true
? getTransferables(args, supportFlags.streams)
: transfer?.({ args })
: undefined,
});

return ret;
Expand Down Expand Up @@ -182,7 +191,12 @@ export class RpcAgent {
ok: true,
ret,
}, {
transfer: transfer && isTransferable(ret) ? [ret] : undefined,
// If the function is marked for transferring data, it uses `getTransferables` to grab none cloneable data,
// transferables exist due to them not being able to be cloned, so to ensure in a complex object we grab all the
// transferables that can't be cloned we traverse the entire object finding all transferables, and listing them to be transferred
transfer: transfer
? getTransferables(ret, supportFlags.streams)
: undefined,
});
} catch (err) {
this.#sendReturnMsg({
Expand Down
32 changes: 0 additions & 32 deletions src/rpc/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,6 @@ import {
RpcReturnMsg,
} from "./types.ts";

/**
* @see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Transferable_objects#supported_objects
*/
const transferableClasses = (function () {
return [
"ArrayBuffer",
"MessagePort",
"ReadableStream",
"WritableStream",
"TransformStream",
"WebTransportReceiveStream",
"WebTransportSendStream",
"AudioData",
"ImageBitmap",
"VideoFrame",
"OffscreenCanvas",
"RTCDataChannel",
].map((name) => getGlobalVar(name)).filter((v) => !!v);
})();

export function getGlobalVar(name: string) {
try {
return Function(`return ${name}`)();
} catch {
return undefined;
}
}

export function isTransferable(val: any): val is Transferable {
return transferableClasses.some((c) => val instanceof c);
}

export function isRpcCallMsg(val: any): val is RpcCallMsg {
return val?.type === "call";
}
Expand Down
5 changes: 2 additions & 3 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ export interface DefineWorkerFnOpts {

export interface UseWorkerFnOpts<FN extends AnyFn> {
/**
* A function that determines objects to be transferred when posting messages to the worker thread.
* A boolean value indicating whether to transfer the arguments, or a function that returns transferable objects should be transferred.
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/Worker/postMessage#transfer
* @param ctx The context of the function call.
* @returns Transferable objects.
*/
transfer?: (ctx: { args: Parameters<FN> }) => Transferable[];
transfer?: boolean | ((ctx: { args: Parameters<FN> }) => Transferable[]);
}
7 changes: 3 additions & 4 deletions src/use.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ export function useWorkerFn<FN extends AnyFn>(
const rpcAgent = RpcAgent.getRpcAgent(worker);
// the proxy function
function fn(...args: Parameters<FN>) {
return rpcAgent.callRemoteFn(name, {
args,
return rpcAgent.callRemoteFn(name, args, {
namespace: "fn",
transfer: transfer ? () => transfer({ args }) : undefined,
transfer,
});
}
return fn;
Expand Down Expand Up @@ -80,7 +79,7 @@ export async function inspectWorker(worker: MsgPort): Promise<{
names: string[];
}> {
const rpcAgent = RpcAgent.getRpcAgent(worker);
const names = await rpcAgent.callRemoteFn<InternalFns["names"]>("names", {
const names = await rpcAgent.callRemoteFn<InternalFns["names"]>("names", [], {
namespace: "fn-internal",
});
return { names };
Expand Down
61 changes: 60 additions & 1 deletion test/basic.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import { assertEquals } from "@std/assert";
import { useWorkerFn } from "../src/main.ts";
import type { Add, Fib, Redefine, ThrowErr } from "./basic.test.worker.ts";
import type {
Add,
AddBytesWithoutTransferring,
AddBytesWithTransferring,
Fib,
Redefine,
ThrowErr,
} from "./basic.test.worker.ts";
import type { Add as Add2 } from "./basic.test.worker.another.ts";

Deno.test({
Expand Down Expand Up @@ -139,6 +146,58 @@ Deno.test({
);
}
});

await t.step("transferring enabled", async () => {
const worker = new Worker(
new URL("./basic.test.worker.ts", import.meta.url),
{
type: "module",
},
);
const addBytesWithTransferring = useWorkerFn<AddBytesWithTransferring>(
"addBytesWithTransferring",
worker,
{
transfer: true,
},
);
const bytes1 = new Uint8Array([1, 2, 3]).buffer;
const bytes2 = new Uint8Array([3, 2, 1]).buffer;
assertEquals(
Array.from(
new Uint8Array(await addBytesWithTransferring(bytes1, bytes2, 3)),
),
[4, 4, 4],
);
assertEquals(bytes1.byteLength, 0);
assertEquals(bytes2.byteLength, 0);
});

await t.step("transferring disabled", async () => {
const worker = new Worker(
new URL("./basic.test.worker.ts", import.meta.url),
{
type: "module",
},
);
const addBytesWithoutTransferring = useWorkerFn<
AddBytesWithoutTransferring
>("addBytesWithoutTransferring", worker, {
transfer: false,
});
const bytes1 = new Uint8Array([1, 2, 3]).buffer;
const bytes2 = new Uint8Array([3, 2, 1]).buffer;
assertEquals(
Array.from(
new Uint8Array(
await addBytesWithoutTransferring(bytes1, bytes2, 3),
),
),
[4, 4, 4],
);
assertEquals(bytes1.byteLength, 3);
assertEquals(bytes2.byteLength, 3);
});
});
},
});
Loading

0 comments on commit 6c02271

Please sign in to comment.