Skip to content

Commit

Permalink
feat(nbstore): add doc sync state
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Dec 17, 2024
1 parent 971daf9 commit f6cc73f
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 7 deletions.
45 changes: 42 additions & 3 deletions packages/common/nbstore/src/sync/doc/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,55 @@
import type { Observable } from 'rxjs';
import { combineLatest, map } from 'rxjs';

import type { DocStorage, SyncStorage } from '../../storage';
import { DocSyncPeer } from './peer';

export interface DocSyncState {
total: number;
syncing: number;
retrying: boolean;
errorMessage: string | null;
}

export interface DocSyncDocState {
syncing: boolean;
retrying: boolean;
errorMessage: string | null;
}

export class DocSync {
private readonly peers: DocSyncPeer[];
private readonly peers: DocSyncPeer[] = this.remotes.map(
remote => new DocSyncPeer(this.local, this.sync, remote)
);
private abort: AbortController | null = null;

readonly state$: Observable<DocSyncState> = combineLatest(
this.peers.map(peer => peer.peerState$)
).pipe(
map(allPeers => ({
total: allPeers.reduce((acc, peer) => acc + peer.total, 0),
syncing: allPeers.reduce((acc, peer) => acc + peer.syncing, 0),
retrying: allPeers.some(peer => peer.retrying),
errorMessage:
allPeers.find(peer => peer.errorMessage)?.errorMessage ?? null,
}))
);

constructor(
readonly local: DocStorage,
readonly sync: SyncStorage,
readonly remotes: DocStorage[]
) {
this.peers = remotes.map(remote => new DocSyncPeer(local, sync, remote));
) {}

docState$(docId: string): Observable<DocSyncDocState> {
return combineLatest(this.peers.map(peer => peer.docState$(docId))).pipe(
map(allPeers => ({
errorMessage:
allPeers.find(peer => peer.errorMessage)?.errorMessage ?? null,
retrying: allPeers.some(peer => peer.retrying),
syncing: allPeers.some(peer => peer.syncing),
}))
);
}

start() {
Expand Down
59 changes: 58 additions & 1 deletion packages/common/nbstore/src/sync/doc/peer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { remove } from 'lodash-es';
import { nanoid } from 'nanoid';
import { Subject } from 'rxjs';
import { Observable, Subject } from 'rxjs';
import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs';

import type { DocStorage, SyncStorage } from '../../storage';
Expand Down Expand Up @@ -46,6 +46,19 @@ interface Status {
errorMessage: string | null;
}

interface PeerState {
total: number;
syncing: number;
retrying: boolean;
errorMessage: string | null;
}

interface PeerDocState {
syncing: boolean;
retrying: boolean;
errorMessage: string | null;
}

interface DocSyncPeerOptions {
mergeUpdates?: (updates: Uint8Array[]) => Promise<Uint8Array> | Uint8Array;
}
Expand Down Expand Up @@ -101,6 +114,50 @@ export class DocSyncPeer {
};
private readonly statusUpdatedSubject$ = new Subject<string | true>();

peerState$ = new Observable<PeerState>(subscribe => {
const next = () => {
if (!this.status.syncing) {
// if syncing = false, jobMap is empty
subscribe.next({
total: this.status.docs.size,
syncing: this.status.docs.size,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
} else {
const syncing = this.status.jobMap.size;
subscribe.next({
total: this.status.docs.size,
syncing: syncing,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
}
};
next();
return this.statusUpdatedSubject$.subscribe(() => {
next();
});
});

docState$(docId: string) {
return new Observable<PeerDocState>(subscribe => {
const next = () => {
subscribe.next({
syncing:
!this.status.connectedDocs.has(docId) ||
this.status.jobMap.has(docId),
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
});
};
next();
return this.statusUpdatedSubject$.subscribe(updatedId => {
if (updatedId === true || updatedId === docId) next();
});
});
}

private readonly jobs = createJobErrorCatcher({
connect: async (docId: string, signal?: AbortSignal) => {
const pushedClock =
Expand Down
30 changes: 27 additions & 3 deletions packages/common/nbstore/src/sync/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
import { combineLatest, map, type Observable, of } from 'rxjs';

import type { BlobStorage, DocStorage, SpaceStorage } from '../storage';
import type { AwarenessStorage } from '../storage/awareness';
import { AwarenessSync } from './awareness';
import { BlobSync } from './blob';
import { DocSync } from './doc';
import { DocSync, type DocSyncState } from './doc';

export interface SyncState {
doc?: DocSyncState;
}

export class Sync {
private readonly doc: DocSync | null;
private readonly blob: BlobSync | null;
readonly doc: DocSync | null;
readonly blob: BlobSync | null;
readonly awareness: AwarenessSync | null;

readonly state$: Observable<SyncState>;

constructor(
readonly local: SpaceStorage,
Expand All @@ -13,6 +24,7 @@ export class Sync {
const doc = local.tryGet('doc');
const blob = local.tryGet('blob');
const sync = local.tryGet('sync');
const awareness = local.tryGet('awareness');

this.doc =
doc && sync
Expand All @@ -32,6 +44,18 @@ export class Sync {
.filter((v): v is BlobStorage => !!v)
)
: null;
this.awareness = awareness
? new AwarenessSync(
awareness,
peers
.map(peer => peer.tryGet('awareness'))
.filter((v): v is AwarenessStorage => !!v)
)
: null;

this.state$ = combineLatest([this.doc?.state$ ?? of(undefined)]).pipe(
map(([doc]) => ({ doc }))
);
}

start() {
Expand Down

0 comments on commit f6cc73f

Please sign in to comment.