-
-
Notifications
You must be signed in to change notification settings - Fork 77
/
stream.js
72 lines (49 loc) · 1.85 KB
/
stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
'use strict';
var ProbeError = require('./lib/common').ProbeError;
var parsers = require('./lib/parsers_stream');
var PassThrough = require('stream').PassThrough;
var pipeline = require('stream').pipeline;
module.exports = function probeStream(src, keepOpen) {
var proxy = new PassThrough();
// increase max number of listeners to stop memory leak warning
proxy.setMaxListeners(Object.keys(parsers).length + 10);
var result = new Promise(function (resolve, reject) {
src.on('error', reject);
proxy.on('error', reject);
var alive_parsers = [];
var last_error;
function parserEnd(err) {
var idx = alive_parsers.indexOf[this];
/* istanbul ignore if */
if (idx < 0) return;
/* istanbul ignore if */
if (err) last_error = err;
proxy.unpipe(this);
this.removeAllListeners();
alive_parsers.splice(idx, 1);
if (alive_parsers.length) return;
// if all parsers finished without success -> fail.
reject(last_error || new ProbeError('unrecognized file format', 'ECONTENT'));
}
Object.keys(parsers).forEach(function (type) {
var pStream = parsers[type]();
alive_parsers.push(pStream);
pStream.once('data', resolve);
pStream.once('end', parserEnd);
// User does not need to know that something wrong in parser
// Process error the same was unrecognized format (end without data)
pStream.on('error', parserEnd);
proxy.pipe(pStream);
});
});
function cleanup() {
// request stream doesn't have unpipe, https://github.com/request/request/issues/874
if (keepOpen && typeof src.unpipe === 'function') src.unpipe(proxy);
proxy.destroy();
}
result.then(cleanup).catch(cleanup);
if (keepOpen) src.pipe(proxy);
else pipeline(src, proxy, function () {});
return result;
};
module.exports.parsers = parsers;