-
Notifications
You must be signed in to change notification settings - Fork 2
/
bucket-stream.coffee
43 lines (41 loc) · 1.07 KB
/
bucket-stream.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{Stream} = require 'stream'
# this just aggregates all data
# and then passes the whole bit along
# on 'end'
# This obviously would be silly for a huge number
# of events, and breaks the purpose of Streaming
# but is handy for demo to simplify piping
module.exports = (buffer = []) ->
stream = new Stream()
stream.writable = true
stream.readable = true
paused = false
ended = false
stream.write = (event) ->
#apply to Aggregate here
buffer.push event
return true
_read = ->
stream.emit 'data', buffer
buffer = []
stream.on 'end', ->
process.nextTick ->
stream.readable = false
stream.end = ->
return if ended
stream.writable = false
ended = true
_read()
buffer = []
stream.emit 'end'
stream.emit 'close'
stream.destroy = ->
ended = true
stream.emit 'end'
stream.emit 'close'
stream.pause = ->
paused = true
stream.resume = ->
paused = false
stream.end() if stream.readable
stream