Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nbstore): add idb implementation #8809

Open
wants to merge 1 commit into
base: 61/adapt-op
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion packages/common/nbstore/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@
"private": true,
"sideEffects": false,
"exports": {
".": "./src/index.ts"
".": "./src/index.ts",
"./idb": "./src/impls/idb/index.ts",
"./idb/v1": "./src/impls/idb/v1/index.ts"
},
"dependencies": {
"@toeverything/infra": "workspace:*",
"eventemitter2": "^6.4.9",
"lodash-es": "^4.17.21",
"rxjs": "^7.8.1",
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
},
"devDependencies": {
"idb": "^8.0.0"
},
"peerDependencies": {
"idb": "^8.0.0"
}
}
94 changes: 94 additions & 0 deletions packages/common/nbstore/src/impls/idb/blob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { share } from '../../connection';
import {
type BlobRecord,
BlobStorage,
type BlobStorageOptions,
type ListedBlobRecord,
} from '../../storage';
import { IDBConnection } from './db';

export interface IndexedDBBlobStorageOptions extends BlobStorageOptions {
dbName: string;
}

export class IndexedDBBlobStorage extends BlobStorage<IndexedDBBlobStorageOptions> {
readonly connection = share(new IDBConnection(this.options.dbName));

get db() {
return this.connection.inner;
}

override async get(key: string) {
const trx = this.db.transaction(['blobs', 'blobData'], 'readonly');
const blob = await trx.objectStore('blobs').get(key);
const data = await trx.objectStore('blobData').get(key);

if (!blob || blob.deletedAt || !data) {
return null;
}

return {
...blob,
data: data.data,
};
}

override async set(blob: BlobRecord) {
const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite');
await trx.objectStore('blobs').put({
key: blob.key,
mime: blob.mime,
size: blob.data.byteLength,
createdAt: new Date(),
deletedAt: null,
});
await trx.objectStore('blobData').put({
key: blob.key,
data: blob.data,
});
}

override async delete(key: string, permanently: boolean) {
if (permanently) {
const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite');
await trx.objectStore('blobs').delete(key);
await trx.objectStore('blobData').delete(key);
} else {
const trx = this.db.transaction('blobs', 'readwrite');
const blob = await trx.store.get(key);
if (blob) {
await trx.store.put({
...blob,
deletedAt: new Date(),
});
}
}
}

override async release() {
const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite');

const it = trx.objectStore('blobs').iterate();

for await (const item of it) {
if (item.value.deletedAt) {
await item.delete();
await trx.objectStore('blobData').delete(item.value.key);
}
}
}

override async list() {
const trx = this.db.transaction('blobs', 'readonly');
const it = trx.store.iterate();

const blobs: ListedBlobRecord[] = [];
for await (const item of it) {
if (!item.value.deletedAt) {
blobs.push(item.value);
}
}

return blobs;
}
}
40 changes: 40 additions & 0 deletions packages/common/nbstore/src/impls/idb/db.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { type IDBPDatabase, openDB } from 'idb';

import { Connection } from '../../connection';
import { type DocStorageSchema, migrator } from './schema';

export class IDBConnection extends Connection<IDBPDatabase<DocStorageSchema>> {
override get shareId() {
return `idb(${migrator.version}):${this.dbName}`;
}

constructor(private readonly dbName: string) {
super();
}

override async doConnect() {
return openDB<DocStorageSchema>(this.dbName, migrator.version, {
upgrade: migrator.migrate,
blocking: () => {
// if, for example, an tab with newer version is opened, this function will be called.
// we should close current connection to allow the new version to upgrade the db.
this.close(
new Error('Blocking a new version. Closing the connection.')
);
},
blocked: () => {
// fallback to retry auto retry
this.setStatus('error', new Error('Blocked by other tabs.'));
},
});
}

override async doDisconnect() {
this.close();
}

private close(error?: Error) {
this.maybeConnection?.close();
this.setStatus('closed', error);
}
}
127 changes: 127 additions & 0 deletions packages/common/nbstore/src/impls/idb/doc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import { share } from '../../connection';
import {
type DocClocks,
type DocRecord,
DocStorage,
type DocStorageOptions,
type DocUpdate,
} from '../../storage';
import { IDBConnection } from './db';

export interface IndexedDBDocStorageOptions extends DocStorageOptions {
dbName: string;
}

export class IndexedDBDocStorage extends DocStorage<IndexedDBDocStorageOptions> {
readonly connection = share(new IDBConnection(this.options.dbName));

get db() {
return this.connection.inner;
}

get name() {
return 'idb';
}

override async pushDocUpdate(update: DocUpdate) {
const trx = this.db.transaction(['updates', 'clocks'], 'readwrite');
const timestamp = new Date();
await trx.objectStore('updates').add({
...update,
createdAt: timestamp,
});

await trx.objectStore('clocks').put({ docId: update.docId, timestamp });

return { docId: update.docId, timestamp };
}

protected override async getDocSnapshot(docId: string) {
const trx = this.db.transaction('snapshots', 'readonly');
const record = await trx.store.get(docId);

if (!record) {
return null;
}

return {
docId,
bin: record.bin,
timestamp: record.updatedAt,
};
}

override async deleteDoc(docId: string) {
const trx = this.db.transaction(
['snapshots', 'updates', 'clocks'],
'readwrite'
);

const idx = trx.objectStore('updates').index('docId');
const iter = idx.iterate(IDBKeyRange.only(docId));

for await (const { value } of iter) {
await trx.objectStore('updates').delete([value.docId, value.createdAt]);
}

await trx.objectStore('snapshots').delete(docId);
await trx.objectStore('clocks').delete(docId);
}

override async getDocTimestamps(after: Date = new Date(0)) {
const trx = this.db.transaction('clocks', 'readonly');

const clocks = await trx.store.getAll();

return clocks.reduce((ret, cur) => {
if (cur.timestamp > after) {
ret[cur.docId] = cur.timestamp;
}
return ret;
}, {} as DocClocks);
}

protected override async setDocSnapshot(
snapshot: DocRecord
): Promise<boolean> {
const trx = this.db.transaction('snapshots', 'readwrite');
const record = await trx.store.get(snapshot.docId);

if (!record || record.updatedAt < snapshot.timestamp) {
await trx.store.put({
docId: snapshot.docId,
bin: snapshot.bin,
createdAt: record?.createdAt ?? snapshot.timestamp,
updatedAt: snapshot.timestamp,
});
}

trx.commit();
return true;
}

protected override async getDocUpdates(docId: string): Promise<DocRecord[]> {
const trx = this.db.transaction('updates', 'readonly');
const updates = await trx.store.index('docId').getAll(docId);

return updates.map(update => ({
docId,
bin: update.bin,
timestamp: update.createdAt,
}));
}

protected override async markUpdatesMerged(
docId: string,
updates: DocRecord[]
): Promise<number> {
const trx = this.db.transaction('updates', 'readwrite');

await Promise.all(
updates.map(update => trx.store.delete([docId, update.timestamp]))
);

trx.commit();
return updates.length;
}
}
3 changes: 3 additions & 0 deletions packages/common/nbstore/src/impls/idb/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './blob';
export * from './doc';
export * from './sync';
Loading
Loading