-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(nbstore): add awareness frontend
- Loading branch information
Showing
13 changed files
with
306 additions
and
30 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
import { nanoid } from 'nanoid'; | ||
import { | ||
applyAwarenessUpdate, | ||
type Awareness, | ||
encodeAwarenessUpdate, | ||
} from 'y-protocols/awareness.js'; | ||
|
||
import type { AwarenessRecord } from '../storage/awareness'; | ||
import type { AwarenessSync } from '../sync/awareness'; | ||
|
||
type AwarenessChanges = Record<'added' | 'updated' | 'removed', number[]>; | ||
|
||
export class AwarenessFrontend { | ||
constructor(private readonly sync: AwarenessSync) {} | ||
|
||
connect(awareness: Awareness) { | ||
const uniqueId = nanoid(); | ||
const handleAwarenessUpdate = ( | ||
changes: AwarenessChanges, | ||
origin: string | ||
) => { | ||
if (origin === uniqueId) { | ||
return; | ||
} | ||
const changedClients = Object.values(changes).reduce((res, cur) => | ||
res.concat(cur) | ||
); | ||
|
||
const update = encodeAwarenessUpdate(awareness, changedClients); | ||
|
||
this.sync | ||
.update( | ||
{ | ||
docId: awareness.doc.guid, | ||
bin: update, | ||
}, | ||
uniqueId | ||
) | ||
.catch(error => { | ||
console.error('update awareness error', error); | ||
}); | ||
}; | ||
|
||
awareness.on('update', handleAwarenessUpdate); | ||
const handleSyncUpdate = (update: AwarenessRecord, origin?: string) => { | ||
if (origin === uniqueId) { | ||
// skip self update | ||
return; | ||
} | ||
|
||
applyAwarenessUpdate(awareness, update.bin, origin); | ||
}; | ||
const handleSyncCollect = () => { | ||
return { | ||
docId: awareness.doc.guid, | ||
bin: encodeAwarenessUpdate(awareness, [awareness.clientID]), | ||
}; | ||
}; | ||
const unsubscribe = this.sync.subscribeUpdate( | ||
awareness.doc.guid, | ||
handleSyncUpdate, | ||
handleSyncCollect | ||
); | ||
|
||
return () => { | ||
awareness.off('update', handleAwarenessUpdate); | ||
unsubscribe(); | ||
}; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,43 +1,129 @@ | ||
import { nanoid } from 'nanoid'; | ||
|
||
import { share } from '../../connection'; | ||
import { | ||
type AwarenessRecord, | ||
AwarenessStorage, | ||
} from '../../storage/awareness'; | ||
import { IDBConnection } from './db'; | ||
|
||
type ChannelMessage = | ||
| { | ||
type: 'awareness-update'; | ||
docId: string; | ||
bin: Uint8Array; | ||
origin?: string; | ||
} | ||
| { | ||
type: 'awareness-collect'; | ||
docId: string; | ||
collectId: string; | ||
} | ||
| { | ||
type: 'awareness-collect-fallback'; | ||
docId: string; | ||
bin: Uint8Array; | ||
collectId: string; | ||
}; | ||
|
||
export class IndexedDBAwarenessStorage extends AwarenessStorage { | ||
override readonly storageType = 'awareness'; | ||
override readonly connection = share(new IDBConnection(this.options)); | ||
get channel() { | ||
return this.connection.inner.channel; | ||
} | ||
|
||
private readonly subscriptions = new Map< | ||
string, | ||
Set<(update: AwarenessRecord, origin?: string) => void> | ||
Set<{ | ||
onUpdate: (update: AwarenessRecord, origin?: string) => void; | ||
onCollect: () => AwarenessRecord; | ||
}> | ||
>(); | ||
|
||
private readonly cached = new Map<string, AwarenessRecord>(); | ||
|
||
override update(record: AwarenessRecord, origin?: string): Promise<void> { | ||
const subscribers = this.subscriptions.get(record.docId); | ||
if (subscribers) { | ||
subscribers.forEach(callback => callback(record, origin)); | ||
subscribers.forEach(subscriber => subscriber.onUpdate(record, origin)); | ||
} | ||
this.cached.set(record.docId, record); | ||
this.channel.postMessage({ | ||
type: 'awareness-update', | ||
docId: record.docId, | ||
bin: record.bin, | ||
origin, | ||
} satisfies ChannelMessage); | ||
return Promise.resolve(); | ||
} | ||
|
||
override subscribeUpdate( | ||
id: string, | ||
callback: (update: AwarenessRecord, origin?: string) => void | ||
onUpdate: (update: AwarenessRecord, origin?: string) => void, | ||
onCollect: () => AwarenessRecord | ||
): () => void { | ||
const subscribers = this.subscriptions.get(id) ?? new Set(); | ||
subscribers.add(callback); | ||
subscribers.forEach(subscriber => { | ||
const fallback = subscriber.onCollect(); | ||
onUpdate(fallback); | ||
}); | ||
|
||
const collectUniqueId = nanoid(); | ||
|
||
const onChannelMessage = (message: MessageEvent<ChannelMessage>) => { | ||
if ( | ||
message.data.type === 'awareness-update' && | ||
message.data.docId === id | ||
) { | ||
onUpdate( | ||
{ | ||
docId: message.data.docId, | ||
bin: message.data.bin, | ||
}, | ||
message.data.origin | ||
); | ||
} | ||
if ( | ||
message.data.type === 'awareness-collect' && | ||
message.data.docId === id | ||
) { | ||
const fallback = onCollect(); | ||
if (fallback) { | ||
this.channel.postMessage({ | ||
type: 'awareness-collect-fallback', | ||
docId: message.data.docId, | ||
bin: fallback.bin, | ||
collectId: collectUniqueId, | ||
} satisfies ChannelMessage); | ||
} | ||
} | ||
if ( | ||
message.data.type === 'awareness-collect-fallback' && | ||
message.data.docId === id && | ||
message.data.collectId === collectUniqueId | ||
) { | ||
onUpdate({ | ||
docId: message.data.docId, | ||
bin: message.data.bin, | ||
}); | ||
} | ||
}; | ||
|
||
this.channel.addEventListener('message', onChannelMessage); | ||
this.channel.postMessage({ | ||
type: 'awareness-collect', | ||
docId: id, | ||
collectId: collectUniqueId, | ||
} satisfies ChannelMessage); | ||
|
||
const subscriber = { | ||
onUpdate, | ||
onCollect, | ||
}; | ||
subscribers.add(subscriber); | ||
this.subscriptions.set(id, subscribers); | ||
const cached = this.cached.get(id); | ||
if (cached) { | ||
callback(cached); | ||
} | ||
|
||
return () => { | ||
subscribers.delete(callback); | ||
subscribers.delete(subscriber); | ||
this.channel.removeEventListener('message', onChannelMessage); | ||
}; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.