]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | var util = require('util'); | |
20 | var http = require('http'); | |
21 | var https = require('https'); | |
22 | var EventEmitter = require('events').EventEmitter; | |
23 | var thrift = require('./thrift'); | |
24 | ||
25 | var TBufferedTransport = require('./buffered_transport'); | |
26 | var TBinaryProtocol = require('./binary_protocol'); | |
27 | var InputBufferUnderrunError = require('./input_buffer_underrun_error'); | |
28 | ||
29 | var createClient = require('./create_client'); | |
30 | ||
31 | /** | |
32 | * @class | |
33 | * @name ConnectOptions | |
34 | * @property {string} transport - The Thrift layered transport to use (TBufferedTransport, etc). | |
35 | * @property {string} protocol - The Thrift serialization protocol to use (TBinaryProtocol, etc.). | |
36 | * @property {string} path - The URL path to POST to (e.g. "/", "/mySvc", "/thrift/quoteSvc", etc.). | |
37 | * @property {object} headers - A standard Node.js header hash, an object hash containing key/value | |
38 | * pairs where the key is the header name string and the value is the header value string. | |
39 | * @property {boolean} https - True causes the connection to use https, otherwise http is used. | |
40 | * @property {object} nodeOptions - Options passed on to node. | |
41 | * @example | |
42 | * //Use a connection that requires ssl/tls, closes the connection after each request, | |
43 | * // uses the buffered transport layer, uses the JSON protocol and directs RPC traffic | |
44 | * // to https://thrift.example.com:9090/hello | |
45 | * var thrift = require('thrift'); | |
46 | * var options = { | |
47 | * transport: thrift.TBufferedTransport, | |
48 | * protocol: thrift.TJSONProtocol, | |
49 | * path: "/hello", | |
50 | * headers: {"Connection": "close"}, | |
51 | * https: true | |
52 | * }; | |
53 | * var con = thrift.createHttpConnection("thrift.example.com", 9090, options); | |
54 | * var client = thrift.createHttpClient(myService, connection); | |
55 | * client.myServiceFunction(); | |
56 | */ | |
57 | ||
58 | /** | |
59 | * Initializes a Thrift HttpConnection instance (use createHttpConnection() rather than | |
60 | * instantiating directly). | |
61 | * @constructor | |
62 | * @param {ConnectOptions} options - The configuration options to use. | |
63 | * @throws {error} Exceptions other than InputBufferUnderrunError are rethrown | |
64 | * @event {error} The "error" event is fired when a Node.js error event occurs during | |
65 | * request or response processing, in which case the node error is passed on. An "error" | |
66 | * event may also be fired when the connection can not map a response back to the | |
67 | * appropriate client (an internal error), generating a TApplicationException. | |
68 | * @classdesc HttpConnection objects provide Thrift end point transport | |
69 | * semantics implemented over the Node.js http.request() method. | |
70 | * @see {@link createHttpConnection} | |
71 | */ | |
72 | var HttpConnection = exports.HttpConnection = function(options) { | |
73 | //Initialize the emitter base object | |
74 | EventEmitter.call(this); | |
75 | ||
76 | //Set configuration | |
77 | var self = this; | |
78 | this.options = options || {}; | |
79 | this.host = this.options.host; | |
80 | this.port = this.options.port; | |
81 | this.socketPath = this.options.socketPath; | |
82 | this.https = this.options.https || false; | |
83 | this.transport = this.options.transport || TBufferedTransport; | |
84 | this.protocol = this.options.protocol || TBinaryProtocol; | |
85 | ||
86 | //Prepare Node.js options | |
87 | this.nodeOptions = { | |
88 | host: this.host, | |
89 | port: this.port, | |
90 | socketPath: this.socketPath, | |
91 | path: this.options.path || '/', | |
92 | method: 'POST', | |
93 | headers: this.options.headers || {}, | |
94 | responseType: this.options.responseType || null | |
95 | }; | |
96 | for (var attrname in this.options.nodeOptions) { | |
97 | this.nodeOptions[attrname] = this.options.nodeOptions[attrname]; | |
98 | } | |
99 | /*jshint -W069 */ | |
100 | if (! this.nodeOptions.headers['Connection']) { | |
101 | this.nodeOptions.headers['Connection'] = 'keep-alive'; | |
102 | } | |
103 | /*jshint +W069 */ | |
104 | ||
105 | //The sequence map is used to map seqIDs back to the | |
106 | // calling client in multiplexed scenarios | |
107 | this.seqId2Service = {}; | |
108 | ||
109 | function decodeCallback(transport_with_data) { | |
110 | var proto = new self.protocol(transport_with_data); | |
111 | try { | |
112 | while (true) { | |
113 | var header = proto.readMessageBegin(); | |
114 | var dummy_seqid = header.rseqid * -1; | |
115 | var client = self.client; | |
116 | //The Multiplexed Protocol stores a hash of seqid to service names | |
117 | // in seqId2Service. If the SeqId is found in the hash we need to | |
118 | // lookup the appropriate client for this call. | |
119 | // The client var is a single client object when not multiplexing, | |
120 | // when using multiplexing it is a service name keyed hash of client | |
121 | // objects. | |
122 | //NOTE: The 2 way interdependencies between protocols, transports, | |
123 | // connections and clients in the Node.js implementation are irregular | |
124 | // and make the implementation difficult to extend and maintain. We | |
125 | // should bring this stuff inline with typical thrift I/O stack | |
126 | // operation soon. | |
127 | // --ra | |
128 | var service_name = self.seqId2Service[header.rseqid]; | |
129 | if (service_name) { | |
130 | client = self.client[service_name]; | |
131 | delete self.seqId2Service[header.rseqid]; | |
132 | } | |
133 | /*jshint -W083 */ | |
134 | client._reqs[dummy_seqid] = function(err, success){ | |
135 | transport_with_data.commitPosition(); | |
136 | var clientCallback = client._reqs[header.rseqid]; | |
137 | delete client._reqs[header.rseqid]; | |
138 | if (clientCallback) { | |
139 | process.nextTick(function() { | |
140 | clientCallback(err, success); | |
141 | }); | |
142 | } | |
143 | }; | |
144 | /*jshint +W083 */ | |
145 | if(client['recv_' + header.fname]) { | |
146 | client['recv_' + header.fname](proto, header.mtype, dummy_seqid); | |
147 | } else { | |
148 | delete client._reqs[dummy_seqid]; | |
149 | self.emit("error", | |
150 | new thrift.TApplicationException( | |
151 | thrift.TApplicationExceptionType.WRONG_METHOD_NAME, | |
152 | "Received a response to an unknown RPC function")); | |
153 | } | |
154 | } | |
155 | } | |
156 | catch (e) { | |
157 | if (e instanceof InputBufferUnderrunError) { | |
158 | transport_with_data.rollbackPosition(); | |
159 | } else { | |
160 | self.emit('error', e); | |
161 | } | |
162 | } | |
163 | } | |
164 | ||
165 | ||
166 | //Response handler | |
167 | ////////////////////////////////////////////////// | |
168 | this.responseCallback = function(response) { | |
169 | var data = []; | |
170 | var dataLen = 0; | |
171 | ||
172 | if (response.statusCode !== 200) { | |
173 | this.emit("error", new THTTPException(response)); | |
174 | } | |
175 | ||
176 | response.on('error', function (e) { | |
177 | self.emit("error", e); | |
178 | }); | |
179 | ||
180 | // When running directly under node, chunk will be a buffer, | |
181 | // however, when running in a Browser (e.g. Browserify), chunk | |
182 | // will be a string or an ArrayBuffer. | |
183 | response.on('data', function (chunk) { | |
184 | if ((typeof chunk == 'string') || | |
185 | (Object.prototype.toString.call(chunk) == '[object Uint8Array]')) { | |
186 | // Wrap ArrayBuffer/string in a Buffer so data[i].copy will work | |
187 | data.push(new Buffer(chunk)); | |
188 | } else { | |
189 | data.push(chunk); | |
190 | } | |
191 | dataLen += chunk.length; | |
192 | }); | |
193 | ||
194 | response.on('end', function(){ | |
195 | var buf = new Buffer(dataLen); | |
196 | for (var i=0, len=data.length, pos=0; i<len; i++) { | |
197 | data[i].copy(buf, pos); | |
198 | pos += data[i].length; | |
199 | } | |
200 | //Get the receiver function for the transport and | |
201 | // call it with the buffer | |
202 | self.transport.receiver(decodeCallback)(buf); | |
203 | }); | |
204 | }; | |
205 | }; | |
206 | util.inherits(HttpConnection, EventEmitter); | |
207 | ||
208 | /** | |
209 | * Writes Thrift message data to the connection | |
210 | * @param {Buffer} data - A Node.js Buffer containing the data to write | |
211 | * @returns {void} No return value. | |
212 | * @event {error} the "error" event is raised upon request failure passing the | |
213 | * Node.js error object to the listener. | |
214 | */ | |
215 | HttpConnection.prototype.write = function(data) { | |
216 | var self = this; | |
217 | var opts = self.nodeOptions; | |
218 | opts.headers["Content-length"] = data.length; | |
219 | if (!opts.headers["Content-Type"]) | |
220 | opts.headers["Content-Type"] = "application/x-thrift"; | |
221 | var req = (self.https) ? | |
222 | https.request(opts, self.responseCallback) : | |
223 | http.request(opts, self.responseCallback); | |
224 | req.on('error', function(err) { | |
225 | self.emit("error", err); | |
226 | }); | |
227 | req.write(data); | |
228 | req.end(); | |
229 | }; | |
230 | ||
231 | /** | |
232 | * Creates a new HttpConnection object, used by Thrift clients to connect | |
233 | * to Thrift HTTP based servers. | |
234 | * @param {string} host - The host name or IP to connect to. | |
235 | * @param {number} port - The TCP port to connect to. | |
236 | * @param {ConnectOptions} options - The configuration options to use. | |
237 | * @returns {HttpConnection} The connection object. | |
238 | * @see {@link ConnectOptions} | |
239 | */ | |
240 | exports.createHttpConnection = function(host, port, options) { | |
241 | options.host = host; | |
242 | options.port = port || 80; | |
243 | return new HttpConnection(options); | |
244 | }; | |
245 | ||
246 | exports.createHttpUDSConnection = function(path, options) { | |
247 | options.socketPath = path; | |
248 | return new HttpConnection(options); | |
249 | }; | |
250 | ||
251 | exports.createHttpClient = createClient | |
252 | ||
253 | ||
254 | function THTTPException(response) { | |
255 | thrift.TApplicationException.call(this); | |
256 | Error.captureStackTrace(this, this.constructor); | |
257 | this.name = this.constructor.name; | |
258 | this.statusCode = response.statusCode; | |
259 | this.response = response; | |
260 | this.type = thrift.TApplicationExceptionType.PROTOCOL_ERROR; | |
261 | this.message = "Received a response with a bad HTTP status code: " + response.statusCode; | |
262 | } | |
263 | util.inherits(THTTPException, thrift.TApplicationException); |