]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/jaegertracing/thrift/lib/nodejs/lib/thrift/connection.js
import quincy beta 17.1.0
[ceph.git] / ceph / src / jaegertracing / thrift / lib / nodejs / lib / thrift / connection.js
diff --git a/ceph/src/jaegertracing/thrift/lib/nodejs/lib/thrift/connection.js b/ceph/src/jaegertracing/thrift/lib/nodejs/lib/thrift/connection.js
deleted file mode 100644 (file)
index 25e34ed..0000000
+++ /dev/null
@@ -1,396 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-var util = require('util');
-var EventEmitter = require('events').EventEmitter;
-var constants = require('constants');
-var net = require('net');
-var tls = require('tls');
-var thrift = require('./thrift');
-var log = require('./log');
-
-var TBufferedTransport = require('./buffered_transport');
-var TBinaryProtocol = require('./binary_protocol');
-var InputBufferUnderrunError = require('./input_buffer_underrun_error');
-
-var createClient = require('./create_client');
-
-var binary = require('./binary');
-
-var Connection = exports.Connection = function(stream, options) {
-  var self = this;
-  EventEmitter.call(this);
-
-  this.seqId2Service = {};
-  this.connection = stream;
-  this.ssl = (stream.encrypted);
-  this.options = options || {};
-  this.transport = this.options.transport || TBufferedTransport;
-  this.protocol = this.options.protocol || TBinaryProtocol;
-  this.offline_queue = [];
-  this.connected = false;
-  this.initialize_retry_vars();
-
-  this._debug = this.options.debug || false;
-  if (this.options.max_attempts &&
-      !isNaN(this.options.max_attempts) &&
-      this.options.max_attempts > 0) {
-     this.max_attempts = +this.options.max_attempts;
-  }
-  this.retry_max_delay = null;
-  if (this.options.retry_max_delay !== undefined &&
-      !isNaN(this.options.retry_max_delay) &&
-      this.options.retry_max_delay > 0) {
-     this.retry_max_delay = this.options.retry_max_delay;
-  }
-  this.connect_timeout = false;
-  if (this.options.connect_timeout &&
-      !isNaN(this.options.connect_timeout) &&
-      this.options.connect_timeout > 0) {
-     this.connect_timeout = +this.options.connect_timeout;
-  }
-
-  this.connection.addListener(this.ssl ? "secureConnect" : "connect", function() {
-    self.connected = true;
-
-    this.setTimeout(self.options.timeout || 0);
-    this.setNoDelay();
-    this.frameLeft = 0;
-    this.framePos = 0;
-    this.frame = null;
-    self.initialize_retry_vars();
-    self.flush_offline_queue();
-
-    self.emit("connect");
-  });
-
-  this.connection.addListener("error", function(err) {
-    // Only emit the error if no-one else is listening on the connection
-    // or if someone is listening on us, because Node turns unhandled
-    // 'error' events into exceptions.
-    if (self.connection.listeners('error').length === 1 ||
-        self.listeners('error').length > 0) {
-      self.emit("error", err);
-    }
-  });
-
-  // Add a close listener
-  this.connection.addListener("close", function() {
-    self.connection_gone(); // handle close event. try to reconnect
-  });
-
-  this.connection.addListener("timeout", function() {
-    self.emit("timeout");
-  });
-
-  this.connection.addListener("data", self.transport.receiver(function(transport_with_data) {
-    var message = new self.protocol(transport_with_data);
-    try {
-      while (true) {
-        var header = message.readMessageBegin();
-        var dummy_seqid = header.rseqid * -1;
-        var client = self.client;
-        //The Multiplexed Protocol stores a hash of seqid to service names
-        //  in seqId2Service. If the SeqId is found in the hash we need to
-        //  lookup the appropriate client for this call.
-        //  The connection.client object is a single client object when not
-        //  multiplexing, when using multiplexing it is a service name keyed
-        //  hash of client objects.
-        //NOTE: The 2 way interdependencies between protocols, transports,
-        //  connections and clients in the Node.js implementation are irregular
-        //  and make the implementation difficult to extend and maintain. We
-        //  should bring this stuff inline with typical thrift I/O stack
-        //  operation soon.
-        //  --ra
-        var service_name = self.seqId2Service[header.rseqid];
-        if (service_name) {
-          client = self.client[service_name];
-        }
-        /*jshint -W083 */
-        client._reqs[dummy_seqid] = function(err, success){
-          transport_with_data.commitPosition();
-
-          var callback = client._reqs[header.rseqid];
-          delete client._reqs[header.rseqid];
-          if (service_name) {
-            delete self.seqId2Service[header.rseqid];
-          }
-          if (callback) {
-            callback(err, success);
-          }
-        };
-        /*jshint +W083 */
-
-        if(client['recv_' + header.fname]) {
-          client['recv_' + header.fname](message, header.mtype, dummy_seqid);
-        } else {
-          delete client._reqs[dummy_seqid];
-          self.emit("error",
-                    new thrift.TApplicationException(thrift.TApplicationExceptionType.WRONG_METHOD_NAME,
-                             "Received a response to an unknown RPC function"));
-        }
-      }
-    }
-    catch (e) {
-      if (e instanceof InputBufferUnderrunError) {
-        transport_with_data.rollbackPosition();
-      }
-      else {
-        self.emit('error', e);
-      }
-    }
-  }));
-};
-util.inherits(Connection, EventEmitter);
-
-Connection.prototype.end = function() {
-  this.connection.end();
-};
-
-Connection.prototype.destroy = function() {
-  this.connection.destroy();
-};
-
-Connection.prototype.initialize_retry_vars = function () {
-  this.retry_timer = null;
-  this.retry_totaltime = 0;
-  this.retry_delay = 150;
-  this.retry_backoff = 1.7;
-  this.attempts = 0;
-};
-
-Connection.prototype.flush_offline_queue = function () {
-  var self = this;
-  var offline_queue = this.offline_queue;
-
-  // Reset offline queue
-  this.offline_queue = [];
-  // Attempt to write queued items
-  offline_queue.forEach(function(data) {
-    self.write(data);
-  });
-};
-
-Connection.prototype.write = function(data) {
-  if (!this.connected) {
-    this.offline_queue.push(data);
-    return;
-  }
-  this.connection.write(data);
-};
-
-Connection.prototype.connection_gone = function () {
-  var self = this;
-  this.connected = false;
-
-  // If a retry is already in progress, just let that happen
-  if (this.retry_timer) {
-    return;
-  }
-  // We cannot reconnect a secure socket.
-  if (!this.max_attempts || this.ssl) {
-    self.emit("close");
-    return;
-  }
-
-  if (this.retry_max_delay !== null && this.retry_delay >= this.retry_max_delay) {
-    this.retry_delay = this.retry_max_delay;
-  } else {
-    this.retry_delay = Math.floor(this.retry_delay * this.retry_backoff);
-  }
-
-  log.debug("Retry connection in " + this.retry_delay + " ms");
-
-  if (this.max_attempts && this.attempts >= this.max_attempts) {
-    this.retry_timer = null;
-    console.error("thrift: Couldn't get thrift connection after " + this.max_attempts + " attempts.");
-    self.emit("close");
-    return;
-  }
-
-  this.attempts += 1;
-  this.emit("reconnecting", {
-    delay: self.retry_delay,
-    attempt: self.attempts
-  });
-
-  this.retry_timer = setTimeout(function () {
-    if (self.connection.destroyed) {
-      self.retry_timer = null;
-      return;
-    }
-
-    log.debug("Retrying connection...");
-
-    self.retry_totaltime += self.retry_delay;
-
-    if (self.connect_timeout && self.retry_totaltime >= self.connect_timeout) {
-       self.retry_timer = null;
-       console.error("thrift: Couldn't get thrift connection after " + self.retry_totaltime + "ms.");
-       self.emit("close");
-       return;
-    }
-
-    if (self.path !== undefined) {
-      self.connection.connect(self.path);
-    } else {
-      self.connection.connect(self.port, self.host);
-    }
-    self.retry_timer = null;
-  }, this.retry_delay);
-};
-
-exports.createConnection = function(host, port, options) {
-  var stream = net.createConnection( {
-    port: port, 
-    host: host,
-    timeout: options.connect_timeout || options.timeout || 0
-  });
-  var connection = new Connection(stream, options);
-  connection.host = host;
-  connection.port = port;
-
-  return connection;
-};
-
-exports.createUDSConnection = function(path, options) {
-  var stream = net.createConnection(path);
-  var connection = new Connection(stream, options);
-  connection.path = path;
-
-  return connection;
-};
-
-exports.createSSLConnection = function(host, port, options) {
-  if (!('secureProtocol' in options) && !('secureOptions' in options)) {
-    options.secureProtocol = "SSLv23_method";
-    options.secureOptions = constants.SSL_OP_NO_SSLv2 | constants.SSL_OP_NO_SSLv3;
-  }
-
-  var stream = tls.connect(port, host, options);
-  var connection = new Connection(stream, options);
-  connection.host = host;
-  connection.port = port;
-
-  return connection;
-};
-
-
-exports.createClient = createClient;
-
-var child_process = require('child_process');
-var StdIOConnection = exports.StdIOConnection = function(command, options) {
-  var command_parts = command.split(' ');
-  command = command_parts[0];
-  var args = command_parts.splice(1,command_parts.length -1);
-  var child = this.child = child_process.spawn(command,args);
-
-  var self = this;
-  EventEmitter.call(this);
-
-  this.connection = child.stdin;
-  this.options = options || {};
-  this.transport = this.options.transport || TBufferedTransport;
-  this.protocol = this.options.protocol || TBinaryProtocol;
-  this.offline_queue = [];
-
-  if (log.getLogLevel() === 'debug') {
-    this.child.stderr.on('data', function (err) {
-      log.debug(err.toString(), 'CHILD ERROR');
-    });
-
-    this.child.on('exit', function (code,signal) {
-      log.debug(code + ':' + signal, 'CHILD EXITED');
-    });
-  }
-
-  this.frameLeft = 0;
-  this.framePos = 0;
-  this.frame = null;
-  this.connected = true;
-
-  self.flush_offline_queue();
-
-  this.connection.addListener("error", function(err) {
-    self.emit("error", err);
-  });
-
-  // Add a close listener
-  this.connection.addListener("close", function() {
-    self.emit("close");
-  });
-
-  child.stdout.addListener("data", self.transport.receiver(function(transport_with_data) {
-    var message = new self.protocol(transport_with_data);
-    try {
-      var header = message.readMessageBegin();
-      var dummy_seqid = header.rseqid * -1;
-      var client = self.client;
-      client._reqs[dummy_seqid] = function(err, success){
-        transport_with_data.commitPosition();
-
-        var callback = client._reqs[header.rseqid];
-        delete client._reqs[header.rseqid];
-        if (callback) {
-          callback(err, success);
-        }
-      };
-      client['recv_' + header.fname](message, header.mtype, dummy_seqid);
-    }
-    catch (e) {
-      if (e instanceof InputBufferUnderrunError) {
-        transport_with_data.rollbackPosition();
-      }
-      else {
-        throw e;
-      }
-    }
-  }));
-};
-
-util.inherits(StdIOConnection, EventEmitter);
-
-StdIOConnection.prototype.end = function() {
-  this.connection.end();
-};
-
-StdIOConnection.prototype.flush_offline_queue = function () {
-  var self = this;
-  var offline_queue = this.offline_queue;
-
-  // Reset offline queue
-  this.offline_queue = [];
-  // Attempt to write queued items
-  offline_queue.forEach(function(data) {
-    self.write(data);
-  });
-};
-
-StdIOConnection.prototype.write = function(data) {
-  if (!this.connected) {
-    this.offline_queue.push(data);
-    return;
-  }
-  this.connection.write(data);
-};
-
-exports.createStdIOConnection = function(command,options){
-  return new StdIOConnection(command,options);
-};
-
-exports.createStdIOClient = createClient;