-
Notifications
You must be signed in to change notification settings - Fork 3
/
index.js
135 lines (116 loc) · 4.58 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
'use strict';
const Assert = require('assert');
const Querystring = require('querystring');
const UrlUtils = require('url');
const Http = require('http');
const Wreck = require('wreck');
const _ = require('lodash');
const httpfy = require('trooba-http-api');
module.exports = function transport(pipe, config) {
if (config && config.context &&
pipe.store && !pipe.store.contextProvider) {
pipe.store.contextProvider = typeof config.context === 'string' ?
require(config.context) :
config.context;
}
addServerAPI(pipe, config);
addClientAPI(pipe, config);
};
function addClientAPI(pipe, config) {
if (pipe.context.$server) {
return;
}
pipe.on('request', function onRequest(request) {
var options = _.merge(request || {}, config);
var genericTimeout = options.timeout;
if (options.connectTimeout) {
options.timeout = options.connectTimeout;
}
if (options.body) {
options.payload = options.body;
}
if (options.path) {
options.pathname = options.path;
}
var url = UrlUtils.format(options);
if (pipe.store && pipe.store.contextProvider) {
// serialize from context into http request
options.headers = options.headers || {};
pipe.store.contextProvider.serialize(pipe.context, options);
}
Wreck.request(options.method, url, options, function onResponse(err, response) {
/* handle err if it exists, in which case res will be undefined */
if (err) {
pipe.throw(err);
return;
}
// buffer the response stream
options.timeout = genericTimeout;
if (options.socketTimeout) {
options.timeout = options.socketTimeout;
}
Wreck.read(response, options, function onResponseRead(err, body) {
if (err) {
pipe.throw(err);
return;
}
response.body = body;
// de-serialize context
if (pipe.store && pipe.store.contextProvider) {
pipe.store.contextProvider.deserialize(response, pipe.context);
}
pipe.respond(response);
});
});
});
httpfy(pipe);
}
function addServerAPI(pipe, config) {
const contextProvider = pipe.store && pipe.store.contextProvider;
pipe.set('server:default', function serverFactory(pipe) {
return {
listen(callback) {
Assert.ok(config.port !== undefined, 'Port must be provided as part of transport config');
const server = Http.createServer((req, res) => {
const urlParts = UrlUtils.parse(req.url);
const context = {
path: urlParts.path,
operation: req.method,
// make sure it will not be propagated beyond current pipe by prefixing with '$'
$rawRequest: req,
$server: true
};
const serverContext = {
request: req,
response: res
};
// de-serialize context from request headers/cookies into context
if (contextProvider) {
contextProvider.deserialize(serverContext, context);
}
const request = Object.assign({
// so far no body in request
// body can be read later by body parser
// from the raw request
}, Querystring.parse(urlParts.query));
// then use the context to initiate the request
pipe.create(context)
.request(request, (err, response) => {
// serialize context into request headers/cookies
if (contextProvider) {
contextProvider.serialize(context, serverContext);
}
if (err) {
res.writeHead(500);
res.end(err.message);
return;
}
res.writeHead(response.status);
res.end(response.body);
});
});
return server.listen(config.port, callback);
}
};
});
}