import json
+import time
import errno
import logging
+from threading import Lock
+try:
+ # py2
+ from threading import _Timer as Timer
+except ImportError:
+ #py3
+ from threading import Timer
import cephfs
import orchestrator
from .subvolspec import SubvolumeSpec
from .subvolume import SubVolume
from .exception import VolumeException
+from .purge_queue import ThreadPoolPurgeQueueMixin
log = logging.getLogger(__name__)
+class ConnectionPool(object):
+ class Connection(object):
+ def __init__(self, mgr, fs_name):
+ self.fs = None
+ self.mgr = mgr
+ self.fs_name = fs_name
+ self.ops_in_progress = 0
+ self.last_used = time.time()
+ self.fs_id = self.get_fs_id()
+
+ def get_fs_id(self):
+ fs_map = self.mgr.get('fs_map')
+ for fs in fs_map['filesystems']:
+ if fs['mdsmap']['fs_name'] == self.fs_name:
+ return fs['id']
+ raise VolumeException(
+ -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
+
+ def get_fs_handle(self):
+ self.last_used = time.time()
+ self.ops_in_progress += 1
+ return self.fs
+
+ def put_fs_handle(self):
+ assert self.ops_in_progress > 0
+ self.ops_in_progress -= 1
+
+ def del_fs_handle(self):
+ if self.is_connection_valid():
+ self.disconnect()
+ else:
+ self.abort()
+
+ def is_connection_valid(self):
+ fs_id = None
+ try:
+ fs_id = self.get_fs_id()
+ except:
+ # the filesystem does not exist now -- connection is not valid.
+ pass
+ return self.fs_id == fs_id
+
+ def is_connection_idle(self, timeout):
+ return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
+
+ def connect(self):
+ assert self.ops_in_progress == 0
+ log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
+ self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
+ log.debug("Setting user ID and group ID of CephFS mount as root...")
+ self.fs.conf_set("client_mount_uid", "0")
+ self.fs.conf_set("client_mount_gid", "0")
+ log.debug("CephFS initializing...")
+ self.fs.init()
+ log.debug("CephFS mounting...")
+ self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
+ log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
+
+ def disconnect(self):
+ assert self.ops_in_progress == 0
+ log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
+ self.fs.shutdown()
+ self.fs = None
+
+ def abort(self):
+ assert self.ops_in_progress == 0
+ log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
+ self.fs.abort_conn()
+ self.fs = None
+
+ class RTimer(Timer):
+ """
+ recurring timer variant of Timer
+ """
+ def run(self):
+ while not self.finished.is_set():
+ self.finished.wait(self.interval)
+ self.function(*self.args, **self.kwargs)
+ self.finished.set()
+
+ # TODO: make this configurable
+ TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
+ CONNECTION_IDLE_INTERVAL = 60.0 # seconds
+
+ def __init__(self, mgr):
+ self.mgr = mgr
+ self.connections = {}
+ self.lock = Lock()
+ self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
+ self.cleanup_connections)
+ self.timer_task.start()
+
+ def cleanup_connections(self):
+ with self.lock:
+ log.info("scanning for idle connections..")
+ idle_fs = [fs_name for fs_name,conn in self.connections.iteritems()
+ if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
+ for fs_name in idle_fs:
+ log.info("cleaning up connection for '{}'".format(fs_name))
+ self._del_fs_handle(fs_name)
+
+ def get_fs_handle(self, fs_name):
+ with self.lock:
+ conn = None
+ try:
+ conn = self.connections.get(fs_name, None)
+ if conn:
+ if conn.is_connection_valid():
+ return conn.get_fs_handle()
+ else:
+ # filesystem id changed beneath us (or the filesystem does not exist).
+ # this is possible if the filesystem got removed (and recreated with
+ # same name) via "ceph fs rm/new" mon command.
+ log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
+ self._del_fs_handle(fs_name)
+ conn = ConnectionPool.Connection(self.mgr, fs_name)
+ conn.connect()
+ except cephfs.Error as e:
+ # try to provide a better error string if possible
+ if e.args[0] == errno.ENOENT:
+ raise VolumeException(
+ -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
+ raise VolumeException(-e.args[0], e.args[1])
+ self.connections[fs_name] = conn
+ return conn.get_fs_handle()
+
+ def put_fs_handle(self, fs_name):
+ with self.lock:
+ conn = self.connections.get(fs_name, None)
+ if conn:
+ conn.put_fs_handle()
+
+ def _del_fs_handle(self, fs_name):
+ conn = self.connections.pop(fs_name, None)
+ if conn:
+ conn.del_fs_handle()
+ def del_fs_handle(self, fs_name):
+ with self.lock:
+ self._del_fs_handle(fs_name)
+
class VolumeClient(object):
def __init__(self, mgr):
self.mgr = mgr
+ self.connection_pool = ConnectionPool(self.mgr)
+ # TODO: make thread pool size configurable
+ self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
+ # on startup, queue purge job for available volumes to kickstart
+ # purge for leftover subvolume entries in trash. note that, if the
+ # trash directory does not exist or if there are no purge entries
+ # available for a volume, the volume is removed from the purge
+ # job list.
+ fs_map = self.mgr.get('fs_map')
+ for fs in fs_map['filesystems']:
+ self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name'])
def gen_pool_names(self, volname):
"""
"""
return ve.to_tuple()
- def create_pool(self, pool_name, pg_num, pg_num_min=None, pg_autoscale_factor=None):
+ def create_pool(self, pool_name, pg_num):
# create the given pool
command = {'prefix': 'osd pool create', 'pool': pool_name, 'pg_num': pg_num}
- if pg_num_min:
- command['pg_num_min'] = pg_num_min
r, outb, outs = self.mgr.mon_command(command)
if r != 0:
return r, outb, outs
- # set pg autoscale if needed
- if pg_autoscale_factor:
- command = {'prefix': 'osd pool set', 'pool': pool_name, 'var': 'pg_autoscale_bias',
- 'val': str(pg_autoscale_factor)}
- r, outb, outs = self.mgr.mon_command(command)
return r, outb, outs
def remove_pool(self, pool_name):
return self.mgr.mon_command(command)
def remove_filesystem(self, fs_name):
+ command = {'prefix': 'fs fail', 'fs_name': fs_name}
+ r, outb, outs = self.mgr.mon_command(command)
+ if r != 0:
+ return r, outb, outs
command = {'prefix': 'fs rm', 'fs_name': fs_name, 'yes_i_really_mean_it': True}
return self.mgr.mon_command(command)
return -errno.EINVAL, "", str(e)
return 0, "", ""
- def set_mds_down(self, fs_name):
- command = {'prefix': 'fs set', 'fs_name': fs_name, 'var': 'cluster_down', 'val': 'true'}
- r, outb, outs = self.mgr.mon_command(command)
- if r != 0:
- return r, outb, outs
- for mds in self.get_mds_names(fs_name):
- command = {'prefix': 'mds fail', 'role_or_gid': mds}
- r, outb, outs = self.mgr.mon_command(command)
- if r != 0:
- return r, outb, outs
- return 0, "", ""
-
### volume operations -- create, rm, ls
def create_volume(self, volname, size=None):
"""
metadata_pool, data_pool = self.gen_pool_names(volname)
# create pools
- r, outs, outb = self.create_pool(metadata_pool, 16, pg_num_min=16, pg_autoscale_factor=4.0)
+ r, outs, outb = self.create_pool(metadata_pool, 16)
if r != 0:
return r, outb, outs
r, outb, outs = self.create_pool(data_pool, 8)
"""
delete the given module (tear down mds, remove filesystem)
"""
+ self.purge_queue.cancel_purge_job(volname)
+ self.connection_pool.del_fs_handle(volname)
# Tear down MDS daemons
try:
completion = self.mgr.remove_stateless_service("mds", volname)
# In case orchestrator didn't tear down MDS daemons cleanly, or
# there was no orchestrator, we force the daemons down.
if self.volume_exists(volname):
- r, outb, outs = self.set_mds_down(volname)
- if r != 0:
- return r, outb, outs
r, outb, outs = self.remove_filesystem(volname)
if r != 0:
return r, outb, outs
except ValueError:
raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
+ def connection_pool_wrap(func):
+ """
+ decorator that wraps subvolume calls by fetching filesystem handle
+ from the connection pool when fs_handle argument is empty, otherwise
+ just invoke func with the passed in filesystem handle. Also handles
+ call made to non-existent volumes (only when fs_handle is empty).
+ """
+ def conn_wrapper(self, fs_handle, **kwargs):
+ fs_h = fs_handle
+ fs_name = kwargs['vol_name']
+ # note that force arg is available for remove type commands
+ force = kwargs.get('force', False)
+
+ # fetch the connection from the pool
+ if not fs_handle:
+ try:
+ fs_h = self.connection_pool.get_fs_handle(fs_name)
+ except VolumeException as ve:
+ if not force:
+ return self.volume_exception_to_retval(ve)
+ return 0, "", ""
+
+ # invoke the actual routine w/ fs handle
+ result = func(self, fs_h, **kwargs)
+
+ # hand over the connection back to the pool
+ if fs_h:
+ self.connection_pool.put_fs_handle(fs_name)
+ return result
+ return conn_wrapper
+
### subvolume operations
- def create_subvolume(self, volname, subvolname, groupname, size, mode='755', pool=None):
- ret = 0, "", ""
+ @connection_pool_wrap
+ def create_subvolume(self, fs_handle, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ size = kwargs['size']
+ pool = kwargs['pool_layout']
+ mode = kwargs['mode']
+
try:
- if not self.volume_exists(volname):
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \
- "volume create` before trying to create subvolumes".format(volname))
- with SubVolume(self.mgr, fs_name=volname) as sv:
+ with SubVolume(self.mgr, fs_handle) as sv:
spec = SubvolumeSpec(subvolname, groupname)
if not self.group_exists(sv, spec):
raise VolumeException(
ret = self.volume_exception_to_retval(ve)
return ret
- def remove_subvolume(self, volname, subvolname, groupname, force):
- ret = 0, "", ""
+ @connection_pool_wrap
+ def remove_subvolume(self, fs_handle, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ force = kwargs['force']
try:
- fs = self.get_fs(volname)
- if fs:
- with SubVolume(self.mgr, fs_name=volname) as sv:
- spec = SubvolumeSpec(subvolname, groupname)
- if self.group_exists(sv, spec):
- sv.remove_subvolume(spec, force)
- sv.purge_subvolume(spec)
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \
- "subvolume '{1}'".format(groupname, subvolname))
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \
- "'{1}'".format(volname, subvolname))
+ with SubVolume(self.mgr, fs_handle) as sv:
+ spec = SubvolumeSpec(subvolname, groupname)
+ if self.group_exists(sv, spec):
+ sv.remove_subvolume(spec, force)
+ self.purge_queue.queue_purge_job(volname)
+ elif not force:
+ raise VolumeException(
+ -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \
+ "subvolume '{1}'".format(groupname, subvolname))
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
- def subvolume_getpath(self, volname, subvolname, groupname):
- ret = None
+ @connection_pool_wrap
+ def subvolume_getpath(self, fs_handle, **kwargs):
+ ret = None
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
try:
- if not self.volume_exists(volname):
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found".format(volname))
-
- with SubVolume(self.mgr, fs_name=volname) as sv:
+ with SubVolume(self.mgr, fs_handle) as sv:
spec = SubvolumeSpec(subvolname, groupname)
if not self.group_exists(sv, spec):
raise VolumeException(
### subvolume snapshot
- def create_subvolume_snapshot(self, volname, subvolname, snapname, groupname):
- ret = 0, "", ""
- try:
- if not self.volume_exists(volname):
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \
- "'{1}'".format(volname, snapname))
+ @connection_pool_wrap
+ def create_subvolume_snapshot(self, fs_handle, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
- with SubVolume(self.mgr, fs_name=volname) as sv:
+ try:
+ with SubVolume(self.mgr, fs_handle) as sv:
spec = SubvolumeSpec(subvolname, groupname)
if not self.group_exists(sv, spec):
raise VolumeException(
ret = self.volume_exception_to_retval(ve)
return ret
- def remove_subvolume_snapshot(self, volname, subvolname, snapname, groupname, force):
- ret = 0, "", ""
+ @connection_pool_wrap
+ def remove_subvolume_snapshot(self, fs_handle, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+ force = kwargs['force']
try:
- if self.volume_exists(volname):
- with SubVolume(self.mgr, fs_name=volname) as sv:
- spec = SubvolumeSpec(subvolname, groupname)
- if self.group_exists(sv, spec):
- if sv.get_subvolume_path(spec):
- sv.remove_subvolume_snapshot(spec, snapname, force)
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \
- "subvolume snapshot '{1}'".format(subvolname, snapname))
+ with SubVolume(self.mgr, fs_handle) as sv:
+ spec = SubvolumeSpec(subvolname, groupname)
+ if self.group_exists(sv, spec):
+ if sv.get_subvolume_path(spec):
+ sv.remove_subvolume_snapshot(spec, snapname, force)
elif not force:
raise VolumeException(
- -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \
- "remove subvolume snapshot '{1}'".format(groupname, snapname))
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \
- "snapshot '{1}'".format(volname, snapname))
+ -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \
+ "subvolume snapshot '{1}'".format(subvolname, snapname))
+ elif not force:
+ raise VolumeException(
+ -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \
+ "remove subvolume snapshot '{1}'".format(groupname, snapname))
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
### group operations
- def create_subvolume_group(self, volname, groupname, mode='755', pool=None):
- ret = 0, "", ""
- try:
- if not self.volume_exists(volname):
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \
- "volume create` before trying to create subvolume groups".format(volname))
+ @connection_pool_wrap
+ def create_subvolume_group(self, fs_handle, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ pool = kwargs['pool_layout']
+ mode = kwargs['mode']
+ try:
# TODO: validate that subvol size fits in volume size
- with SubVolume(self.mgr, fs_name=volname) as sv:
+ with SubVolume(self.mgr, fs_handle) as sv:
spec = SubvolumeSpec("", groupname)
sv.create_group(spec, pool=pool, mode=self.octal_str_to_decimal_int(mode))
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
- def remove_subvolume_group(self, volname, groupname, force):
- ret = 0, "", ""
+ @connection_pool_wrap
+ def remove_subvolume_group(self, fs_handle, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ force = kwargs['force']
try:
- if self.volume_exists(volname):
- with SubVolume(self.mgr, fs_name=volname) as sv:
- # TODO: check whether there are no subvolumes in the group
- spec = SubvolumeSpec("", groupname)
- sv.remove_group(spec, force)
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \
- "group '{0}'".format(volname, groupname))
+ with SubVolume(self.mgr, fs_handle) as sv:
+ # TODO: check whether there are no subvolumes in the group
+ spec = SubvolumeSpec("", groupname)
+ sv.remove_group(spec, force)
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
+ @connection_pool_wrap
+ def getpath_subvolume_group(self, fs_handle, **kwargs):
+ groupname = kwargs['group_name']
+ try:
+ with SubVolume(self.mgr, fs_handle) as sv:
+ spec = SubvolumeSpec("", groupname)
+ path = sv.get_group_path(spec)
+ if path is None:
+ raise VolumeException(
+ -errno.ENOENT, "Subvolume group '{0}' not found".format(groupname))
+ return 0, path, ""
+ except VolumeException as ve:
+ return self.volume_exception_to_retval(ve)
+
### group snapshot
- def create_subvolume_group_snapshot(self, volname, groupname, snapname):
- ret = 0, "", ""
+ @connection_pool_wrap
+ def create_subvolume_group_snapshot(self, fs_handle, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ snapname = kwargs['snap_name']
try:
- if not self.volume_exists(volname):
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \
- "'{1}'".format(volname, snapname))
-
- with SubVolume(self.mgr, fs_name=volname) as sv:
+ with SubVolume(self.mgr, fs_handle) as sv:
spec = SubvolumeSpec("", groupname)
if not self.group_exists(sv, spec):
raise VolumeException(
ret = self.volume_exception_to_retval(ve)
return ret
- def remove_subvolume_group_snapshot(self, volname, groupname, snapname, force):
+ @connection_pool_wrap
+ def remove_subvolume_group_snapshot(self, fs_handle, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ snapname = kwargs['snap_name']
+ force = kwargs['force']
+ try:
+ with SubVolume(self.mgr, fs_handle) as sv:
+ spec = SubvolumeSpec("", groupname)
+ if self.group_exists(sv, spec):
+ sv.remove_group_snapshot(spec, snapname, force)
+ elif not force:
+ raise VolumeException(
+ -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \
+ "remove it".format(groupname))
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ @connection_pool_wrap
+ def get_subvolume_trash_entry(self, fs_handle, **kwargs):
+ ret = None
+ volname = kwargs['vol_name']
+ exclude = kwargs.get('exclude_entries', [])
+
+ try:
+ with SubVolume(self.mgr, fs_handle) as sv:
+ spec = SubvolumeSpec("", "")
+ path = sv.get_trash_entry(spec, exclude)
+ ret = 0, path, ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ @connection_pool_wrap
+ def purge_subvolume_trash_entry(self, fs_handle, **kwargs):
ret = 0, "", ""
+ volname = kwargs['vol_name']
+ purge_dir = kwargs['purge_dir']
+ should_cancel = kwargs.get('should_cancel', lambda: False)
+
try:
- if self.volume_exists(volname):
- with SubVolume(self.mgr, fs_name=volname) as sv:
- spec = SubvolumeSpec("", groupname)
- if self.group_exists(sv, spec):
- sv.remove_group_snapshot(spec, snapname, force)
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \
- "remove it".format(groupname))
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \
- "snapshot '{1}'".format(volname, snapname))
+ with SubVolume(self.mgr, fs_handle) as sv:
+ spec = SubvolumeSpec(purge_dir.decode('utf-8'), "")
+ sv.purge_subvolume(spec, should_cancel)
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret