+class ConnectionPool(object):
+ class Connection(object):
+ def __init__(self, mgr, fs_name):
+ self.fs = None
+ self.mgr = mgr
+ self.fs_name = fs_name
+ self.ops_in_progress = 0
+ self.last_used = time.time()
+ self.fs_id = self.get_fs_id()
+
+ def get_fs_id(self):
+ fs_map = self.mgr.get('fs_map')
+ for fs in fs_map['filesystems']:
+ if fs['mdsmap']['fs_name'] == self.fs_name:
+ return fs['id']
+ raise VolumeException(
+ -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
+
+ def get_fs_handle(self):
+ self.last_used = time.time()
+ self.ops_in_progress += 1
+ return self.fs
+
+ def put_fs_handle(self):
+ assert self.ops_in_progress > 0
+ self.ops_in_progress -= 1
+
+ def del_fs_handle(self):
+ if self.is_connection_valid():
+ self.disconnect()
+ else:
+ self.abort()
+
+ def is_connection_valid(self):
+ fs_id = None
+ try:
+ fs_id = self.get_fs_id()
+ except:
+ # the filesystem does not exist now -- connection is not valid.
+ pass
+ return self.fs_id == fs_id
+
+ def is_connection_idle(self, timeout):
+ return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
+
+ def connect(self):
+ assert self.ops_in_progress == 0
+ log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
+ self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
+ log.debug("Setting user ID and group ID of CephFS mount as root...")
+ self.fs.conf_set("client_mount_uid", "0")
+ self.fs.conf_set("client_mount_gid", "0")
+ log.debug("CephFS initializing...")
+ self.fs.init()
+ log.debug("CephFS mounting...")
+ self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
+ log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
+
+ def disconnect(self):
+ assert self.ops_in_progress == 0
+ log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
+ self.fs.shutdown()
+ self.fs = None
+
+ def abort(self):
+ assert self.ops_in_progress == 0
+ log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
+ self.fs.abort_conn()
+ self.fs = None
+
+ class RTimer(Timer):
+ """
+ recurring timer variant of Timer
+ """
+ def run(self):
+ while not self.finished.is_set():
+ self.finished.wait(self.interval)
+ self.function(*self.args, **self.kwargs)
+ self.finished.set()
+
+ # TODO: make this configurable
+ TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
+ CONNECTION_IDLE_INTERVAL = 60.0 # seconds
+
+ def __init__(self, mgr):
+ self.mgr = mgr
+ self.connections = {}
+ self.lock = Lock()
+ self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
+ self.cleanup_connections)
+ self.timer_task.start()
+
+ def cleanup_connections(self):
+ with self.lock:
+ log.info("scanning for idle connections..")
+ idle_fs = [fs_name for fs_name,conn in self.connections.iteritems()
+ if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
+ for fs_name in idle_fs:
+ log.info("cleaning up connection for '{}'".format(fs_name))
+ self._del_fs_handle(fs_name)
+
+ def get_fs_handle(self, fs_name):
+ with self.lock:
+ conn = None
+ try:
+ conn = self.connections.get(fs_name, None)
+ if conn:
+ if conn.is_connection_valid():
+ return conn.get_fs_handle()
+ else:
+ # filesystem id changed beneath us (or the filesystem does not exist).
+ # this is possible if the filesystem got removed (and recreated with
+ # same name) via "ceph fs rm/new" mon command.
+ log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
+ self._del_fs_handle(fs_name)
+ conn = ConnectionPool.Connection(self.mgr, fs_name)
+ conn.connect()
+ except cephfs.Error as e:
+ # try to provide a better error string if possible
+ if e.args[0] == errno.ENOENT:
+ raise VolumeException(
+ -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
+ raise VolumeException(-e.args[0], e.args[1])
+ self.connections[fs_name] = conn
+ return conn.get_fs_handle()
+
+ def put_fs_handle(self, fs_name):
+ with self.lock:
+ conn = self.connections.get(fs_name, None)
+ if conn:
+ conn.put_fs_handle()
+
+ def _del_fs_handle(self, fs_name):
+ conn = self.connections.pop(fs_name, None)
+ if conn:
+ conn.del_fs_handle()
+ def del_fs_handle(self, fs_name):
+ with self.lock:
+ self._del_fs_handle(fs_name)
+