Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: streaming directory using multipart #416

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 5 additions & 16 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"dependencies": {
"cross-blob": "^2.0.1",
"elliptic": "^6.5.4",
"form-data": "^3.0.1",
"isomorphic-ws": "^4.0.1",
"js-sha3": "^0.8.0",
"ky": "^0.25.1",
Expand Down
10 changes: 5 additions & 5 deletions src/bee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import * as stamps from './modules/stamps'

import { BeeArgumentError, BeeError } from './utils/error'
import { prepareWebsocketData } from './utils/data'
import { fileArrayBuffer, isFile } from './utils/file'
import { makeFeedReader, makeFeedWriter } from './feed'
import { makeSigner } from './chunk/signer'
import { assertFeedType, DEFAULT_FEED_TYPE, FeedType } from './feed/type'
Expand All @@ -33,10 +32,12 @@ import {
assertPublicKey,
assertReference,
assertUploadOptions,
assertCollection,
isFile,
makeTagUid,
} from './utils/type'
import { setJsonData, getJsonData } from './feed/json'
import { makeCollectionFromFS, makeCollectionFromFileList, assertCollection } from './utils/collection'
import { makeCollectionFromFS, makeCollectionFromFileList } from './utils/collection'
import {
AllTagsOptions,
Collection,
Expand Down Expand Up @@ -224,12 +225,11 @@ export class Bee {
}

if (isFile(data)) {
const fileData = await fileArrayBuffer(data)
const fileName = name ?? data.name
const contentType = data.type
const fileOptions = { contentType, ...options }

return bzz.uploadFile(this.ky, fileData, postageBatchId, fileName, fileOptions)
return bzz.uploadFile(this.ky, data.stream(), postageBatchId, fileName, fileOptions)
} else if (isReadable(data) && options?.tag && !options.size) {
// TODO: Needed until https://github.com/ethersphere/bee/issues/2317 is resolved
const result = await bzz.uploadFile(this.ky, data, postageBatchId, name, options)
Expand Down Expand Up @@ -297,7 +297,7 @@ export class Bee {

if (options) assertCollectionUploadOptions(options)

const data = await makeCollectionFromFileList(fileList)
const data = makeCollectionFromFileList(fileList)

return bzz.uploadCollection(this.ky, data, postageBatchId, options)
}
Expand Down
12 changes: 4 additions & 8 deletions src/modules/bzz.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ import {
} from '../types'
import { extractUploadHeaders, readFileHeaders } from '../utils/headers'
import { http } from '../utils/http'
import { prepareData } from '../utils/data'
import { makeTar } from '../utils/tar'
import { assertCollection } from '../utils/collection'
import { prepareCollection, prepareData } from '../utils/data'
import { wrapBytesWithHelpers } from '../utils/bytes'
import { isReadable } from '../utils/stream'
import { makeTagUid } from '../utils/type'
import { assertCollection, makeTagUid } from '../utils/type'

const bzzEndpoint = 'bzz'

Expand Down Expand Up @@ -153,20 +151,18 @@ function extractCollectionUploadHeaders(
*/
export async function uploadCollection(
ky: Ky,
collection: Collection<Uint8Array>,
collection: Collection<Uint8Array | Readable>,
postageBatchId: BatchId,
options?: CollectionUploadOptions,
): Promise<UploadResult> {
assertCollection(collection)
const tarData = makeTar(collection)

const response = await http<{ reference: Reference }>(ky, {
method: 'post',
path: bzzEndpoint,
body: tarData,
body: await prepareCollection(collection),
responseType: 'json',
headers: {
'content-type': 'application/x-tar',
'swarm-collection': 'true',
...extractCollectionUploadHeaders(postageBatchId, options),
},
Expand Down
9 changes: 7 additions & 2 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export type Address = HexString<typeof ADDRESS_HEX_LENGTH>

/**
* Type representing Readable stream that abstracts away implementation especially the difference between
* browser and NodeJS versions as both are supported.
* browser (WHATWG) and NodeJS versions as both are supported.
*/
export type Readable = NativeReadable | CompatibilityReadable | ReadableStream | ReadableStreamPonyfill

Expand Down Expand Up @@ -272,9 +272,14 @@ export interface CollectionEntry<T> {
data: T

/**
*
* Path in the directory structure
*/
path: string

/**
* If data is Readable then length has to be specified as well!
*/
length?: number
}

/**
Expand Down
42 changes: 11 additions & 31 deletions src/utils/collection.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,9 @@
import fs from 'fs'
import path from 'path'
import { Collection } from '../types'
import { BeeArgumentError } from './error'
import { fileArrayBuffer } from './file'
import { isUint8Array } from './type'

export function isCollection(data: unknown): data is Collection<Uint8Array> {
if (!Array.isArray(data)) {
return false
}

return data.every(entry => typeof entry === 'object' && entry.data && entry.path && isUint8Array(entry.data))
}
import { Collection, Readable } from '../types'
import { Readable as NodeReadable } from 'stream'

export function assertCollection(data: unknown): asserts data is Collection<Uint8Array> {
if (!isCollection(data)) {
throw new BeeArgumentError('invalid collection', data)
}
}

/**
* Creates array in the format of Collection with data loaded from directory on filesystem.
* The function loads all the data into memory!
*
* @param dir path to the directory
*/
export async function makeCollectionFromFS(dir: string): Promise<Collection<Uint8Array>> {
export async function makeCollectionFromFS(dir: string): Promise<Collection<NodeReadable>> {
if (typeof dir !== 'string') {
throw new TypeError('dir has to be string!')
}
Expand All @@ -37,11 +15,11 @@ export async function makeCollectionFromFS(dir: string): Promise<Collection<Uint
return buildCollectionRelative(dir, '')
}

async function buildCollectionRelative(dir: string, relativePath: string): Promise<Collection<Uint8Array>> {
async function buildCollectionRelative(dir: string, relativePath: string): Promise<Collection<NodeReadable>> {
// Handles case when the dir is not existing or it is a file ==> throws an error
const dirname = path.join(dir, relativePath)
const entries = await fs.promises.opendir(dirname)
let collection: Collection<Uint8Array> = []
let collection: Collection<NodeReadable> = []

for await (const entry of entries) {
const fullPath = path.join(dir, relativePath, entry.name)
Expand All @@ -50,7 +28,8 @@ async function buildCollectionRelative(dir: string, relativePath: string): Promi
if (entry.isFile()) {
collection.push({
path: entryPath,
data: new Uint8Array(await fs.promises.readFile(fullPath)),
data: fs.createReadStream(fullPath),
length: (await fs.promises.stat(fullPath)).size,
})
} else if (entry.isDirectory()) {
collection = [...(await buildCollectionRelative(dir, entryPath)), ...collection]
Expand Down Expand Up @@ -83,16 +62,17 @@ function makeFilePath(file: WebkitFile) {
throw new TypeError('file is not valid File object')
}

export async function makeCollectionFromFileList(fileList: FileList | File[]): Promise<Collection<Uint8Array>> {
const collection: Collection<Uint8Array> = []
export function makeCollectionFromFileList(fileList: FileList | File[]): Collection<Readable> {
const collection: Collection<Readable> = []

for (let i = 0; i < fileList.length; i++) {
const file = fileList[i] as WebkitFile

if (file) {
collection.push({
path: makeFilePath(file),
data: new Uint8Array(await fileArrayBuffer(file)),
data: file.stream(),
length: file.size,
})
}
}
Expand Down
54 changes: 24 additions & 30 deletions src/utils/data.browser.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { isNodeReadable, isReadableStream } from './stream'
import { Readable } from '../types'
import { bufferReadable, isReadable } from './stream'
import { Collection, Readable } from '../types'
import Blob from 'cross-blob'
import { FormData } from 'formdata-node'
import { BeeError } from './error'

/**
* Validates input and converts to Uint8Array
Expand All @@ -24,34 +26,8 @@ export async function prepareData(
// there are already first experiments on this field (Chromium)
// but till it is fully implemented across browsers-land we have to
// buffer the data before sending the requests.
if (isNodeReadable(data)) {
return new Promise(resolve => {
const buffers: Array<Uint8Array> = []
data.on('data', d => {
buffers.push(d)
})
data.on('end', () => {
resolve(new Blob(buffers, { type: 'application/octet-stream' }))
})
})
}

if (isReadableStream(data)) {
return new Promise(async resolve => {
const reader = data.getReader()
const buffers: Array<Uint8Array> = []

let done, value
do {
;({ done, value } = await reader.read())

if (!done) {
buffers.push(value)
}
} while (!done)

resolve(new Blob(buffers, { type: 'application/octet-stream' }))
})
if (isReadable(data)) {
return bufferReadable(data)
}

throw new TypeError('unknown data type')
Expand All @@ -66,3 +42,21 @@ export async function prepareWebsocketData(data: string | ArrayBuffer | Blob): P

throw new TypeError('unknown websocket data type')
}

export async function prepareCollection(data: Collection<Uint8Array | Readable>): Promise<FormData> {
const form = new FormData()

for (const el of data) {
if (el.data instanceof Uint8Array) {
form.set(el.path, el.data)
} else if (isReadable(el.data)) {
if (!el.length) {
throw new BeeError(`Collection entry '${el.path}' is a stream, but does not have required length!`)
}

form.set(el.path, await bufferReadable(el.data))
}
}

return form
}
Loading