diff --git a/packages/common/nbstore/src/sync/doc/index.ts b/packages/common/nbstore/src/sync/doc/index.ts index 0585556b231af..0728487bda05a 100644 --- a/packages/common/nbstore/src/sync/doc/index.ts +++ b/packages/common/nbstore/src/sync/doc/index.ts @@ -1,16 +1,55 @@ +import type { Observable } from 'rxjs'; +import { combineLatest, map } from 'rxjs'; + import type { DocStorage, SyncStorage } from '../../storage'; import { DocSyncPeer } from './peer'; +export interface DocSyncState { + total: number; + syncing: number; + retrying: boolean; + errorMessage: string | null; +} + +export interface DocSyncDocState { + syncing: boolean; + retrying: boolean; + errorMessage: string | null; +} + export class DocSync { - private readonly peers: DocSyncPeer[]; + private readonly peers: DocSyncPeer[] = this.remotes.map( + remote => new DocSyncPeer(this.local, this.sync, remote) + ); private abort: AbortController | null = null; + readonly state$: Observable = combineLatest( + this.peers.map(peer => peer.peerState$) + ).pipe( + map(allPeers => ({ + total: allPeers.reduce((acc, peer) => acc + peer.total, 0), + syncing: allPeers.reduce((acc, peer) => acc + peer.syncing, 0), + retrying: allPeers.some(peer => peer.retrying), + errorMessage: + allPeers.find(peer => peer.errorMessage)?.errorMessage ?? null, + })) + ); + constructor( readonly local: DocStorage, readonly sync: SyncStorage, readonly remotes: DocStorage[] - ) { - this.peers = remotes.map(remote => new DocSyncPeer(local, sync, remote)); + ) {} + + docState$(docId: string): Observable { + return combineLatest(this.peers.map(peer => peer.docState$(docId))).pipe( + map(allPeers => ({ + errorMessage: + allPeers.find(peer => peer.errorMessage)?.errorMessage ?? null, + retrying: allPeers.some(peer => peer.retrying), + syncing: allPeers.some(peer => peer.syncing), + })) + ); } start() { diff --git a/packages/common/nbstore/src/sync/doc/peer.ts b/packages/common/nbstore/src/sync/doc/peer.ts index 688026f28bf1e..b11d65cdb6583 100644 --- a/packages/common/nbstore/src/sync/doc/peer.ts +++ b/packages/common/nbstore/src/sync/doc/peer.ts @@ -1,6 +1,6 @@ import { remove } from 'lodash-es'; import { nanoid } from 'nanoid'; -import { Subject } from 'rxjs'; +import { Observable, Subject } from 'rxjs'; import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs'; import type { DocStorage, SyncStorage } from '../../storage'; @@ -46,6 +46,19 @@ interface Status { errorMessage: string | null; } +interface PeerState { + total: number; + syncing: number; + retrying: boolean; + errorMessage: string | null; +} + +interface PeerDocState { + syncing: boolean; + retrying: boolean; + errorMessage: string | null; +} + interface DocSyncPeerOptions { mergeUpdates?: (updates: Uint8Array[]) => Promise | Uint8Array; } @@ -101,6 +114,50 @@ export class DocSyncPeer { }; private readonly statusUpdatedSubject$ = new Subject(); + peerState$ = new Observable(subscribe => { + const next = () => { + if (!this.status.syncing) { + // if syncing = false, jobMap is empty + subscribe.next({ + total: this.status.docs.size, + syncing: this.status.docs.size, + retrying: this.status.retrying, + errorMessage: this.status.errorMessage, + }); + } else { + const syncing = this.status.jobMap.size; + subscribe.next({ + total: this.status.docs.size, + syncing: syncing, + retrying: this.status.retrying, + errorMessage: this.status.errorMessage, + }); + } + }; + next(); + return this.statusUpdatedSubject$.subscribe(() => { + next(); + }); + }); + + docState$(docId: string) { + return new Observable(subscribe => { + const next = () => { + subscribe.next({ + syncing: + !this.status.connectedDocs.has(docId) || + this.status.jobMap.has(docId), + retrying: this.status.retrying, + errorMessage: this.status.errorMessage, + }); + }; + next(); + return this.statusUpdatedSubject$.subscribe(updatedId => { + if (updatedId === true || updatedId === docId) next(); + }); + }); + } + private readonly jobs = createJobErrorCatcher({ connect: async (docId: string, signal?: AbortSignal) => { const pushedClock = diff --git a/packages/common/nbstore/src/sync/index.ts b/packages/common/nbstore/src/sync/index.ts index 58bfea2700273..00c76dbb52026 100644 --- a/packages/common/nbstore/src/sync/index.ts +++ b/packages/common/nbstore/src/sync/index.ts @@ -1,10 +1,21 @@ +import { combineLatest, map, type Observable, of } from 'rxjs'; + import type { BlobStorage, DocStorage, SpaceStorage } from '../storage'; +import type { AwarenessStorage } from '../storage/awareness'; +import { AwarenessSync } from './awareness'; import { BlobSync } from './blob'; -import { DocSync } from './doc'; +import { DocSync, type DocSyncState } from './doc'; + +export interface SyncState { + doc?: DocSyncState; +} export class Sync { - private readonly doc: DocSync | null; - private readonly blob: BlobSync | null; + readonly doc: DocSync | null; + readonly blob: BlobSync | null; + readonly awareness: AwarenessSync | null; + + readonly state$: Observable; constructor( readonly local: SpaceStorage, @@ -13,6 +24,7 @@ export class Sync { const doc = local.tryGet('doc'); const blob = local.tryGet('blob'); const sync = local.tryGet('sync'); + const awareness = local.tryGet('awareness'); this.doc = doc && sync @@ -32,6 +44,18 @@ export class Sync { .filter((v): v is BlobStorage => !!v) ) : null; + this.awareness = awareness + ? new AwarenessSync( + awareness, + peers + .map(peer => peer.tryGet('awareness')) + .filter((v): v is AwarenessStorage => !!v) + ) + : null; + + this.state$ = combineLatest([this.doc?.state$ ?? of(undefined)]).pipe( + map(([doc]) => ({ doc })) + ); } start() {