Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallelize upload to chunk with setting "chunkParallelize": true #876

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 93 additions & 6 deletions dist/filepond.esm.js
Original file line number Diff line number Diff line change
Expand Up @@ -1864,6 +1864,7 @@ const defaultOptions = {
chunkForce: [false, Type.BOOLEAN], // Force use of chunk uploads even for files smaller than chunk size
chunkSize: [5000000, Type.INT], // Size of chunks (5MB default)
chunkRetryDelays: [[500, 1000, 3000], Type.ARRAY], // Amount of times to retry upload of a chunk when it fails
chunkParallelize: [false, Type.BOOLEAN], // Enable uploads of chuncks in parallel

// The server api end points to use for uploading (see docs)
server: [null, Type.SERVER_API],
Expand Down Expand Up @@ -2768,7 +2769,9 @@ const processFileChunked = (
) => {
// all chunks
const chunks = [];
const { chunkTransferId, chunkServer, chunkSize, chunkRetryDelays } = options;
console.log('options', options);
const { chunkTransferId, chunkServer, chunkSize, chunkRetryDelays, chunkParallelize } = options;
// console.log('processFileChunked', chunkTransferId, chunkServer, chunkSize, chunkParallelize);

// default state
const state = {
Expand Down Expand Up @@ -2873,6 +2876,7 @@ const processFileChunked = (
timeout: null,
};
}
const lastChunk = chunks.pop();

const completeProcessingChunks = () => load(state.serverId);

Expand Down Expand Up @@ -2908,17 +2912,91 @@ const processFileChunked = (
// send request object
const requestUrl = buildURL(apiUrl, chunkServer.url, state.serverId);

if (!chunkParallelize) {
const headers =
typeof chunkServer.headers === 'function'
? chunkServer.headers(chunk)
: {
...chunkServer.headers,
'Content-Type': 'application/offset+octet-stream',
'Upload-Offset': chunk.offset,
'Upload-Length': file.size,
'Upload-Name': file.name,
};

const request = (chunk.request = sendRequest(ondata(chunk.data), requestUrl, {
...chunkServer,
headers,
}));

request.onload = () => {
// done!
chunk.status = ChunkStatus.COMPLETE;

// remove request reference
chunk.request = null;

// start processing more chunks
processChunks();
};

request.onprogress = (lengthComputable, loaded, total) => {
chunk.progress = lengthComputable ? loaded : null;
updateTotalProgress();
};

request.onerror = xhr => {
chunk.status = ChunkStatus.ERROR;
chunk.request = null;
chunk.error = onerror(xhr.response) || xhr.statusText;
if (!retryProcessChunk(chunk)) {
error(
createResponse(
'error',
xhr.status,
onerror(xhr.response) || xhr.statusText,
xhr.getAllResponseHeaders()
)
);
}
};

request.ontimeout = xhr => {
chunk.status = ChunkStatus.ERROR;
chunk.request = null;
if (!retryProcessChunk(chunk)) {
createTimeoutResponse(error)(xhr);
}
};

request.onabort = () => {
chunk.status = ChunkStatus.QUEUED;
chunk.request = null;
abort();
};
} else {
for (const chunk of chunks) {
processChunkRequest(chunk);
}
}
};

const processChunkRequest = (chunk, isLastChunk = false) => {
const headers =
typeof chunkServer.headers === 'function'
? chunkServer.headers(chunk)
: {
...chunkServer.headers,
'Content-Type': 'application/offset+octet-stream',
'Upload-Offset': chunk.offset,
'Upload-Length': file.size,
'Upload-Index': chunk.index,
'Upload-Chunks-Number': chunks.length + 1,
// 'Upload-Offset': chunk.offset,
// 'Upload-Length': file.size,
'Upload-Name': file.name,
};

// send request object
const requestUrl = buildURL(apiUrl, chunkServer.url, state.serverId);
const request = (chunk.request = sendRequest(ondata(chunk.data), requestUrl, {
...chunkServer,
headers,
Expand All @@ -2927,12 +3005,20 @@ const processFileChunked = (
request.onload = () => {
// done!
chunk.status = ChunkStatus.COMPLETE;

// remove request reference
chunk.request = null;

// start processing more chunks
processChunks();
// processChunks();
if (
chunks.length === chunks.filter(c => c.status === ChunkStatus.COMPLETE).length &&
!isLastChunk
) {
console.log('processo ultimo chunk', lastChunk);
processChunkRequest(lastChunk, true);
}
if (isLastChunk) {
completeProcessingChunks();
}
};

request.onprogress = (lengthComputable, loaded, total) => {
Expand Down Expand Up @@ -4831,6 +4917,7 @@ const actions = (dispatch, query, state) => ({
chunkForce: options.chunkForce,
chunkSize: options.chunkSize,
chunkRetryDelays: options.chunkRetryDelays,
chunkParallelize: options.chunkParallelize,
}),
{
allowMinimumUploadDuration: query('GET_ALLOW_MINIMUM_UPLOAD_DURATION'),
Expand Down
109 changes: 103 additions & 6 deletions dist/filepond.js
Original file line number Diff line number Diff line change
Expand Up @@ -3790,6 +3790,7 @@
chunkForce: [false, Type.BOOLEAN], // Force use of chunk uploads even for files smaller than chunk size
chunkSize: [5000000, Type.INT], // Size of chunks (5MB default)
chunkRetryDelays: [[500, 1000, 3000], Type.ARRAY], // Amount of times to retry upload of a chunk when it fails
chunkParallelize: [false, Type.BOOLEAN], // Enable uploads of chuncks in parallel

// The server api end points to use for uploading (see docs)
server: [null, Type.SERVER_API],
Expand Down Expand Up @@ -4840,10 +4841,13 @@
) {
// all chunks
var chunks = [];
console.log('options', options);
var chunkTransferId = options.chunkTransferId,
chunkServer = options.chunkServer,
chunkSize = options.chunkSize,
chunkRetryDelays = options.chunkRetryDelays;
chunkRetryDelays = options.chunkRetryDelays,
chunkParallelize = options.chunkParallelize;
// console.log('processFileChunked', chunkTransferId, chunkServer, chunkSize, chunkParallelize);

// default state
var state = {
Expand Down Expand Up @@ -4963,6 +4967,7 @@
timeout: null,
};
}
var lastChunk = chunks.pop();

var completeProcessingChunks = function completeProcessingChunks() {
return load(state.serverId);
Expand Down Expand Up @@ -5013,16 +5018,96 @@
// send request object
var requestUrl = buildURL(apiUrl, chunkServer.url, state.serverId);

if (!chunkParallelize) {
var headers =
typeof chunkServer.headers === 'function'
? chunkServer.headers(chunk)
: Object.assign({}, chunkServer.headers, {
'Content-Type': 'application/offset+octet-stream',
'Upload-Offset': chunk.offset,
'Upload-Length': file.size,
'Upload-Name': file.name,
});

var request = (chunk.request = sendRequest(
ondata(chunk.data),
requestUrl,
Object.assign({}, chunkServer, {
headers: headers,
})
));

request.onload = function() {
// done!
chunk.status = ChunkStatus.COMPLETE;

// remove request reference
chunk.request = null;

// start processing more chunks
processChunks();
};

request.onprogress = function(lengthComputable, loaded, total) {
chunk.progress = lengthComputable ? loaded : null;
updateTotalProgress();
};

request.onerror = function(xhr) {
chunk.status = ChunkStatus.ERROR;
chunk.request = null;
chunk.error = onerror(xhr.response) || xhr.statusText;
if (!retryProcessChunk(chunk)) {
error(
createResponse(
'error',
xhr.status,
onerror(xhr.response) || xhr.statusText,
xhr.getAllResponseHeaders()
)
);
}
};

request.ontimeout = function(xhr) {
chunk.status = ChunkStatus.ERROR;
chunk.request = null;
if (!retryProcessChunk(chunk)) {
createTimeoutResponse(error)(xhr);
}
};

request.onabort = function() {
chunk.status = ChunkStatus.QUEUED;
chunk.request = null;
abort();
};
} else {
for (var _i = 0, _chunks = chunks; _i < _chunks.length; _i++) {
var _chunk = _chunks[_i];
processChunkRequest(_chunk);
}
}
};

var processChunkRequest = function processChunkRequest(chunk) {
var isLastChunk =
arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : false;

var headers =
typeof chunkServer.headers === 'function'
? chunkServer.headers(chunk)
: Object.assign({}, chunkServer.headers, {
'Content-Type': 'application/offset+octet-stream',
'Upload-Offset': chunk.offset,
'Upload-Length': file.size,
'Upload-Index': chunk.index,
'Upload-Chunks-Number': chunks.length + 1,
// 'Upload-Offset': chunk.offset,
// 'Upload-Length': file.size,
'Upload-Name': file.name,
});

// send request object
var requestUrl = buildURL(apiUrl, chunkServer.url, state.serverId);
var request = (chunk.request = sendRequest(
ondata(chunk.data),
requestUrl,
Expand All @@ -5034,12 +5119,23 @@
request.onload = function() {
// done!
chunk.status = ChunkStatus.COMPLETE;

// remove request reference
chunk.request = null;

// start processing more chunks
processChunks();
// processChunks();
if (
chunks.length ===
chunks.filter(function(c) {
return c.status === ChunkStatus.COMPLETE;
}).length &&
!isLastChunk
) {
console.log('processo ultimo chunk', lastChunk);
processChunkRequest(lastChunk, true);
}
if (isLastChunk) {
completeProcessingChunks();
}
};

request.onprogress = function(lengthComputable, loaded, total) {
Expand Down Expand Up @@ -7223,6 +7319,7 @@
chunkForce: options.chunkForce,
chunkSize: options.chunkSize,
chunkRetryDelays: options.chunkRetryDelays,
chunkParallelize: options.chunkParallelize,
}
),

Expand Down
1 change: 1 addition & 0 deletions src/js/app/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,7 @@ export const actions = (dispatch, query, state) => ({
chunkForce: options.chunkForce,
chunkSize: options.chunkSize,
chunkRetryDelays: options.chunkRetryDelays,
chunkParallelize: options.chunkParallelize
}),
{
allowMinimumUploadDuration: query('GET_ALLOW_MINIMUM_UPLOAD_DURATION'),
Expand Down
1 change: 1 addition & 0 deletions src/js/app/options.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ export const defaultOptions = {
chunkForce: [false, Type.BOOLEAN], // Force use of chunk uploads even for files smaller than chunk size
chunkSize: [5000000, Type.INT], // Size of chunks (5MB default)
chunkRetryDelays: [[500, 1000, 3000], Type.ARRAY], // Amount of times to retry upload of a chunk when it fails
chunkParallelize: [false, Type.BOOLEAN], // Enable uploads of chuncks in parallel

// The server api end points to use for uploading (see docs)
server: [null, Type.SERVER_API],
Expand Down
Loading