]>
Commit | Line | Data |
---|---|---|
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | var http = require('http'); | |
20 | var https = require('https'); | |
21 | var url = require("url"); | |
22 | var path = require("path"); | |
23 | var fs = require("fs"); | |
24 | var crypto = require("crypto"); | |
25 | var log = require('./log'); | |
26 | ||
27 | var MultiplexedProcessor = require('./multiplexed_processor').MultiplexedProcessor; | |
28 | ||
29 | var TBufferedTransport = require('./buffered_transport'); | |
30 | var TBinaryProtocol = require('./binary_protocol'); | |
31 | var InputBufferUnderrunError = require('./input_buffer_underrun_error'); | |
32 | ||
33 | // WSFrame constructor and prototype | |
34 | ///////////////////////////////////////////////////////////////////// | |
35 | ||
36 | /** Apache Thrift RPC Web Socket Transport | |
37 | * Frame layout conforming to RFC 6455 circa 12/2011 | |
38 | * | |
39 | * Theoretical frame size limit is 4GB*4GB, however the Node Buffer | |
40 | * limit is 1GB as of v0.10. The frame length encoding is also | |
41 | * configured for a max of 4GB presently and needs to be adjusted | |
42 | * if Node/Browsers become capabile of > 4GB frames. | |
43 | * | |
44 | * - FIN is 1 if the message is complete | |
45 | * - RSV1/2/3 are always 0 | |
46 | * - Opcode is 1(TEXT) for TJSONProtocol and 2(BIN) for TBinaryProtocol | |
47 | * - Mask Present bit is 1 sending to-server and 0 sending to-client | |
48 | * - Payload Len: | |
49 | * + If < 126: then represented directly | |
50 | * + If >=126: but within range of an unsigned 16 bit integer | |
51 | * then Payload Len is 126 and the two following bytes store | |
52 | * the length | |
53 | * + Else: Payload Len is 127 and the following 8 bytes store the | |
54 | * length as an unsigned 64 bit integer | |
55 | * - Masking key is a 32 bit key only present when sending to the server | |
56 | * - Payload follows the masking key or length | |
57 | * | |
58 | * 0 1 2 3 | |
59 | * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 | |
60 | * +-+-+-+-+-------+-+-------------+-------------------------------+ | |
61 | * |F|R|R|R| opcode|M| Payload len | Extended payload length | | |
62 | * |I|S|S|S| (4) |A| (7) | (16/64) | | |
63 | * |N|V|V|V| |S| | (if payload len==126/127) | | |
64 | * | |1|2|3| |K| | | | |
65 | * +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + | |
66 | * | Extended payload length continued, if payload len == 127 | | |
67 | * + - - - - - - - - - - - - - - - +-------------------------------+ | |
68 | * | |Masking-key, if MASK set to 1 | | |
69 | * +-------------------------------+-------------------------------+ | |
70 | * | Masking-key (continued) | Payload Data | | |
71 | * +-------------------------------- - - - - - - - - - - - - - - - + | |
72 | * : Payload Data continued ... : | |
73 | * + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | |
74 | * | Payload Data continued ... | | |
75 | * +---------------------------------------------------------------+ | |
76 | */ | |
77 | var wsFrame = { | |
78 | /** Encodes a WebSocket frame | |
79 | * | |
80 | * @param {Buffer} data - The raw data to encode | |
81 | * @param {Buffer} mask - The mask to apply when sending to server, null for no mask | |
82 | * @param {Boolean} binEncoding - True for binary encoding, false for text encoding | |
83 | * @returns {Buffer} - The WebSocket frame, ready to send | |
84 | */ | |
85 | encode: function(data, mask, binEncoding) { | |
86 | var frame = new Buffer(wsFrame.frameSizeFromData(data, mask)); | |
87 | //Byte 0 - FIN & OPCODE | |
88 | frame[0] = wsFrame.fin.FIN + | |
89 | (binEncoding ? wsFrame.frameOpCodes.BIN : wsFrame.frameOpCodes.TEXT); | |
90 | //Byte 1 or 1-3 or 1-9 - MASK FLAG & SIZE | |
91 | var payloadOffset = 2; | |
92 | if (data.length < 0x7E) { | |
93 | frame[1] = data.length + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT); | |
94 | } else if (data.length < 0xFFFF) { | |
95 | frame[1] = 0x7E + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT); | |
96 | frame.writeUInt16BE(data.length, 2, true); | |
97 | payloadOffset = 4; | |
98 | } else { | |
99 | frame[1] = 0x7F + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT); | |
100 | frame.writeUInt32BE(0, 2, true); | |
101 | frame.writeUInt32BE(data.length, 6, true); | |
102 | payloadOffset = 10; | |
103 | } | |
104 | //MASK | |
105 | if (mask) { | |
106 | mask.copy(frame, payloadOffset, 0, 4); | |
107 | payloadOffset += 4; | |
108 | } | |
109 | //Payload | |
110 | data.copy(frame, payloadOffset); | |
111 | if (mask) { | |
112 | wsFrame.applyMask(frame.slice(payloadOffset), frame.slice(payloadOffset-4,payloadOffset)); | |
113 | } | |
114 | return frame; | |
115 | }, | |
116 | ||
117 | /** | |
118 | * @class | |
119 | * @name WSDecodeResult | |
120 | * @property {Buffer} data - The decoded data for the first ATRPC message | |
121 | * @property {Buffer} mask - The frame mask | |
122 | * @property {Boolean} binEncoding - True if binary (TBinaryProtocol), | |
123 | * False if text (TJSONProtocol) | |
124 | * @property {Buffer} nextFrame - Multiple ATRPC messages may be sent in a | |
125 | * single WebSocket frame, this Buffer contains | |
126 | * any bytes remaining to be decoded | |
127 | * @property {Boolean} FIN - True is the message is complete | |
128 | */ | |
129 | ||
130 | /** Decodes a WebSocket frame | |
131 | * | |
132 | * @param {Buffer} frame - The raw inbound frame, if this is a continuation | |
133 | * frame it must have a mask property with the mask. | |
134 | * @returns {WSDecodeResult} - The decoded payload | |
135 | * | |
136 | * @see {@link WSDecodeResult} | |
137 | */ | |
138 | decode: function(frame) { | |
139 | var result = { | |
140 | data: null, | |
141 | mask: null, | |
142 | binEncoding: false, | |
143 | nextFrame: null, | |
144 | FIN: true | |
145 | }; | |
146 | ||
147 | //Byte 0 - FIN & OPCODE | |
148 | if (wsFrame.fin.FIN != (frame[0] & wsFrame.fin.FIN)) { | |
149 | result.FIN = false; | |
150 | } | |
151 | result.binEncoding = (wsFrame.frameOpCodes.BIN == (frame[0] & wsFrame.frameOpCodes.BIN)); | |
152 | //Byte 1 or 1-3 or 1-9 - SIZE | |
153 | var lenByte = (frame[1] & 0x0000007F); | |
154 | var len = lenByte; | |
155 | var dataOffset = 2; | |
156 | if (lenByte == 0x7E) { | |
157 | len = frame.readUInt16BE(2); | |
158 | dataOffset = 4; | |
159 | } else if (lenByte == 0x7F) { | |
160 | len = frame.readUInt32BE(6); | |
161 | dataOffset = 10; | |
162 | } | |
163 | //MASK | |
164 | if (wsFrame.mask.TO_SERVER == (frame[1] & wsFrame.mask.TO_SERVER)) { | |
165 | result.mask = new Buffer(4); | |
166 | frame.copy(result.mask, 0, dataOffset, dataOffset + 4); | |
167 | dataOffset += 4; | |
168 | } | |
169 | //Payload | |
170 | result.data = new Buffer(len); | |
171 | frame.copy(result.data, 0, dataOffset, dataOffset+len); | |
172 | if (result.mask) { | |
173 | wsFrame.applyMask(result.data, result.mask); | |
174 | } | |
175 | //Next Frame | |
176 | if (frame.length > dataOffset+len) { | |
177 | result.nextFrame = new Buffer(frame.length - (dataOffset+len)); | |
178 | frame.copy(result.nextFrame, 0, dataOffset+len, frame.length); | |
179 | } | |
180 | //Don't forward control frames | |
181 | if (frame[0] & wsFrame.frameOpCodes.FINCTRL) { | |
182 | result.data = null; | |
183 | } | |
184 | ||
185 | return result; | |
186 | }, | |
187 | ||
188 | /** Masks/Unmasks data | |
189 | * | |
190 | * @param {Buffer} data - data to mask/unmask in place | |
191 | * @param {Buffer} mask - the mask | |
192 | */ | |
193 | applyMask: function(data, mask){ | |
194 | //TODO: look into xoring words at a time | |
195 | var dataLen = data.length; | |
196 | var maskLen = mask.length; | |
197 | for (var i = 0; i < dataLen; i++) { | |
198 | data[i] = data[i] ^ mask[i%maskLen]; | |
199 | } | |
200 | }, | |
201 | ||
202 | /** Computes frame size on the wire from data to be sent | |
203 | * | |
204 | * @param {Buffer} data - data.length is the assumed payload size | |
205 | * @param {Boolean} mask - true if a mask will be sent (TO_SERVER) | |
206 | */ | |
207 | frameSizeFromData: function(data, mask) { | |
208 | var headerSize = 10; | |
209 | if (data.length < 0x7E) { | |
210 | headerSize = 2; | |
211 | } else if (data.length < 0xFFFF) { | |
212 | headerSize = 4; | |
213 | } | |
214 | return headerSize + data.length + (mask ? 4 : 0); | |
215 | }, | |
216 | ||
217 | frameOpCodes: { | |
218 | CONT: 0x00, | |
219 | TEXT: 0x01, | |
220 | BIN: 0x02, | |
221 | CTRL: 0x80 | |
222 | }, | |
223 | ||
224 | mask: { | |
225 | TO_SERVER: 0x80, | |
226 | TO_CLIENT: 0x00 | |
227 | }, | |
228 | ||
229 | fin: { | |
230 | CONT: 0x00, | |
231 | FIN: 0x80 | |
232 | } | |
233 | }; | |
234 | ||
235 | ||
236 | // createWebServer constructor and options | |
237 | ///////////////////////////////////////////////////////////////////// | |
238 | ||
239 | /** | |
240 | * @class | |
241 | * @name ServerOptions | |
242 | * @property {array} cors - Array of CORS origin strings to permit requests from. | |
243 | * @property {string} files - Path to serve static files from, if absent or "" | |
244 | * static file service is disabled. | |
245 | * @property {object} headers - An object hash mapping header strings to header value | |
246 | * strings, these headers are transmitted in response to | |
247 | * static file GET operations. | |
248 | * @property {object} services - An object hash mapping service URI strings | |
249 | * to ServiceOptions objects | |
250 | * @property {object} tls - Node.js TLS options (see: nodejs.org/api/tls.html), | |
251 | * if not present or null regular http is used, | |
252 | * at least a key and a cert must be defined to use SSL/TLS | |
253 | * @see {@link ServiceOptions} | |
254 | */ | |
255 | ||
256 | /** | |
257 | * @class | |
258 | * @name ServiceOptions | |
259 | * @property {object} transport - The layered transport to use (defaults | |
260 | * to TBufferedTransport). | |
261 | * @property {object} protocol - The serialization Protocol to use (defaults to | |
262 | * TBinaryProtocol). | |
263 | * @property {object} processor - The Thrift Service class/processor generated | |
264 | * by the IDL Compiler for the service (the "cls" | |
265 | * key can also be used for this attribute). | |
266 | * @property {object} handler - The handler methods for the Thrift Service. | |
267 | */ | |
268 | ||
269 | /** | |
270 | * Create a Thrift server which can serve static files and/or one or | |
271 | * more Thrift Services. | |
272 | * @param {ServerOptions} options - The server configuration. | |
273 | * @returns {object} - The Apache Thrift Web Server. | |
274 | */ | |
275 | exports.createWebServer = function(options) { | |
276 | var baseDir = options.files; | |
277 | var contentTypesByExtension = { | |
278 | '.txt': 'text/plain', | |
279 | '.html': 'text/html', | |
280 | '.css': 'text/css', | |
281 | '.xml': 'application/xml', | |
282 | '.json': 'application/json', | |
283 | '.js': 'application/javascript', | |
284 | '.jpg': 'image/jpeg', | |
285 | '.jpeg': 'image/jpeg', | |
286 | '.gif': 'image/gif', | |
287 | '.png': 'image/png', | |
288 | '.svg': 'image/svg+xml' | |
289 | }; | |
290 | ||
291 | //Setup all of the services | |
292 | var services = options.services; | |
293 | for (var uri in services) { | |
294 | var svcObj = services[uri]; | |
295 | ||
296 | //Setup the processor | |
297 | if (svcObj.processor instanceof MultiplexedProcessor) { | |
298 | //Multiplex processors have pre embedded processor/handler pairs, save as is | |
299 | svcObj.processor = svcObj.processor; | |
300 | } else { | |
301 | //For historical reasons Node.js supports processors passed in directly or via the | |
302 | // IDL Compiler generated class housing the processor. Also, the options property | |
303 | // for a Processor has been called both cls and processor at different times. We | |
304 | // support any of the four possibilities here. | |
305 | var processor = (svcObj.processor) ? (svcObj.processor.Processor || svcObj.processor) : | |
306 | (svcObj.cls.Processor || svcObj.cls); | |
307 | //Processors can be supplied as constructed objects with handlers already embedded, | |
308 | // if a handler is provided we construct a new processor, if not we use the processor | |
309 | // object directly | |
310 | if (svcObj.handler) { | |
311 | svcObj.processor = new processor(svcObj.handler); | |
312 | } else { | |
313 | svcObj.processor = processor; | |
314 | } | |
315 | } | |
316 | svcObj.transport = svcObj.transport ? svcObj.transport : TBufferedTransport; | |
317 | svcObj.protocol = svcObj.protocol ? svcObj.protocol : TBinaryProtocol; | |
318 | } | |
319 | ||
320 | //Verify CORS requirements | |
321 | function VerifyCORSAndSetHeaders(request, response) { | |
322 | if (request.headers.origin && options.cors) { | |
323 | if (options.cors["*"] || options.cors[request.headers.origin]) { | |
324 | //Allow, origin allowed | |
325 | response.setHeader("access-control-allow-origin", request.headers.origin); | |
326 | response.setHeader("access-control-allow-methods", "GET, POST, OPTIONS"); | |
327 | response.setHeader("access-control-allow-headers", "content-type, accept"); | |
328 | response.setHeader("access-control-max-age", "60"); | |
329 | return true; | |
330 | } else { | |
331 | //Disallow, origin denied | |
332 | return false; | |
333 | } | |
334 | } | |
335 | //Allow, CORS is not in use | |
336 | return true; | |
337 | } | |
338 | ||
339 | ||
340 | //Handle OPTIONS method (CORS) | |
341 | /////////////////////////////////////////////////// | |
342 | function processOptions(request, response) { | |
343 | if (VerifyCORSAndSetHeaders(request, response)) { | |
344 | response.writeHead("204", "No Content", {"content-length": 0}); | |
345 | } else { | |
346 | response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {}); | |
347 | } | |
348 | response.end(); | |
349 | } | |
350 | ||
351 | ||
352 | //Handle POST methods (TXHRTransport) | |
353 | /////////////////////////////////////////////////// | |
354 | function processPost(request, response) { | |
355 | //Lookup service | |
356 | var uri = url.parse(request.url).pathname; | |
357 | var svc = services[uri]; | |
358 | if (!svc) { | |
359 | response.writeHead("403", "No Apache Thrift Service at " + uri, {}); | |
360 | response.end(); | |
361 | return; | |
362 | } | |
363 | ||
364 | //Verify CORS requirements | |
365 | if (!VerifyCORSAndSetHeaders(request, response)) { | |
366 | response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {}); | |
367 | response.end(); | |
368 | return; | |
369 | } | |
370 | ||
371 | //Process XHR payload | |
372 | request.on('data', svc.transport.receiver(function(transportWithData) { | |
373 | var input = new svc.protocol(transportWithData); | |
374 | var output = new svc.protocol(new svc.transport(undefined, function(buf) { | |
375 | try { | |
376 | response.writeHead(200); | |
377 | response.end(buf); | |
378 | } catch (err) { | |
379 | response.writeHead(500); | |
380 | response.end(); | |
381 | } | |
382 | })); | |
383 | ||
384 | try { | |
385 | svc.processor.process(input, output); | |
386 | transportWithData.commitPosition(); | |
387 | } catch (err) { | |
388 | if (err instanceof InputBufferUnderrunError) { | |
389 | transportWithData.rollbackPosition(); | |
390 | } else { | |
391 | response.writeHead(500); | |
392 | response.end(); | |
393 | } | |
394 | } | |
395 | })); | |
396 | } | |
397 | ||
398 | ||
399 | //Handle GET methods (Static Page Server) | |
400 | /////////////////////////////////////////////////// | |
401 | function processGet(request, response) { | |
402 | //Undefined or empty base directory means do not serve static files | |
403 | if (!baseDir || "" === baseDir) { | |
404 | response.writeHead(404); | |
405 | response.end(); | |
406 | return; | |
407 | } | |
408 | ||
409 | //Verify CORS requirements | |
410 | if (!VerifyCORSAndSetHeaders(request, response)) { | |
411 | response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {}); | |
412 | response.end(); | |
413 | return; | |
414 | } | |
415 | ||
416 | //Locate the file requested and send it | |
417 | var uri = url.parse(request.url).pathname; | |
418 | var filename = path.resolve(path.join(baseDir, uri)); | |
419 | ||
420 | //Ensure the basedir path is not able to be escaped | |
421 | if (filename.indexOf(baseDir) != 0) { | |
422 | response.writeHead(400, "Invalid request path", {}); | |
423 | response.end(); | |
424 | return; | |
425 | } | |
426 | ||
427 | fs.exists(filename, function(exists) { | |
428 | if(!exists) { | |
429 | response.writeHead(404); | |
430 | response.end(); | |
431 | return; | |
432 | } | |
433 | ||
434 | if (fs.statSync(filename).isDirectory()) { | |
435 | filename += '/index.html'; | |
436 | } | |
437 | ||
438 | fs.readFile(filename, "binary", function(err, file) { | |
439 | if (err) { | |
440 | response.writeHead(500); | |
441 | response.end(err + "\n"); | |
442 | return; | |
443 | } | |
444 | var headers = {}; | |
445 | var contentType = contentTypesByExtension[path.extname(filename)]; | |
446 | if (contentType) { | |
447 | headers["Content-Type"] = contentType; | |
448 | } | |
449 | for (var k in options.headers) { | |
450 | headers[k] = options.headers[k]; | |
451 | } | |
452 | response.writeHead(200, headers); | |
453 | response.write(file, "binary"); | |
454 | response.end(); | |
455 | }); | |
456 | }); | |
457 | } | |
458 | ||
459 | ||
460 | //Handle WebSocket calls (TWebSocketTransport) | |
461 | /////////////////////////////////////////////////// | |
462 | function processWS(data, socket, svc, binEncoding) { | |
463 | svc.transport.receiver(function(transportWithData) { | |
464 | var input = new svc.protocol(transportWithData); | |
465 | var output = new svc.protocol(new svc.transport(undefined, function(buf) { | |
466 | try { | |
467 | var frame = wsFrame.encode(buf, null, binEncoding); | |
468 | socket.write(frame); | |
469 | } catch (err) { | |
470 | //TODO: Add better error processing | |
471 | } | |
472 | })); | |
473 | ||
474 | try { | |
475 | svc.processor.process(input, output); | |
476 | transportWithData.commitPosition(); | |
477 | } | |
478 | catch (err) { | |
479 | if (err instanceof InputBufferUnderrunError) { | |
480 | transportWithData.rollbackPosition(); | |
481 | } | |
482 | else { | |
483 | //TODO: Add better error processing | |
484 | } | |
485 | } | |
486 | })(data); | |
487 | } | |
488 | ||
489 | //Create the server (HTTP or HTTPS) | |
490 | var server = null; | |
491 | if (options.tls) { | |
492 | server = https.createServer(options.tls); | |
493 | } else { | |
494 | server = http.createServer(); | |
495 | } | |
496 | ||
497 | //Wire up listeners for upgrade(to WebSocket) & request methods for: | |
498 | // - GET static files, | |
499 | // - POST XHR Thrift services | |
500 | // - OPTIONS CORS requests | |
501 | server.on('request', function(request, response) { | |
502 | if (request.method === 'POST') { | |
503 | processPost(request, response); | |
504 | } else if (request.method === 'GET') { | |
505 | processGet(request, response); | |
506 | } else if (request.method === 'OPTIONS') { | |
507 | processOptions(request, response); | |
508 | } else { | |
509 | response.writeHead(500); | |
510 | response.end(); | |
511 | } | |
512 | }).on('upgrade', function(request, socket, head) { | |
513 | //Lookup service | |
514 | var svc; | |
515 | try { | |
516 | svc = services[Object.keys(services)[0]]; | |
517 | } catch(e) { | |
518 | socket.write("HTTP/1.1 403 No Apache Thrift Service available\r\n\r\n"); | |
519 | return; | |
520 | } | |
521 | //Perform upgrade | |
522 | var hash = crypto.createHash("sha1"); | |
523 | hash.update(request.headers['sec-websocket-key'] + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); | |
524 | socket.write("HTTP/1.1 101 Switching Protocols\r\n" + | |
525 | "Upgrade: websocket\r\n" + | |
526 | "Connection: Upgrade\r\n" + | |
527 | "Sec-WebSocket-Accept: " + hash.digest("base64") + "\r\n" + | |
528 | "Sec-WebSocket-Origin: " + request.headers.origin + "\r\n" + | |
529 | "Sec-WebSocket-Location: ws://" + request.headers.host + request.url + "\r\n" + | |
530 | "\r\n"); | |
531 | //Handle WebSocket traffic | |
532 | var data = null; | |
533 | socket.on('data', function(frame) { | |
534 | try { | |
535 | while (frame) { | |
536 | var result = wsFrame.decode(frame); | |
537 | //Prepend any existing decoded data | |
538 | if (data) { | |
539 | if (result.data) { | |
540 | var newData = new Buffer(data.length + result.data.length); | |
541 | data.copy(newData); | |
542 | result.data.copy(newData, data.length); | |
543 | result.data = newData; | |
544 | } else { | |
545 | result.data = data; | |
546 | } | |
547 | data = null; | |
548 | } | |
549 | //If this completes a message process it | |
550 | if (result.FIN) { | |
551 | processWS(result.data, socket, svc, result.binEncoding); | |
552 | } else { | |
553 | data = result.data; | |
554 | } | |
555 | //Prepare next frame for decoding (if any) | |
556 | frame = result.nextFrame; | |
557 | } | |
558 | } catch(e) { | |
559 | log.error('TWebSocketTransport Exception: ' + e); | |
560 | socket.destroy(); | |
561 | } | |
562 | }); | |
563 | }); | |
564 | ||
565 | //Return the server | |
566 | return server; | |
567 | }; |