Skip to content

Commit

Permalink
always wait for active read operation to finish before resetting data…
Browse files Browse the repository at this point in the history
… processors
  • Loading branch information
KurtThiemann committed Jun 16, 2023
1 parent fa04bd1 commit 95d2d6f
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 49 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "armarius",
"version": "1.10.4",
"version": "1.11.0",
"description": "A JavaScript library to read, write, and merge ZIP archives in web browsers.",
"repository": "github:aternosorg/armarius",
"type": "module",
Expand Down
4 changes: 2 additions & 2 deletions src/Archive/Entry/EntryDataReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ export default class EntryDataReader {
this.expectedCrc32 = expectedCrc32;
}

reset() {
this.dataProcessor.reset();
async reset() {
await this.dataProcessor.reset();
this.eof = false;
}

Expand Down
35 changes: 12 additions & 23 deletions src/DataProcessor/AbstractDataProcessor.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import CRC32 from '../Util/CRC32.js';
import DataProcessor from './DataProcessor.js';
import DataProcessorChunkReader from './DataProcessorChunkReader.js';

/**
* @implements DataProcessor
*/
export default class AbstractDataProcessor extends DataProcessor {
/** @type {DataReader} */ reader;
/** @type {boolean} */ eof = false;
/** @type {?CRC32} */ preCrc = null;
/** @type {DataProcessorChunkReader} */ chunkReader;
/** @type {?CRC32} */ postCrc = null;
/** @type {boolean} */ createPreCrc = false;

/**
* @inheritDoc
Expand All @@ -25,15 +26,16 @@ export default class AbstractDataProcessor extends DataProcessor {
constructor(reader, createPreCrc = false, createPostCrc = false) {
super();
this.reader = reader;
this.preCrc = createPreCrc ? new CRC32() : null;
this.createPreCrc = createPreCrc;
this.chunkReader = new DataProcessorChunkReader(this.reader, this.createPreCrc);
this.postCrc = createPostCrc ? new CRC32() : null;
}

/**
* @inheritDoc
*/
getPreCrc() {
return this.preCrc;
return this.chunkReader.getCrc();
}

/**
Expand All @@ -43,22 +45,6 @@ export default class AbstractDataProcessor extends DataProcessor {
return this.postCrc;
}

/**
* @inheritDoc
*/
async getChunkFromReader(length) {
if (this.reader.offset + length > this.reader.byteLength) {
this.eof = true;
length = this.reader.byteLength - this.reader.offset;
}
let chunk = await this.reader.read(length);
if(this.preCrc) {
this.preCrc.add(chunk);
}

return chunk;
}

async process() {

}
Expand All @@ -82,9 +68,12 @@ export default class AbstractDataProcessor extends DataProcessor {
return chunk;
}

reset() {
this.eof = false;
this.preCrc?.reset();
/**
* @inheritDoc
*/
async reset() {
await this.chunkReader.close();
this.chunkReader = new DataProcessorChunkReader(this.reader, this.createPreCrc);
this.postCrc?.reset();
this.reader.seek(0);
}
Expand Down
12 changes: 4 additions & 8 deletions src/DataProcessor/DataProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,6 @@ export default class DataProcessor {
getPostCrc() {
}

/**
* @param {number} length
* @return {Promise<Uint8Array>}
*/
async getChunkFromReader(length) {
}

/**
* @param {number} length
* @return {Promise<?Uint8Array>}
Expand All @@ -45,7 +38,10 @@ export default class DataProcessor {
async read(length) {
}

reset() {
/**
* @return {Promise<void>}
*/
async reset() {
}
}

79 changes: 79 additions & 0 deletions src/DataProcessor/DataProcessorChunkReader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import CRC32 from '../Util/CRC32.js';

export default class DataProcessorChunkReader {
/** @type {DataReader} */ reader;
/** @type {boolean} */ eof = false;
/** @type {boolean} */ closed = false;
/** @type {boolean} */ reading = false;
/** @type {Array} */ closePromises = [];
/** @type {?CRC32} */ crc = null;

/**
* @param {DataReader} reader
* @param {boolean} createCrc
*/
constructor(reader, createCrc = false) {
this.reader = reader;
this.crc = createCrc ? new CRC32() : null;
}

/**
* @param {number} length
* @return {Promise<Uint8Array>}
*/
async getChunk(length) {
if (this.closed) {
return new Uint8Array(0);
}
if (this.reading) {
throw new Error('Simultaneous read not supported');
}
this.reading = true;
if (this.reader.offset + length > this.reader.byteLength) {
this.eof = true;
length = this.reader.byteLength - this.reader.offset;
}
let chunk = await this.reader.read(length);

if(this.crc) {
this.crc.add(chunk);
}

this.reading = false;

if (this.closePromises.length > 0) {
this.closePromises.forEach(resolve => resolve());
this.closePromises = [];
}

return chunk;
}

/**
* @return {boolean}
*/
isEof() {
return this.eof;
}

/**
* @return {?CRC32}
*/
getCrc() {
return this.crc;
}

/**
* @return {Promise<void>}
*/
close() {
this.closed = true;
return new Promise(resolve => {
if (this.reading) {
this.closePromises.push(resolve);
} else {
resolve();
}
});
}
}
4 changes: 2 additions & 2 deletions src/DataProcessor/FallbackDataProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export default class FallbackDataProcessor extends DataProcessor {
/**
* @inheritDoc
*/
reset() {
this.dataProcessor.reset();
async reset() {
await this.dataProcessor.reset();
}
}
8 changes: 4 additions & 4 deletions src/DataProcessor/FflateDataProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ export default class FflateDataProcessor extends AbstractDataProcessor {
* @inheritDoc
*/
async generate(length) {
if(this.eof) {
if(this.chunkReader.isEof()) {
return null;
}

this.flate.push(await this.getChunkFromReader(length), this.eof);
this.flate.push(await this.chunkReader.getChunk(length), this.chunkReader.isEof());
return this.concatChunks();
}

Expand Down Expand Up @@ -72,8 +72,8 @@ export default class FflateDataProcessor extends AbstractDataProcessor {
/**
* @inheritDoc
*/
reset() {
super.reset();
async reset() {
await super.reset();
this.chunks = [];
this.initFflate();
}
Expand Down
9 changes: 5 additions & 4 deletions src/DataProcessor/NativeStreamDataProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ export default class NativeStreamDataProcessor extends AbstractDataProcessor {
* @inheritDoc
*/
resetStreams() {
let chunkReader = this.chunkReader;
this.processor = this.createProcessorStream();
this.stream = new ReadableStream({
pull: async (controller) => {
let data = await this.getChunkFromReader(Constants.DEFAULT_CHUNK_SIZE);
let data = await chunkReader.getChunk(Constants.DEFAULT_CHUNK_SIZE);
controller.enqueue(data);
if(this.eof) {
if(this.chunkReader.isEof()) {
controller.close();
}
}
Expand All @@ -69,8 +70,8 @@ export default class NativeStreamDataProcessor extends AbstractDataProcessor {
/**
* @inheritDoc
*/
reset() {
super.reset();
async reset() {
await super.reset();
this.processor = null;
this.stream = null;
this.streamReader = null;
Expand Down
8 changes: 3 additions & 5 deletions src/DataProcessor/PassThroughDataProcessor.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
import AbstractDataProcessor from './AbstractDataProcessor.js';

export default class PassThroughDataProcessor extends AbstractDataProcessor {
/** @type {boolean} */ eof = false;

/**
* @inheritDoc
*/
async reset() {
super.reset();
await super.reset();
}

/**
* @inheritDoc
*/
async generate(length) {
if(this.eof) {
if(this.chunkReader.isEof()) {
return null;
}

return await this.getChunkFromReader(length);
return await this.chunkReader.getChunk(length);
}
}

0 comments on commit 95d2d6f

Please sign in to comment.