-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
111 lines (100 loc) · 3.5 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/*jslint esversion:6 */
'use strict';
const requestResponse = require('trooba-request-response');
const plugin = require('trooba-plugin');
const TroobaDuplexStream = require('trooba-streaming').TroobaDuplexStream;
const RUNTIME = 'async';
function decorate(pipe) {
// asyncawait extends request-response protocol
requestResponse.decorate(pipe);
// let some handlers use koa via koa annotation
pipe.runtimes[RUNTIME] = function asyncawaitRuntime(fn) {
var pipe = this;
pipe.on('request', function (request, next) {
var context = pipe.context;
var origResponse = context.response;
context.request = request;
context.throw = function (status, message, props) {
var err = new Error(message);
err.status = status;
if (props) {
Object.assign(err, props);
}
pipe.throw(err);
};
// bidirectional stream
context.stream = new TroobaDuplexStream(pipe);
context.reader =
var callback = function () {
// callback will represent next call
// that must serve dual function
// continue original flow
// or initiate request retry if needed
pipe.removeListener('error');
pipe.removeListener('response');
return new Promise(function (resolve, reject) {
pipe.once('response', function (response, next) {
context.response = response || context.response;
resolve();
});
pipe.once('error', reject);
context.response = undefined;
next();
next = () => {
pipe.retry(context.request);
};
});
};
var ret;
try {
ret = fn(context, callback);
}
catch (err) {
// if this happens, it would be sync function flow
return pipe.throw(err);
}
if (ret instanceof Promise) {
ret
.then(function () {
pipe.continue(context.response);
})
.catch(function (err) {
pipe.throw(err);
});
}
else {
if (!origResponse) {
// first time response
pipe.respond(context.response);
}
}
});
};
pipe.decorate('request', (original) => {
return function (requestOptions) {
var requestCtx = original.call(this, requestOptions);
return {
stream: new TroobaDuplexStream(requestCtx)
};
};
}, true); // override default one
// create a way to access response promise
pipe.decorate('response', () => {
return function () {
return new Promise((resolve, reject) => {
this.once('response', response => {
if (typeof response === 'string') {
return resolve(response);
}
response.stream = new TroobaDuplexStream(requestCtx)
});
resolve();
});
};
});
}
module.exports = plugin({
decorate: decorate
}, {
troobaVersion: '^3'
});