Skip to content

Commit

Permalink
feat(nbstore): add nbstore worker
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Dec 19, 2024
1 parent 3058878 commit 69a880b
Show file tree
Hide file tree
Showing 51 changed files with 1,144 additions and 501 deletions.
10 changes: 10 additions & 0 deletions packages/common/infra/src/op/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ export class OpConsumer<Ops extends OpSchema> extends AutoMessageHandler {
this.registeredOpHandlers.set(op, handler);
}

registerAll(
handlers: OpNames<Ops> extends string
? { [K in OpNames<Ops>]: OpHandler<Ops, K> }
: never
) {
for (const [op, handler] of Object.entries(handlers)) {
this.register(op as any, handler as any);
}
}

before<Op extends OpNames<Ops>>(
op: Op,
handler: (...input: OpInput<Ops, Op>) => void
Expand Down
2 changes: 1 addition & 1 deletion packages/common/nbstore/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"sideEffects": false,
"exports": {
".": "./src/index.ts",
"./op": "./src/op/index.ts",
"./worker": "./src/worker/index.ts",
"./idb": "./src/impls/idb/index.ts",
"./idb/v1": "./src/impls/idb/v1/index.ts",
"./cloud": "./src/impls/cloud/index.ts",
Expand Down
18 changes: 9 additions & 9 deletions packages/common/nbstore/src/__tests__/frontend.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { AwarenessFrontend } from '../frontend/awareness';
import { DocFrontend } from '../frontend/doc';
import { BroadcastChannelAwarenessStorage } from '../impls/broadcast-channel/awareness';
import { IndexedDBDocStorage } from '../impls/idb';
import { AwarenessSync } from '../sync/awareness';
import { AwarenessSyncImpl } from '../sync/awareness';
import { expectYjsEqual } from './utils';

test('doc', async () => {
Expand All @@ -23,9 +23,9 @@ test('doc', async () => {
type: 'workspace',
});

docStorage.connect();
docStorage.connection.connect();

await docStorage.waitForConnected();
await docStorage.connection.waitForConnected();

const frontend1 = new DocFrontend(docStorage, null);
frontend1.start();
Expand Down Expand Up @@ -68,11 +68,11 @@ test('awareness', async () => {
type: 'workspace',
});

storage1.connect();
storage2.connect();
storage1.connection.connect();
storage2.connection.connect();

await storage1.waitForConnected();
await storage2.waitForConnected();
await storage1.connection.waitForConnected();
await storage2.connection.waitForConnected();

// peer a
const docA = new YDoc({ guid: 'test-doc' });
Expand All @@ -90,13 +90,13 @@ test('awareness', async () => {
const awarenessC = new Awareness(docC);

{
const sync = new AwarenessSync(storage1, [storage2]);
const sync = new AwarenessSyncImpl(storage1, [storage2]);
const frontend = new AwarenessFrontend(sync);
frontend.connect(awarenessA);
frontend.connect(awarenessB);
}
{
const sync = new AwarenessSync(storage2, [storage1]);
const sync = new AwarenessSyncImpl(storage2, [storage1]);
const frontend = new AwarenessFrontend(sync);
frontend.connect(awarenessC);
}
Expand Down
35 changes: 29 additions & 6 deletions packages/common/nbstore/src/connection/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,20 @@ export type ConnectionStatus =
| 'error'
| 'closed';

export abstract class Connection<T = any> {
export interface Connection<T = any> {
readonly status: ConnectionStatus;
readonly inner: T;
connect(): void;
disconnect(): void;
waitForConnected(signal?: AbortSignal): Promise<void>;
onStatusChanged(
cb: (status: ConnectionStatus, error?: Error) => void
): () => void;
}

export abstract class AutoReconnectConnection<T = any>
implements Connection<T>
{
private readonly event = new EventEmitter2();
private _inner: T | null = null;
private _status: ConnectionStatus = 'idle';
Expand Down Expand Up @@ -160,12 +173,22 @@ export abstract class Connection<T = any> {
};
}

export class DummyConnection extends Connection<undefined> {
doConnect() {
return Promise.resolve(undefined);
}
export class DummyConnection implements Connection<undefined> {
readonly status: ConnectionStatus = 'connected';
readonly inner: undefined;

doDisconnect() {
connect(): void {
return;
}
disconnect(): void {
return;
}
waitForConnected(_signal?: AbortSignal): Promise<void> {
return Promise.resolve();
}
onStatusChanged(
_cb: (status: ConnectionStatus, error?: Error) => void
): () => void {
return () => {};
}
}
6 changes: 3 additions & 3 deletions packages/common/nbstore/src/connection/shared-connection.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Connection } from './connection';
import type { AutoReconnectConnection } from './connection';

const CONNECTIONS: Map<string, Connection<any>> = new Map();
export function share<T extends Connection<any>>(conn: T): T {
const CONNECTIONS: Map<string, AutoReconnectConnection<any>> = new Map();
export function share<T extends AutoReconnectConnection<any>>(conn: T): T {
if (!conn.shareId) {
throw new Error(
`Connection ${conn.constructor.name} is not shareable.\nIf you want to make it shareable, please override [shareId].`
Expand Down
4 changes: 2 additions & 2 deletions packages/common/nbstore/src/frontend/awareness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ export class AwarenessFrontend {
applyAwarenessUpdate(awareness, update.bin, origin);
};
const handleSyncCollect = () => {
return {
return Promise.resolve({
docId: awareness.doc.guid,
bin: encodeAwarenessUpdate(awareness, [awareness.clientID]),
};
});
};
const unsubscribe = this.sync.subscribeUpdate(
awareness.doc.guid,
Expand Down
4 changes: 2 additions & 2 deletions packages/common/nbstore/src/frontend/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export class BlobFrontend {
return this.sync ? this.sync.uploadBlob(blob) : this.storage.set(blob);
}

addPriority(id: string, priority: number) {
return this.sync?.addPriority(id, priority);
addPriority(_id: string, _priority: number) {
// not support yet
}
}
3 changes: 1 addition & 2 deletions packages/common/nbstore/src/frontend/doc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ interface DocFrontendOptions {
}

export class DocFrontend {
private readonly uniqueId = `frontend:${this.storage.peer}:${nanoid()}`;
private readonly uniqueId = `frontend:${nanoid()}`;

private readonly prioritySettings = new Map<string, number>();

Expand Down Expand Up @@ -88,7 +88,6 @@ export class DocFrontend {
}),
]);

// eslint-disable-next-line no-constant-condition
while (true) {
throwIfAborted(signal);
const docId = await this.status.jobDocQueue.asyncPop(signal);
Expand Down
50 changes: 30 additions & 20 deletions packages/common/nbstore/src/impls/broadcast-channel/awareness.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import { nanoid } from 'nanoid';

import {
type AwarenessRecord,
AwarenessStorage,
} from '../../storage/awareness';
import { type AwarenessRecord, AwarenessStorageBase } from '../../storage';
import { BroadcastChannelConnection } from './channel';

type ChannelMessage =
Expand All @@ -19,13 +16,13 @@ type ChannelMessage =
collectId: string;
}
| {
type: 'awareness-collect-fallback';
type: 'awareness-collect-feedback';
docId: string;
bin: Uint8Array;
collectId: string;
};

export class BroadcastChannelAwarenessStorage extends AwarenessStorage {
export class BroadcastChannelAwarenessStorage extends AwarenessStorageBase {
override readonly storageType = 'awareness';
override readonly connection = new BroadcastChannelConnection(this.options);
get channel() {
Expand All @@ -36,7 +33,7 @@ export class BroadcastChannelAwarenessStorage extends AwarenessStorage {
string,
Set<{
onUpdate: (update: AwarenessRecord, origin?: string) => void;
onCollect: () => AwarenessRecord;
onCollect: () => Promise<AwarenessRecord | null>;
}>
>();

Expand All @@ -57,12 +54,20 @@ export class BroadcastChannelAwarenessStorage extends AwarenessStorage {
override subscribeUpdate(
id: string,
onUpdate: (update: AwarenessRecord, origin?: string) => void,
onCollect: () => AwarenessRecord
onCollect: () => Promise<AwarenessRecord | null>
): () => void {
const subscribers = this.subscriptions.get(id) ?? new Set();
subscribers.forEach(subscriber => {
const fallback = subscriber.onCollect();
onUpdate(fallback);
subscriber
.onCollect()
.then(awareness => {
if (awareness) {
onUpdate(awareness);
}
})
.catch(error => {
console.error('error in on collect awareness', error);
});
});

const collectUniqueId = nanoid();
Expand All @@ -84,18 +89,23 @@ export class BroadcastChannelAwarenessStorage extends AwarenessStorage {
message.data.type === 'awareness-collect' &&
message.data.docId === id
) {
const fallback = onCollect();
if (fallback) {
this.channel.postMessage({
type: 'awareness-collect-fallback',
docId: message.data.docId,
bin: fallback.bin,
collectId: collectUniqueId,
} satisfies ChannelMessage);
}
onCollect()
.then(awareness => {
if (awareness) {
this.channel.postMessage({
type: 'awareness-collect-feedback',
docId: message.data.docId,
bin: awareness.bin,
collectId: collectUniqueId,
} satisfies ChannelMessage);
}
})
.catch(error => {
console.error('error in on collect awareness', error);
});
}
if (
message.data.type === 'awareness-collect-fallback' &&
message.data.type === 'awareness-collect-feedback' &&
message.data.docId === id &&
message.data.collectId === collectUniqueId
) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Connection } from '../../connection';
import { AutoReconnectConnection } from '../../connection';
import type { StorageOptions } from '../../storage';

export class BroadcastChannelConnection extends Connection<BroadcastChannel> {
export class BroadcastChannelConnection extends AutoReconnectConnection<BroadcastChannel> {
readonly channelName = `channel:${this.opts.peer}:${this.opts.type}:${this.opts.id}`;

constructor(private readonly opts: StorageOptions) {
Expand Down
24 changes: 13 additions & 11 deletions packages/common/nbstore/src/impls/cloud/awareness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { SocketOptions } from 'socket.io-client';
import { share } from '../../connection';
import {
type AwarenessRecord,
AwarenessStorage,
AwarenessStorageBase,
type AwarenessStorageOptions,
} from '../../storage/awareness';
import {
Expand All @@ -16,7 +16,7 @@ interface CloudAwarenessStorageOptions extends AwarenessStorageOptions {
socketOptions: SocketOptions;
}

export class CloudAwarenessStorage extends AwarenessStorage<CloudAwarenessStorageOptions> {
export class CloudAwarenessStorage extends AwarenessStorageBase<CloudAwarenessStorageOptions> {
connection = share(
new SocketConnection(this.peer, this.options.socketOptions)
);
Expand All @@ -38,7 +38,7 @@ export class CloudAwarenessStorage extends AwarenessStorage<CloudAwarenessStorag
override subscribeUpdate(
id: string,
onUpdate: (update: AwarenessRecord, origin?: string) => void,
onCollect: () => AwarenessRecord
onCollect: () => Promise<AwarenessRecord | null>
): () => void {
// TODO: handle disconnect
// leave awareness
Expand Down Expand Up @@ -92,14 +92,16 @@ export class CloudAwarenessStorage extends AwarenessStorage<CloudAwarenessStorag
docId === id
) {
(async () => {
const record = onCollect();
const encodedUpdate = await uint8ArrayToBase64(record.bin);
this.socket.emit('space:update-awareness', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId: record.docId,
awarenessUpdate: encodedUpdate,
});
const record = await onCollect();
if (record) {
const encodedUpdate = await uint8ArrayToBase64(record.bin);
this.socket.emit('space:update-awareness', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId: record.docId,
awarenessUpdate: encodedUpdate,
});
}
})().catch(err => console.error('awareness upload failed', err));
}
};
Expand Down
23 changes: 19 additions & 4 deletions packages/common/nbstore/src/impls/cloud/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,31 @@ import {
} from '@affine/graphql';

import { DummyConnection } from '../../connection';
import { type BlobRecord, BlobStorage } from '../../storage';
import {
type BlobRecord,
BlobStorageBase,
type BlobStorageOptions,
} from '../../storage';

interface CloudBlobStorageOptions extends BlobStorageOptions {
apiBaseUrl: string;
}

export class CloudBlobStorage extends BlobStorage {
private readonly gql = gqlFetcherFactory(this.options.peer + '/graphql');
export class CloudBlobStorage extends BlobStorageBase<CloudBlobStorageOptions> {
private readonly gql = gqlFetcherFactory(
this.options.apiBaseUrl + '/graphql'
);
override connection = new DummyConnection();

override async get(key: string) {
const res = await fetch(
this.options.peer + '/api/workspaces/' + this.spaceId + '/blobs/' + key,
this.options.apiBaseUrl +
'/api/workspaces/' +
this.spaceId +
'/blobs/' +
key,
{
cache: 'default',
headers: {
'x-affine-version': BUILD_CONFIG.appVersion,
},
Expand Down
Loading

0 comments on commit 69a880b

Please sign in to comment.