Skip to content

Commit

Permalink
feat(nbstore): blob sync frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Dec 11, 2024
1 parent 331e674 commit f0d39d5
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 3 deletions.
23 changes: 23 additions & 0 deletions packages/common/nbstore/src/frontend/blob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type { BlobRecord, BlobStorage } from '../storage';
import type { BlobSyncEngine } from '../sync/blob';

export class BlobFrontend {
constructor(
readonly storage: BlobStorage,
readonly sync?: BlobSyncEngine
) {}

get(blobId: string) {
return this.sync
? this.sync.downloadBlob(blobId)
: this.storage.get(blobId);
}

set(blob: BlobRecord) {
return this.sync ? this.sync.uploadBlob(blob) : this.storage.set(blob);
}

addPriority(id: string, priority: number) {
return this.sync?.addPriority(id, priority);
}
}
4 changes: 2 additions & 2 deletions packages/common/nbstore/src/storage/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ export interface BlobRecord {
key: string;
data: Uint8Array;
mime: string;
createdAt: Date;
createdAt?: Date;
}

export interface ListedBlobRecord {
key: string;
mime: string;
size: number;
createdAt: Date;
createdAt?: Date;
}

export abstract class BlobStorage<
Expand Down
30 changes: 29 additions & 1 deletion packages/common/nbstore/src/sync/blob/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { difference } from 'lodash-es';

import type { BlobStorage } from '../../storage';
import type { BlobRecord, BlobStorage } from '../../storage';
import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted';

export class BlobSyncEngine {
Expand All @@ -11,6 +11,29 @@ export class BlobSyncEngine {
readonly remotes: BlobStorage[]
) {}

async downloadBlob(blobId: string, signal?: AbortSignal) {
const localBlob = await this.local.get(blobId, signal);
if (localBlob) {
return localBlob;
}

for (const storage of this.remotes) {
const data = await storage.get(blobId, signal);
if (data) {
await this.local.set(data, signal);
return data;
}
}
return null;
}

async uploadBlob(blob: BlobRecord, signal?: AbortSignal) {
await this.local.set(blob);
await Promise.allSettled(
this.remotes.map(remote => remote.set(blob, signal))
);
}

private async sync(signal?: AbortSignal) {
throwIfAborted(signal);

Expand Down Expand Up @@ -94,4 +117,9 @@ export class BlobSyncEngine {
this.abort?.abort();
this.abort = null;
}

addPriority(_id: string, _priority: number): () => void {
// TODO: implement
return () => {};
}
}

0 comments on commit f0d39d5

Please sign in to comment.