-
Notifications
You must be signed in to change notification settings - Fork 4
/
index.js
66 lines (54 loc) · 1.43 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
var entityStream = require('seneca-entity-save-stream')
, actStream = require('seneca-act-stream')
, csvStream = require('binary-csv')
, through = require('through2')
, xtend = require('xtend')
, Joi = require('joi')
function csvToObj(dest, opts) {
var csv = csvStream({
highWaterMark: 16
, json: true
})
, count = 0
opts = xtend({ skip: 0 }, opts)
csv.pipe(through.obj({ highWaterMark: 16 }, function (obj, enc, callback) {
if (count++ < opts.skip) {
return callback()
}
var that = this
if (!opts.schema) {
this.push(obj)
callback()
} else {
Joi.validate(obj, opts.schema, {
convert: true
, stripUnknown: true
}, function(err, obj) {
if (!err) {
that.push(obj)
}
callback()
})
}
})).pipe(dest)
dest.on('finish', function() {
csv.emit('importCompleted')
})
dest.on('one', function() {
csv.rowsImported++
csv.emit('rowImported')
})
dest.on('oneError', function(err) {
csv.emit('rowError', err)
})
csv.rowsImported = 0
return csv
}
module.exports.entity = function importEntity(seneca, entity, opts) {
var dest = entityStream(seneca, { name$: entity })
return csvToObj(dest, opts)
}
module.exports.act = function importAct(seneca, pattern, opts) {
var dest = actStream(seneca, pattern)
return csvToObj(dest, opts)
}