Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nbstore): add doc sync state #9131

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/common/nbstore/src/storage/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import EventEmitter2 from 'eventemitter2';

import type { ConnectionStatus } from '../connection';
import type { AwarenessStorage } from './awareness';
import type { BlobStorage } from './blob';
import type { DocStorage } from './doc';
import type { Storage, StorageType } from './storage';
import type { SyncStorage } from './sync';

type Storages = DocStorage | BlobStorage | SyncStorage;
type Storages = DocStorage | BlobStorage | SyncStorage | AwarenessStorage;

export class SpaceStorage {
protected readonly storages: Map<StorageType, Storage> = new Map();
Expand Down
45 changes: 42 additions & 3 deletions packages/common/nbstore/src/sync/doc/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,55 @@
import type { Observable } from 'rxjs';
import { combineLatest, map } from 'rxjs';

import type { DocStorage, SyncStorage } from '../../storage';
import { DocSyncPeer } from './peer';

export interface DocSyncState {
total: number;
syncing: number;
retrying: boolean;
errorMessage: string | null;
}

export interface DocSyncDocState {
syncing: boolean;
retrying: boolean;
errorMessage: string | null;
}

export class DocSync {
private readonly peers: DocSyncPeer[];
private readonly peers: DocSyncPeer[] = this.remotes.map(
remote => new DocSyncPeer(this.local, this.sync, remote)
);
private abort: AbortController | null = null;

readonly state$: Observable<DocSyncState> = combineLatest(
this.peers.map(peer => peer.peerState$)
).pipe(
map(allPeers => ({
total: allPeers.reduce((acc, peer) => acc + peer.total, 0),
syncing: allPeers.reduce((acc, peer) => acc + peer.syncing, 0),
retrying: allPeers.some(peer => peer.retrying),
errorMessage:
allPeers.find(peer => peer.errorMessage)?.errorMessage ?? null,
}))
);

constructor(
readonly local: DocStorage,
readonly sync: SyncStorage,
readonly remotes: DocStorage[]
) {
this.peers = remotes.map(remote => new DocSyncPeer(local, sync, remote));
) {}

docState$(docId: string): Observable<DocSyncDocState> {
return combineLatest(this.peers.map(peer => peer.docState$(docId))).pipe(
map(allPeers => ({
errorMessage:
allPeers.find(peer => peer.errorMessage)?.errorMessage ?? null,
retrying: allPeers.some(peer => peer.retrying),
syncing: allPeers.some(peer => peer.syncing),
}))
);
}

start() {
Expand Down
59 changes: 58 additions & 1 deletion packages/common/nbstore/src/sync/doc/peer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { remove } from 'lodash-es';
import { nanoid } from 'nanoid';
import { Subject } from 'rxjs';
import { Observable, Subject } from 'rxjs';
import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs';

import type { DocStorage, SyncStorage } from '../../storage';
Expand Down Expand Up @@ -46,6 +46,19 @@ interface Status {
errorMessage: string | null;
}

interface PeerState {
total: number;
syncing: number;
retrying: boolean;
errorMessage: string | null;
}

interface PeerDocState {
syncing: boolean;
retrying: boolean;
errorMessage: string | null;
}

interface DocSyncPeerOptions {
mergeUpdates?: (updates: Uint8Array[]) => Promise<Uint8Array> | Uint8Array;
}
Expand Down Expand Up @@ -101,6 +114,50 @@ export class DocSyncPeer {
};
private readonly statusUpdatedSubject$ = new Subject<string | true>();

peerState$ = new Observable<PeerState>(subscribe => {
const next = () => {
if (!this.status.syncing) {
// if syncing = false, jobMap is empty
subscribe.next({
total: this.status.docs.size,
syncing: this.status.docs.size,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
} else {
const syncing = this.status.jobMap.size;
subscribe.next({
total: this.status.docs.size,
syncing: syncing,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
}
};
next();
return this.statusUpdatedSubject$.subscribe(() => {
next();
});
});

docState$(docId: string) {
return new Observable<PeerDocState>(subscribe => {
const next = () => {
subscribe.next({
syncing:
!this.status.connectedDocs.has(docId) ||
this.status.jobMap.has(docId),
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
};
next();
return this.statusUpdatedSubject$.subscribe(updatedId => {
if (updatedId === true || updatedId === docId) next();
});
});
}

private readonly jobs = createJobErrorCatcher({
connect: async (docId: string, signal?: AbortSignal) => {
const pushedClock =
Expand Down
30 changes: 27 additions & 3 deletions packages/common/nbstore/src/sync/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
import { combineLatest, map, type Observable, of } from 'rxjs';

import type { BlobStorage, DocStorage, SpaceStorage } from '../storage';
import type { AwarenessStorage } from '../storage/awareness';
import { AwarenessSync } from './awareness';
import { BlobSync } from './blob';
import { DocSync } from './doc';
import { DocSync, type DocSyncState } from './doc';

export interface SyncState {
doc?: DocSyncState;
}

export class Sync {
private readonly doc: DocSync | null;
private readonly blob: BlobSync | null;
readonly doc: DocSync | null;
readonly blob: BlobSync | null;
readonly awareness: AwarenessSync | null;

readonly state$: Observable<SyncState>;

constructor(
readonly local: SpaceStorage,
Expand All @@ -13,6 +24,7 @@ export class Sync {
const doc = local.tryGet('doc');
const blob = local.tryGet('blob');
const sync = local.tryGet('sync');
const awareness = local.tryGet('awareness');

this.doc =
doc && sync
Expand All @@ -32,6 +44,18 @@ export class Sync {
.filter((v): v is BlobStorage => !!v)
)
: null;
this.awareness = awareness
? new AwarenessSync(
awareness,
peers
.map(peer => peer.tryGet('awareness'))
.filter((v): v is AwarenessStorage => !!v)
)
: null;

this.state$ = combineLatest([this.doc?.state$ ?? of(undefined)]).pipe(
map(([doc]) => ({ doc }))
);
}

start() {
Expand Down
Loading