Skip to content

Commit

Permalink
UBERF-8950 Expose blob list method in datalake (#7484)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Onnikov <[email protected]>
  • Loading branch information
aonnikov authored Dec 17, 2024
1 parent 2ea25fa commit d811377
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 65 deletions.
97 changes: 68 additions & 29 deletions server/datalake/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@ export interface ObjectMetadata {
lastModified: number
name: string
type: string
etag: string
size?: number
}

/** @public */
export interface ListObjectOutput {
cursor: string | undefined
blobs: Omit<ObjectMetadata, 'lastModified'>[]
}

/** @public */
export interface StatObjectOutput {
lastModified: number
Expand All @@ -36,6 +43,13 @@ export interface StatObjectOutput {
size?: number
}

/** @public */
export interface UploadObjectParams {
lastModified: number
type: string
size?: number
}

interface BlobUploadError {
key: string
error: string
Expand Down Expand Up @@ -68,6 +82,23 @@ export class DatalakeClient {
return concatLink(this.endpoint, path)
}

async listObjects (
ctx: MeasureContext,
workspace: WorkspaceId,
cursor: string | undefined
): Promise<ListObjectOutput> {
const limit = 100
const path = `/blob/${workspace.name}`
const url = new URL(concatLink(this.endpoint, path))
url.searchParams.append('limit', String(limit))
if (cursor !== undefined) {
url.searchParams.append('cursor', cursor)
}

const response = await fetchSafe(ctx, url)
return (await response.json()) as ListObjectOutput
}

async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<Readable> {
const url = this.getObjectUrl(ctx, workspace, objectName)

Expand Down Expand Up @@ -166,9 +197,9 @@ export class DatalakeClient {
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
metadata: ObjectMetadata,
size?: number
): Promise<void> {
params: UploadObjectParams
): Promise<ObjectMetadata> {
let size = params.size
if (size === undefined) {
if (Buffer.isBuffer(stream)) {
size = stream.length
Expand All @@ -182,12 +213,12 @@ export class DatalakeClient {

try {
if (size === undefined || size < 64 * 1024 * 1024) {
await ctx.with('direct-upload', {}, (ctx) =>
this.uploadWithFormData(ctx, workspace, objectName, stream, metadata)
return await ctx.with('direct-upload', {}, (ctx) =>
this.uploadWithFormData(ctx, workspace, objectName, stream, { ...params, size })
)
} else {
await ctx.with('signed-url-upload', {}, (ctx) =>
this.uploadWithSignedURL(ctx, workspace, objectName, stream, metadata)
return await ctx.with('signed-url-upload', {}, (ctx) =>
this.uploadWithSignedURL(ctx, workspace, objectName, stream, { ...params, size })
)
}
} catch (err) {
Expand All @@ -201,18 +232,18 @@ export class DatalakeClient {
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
metadata: ObjectMetadata
): Promise<void> {
params: UploadObjectParams
): Promise<ObjectMetadata> {
const path = `/upload/form-data/${workspace.name}`
const url = concatLink(this.endpoint, path)

const form = new FormData()
const options: FormData.AppendOptions = {
filename: objectName,
contentType: metadata.type,
knownLength: metadata.size,
contentType: params.type,
knownLength: params.size,
header: {
'Last-Modified': metadata.lastModified
'Last-Modified': params.lastModified
}
}
form.append('file', stream, options)
Expand All @@ -229,18 +260,20 @@ export class DatalakeClient {
if ('error' in uploadResult) {
throw new DatalakeError('Upload failed: ' + uploadResult.error)
}

return uploadResult.metadata
}

async uploadMultipart (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
metadata: ObjectMetadata
): Promise<void> {
params: UploadObjectParams
): Promise<ObjectMetadata> {
const chunkSize = 10 * 1024 * 1024

const multipart = await this.multipartUploadStart(ctx, workspace, objectName, metadata)
const multipart = await this.multipartUploadStart(ctx, workspace, objectName, params)

try {
const parts: MultipartUploadPart[] = []
Expand All @@ -252,7 +285,7 @@ export class DatalakeClient {
partNumber++
}

await this.multipartUploadComplete(ctx, workspace, objectName, multipart, parts)
return await this.multipartUploadComplete(ctx, workspace, objectName, multipart, parts)
} catch (err: any) {
await this.multipartUploadAbort(ctx, workspace, objectName, multipart)
throw err
Expand All @@ -264,17 +297,17 @@ export class DatalakeClient {
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
metadata: ObjectMetadata
): Promise<void> {
params: UploadObjectParams
): Promise<ObjectMetadata> {
const url = await this.signObjectSign(ctx, workspace, objectName)

try {
await fetchSafe(ctx, url, {
body: stream,
method: 'PUT',
headers: {
'Content-Type': metadata.type,
'Content-Length': metadata.size?.toString() ?? '0'
'Content-Type': params.type,
'Content-Length': params.size?.toString() ?? '0'
// 'x-amz-meta-last-modified': metadata.lastModified.toString()
}
})
Expand All @@ -284,7 +317,7 @@ export class DatalakeClient {
throw new DatalakeError('Failed to upload via signed URL')
}

await this.signObjectComplete(ctx, workspace, objectName)
return await this.signObjectComplete(ctx, workspace, objectName)
}

async uploadFromS3 (
Expand Down Expand Up @@ -322,10 +355,15 @@ export class DatalakeClient {
}
}

private async signObjectComplete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
private async signObjectComplete (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string
): Promise<ObjectMetadata> {
try {
const url = this.getSignObjectUrl(workspace, objectName)
await fetchSafe(ctx, url, { method: 'PUT' })
const res = await fetchSafe(ctx, url, { method: 'PUT' })
return (await res.json()) as ObjectMetadata
} catch (err: any) {
ctx.error('failed to complete signed url upload', { workspace, objectName, err })
throw new DatalakeError('Failed to complete signed URL upload')
Expand Down Expand Up @@ -353,16 +391,16 @@ export class DatalakeClient {
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
metadata: ObjectMetadata
params: UploadObjectParams
): Promise<MultipartUpload> {
const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}`
const url = concatLink(this.endpoint, path)

try {
const headers = {
'Content-Type': metadata.type,
'Content-Length': metadata.size?.toString() ?? '0',
'Last-Modified': new Date(metadata.lastModified).toUTCString()
'Content-Type': params.type,
'Content-Length': params.size?.toString() ?? '0',
'Last-Modified': new Date(params.lastModified).toUTCString()
}
const response = await fetchSafe(ctx, url, { method: 'POST', headers })
return (await response.json()) as MultipartUpload
Expand Down Expand Up @@ -401,14 +439,15 @@ export class DatalakeClient {
objectName: string,
multipart: MultipartUpload,
parts: MultipartUploadPart[]
): Promise<void> {
): Promise<ObjectMetadata> {
const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}/complete`
const url = new URL(concatLink(this.endpoint, path))
url.searchParams.append('key', multipart.key)
url.searchParams.append('uploadId', multipart.uploadId)

try {
await fetchSafe(ctx, url, { method: 'POST', body: JSON.stringify({ parts }) })
const res = await fetchSafe(ctx, url, { method: 'POST', body: JSON.stringify({ parts }) })
return (await res.json()) as ObjectMetadata
} catch (err: any) {
ctx.error('failed to complete multipart upload', { workspace, objectName, err })
throw new DatalakeError('Failed to complete multipart upload')
Expand Down
42 changes: 35 additions & 7 deletions server/datalake/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import core, { type Blob, type MeasureContext, type Ref, type WorkspaceId, withC
import {
type BlobStorageIterator,
type BucketInfo,
type ListBlobResult,
type StorageAdapter,
type StorageConfig,
type StorageConfiguration,
type UploadedObjectInfo
} from '@hcengineering/server-core'
import { type Readable } from 'stream'
import { type ObjectMetadata, DatalakeClient } from './client'
import { type UploadObjectParams, DatalakeClient } from './client'

export { DatalakeClient }

Expand Down Expand Up @@ -88,8 +89,36 @@ export class DatalakeService implements StorageAdapter {

@withContext('listStream')
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
let hasMore = true
const buffer: ListBlobResult[] = []
let cursor: string | undefined

return {
next: async () => [],
next: async () => {
try {
while (hasMore && buffer.length < 50) {
const res = await this.client.listObjects(ctx, workspaceId, cursor)
hasMore = res.cursor !== undefined
cursor = res.cursor

for (const blob of res.blobs) {
buffer.push({
_id: blob.name as Ref<Blob>,
_class: core.class.Blob,
etag: blob.etag,
size: blob.size ?? 0,
provider: this.opt.name,
space: core.space.Configuration,
modifiedBy: core.account.ConfigUser,
modifiedOn: 0
})
}
}
} catch (err: any) {
ctx.error('Failed to get list', { error: err, workspaceId: workspaceId.name })
}
return buffer.splice(0, 50)
},
close: async () => {}
}
}
Expand Down Expand Up @@ -131,19 +160,18 @@ export class DatalakeService implements StorageAdapter {
contentType: string,
size?: number
): Promise<UploadedObjectInfo> {
const metadata: ObjectMetadata = {
const params: UploadObjectParams = {
lastModified: Date.now(),
name: objectName,
type: contentType,
size
}

await ctx.with('put', {}, (ctx) =>
withRetry(ctx, 5, () => this.client.putObject(ctx, workspaceId, objectName, stream, metadata, size))
const { etag } = await ctx.with('put', {}, (ctx) =>
withRetry(ctx, 5, () => this.client.putObject(ctx, workspaceId, objectName, stream, params))
)

return {
etag: '',
etag,
versionId: ''
}
}
Expand Down
Loading

0 comments on commit d811377

Please sign in to comment.