From 95d2d6f5783c86c8a608e37ea9c942693ca894c6 Mon Sep 17 00:00:00 2001 From: Kurt Thiemann Date: Fri, 16 Jun 2023 17:57:27 +0200 Subject: [PATCH] always wait for active read operation to finish before resetting data processors --- package.json | 2 +- src/Archive/Entry/EntryDataReader.js | 4 +- src/DataProcessor/AbstractDataProcessor.js | 35 +++----- src/DataProcessor/DataProcessor.js | 12 +-- src/DataProcessor/DataProcessorChunkReader.js | 79 +++++++++++++++++++ src/DataProcessor/FallbackDataProcessor.js | 4 +- src/DataProcessor/FflateDataProcessor.js | 8 +- .../NativeStreamDataProcessor.js | 9 ++- src/DataProcessor/PassThroughDataProcessor.js | 8 +- 9 files changed, 112 insertions(+), 49 deletions(-) create mode 100644 src/DataProcessor/DataProcessorChunkReader.js diff --git a/package.json b/package.json index 75828dc..8014f6c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "armarius", - "version": "1.10.4", + "version": "1.11.0", "description": "A JavaScript library to read, write, and merge ZIP archives in web browsers.", "repository": "github:aternosorg/armarius", "type": "module", diff --git a/src/Archive/Entry/EntryDataReader.js b/src/Archive/Entry/EntryDataReader.js index ba22128..c4442b4 100644 --- a/src/Archive/Entry/EntryDataReader.js +++ b/src/Archive/Entry/EntryDataReader.js @@ -16,8 +16,8 @@ export default class EntryDataReader { this.expectedCrc32 = expectedCrc32; } - reset() { - this.dataProcessor.reset(); + async reset() { + await this.dataProcessor.reset(); this.eof = false; } diff --git a/src/DataProcessor/AbstractDataProcessor.js b/src/DataProcessor/AbstractDataProcessor.js index 328dece..78a154f 100644 --- a/src/DataProcessor/AbstractDataProcessor.js +++ b/src/DataProcessor/AbstractDataProcessor.js @@ -1,14 +1,15 @@ import CRC32 from '../Util/CRC32.js'; import DataProcessor from './DataProcessor.js'; +import DataProcessorChunkReader from './DataProcessorChunkReader.js'; /** * @implements DataProcessor */ export default class AbstractDataProcessor extends DataProcessor { /** @type {DataReader} */ reader; - /** @type {boolean} */ eof = false; - /** @type {?CRC32} */ preCrc = null; + /** @type {DataProcessorChunkReader} */ chunkReader; /** @type {?CRC32} */ postCrc = null; + /** @type {boolean} */ createPreCrc = false; /** * @inheritDoc @@ -25,7 +26,8 @@ export default class AbstractDataProcessor extends DataProcessor { constructor(reader, createPreCrc = false, createPostCrc = false) { super(); this.reader = reader; - this.preCrc = createPreCrc ? new CRC32() : null; + this.createPreCrc = createPreCrc; + this.chunkReader = new DataProcessorChunkReader(this.reader, this.createPreCrc); this.postCrc = createPostCrc ? new CRC32() : null; } @@ -33,7 +35,7 @@ export default class AbstractDataProcessor extends DataProcessor { * @inheritDoc */ getPreCrc() { - return this.preCrc; + return this.chunkReader.getCrc(); } /** @@ -43,22 +45,6 @@ export default class AbstractDataProcessor extends DataProcessor { return this.postCrc; } - /** - * @inheritDoc - */ - async getChunkFromReader(length) { - if (this.reader.offset + length > this.reader.byteLength) { - this.eof = true; - length = this.reader.byteLength - this.reader.offset; - } - let chunk = await this.reader.read(length); - if(this.preCrc) { - this.preCrc.add(chunk); - } - - return chunk; - } - async process() { } @@ -82,9 +68,12 @@ export default class AbstractDataProcessor extends DataProcessor { return chunk; } - reset() { - this.eof = false; - this.preCrc?.reset(); + /** + * @inheritDoc + */ + async reset() { + await this.chunkReader.close(); + this.chunkReader = new DataProcessorChunkReader(this.reader, this.createPreCrc); this.postCrc?.reset(); this.reader.seek(0); } diff --git a/src/DataProcessor/DataProcessor.js b/src/DataProcessor/DataProcessor.js index 317c3ef..a97d339 100644 --- a/src/DataProcessor/DataProcessor.js +++ b/src/DataProcessor/DataProcessor.js @@ -22,13 +22,6 @@ export default class DataProcessor { getPostCrc() { } - /** - * @param {number} length - * @return {Promise} - */ - async getChunkFromReader(length) { - } - /** * @param {number} length * @return {Promise} @@ -45,7 +38,10 @@ export default class DataProcessor { async read(length) { } - reset() { + /** + * @return {Promise} + */ + async reset() { } } diff --git a/src/DataProcessor/DataProcessorChunkReader.js b/src/DataProcessor/DataProcessorChunkReader.js new file mode 100644 index 0000000..36e0071 --- /dev/null +++ b/src/DataProcessor/DataProcessorChunkReader.js @@ -0,0 +1,79 @@ +import CRC32 from '../Util/CRC32.js'; + +export default class DataProcessorChunkReader { + /** @type {DataReader} */ reader; + /** @type {boolean} */ eof = false; + /** @type {boolean} */ closed = false; + /** @type {boolean} */ reading = false; + /** @type {Array} */ closePromises = []; + /** @type {?CRC32} */ crc = null; + + /** + * @param {DataReader} reader + * @param {boolean} createCrc + */ + constructor(reader, createCrc = false) { + this.reader = reader; + this.crc = createCrc ? new CRC32() : null; + } + + /** + * @param {number} length + * @return {Promise} + */ + async getChunk(length) { + if (this.closed) { + return new Uint8Array(0); + } + if (this.reading) { + throw new Error('Simultaneous read not supported'); + } + this.reading = true; + if (this.reader.offset + length > this.reader.byteLength) { + this.eof = true; + length = this.reader.byteLength - this.reader.offset; + } + let chunk = await this.reader.read(length); + + if(this.crc) { + this.crc.add(chunk); + } + + this.reading = false; + + if (this.closePromises.length > 0) { + this.closePromises.forEach(resolve => resolve()); + this.closePromises = []; + } + + return chunk; + } + + /** + * @return {boolean} + */ + isEof() { + return this.eof; + } + + /** + * @return {?CRC32} + */ + getCrc() { + return this.crc; + } + + /** + * @return {Promise} + */ + close() { + this.closed = true; + return new Promise(resolve => { + if (this.reading) { + this.closePromises.push(resolve); + } else { + resolve(); + } + }); + } +} diff --git a/src/DataProcessor/FallbackDataProcessor.js b/src/DataProcessor/FallbackDataProcessor.js index 25f9819..b768203 100644 --- a/src/DataProcessor/FallbackDataProcessor.js +++ b/src/DataProcessor/FallbackDataProcessor.js @@ -68,7 +68,7 @@ export default class FallbackDataProcessor extends DataProcessor { /** * @inheritDoc */ - reset() { - this.dataProcessor.reset(); + async reset() { + await this.dataProcessor.reset(); } } diff --git a/src/DataProcessor/FflateDataProcessor.js b/src/DataProcessor/FflateDataProcessor.js index 3a9d931..7a3bc83 100644 --- a/src/DataProcessor/FflateDataProcessor.js +++ b/src/DataProcessor/FflateDataProcessor.js @@ -19,11 +19,11 @@ export default class FflateDataProcessor extends AbstractDataProcessor { * @inheritDoc */ async generate(length) { - if(this.eof) { + if(this.chunkReader.isEof()) { return null; } - this.flate.push(await this.getChunkFromReader(length), this.eof); + this.flate.push(await this.chunkReader.getChunk(length), this.chunkReader.isEof()); return this.concatChunks(); } @@ -72,8 +72,8 @@ export default class FflateDataProcessor extends AbstractDataProcessor { /** * @inheritDoc */ - reset() { - super.reset(); + async reset() { + await super.reset(); this.chunks = []; this.initFflate(); } diff --git a/src/DataProcessor/NativeStreamDataProcessor.js b/src/DataProcessor/NativeStreamDataProcessor.js index f133ea6..83b96f0 100644 --- a/src/DataProcessor/NativeStreamDataProcessor.js +++ b/src/DataProcessor/NativeStreamDataProcessor.js @@ -53,12 +53,13 @@ export default class NativeStreamDataProcessor extends AbstractDataProcessor { * @inheritDoc */ resetStreams() { + let chunkReader = this.chunkReader; this.processor = this.createProcessorStream(); this.stream = new ReadableStream({ pull: async (controller) => { - let data = await this.getChunkFromReader(Constants.DEFAULT_CHUNK_SIZE); + let data = await chunkReader.getChunk(Constants.DEFAULT_CHUNK_SIZE); controller.enqueue(data); - if(this.eof) { + if(this.chunkReader.isEof()) { controller.close(); } } @@ -69,8 +70,8 @@ export default class NativeStreamDataProcessor extends AbstractDataProcessor { /** * @inheritDoc */ - reset() { - super.reset(); + async reset() { + await super.reset(); this.processor = null; this.stream = null; this.streamReader = null; diff --git a/src/DataProcessor/PassThroughDataProcessor.js b/src/DataProcessor/PassThroughDataProcessor.js index 84f6127..6ebfb90 100644 --- a/src/DataProcessor/PassThroughDataProcessor.js +++ b/src/DataProcessor/PassThroughDataProcessor.js @@ -1,24 +1,22 @@ import AbstractDataProcessor from './AbstractDataProcessor.js'; export default class PassThroughDataProcessor extends AbstractDataProcessor { - /** @type {boolean} */ eof = false; - /** * @inheritDoc */ async reset() { - super.reset(); + await super.reset(); } /** * @inheritDoc */ async generate(length) { - if(this.eof) { + if(this.chunkReader.isEof()) { return null; } - return await this.getChunkFromReader(length); + return await this.chunkReader.getChunk(length); } }