From 6b0ca77b28315349b39cca1ec8a63f929df07a4c Mon Sep 17 00:00:00 2001 From: Daniel La Rocque Date: Wed, 14 Aug 2024 13:23:46 -0500 Subject: [PATCH] Migrate from the Node to the Web ReadableStream (#8410) --- .changeset/itchy-boxes-try.md | 6 +++ common/api-review/storage.api.md | 2 +- docs-devsite/storage.md | 4 +- packages/storage/src/api.browser.ts | 2 +- packages/storage/src/api.node.ts | 2 +- .../storage/src/implementation/connection.ts | 3 +- .../src/platform/browser/connection.ts | 2 +- packages/storage/src/platform/connection.ts | 2 +- .../storage/src/platform/node/connection.ts | 20 +++++----- packages/storage/src/reference.ts | 33 ++++++++--------- packages/storage/test/node/stream.test.ts | 37 +++++++++++++------ 11 files changed, 66 insertions(+), 47 deletions(-) create mode 100644 .changeset/itchy-boxes-try.md diff --git a/.changeset/itchy-boxes-try.md b/.changeset/itchy-boxes-try.md new file mode 100644 index 00000000000..3a5453fbb1a --- /dev/null +++ b/.changeset/itchy-boxes-try.md @@ -0,0 +1,6 @@ +--- +'firebase': minor +'@firebase/storage': minor +--- + +Migrate from the Node to Web ReadableStream interface diff --git a/common/api-review/storage.api.md b/common/api-review/storage.api.md index 2ed774c8bb0..4964aa40af7 100644 --- a/common/api-review/storage.api.md +++ b/common/api-review/storage.api.md @@ -132,7 +132,7 @@ export function getMetadata(ref: StorageReference): Promise; export function getStorage(app?: FirebaseApp, bucketUrl?: string): FirebaseStorage; // @public -export function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): NodeJS.ReadableStream; +export function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): ReadableStream; // @internal (undocumented) export function _invalidArgument(message: string): StorageError; diff --git a/docs-devsite/storage.md b/docs-devsite/storage.md index e929246b5b9..76d6f6887d3 100644 --- a/docs-devsite/storage.md +++ b/docs-devsite/storage.md @@ -279,7 +279,7 @@ This API is only available in Node. Signature: ```typescript -export declare function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): NodeJS.ReadableStream; +export declare function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): ReadableStream; ``` #### Parameters @@ -291,7 +291,7 @@ export declare function getStream(ref: StorageReference, maxDownloadSizeBytes?: Returns: -NodeJS.ReadableStream +ReadableStream A stream with the object's data as bytes diff --git a/packages/storage/src/api.browser.ts b/packages/storage/src/api.browser.ts index 0ccf31bbdfb..3cf653360f5 100644 --- a/packages/storage/src/api.browser.ts +++ b/packages/storage/src/api.browser.ts @@ -58,6 +58,6 @@ export function getBlob( export function getStream( ref: StorageReference, maxDownloadSizeBytes?: number -): NodeJS.ReadableStream { +): ReadableStream { throw new Error('getStream() is only supported by NodeJS builds'); } diff --git a/packages/storage/src/api.node.ts b/packages/storage/src/api.node.ts index 790147d26fa..beeabc7d93f 100644 --- a/packages/storage/src/api.node.ts +++ b/packages/storage/src/api.node.ts @@ -58,7 +58,7 @@ export function getBlob( export function getStream( ref: StorageReference, maxDownloadSizeBytes?: number -): NodeJS.ReadableStream { +): ReadableStream { ref = getModularInstance(ref); return getStreamInternal(ref as Reference, maxDownloadSizeBytes); } diff --git a/packages/storage/src/implementation/connection.ts b/packages/storage/src/implementation/connection.ts index 19de2ec941d..80e29c9cd2f 100644 --- a/packages/storage/src/implementation/connection.ts +++ b/packages/storage/src/implementation/connection.ts @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - /** Network headers */ export type Headers = Record; @@ -23,7 +22,7 @@ export type ConnectionType = | string | ArrayBuffer | Blob - | NodeJS.ReadableStream; + | ReadableStream; /** * A lightweight wrapper around XMLHttpRequest with a diff --git a/packages/storage/src/platform/browser/connection.ts b/packages/storage/src/platform/browser/connection.ts index 442962dafe8..fdd9b496242 100644 --- a/packages/storage/src/platform/browser/connection.ts +++ b/packages/storage/src/platform/browser/connection.ts @@ -171,7 +171,7 @@ export function newBlobConnection(): Connection { return new XhrBlobConnection(); } -export function newStreamConnection(): Connection { +export function newStreamConnection(): Connection { throw new Error('Streams are only supported on Node'); } diff --git a/packages/storage/src/platform/connection.ts b/packages/storage/src/platform/connection.ts index a647c19cacf..8f02d3c8b50 100644 --- a/packages/storage/src/platform/connection.ts +++ b/packages/storage/src/platform/connection.ts @@ -45,7 +45,7 @@ export function newBlobConnection(): Connection { return nodeNewBlobConnection(); } -export function newStreamConnection(): Connection { +export function newStreamConnection(): Connection> { // This file is only used in Node.js tests using ts-node. return nodeNewStreamConnection(); } diff --git a/packages/storage/src/platform/node/connection.ts b/packages/storage/src/platform/node/connection.ts index f51fa6fdb5b..d7141e6ef68 100644 --- a/packages/storage/src/platform/node/connection.ts +++ b/packages/storage/src/platform/node/connection.ts @@ -50,7 +50,7 @@ abstract class FetchConnection async send( url: string, method: string, - body?: ArrayBufferView | Blob | string, + body?: NodeJS.ArrayBufferView | Blob | string, headers?: Record ): Promise { if (this.sent_) { @@ -62,7 +62,7 @@ abstract class FetchConnection const response = await this.fetch_(url, { method, headers: headers || {}, - body: body as ArrayBufferView | string + body: body as NodeJS.ArrayBufferView | string }); this.headers_ = response.headers; this.statusCode_ = response.status; @@ -146,13 +146,15 @@ export function newBytesConnection(): Connection { return new FetchBytesConnection(); } -export class FetchStreamConnection extends FetchConnection { - private stream_: NodeJS.ReadableStream | null = null; +export class FetchStreamConnection extends FetchConnection< + ReadableStream +> { + private stream_: ReadableStream | null = null; async send( url: string, method: string, - body?: ArrayBufferView | Blob | string, + body?: NodeJS.ArrayBufferView | Blob | string, headers?: Record ): Promise { if (this.sent_) { @@ -164,12 +166,12 @@ export class FetchStreamConnection extends FetchConnection; } catch (e) { this.errorText_ = (e as Error)?.message; // emulate XHR which sets status to 0 when encountering a network error @@ -178,7 +180,7 @@ export class FetchStreamConnection extends FetchConnection { +export function newStreamConnection(): Connection> { return new FetchStreamConnection(); } diff --git a/packages/storage/src/reference.ts b/packages/storage/src/reference.ts index 46249da8146..e00bceb2f7c 100644 --- a/packages/storage/src/reference.ts +++ b/packages/storage/src/reference.ts @@ -19,8 +19,6 @@ * @fileoverview Defines the Firebase StorageReference class. */ -import { PassThrough, Transform, TransformOptions } from 'stream'; - import { FbsBlob } from './implementation/blob'; import { Location } from './implementation/location'; import { getMappings } from './implementation/metadata'; @@ -48,6 +46,7 @@ import { newStreamConnection, newTextConnection } from './platform/connection'; +import { RequestInfo } from './implementation/requestinfo'; /** * Provides methods to interact with a bucket in the Firebase Storage service. @@ -203,42 +202,42 @@ export function getBlobInternal( export function getStreamInternal( ref: Reference, maxDownloadSizeBytes?: number -): NodeJS.ReadableStream { +): ReadableStream { ref._throwIfRoot('getStream'); - const requestInfo = getBytes( + const requestInfo: RequestInfo = getBytes( ref.storage, ref._location, maxDownloadSizeBytes ); - /** A transformer that passes through the first n bytes. */ - const newMaxSizeTransform: (n: number) => TransformOptions = n => { + // Transforms the stream so that only `maxDownloadSizeBytes` bytes are piped to the result + const newMaxSizeTransform = (n: number): Transformer => { let missingBytes = n; return { - transform(chunk, encoding, callback) { + transform(chunk, controller: TransformStreamDefaultController) { // GCS may not honor the Range header for small files if (chunk.length < missingBytes) { - this.push(chunk); + controller.enqueue(chunk); missingBytes -= chunk.length; } else { - this.push(chunk.slice(0, missingBytes)); - this.emit('end'); + controller.enqueue(chunk.slice(0, missingBytes)); + controller.terminate(); } - callback(); } - } as TransformOptions; + }; }; const result = maxDownloadSizeBytes !== undefined - ? new Transform(newMaxSizeTransform(maxDownloadSizeBytes)) - : new PassThrough(); + ? new TransformStream(newMaxSizeTransform(maxDownloadSizeBytes)) + : new TransformStream(); // The default transformer forwards all chunks to its readable side ref.storage .makeRequestWithTokens(requestInfo, newStreamConnection) - .then(stream => (stream as NodeJS.ReadableStream).pipe(result)) - .catch(e => result.destroy(e)); - return result; + .then(readableStream => readableStream.pipeThrough(result)) + .catch(err => result.writable.abort(err)); + + return result.readable; } /** diff --git a/packages/storage/test/node/stream.test.ts b/packages/storage/test/node/stream.test.ts index 65b2fee1498..5fde3099d4e 100644 --- a/packages/storage/test/node/stream.test.ts +++ b/packages/storage/test/node/stream.test.ts @@ -21,12 +21,27 @@ import { FirebaseApp, deleteApp } from '@firebase/app'; import { getStream, ref, uploadBytes } from '../../src/index.node'; import * as types from '../../src/public-types'; -async function readData(reader: NodeJS.ReadableStream): Promise { - return new Promise((resolve, reject) => { - const data: number[] = []; - reader.on('error', e => reject(e)); - reader.on('data', chunk => data.push(...Array.from(chunk as Buffer))); - reader.on('end', () => resolve(data)); +// See: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/getReader +async function readData(readableStream: ReadableStream): Promise { + return new Promise((resolve, reject) => { + const reader: ReadableStreamDefaultReader = readableStream.getReader(); + const result: any[] = []; + reader + .read() + .then(function processBytes({ done, value }): any { + if (done) { + resolve(new Uint8Array(result)); + return; + } + + result.push(...value); + return reader.read().then(processBytes); + }) + .catch(err => { + console.error(err); + reject(err); + return; + }); }); } @@ -46,19 +61,17 @@ describe('Firebase Storage > getStream', () => { it('can get stream', async () => { const reference = ref(storage, 'public/exp-bytes'); await uploadBytes(reference, new Uint8Array([0, 1, 3, 128, 255])); - const stream = await getStream(reference); + const stream = getStream(reference); const data = await readData(stream); - expect(new Uint8Array(data)).to.deep.equal( - new Uint8Array([0, 1, 3, 128, 255]) - ); + expect(data).to.deep.equal(new Uint8Array([0, 1, 3, 128, 255])); }); it('can get first n bytes of stream', async () => { const reference = ref(storage, 'public/exp-bytes'); await uploadBytes(reference, new Uint8Array([0, 1, 3])); - const stream = await getStream(reference, 2); + const stream = getStream(reference, 2); const data = await readData(stream); - expect(new Uint8Array(data)).to.deep.equal(new Uint8Array([0, 1])); + expect(data).to.deep.equal(new Uint8Array([0, 1])); }); it('getStream() throws for missing file', async () => {