]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/volumes/fs/volume.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / volume.py
index 8cc7ef26fa05883c8bb093097228e6266ddab736..8e32a233b4f786c3f9c23b25136d3e8cd3241c82 100644 (file)
@@ -1,6 +1,14 @@
 import json
+import time
 import errno
 import logging
+from threading import Lock
+try:
+    # py2
+    from threading import _Timer as Timer
+except ImportError:
+    #py3
+    from threading import Timer
 
 import cephfs
 import orchestrator
@@ -8,12 +16,164 @@ import orchestrator
 from .subvolspec import SubvolumeSpec
 from .subvolume import SubVolume
 from .exception import VolumeException
+from .purge_queue import ThreadPoolPurgeQueueMixin
 
 log = logging.getLogger(__name__)
 
+class ConnectionPool(object):
+    class Connection(object):
+        def __init__(self, mgr, fs_name):
+            self.fs = None
+            self.mgr = mgr
+            self.fs_name = fs_name
+            self.ops_in_progress = 0
+            self.last_used = time.time()
+            self.fs_id = self.get_fs_id()
+
+        def get_fs_id(self):
+            fs_map = self.mgr.get('fs_map')
+            for fs in fs_map['filesystems']:
+                if fs['mdsmap']['fs_name'] == self.fs_name:
+                    return fs['id']
+            raise VolumeException(
+                -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
+
+        def get_fs_handle(self):
+            self.last_used = time.time()
+            self.ops_in_progress += 1
+            return self.fs
+
+        def put_fs_handle(self):
+            assert self.ops_in_progress > 0
+            self.ops_in_progress -= 1
+
+        def del_fs_handle(self):
+            if self.is_connection_valid():
+                self.disconnect()
+            else:
+                self.abort()
+
+        def is_connection_valid(self):
+            fs_id = None
+            try:
+                fs_id = self.get_fs_id()
+            except:
+                # the filesystem does not exist now -- connection is not valid.
+                pass
+            return self.fs_id == fs_id
+
+        def is_connection_idle(self, timeout):
+            return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
+
+        def connect(self):
+            assert self.ops_in_progress == 0
+            log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
+            self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
+            log.debug("Setting user ID and group ID of CephFS mount as root...")
+            self.fs.conf_set("client_mount_uid", "0")
+            self.fs.conf_set("client_mount_gid", "0")
+            log.debug("CephFS initializing...")
+            self.fs.init()
+            log.debug("CephFS mounting...")
+            self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
+            log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
+
+        def disconnect(self):
+            assert self.ops_in_progress == 0
+            log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
+            self.fs.shutdown()
+            self.fs = None
+
+        def abort(self):
+            assert self.ops_in_progress == 0
+            log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
+            self.fs.abort_conn()
+            self.fs = None
+
+    class RTimer(Timer):
+        """
+        recurring timer variant of Timer
+        """
+        def run(self):
+            while not self.finished.is_set():
+                self.finished.wait(self.interval)
+                self.function(*self.args, **self.kwargs)
+            self.finished.set()
+
+    # TODO: make this configurable
+    TIMER_TASK_RUN_INTERVAL = 30.0  # seconds
+    CONNECTION_IDLE_INTERVAL = 60.0 # seconds
+
+    def __init__(self, mgr):
+        self.mgr = mgr
+        self.connections = {}
+        self.lock = Lock()
+        self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
+                                                self.cleanup_connections)
+        self.timer_task.start()
+
+    def cleanup_connections(self):
+        with self.lock:
+            log.info("scanning for idle connections..")
+            idle_fs = [fs_name for fs_name,conn in self.connections.iteritems()
+                       if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
+            for fs_name in idle_fs:
+                log.info("cleaning up connection for '{}'".format(fs_name))
+                self._del_fs_handle(fs_name)
+
+    def get_fs_handle(self, fs_name):
+        with self.lock:
+            conn = None
+            try:
+                conn = self.connections.get(fs_name, None)
+                if conn:
+                    if conn.is_connection_valid():
+                        return conn.get_fs_handle()
+                    else:
+                        # filesystem id changed beneath us (or the filesystem does not exist).
+                        # this is possible if the filesystem got removed (and recreated with
+                        # same name) via "ceph fs rm/new" mon command.
+                        log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
+                        self._del_fs_handle(fs_name)
+                conn = ConnectionPool.Connection(self.mgr, fs_name)
+                conn.connect()
+            except cephfs.Error as e:
+                # try to provide a better error string if possible
+                if e.args[0] == errno.ENOENT:
+                    raise VolumeException(
+                        -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
+                raise VolumeException(-e.args[0], e.args[1])
+            self.connections[fs_name] = conn
+            return conn.get_fs_handle()
+
+    def put_fs_handle(self, fs_name):
+        with self.lock:
+            conn = self.connections.get(fs_name, None)
+            if conn:
+                conn.put_fs_handle()
+
+    def _del_fs_handle(self, fs_name):
+        conn = self.connections.pop(fs_name, None)
+        if conn:
+            conn.del_fs_handle()
+    def del_fs_handle(self, fs_name):
+        with self.lock:
+            self._del_fs_handle(fs_name)
+
 class VolumeClient(object):
     def __init__(self, mgr):
         self.mgr = mgr
+        self.connection_pool = ConnectionPool(self.mgr)
+        # TODO: make thread pool size configurable
+        self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
+        # on startup, queue purge job for available volumes to kickstart
+        # purge for leftover subvolume entries in trash. note that, if the
+        # trash directory does not exist or if there are no purge entries
+        # available for a volume, the volume is removed from the purge
+        # job list.
+        fs_map = self.mgr.get('fs_map')
+        for fs in fs_map['filesystems']:
+            self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name'])
 
     def gen_pool_names(self, volname):
         """
@@ -43,20 +203,13 @@ class VolumeClient(object):
         """
         return ve.to_tuple()
 
-    def create_pool(self, pool_name, pg_num, pg_num_min=None, pg_autoscale_factor=None):
+    def create_pool(self, pool_name, pg_num):
         # create the given pool
         command = {'prefix': 'osd pool create', 'pool': pool_name, 'pg_num': pg_num}
-        if pg_num_min:
-            command['pg_num_min'] = pg_num_min
         r, outb, outs = self.mgr.mon_command(command)
         if r != 0:
             return r, outb, outs
 
-        # set pg autoscale if needed
-        if pg_autoscale_factor:
-            command = {'prefix': 'osd pool set', 'pool': pool_name, 'var': 'pg_autoscale_bias',
-                       'val': str(pg_autoscale_factor)}
-            r, outb, outs = self.mgr.mon_command(command)
         return r, outb, outs
 
     def remove_pool(self, pool_name):
@@ -70,6 +223,10 @@ class VolumeClient(object):
         return self.mgr.mon_command(command)
 
     def remove_filesystem(self, fs_name):
+        command = {'prefix': 'fs fail', 'fs_name': fs_name}
+        r, outb, outs = self.mgr.mon_command(command)
+        if r != 0:
+            return r, outb, outs
         command = {'prefix': 'fs rm', 'fs_name': fs_name, 'yes_i_really_mean_it': True}
         return self.mgr.mon_command(command)
 
@@ -89,18 +246,6 @@ class VolumeClient(object):
             return -errno.EINVAL, "", str(e)
         return 0, "", ""
 
-    def set_mds_down(self, fs_name):
-        command = {'prefix': 'fs set', 'fs_name': fs_name, 'var': 'cluster_down', 'val': 'true'}
-        r, outb, outs = self.mgr.mon_command(command)
-        if r != 0:
-            return r, outb, outs
-        for mds in self.get_mds_names(fs_name):
-            command = {'prefix': 'mds fail', 'role_or_gid': mds}
-            r, outb, outs = self.mgr.mon_command(command)
-            if r != 0:
-                return r, outb, outs
-        return 0, "", ""
-
     ### volume operations -- create, rm, ls
 
     def create_volume(self, volname, size=None):
@@ -109,7 +254,7 @@ class VolumeClient(object):
         """
         metadata_pool, data_pool = self.gen_pool_names(volname)
         # create pools
-        r, outs, outb = self.create_pool(metadata_pool, 16, pg_num_min=16, pg_autoscale_factor=4.0)
+        r, outs, outb = self.create_pool(metadata_pool, 16)
         if r != 0:
             return r, outb, outs
         r, outb, outs = self.create_pool(data_pool, 8)
@@ -127,6 +272,8 @@ class VolumeClient(object):
         """
         delete the given module (tear down mds, remove filesystem)
         """
+        self.purge_queue.cancel_purge_job(volname)
+        self.connection_pool.del_fs_handle(volname)
         # Tear down MDS daemons
         try:
             completion = self.mgr.remove_stateless_service("mds", volname)
@@ -143,9 +290,6 @@ class VolumeClient(object):
         # In case orchestrator didn't tear down MDS daemons cleanly, or
         # there was no orchestrator, we force the daemons down.
         if self.volume_exists(volname):
-            r, outb, outs = self.set_mds_down(volname)
-            if r != 0:
-                return r, outb, outs
             r, outb, outs = self.remove_filesystem(volname)
             if r != 0:
                 return r, outb, outs
@@ -176,16 +320,51 @@ class VolumeClient(object):
         except ValueError:
             raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
 
+    def connection_pool_wrap(func):
+        """
+        decorator that wraps subvolume calls by fetching filesystem handle
+        from the connection pool when fs_handle argument is empty, otherwise
+        just invoke func with the passed in filesystem handle. Also handles
+        call made to non-existent volumes (only when fs_handle is empty).
+        """
+        def conn_wrapper(self, fs_handle, **kwargs):
+            fs_h = fs_handle
+            fs_name = kwargs['vol_name']
+            # note that force arg is available for remove type commands
+            force = kwargs.get('force', False)
+
+            # fetch the connection from the pool
+            if not fs_handle:
+                try:
+                    fs_h = self.connection_pool.get_fs_handle(fs_name)
+                except VolumeException as ve:
+                    if not force:
+                        return self.volume_exception_to_retval(ve)
+                    return 0, "", ""
+
+            # invoke the actual routine w/ fs handle
+            result = func(self, fs_h, **kwargs)
+
+            # hand over the connection back to the pool
+            if fs_h:
+                self.connection_pool.put_fs_handle(fs_name)
+            return result
+        return conn_wrapper
+
     ### subvolume operations
 
-    def create_subvolume(self, volname, subvolname, groupname, size, mode='755', pool=None):
-        ret = 0, "", ""
+    @connection_pool_wrap
+    def create_subvolume(self, fs_handle, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
+        size       = kwargs['size']
+        pool       = kwargs['pool_layout']
+        mode       = kwargs['mode']
+
         try:
-            if not self.volume_exists(volname):
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \
-                    "volume create` before trying to create subvolumes".format(volname))
-            with SubVolume(self.mgr, fs_name=volname) as sv:
+            with SubVolume(self.mgr, fs_handle) as sv:
                 spec = SubvolumeSpec(subvolname, groupname)
                 if not self.group_exists(sv, spec):
                     raise VolumeException(
@@ -196,36 +375,35 @@ class VolumeClient(object):
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    def remove_subvolume(self, volname, subvolname, groupname, force):
-        ret = 0, "", ""
+    @connection_pool_wrap
+    def remove_subvolume(self, fs_handle, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
+        force      = kwargs['force']
         try:
-            fs = self.get_fs(volname)
-            if fs:
-                with SubVolume(self.mgr, fs_name=volname) as sv:
-                    spec = SubvolumeSpec(subvolname, groupname)
-                    if self.group_exists(sv, spec):
-                        sv.remove_subvolume(spec, force)
-                        sv.purge_subvolume(spec)
-                    elif not force:
-                        raise VolumeException(
-                            -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \
-                            "subvolume '{1}'".format(groupname, subvolname))
-            elif not force:
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \
-                    "'{1}'".format(volname, subvolname))
+            with SubVolume(self.mgr, fs_handle) as sv:
+                spec = SubvolumeSpec(subvolname, groupname)
+                if self.group_exists(sv, spec):
+                    sv.remove_subvolume(spec, force)
+                    self.purge_queue.queue_purge_job(volname)
+                elif not force:
+                    raise VolumeException(
+                        -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \
+                        "subvolume '{1}'".format(groupname, subvolname))
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    def subvolume_getpath(self, volname, subvolname, groupname):
-        ret = None
+    @connection_pool_wrap
+    def subvolume_getpath(self, fs_handle, **kwargs):
+        ret        = None
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
         try:
-            if not self.volume_exists(volname):
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found".format(volname))
-
-            with SubVolume(self.mgr, fs_name=volname) as sv:
+            with SubVolume(self.mgr, fs_handle) as sv:
                 spec = SubvolumeSpec(subvolname, groupname)
                 if not self.group_exists(sv, spec):
                     raise VolumeException(
@@ -241,15 +419,16 @@ class VolumeClient(object):
 
     ### subvolume snapshot
 
-    def create_subvolume_snapshot(self, volname, subvolname, snapname, groupname):
-        ret = 0, "", ""
-        try:
-            if not self.volume_exists(volname):
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \
-                    "'{1}'".format(volname, snapname))
+    @connection_pool_wrap
+    def create_subvolume_snapshot(self, fs_handle, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        snapname   = kwargs['snap_name']
+        groupname  = kwargs['group_name']
 
-            with SubVolume(self.mgr, fs_name=volname) as sv:
+        try:
+            with SubVolume(self.mgr, fs_handle) as sv:
                 spec = SubvolumeSpec(subvolname, groupname)
                 if not self.group_exists(sv, spec):
                     raise VolumeException(
@@ -264,76 +443,90 @@ class VolumeClient(object):
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    def remove_subvolume_snapshot(self, volname, subvolname, snapname, groupname, force):
-        ret = 0, "", ""
+    @connection_pool_wrap
+    def remove_subvolume_snapshot(self, fs_handle, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        snapname   = kwargs['snap_name']
+        groupname  = kwargs['group_name']
+        force      = kwargs['force']
         try:
-            if self.volume_exists(volname):
-                with SubVolume(self.mgr, fs_name=volname) as sv:
-                    spec = SubvolumeSpec(subvolname, groupname)
-                    if self.group_exists(sv, spec):
-                        if sv.get_subvolume_path(spec):
-                            sv.remove_subvolume_snapshot(spec, snapname, force)
-                        elif not force:
-                            raise VolumeException(
-                                -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \
-                                "subvolume snapshot '{1}'".format(subvolname, snapname))
+            with SubVolume(self.mgr, fs_handle) as sv:
+                spec = SubvolumeSpec(subvolname, groupname)
+                if self.group_exists(sv, spec):
+                    if sv.get_subvolume_path(spec):
+                        sv.remove_subvolume_snapshot(spec, snapname, force)
                     elif not force:
                         raise VolumeException(
-                            -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \
-                            "remove subvolume snapshot '{1}'".format(groupname, snapname))
-            elif not force:
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \
-                    "snapshot '{1}'".format(volname, snapname))
+                            -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \
+                            "subvolume snapshot '{1}'".format(subvolname, snapname))
+                elif not force:
+                    raise VolumeException(
+                        -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \
+                        "remove subvolume snapshot '{1}'".format(groupname, snapname))
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
     ### group operations
 
-    def create_subvolume_group(self, volname, groupname, mode='755', pool=None):
-        ret = 0, "", ""
-        try:
-            if not self.volume_exists(volname):
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \
-                    "volume create` before trying to create subvolume groups".format(volname))
+    @connection_pool_wrap
+    def create_subvolume_group(self, fs_handle, **kwargs):
+        ret       = 0, "", ""
+        volname   = kwargs['vol_name']
+        groupname = kwargs['group_name']
+        pool      = kwargs['pool_layout']
+        mode      = kwargs['mode']
 
+        try:
             # TODO: validate that subvol size fits in volume size
-            with SubVolume(self.mgr, fs_name=volname) as sv:
+            with SubVolume(self.mgr, fs_handle) as sv:
                 spec = SubvolumeSpec("", groupname)
                 sv.create_group(spec, pool=pool, mode=self.octal_str_to_decimal_int(mode))
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    def remove_subvolume_group(self, volname, groupname, force):
-        ret = 0, "", ""
+    @connection_pool_wrap
+    def remove_subvolume_group(self, fs_handle, **kwargs):
+        ret       = 0, "", ""
+        volname   = kwargs['vol_name']
+        groupname = kwargs['group_name']
+        force     = kwargs['force']
         try:
-            if self.volume_exists(volname):
-                with SubVolume(self.mgr, fs_name=volname) as sv:
-                    # TODO: check whether there are no subvolumes in the group
-                    spec = SubvolumeSpec("", groupname)
-                    sv.remove_group(spec, force)
-            elif not force:
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \
-                    "group '{0}'".format(volname, groupname))
+            with SubVolume(self.mgr, fs_handle) as sv:
+                # TODO: check whether there are no subvolumes in the group
+                spec = SubvolumeSpec("", groupname)
+                sv.remove_group(spec, force)
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
+    @connection_pool_wrap
+    def getpath_subvolume_group(self, fs_handle, **kwargs):
+        groupname  = kwargs['group_name']
+        try:
+            with SubVolume(self.mgr, fs_handle) as sv:
+                spec = SubvolumeSpec("", groupname)
+                path = sv.get_group_path(spec)
+                if path is None:
+                    raise VolumeException(
+                        -errno.ENOENT, "Subvolume group '{0}' not found".format(groupname))
+                return 0, path, ""
+        except VolumeException as ve:
+            return self.volume_exception_to_retval(ve)
+
     ### group snapshot
 
-    def create_subvolume_group_snapshot(self, volname, groupname, snapname):
-        ret = 0, "", ""
+    @connection_pool_wrap
+    def create_subvolume_group_snapshot(self, fs_handle, **kwargs):
+        ret       = 0, "", ""
+        volname   = kwargs['vol_name']
+        groupname = kwargs['group_name']
+        snapname  = kwargs['snap_name']
         try:
-            if not self.volume_exists(volname):
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \
-                    "'{1}'".format(volname, snapname))
-
-            with SubVolume(self.mgr, fs_name=volname) as sv:
+            with SubVolume(self.mgr, fs_handle) as sv:
                 spec = SubvolumeSpec("", groupname)
                 if not self.group_exists(sv, spec):
                     raise VolumeException(
@@ -344,22 +537,52 @@ class VolumeClient(object):
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    def remove_subvolume_group_snapshot(self, volname, groupname, snapname, force):
+    @connection_pool_wrap
+    def remove_subvolume_group_snapshot(self, fs_handle, **kwargs):
+        ret       = 0, "", ""
+        volname   = kwargs['vol_name']
+        groupname = kwargs['group_name']
+        snapname  = kwargs['snap_name']
+        force     = kwargs['force']
+        try:
+            with SubVolume(self.mgr, fs_handle) as sv:
+                spec = SubvolumeSpec("", groupname)
+                if self.group_exists(sv, spec):
+                    sv.remove_group_snapshot(spec, snapname, force)
+                elif not force:
+                    raise VolumeException(
+                        -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \
+                        "remove it".format(groupname))
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    @connection_pool_wrap
+    def get_subvolume_trash_entry(self, fs_handle, **kwargs):
+        ret = None
+        volname = kwargs['vol_name']
+        exclude = kwargs.get('exclude_entries', [])
+
+        try:
+            with SubVolume(self.mgr, fs_handle) as sv:
+                spec = SubvolumeSpec("", "")
+                path = sv.get_trash_entry(spec, exclude)
+                ret = 0, path, ""
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    @connection_pool_wrap
+    def purge_subvolume_trash_entry(self, fs_handle, **kwargs):
         ret = 0, "", ""
+        volname = kwargs['vol_name']
+        purge_dir = kwargs['purge_dir']
+        should_cancel = kwargs.get('should_cancel', lambda: False)
+
         try:
-            if self.volume_exists(volname):
-                with SubVolume(self.mgr, fs_name=volname) as sv:
-                    spec = SubvolumeSpec("", groupname)
-                    if self.group_exists(sv, spec):
-                        sv.remove_group_snapshot(spec, snapname, force)
-                    elif not force:
-                        raise VolumeException(
-                            -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \
-                            "remove it".format(groupname))
-            elif not force:
-                raise VolumeException(
-                    -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \
-                    "snapshot '{1}'".format(volname, snapname))
+            with SubVolume(self.mgr, fs_handle) as sv:
+                spec = SubvolumeSpec(purge_dir.decode('utf-8'), "")
+                sv.purge_subvolume(spec, should_cancel)
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret