]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/volumes/fs/volume.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / volume.py
index 8e32a233b4f786c3f9c23b25136d3e8cd3241c82..18c5da222fe15c8d3a0f472d4830fcf4b42739a3 100644 (file)
 import json
-import time
 import errno
 import logging
-from threading import Lock
-try:
-    # py2
-    from threading import _Timer as Timer
-except ImportError:
-    #py3
-    from threading import Timer
+from typing import TYPE_CHECKING
 
 import cephfs
-import orchestrator
 
-from .subvolspec import SubvolumeSpec
-from .subvolume import SubVolume
-from .exception import VolumeException
+from mgr_util import CephfsClient
+
+from .fs_util import listdir
+
+from .operations.volume import create_volume, \
+    delete_volume, rename_volume, list_volumes, open_volume, get_pool_names
+from .operations.group import open_group, create_group, remove_group, open_group_unique
+from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
+    create_clone
+from .operations.trash import Trash
+
+from .vol_spec import VolSpec
+from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
+from .async_cloner import Cloner
 from .purge_queue import ThreadPoolPurgeQueueMixin
+from .operations.template import SubvolumeOpType
+
+if TYPE_CHECKING:
+    from volumes import Module
 
 log = logging.getLogger(__name__)
 
-class ConnectionPool(object):
-    class Connection(object):
-        def __init__(self, mgr, fs_name):
-            self.fs = None
-            self.mgr = mgr
-            self.fs_name = fs_name
-            self.ops_in_progress = 0
-            self.last_used = time.time()
-            self.fs_id = self.get_fs_id()
-
-        def get_fs_id(self):
-            fs_map = self.mgr.get('fs_map')
-            for fs in fs_map['filesystems']:
-                if fs['mdsmap']['fs_name'] == self.fs_name:
-                    return fs['id']
-            raise VolumeException(
-                -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
-
-        def get_fs_handle(self):
-            self.last_used = time.time()
-            self.ops_in_progress += 1
-            return self.fs
-
-        def put_fs_handle(self):
-            assert self.ops_in_progress > 0
-            self.ops_in_progress -= 1
-
-        def del_fs_handle(self):
-            if self.is_connection_valid():
-                self.disconnect()
-            else:
-                self.abort()
-
-        def is_connection_valid(self):
-            fs_id = None
-            try:
-                fs_id = self.get_fs_id()
-            except:
-                # the filesystem does not exist now -- connection is not valid.
-                pass
-            return self.fs_id == fs_id
-
-        def is_connection_idle(self, timeout):
-            return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
-
-        def connect(self):
-            assert self.ops_in_progress == 0
-            log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
-            self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
-            log.debug("Setting user ID and group ID of CephFS mount as root...")
-            self.fs.conf_set("client_mount_uid", "0")
-            self.fs.conf_set("client_mount_gid", "0")
-            log.debug("CephFS initializing...")
-            self.fs.init()
-            log.debug("CephFS mounting...")
-            self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
-            log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
-
-        def disconnect(self):
-            assert self.ops_in_progress == 0
-            log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
-            self.fs.shutdown()
-            self.fs = None
-
-        def abort(self):
-            assert self.ops_in_progress == 0
-            log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
-            self.fs.abort_conn()
-            self.fs = None
-
-    class RTimer(Timer):
-        """
-        recurring timer variant of Timer
-        """
-        def run(self):
-            while not self.finished.is_set():
-                self.finished.wait(self.interval)
-                self.function(*self.args, **self.kwargs)
-            self.finished.set()
+ALLOWED_ACCESS_LEVELS = ('r', 'rw')
 
-    # TODO: make this configurable
-    TIMER_TASK_RUN_INTERVAL = 30.0  # seconds
-    CONNECTION_IDLE_INTERVAL = 60.0 # seconds
 
+def octal_str_to_decimal_int(mode):
+    try:
+        return int(mode, 8)
+    except ValueError:
+        raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
+
+
+def name_to_json(names):
+    """
+    convert the list of names to json
+    """
+    namedict = []
+    for i in range(len(names)):
+        namedict.append({'name': names[i].decode('utf-8')})
+    return json.dumps(namedict, indent=4, sort_keys=True)
+
+
+class VolumeClient(CephfsClient["Module"]):
     def __init__(self, mgr):
-        self.mgr = mgr
-        self.connections = {}
-        self.lock = Lock()
-        self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
-                                                self.cleanup_connections)
-        self.timer_task.start()
-
-    def cleanup_connections(self):
-        with self.lock:
-            log.info("scanning for idle connections..")
-            idle_fs = [fs_name for fs_name,conn in self.connections.iteritems()
-                       if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
-            for fs_name in idle_fs:
-                log.info("cleaning up connection for '{}'".format(fs_name))
-                self._del_fs_handle(fs_name)
-
-    def get_fs_handle(self, fs_name):
-        with self.lock:
-            conn = None
-            try:
-                conn = self.connections.get(fs_name, None)
-                if conn:
-                    if conn.is_connection_valid():
-                        return conn.get_fs_handle()
-                    else:
-                        # filesystem id changed beneath us (or the filesystem does not exist).
-                        # this is possible if the filesystem got removed (and recreated with
-                        # same name) via "ceph fs rm/new" mon command.
-                        log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
-                        self._del_fs_handle(fs_name)
-                conn = ConnectionPool.Connection(self.mgr, fs_name)
-                conn.connect()
-            except cephfs.Error as e:
-                # try to provide a better error string if possible
-                if e.args[0] == errno.ENOENT:
-                    raise VolumeException(
-                        -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
-                raise VolumeException(-e.args[0], e.args[1])
-            self.connections[fs_name] = conn
-            return conn.get_fs_handle()
-
-    def put_fs_handle(self, fs_name):
-        with self.lock:
-            conn = self.connections.get(fs_name, None)
-            if conn:
-                conn.put_fs_handle()
-
-    def _del_fs_handle(self, fs_name):
-        conn = self.connections.pop(fs_name, None)
-        if conn:
-            conn.del_fs_handle()
-    def del_fs_handle(self, fs_name):
-        with self.lock:
-            self._del_fs_handle(fs_name)
-
-class VolumeClient(object):
-    def __init__(self, mgr):
-        self.mgr = mgr
-        self.connection_pool = ConnectionPool(self.mgr)
-        # TODO: make thread pool size configurable
+        super().__init__(mgr)
+        # volume specification
+        self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
+        self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
         self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
         # on startup, queue purge job for available volumes to kickstart
         # purge for leftover subvolume entries in trash. note that, if the
@@ -173,29 +61,28 @@ class VolumeClient(object):
         # job list.
         fs_map = self.mgr.get('fs_map')
         for fs in fs_map['filesystems']:
-            self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name'])
-
-    def gen_pool_names(self, volname):
+            self.cloner.queue_job(fs['mdsmap']['fs_name'])
+            self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
+
+    def shutdown(self):
+        # Overrides CephfsClient.shutdown()
+        log.info("shutting down")
+        # first, note that we're shutting down
+        self.stopping.set()
+        # stop clones
+        self.cloner.shutdown()
+        # stop purge threads
+        self.purge_queue.shutdown()
+        # last, delete all libcephfs handles from connection pool
+        self.connection_pool.del_all_connections()
+
+    def cluster_log(self, msg, lvl=None):
         """
-        return metadata and data pool name (from a filesystem/volume name) as a tuple
+        log to cluster log with default log level as WARN.
         """
-        return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
-
-    def get_fs(self, fs_name):
-        fs_map = self.mgr.get('fs_map')
-        for fs in fs_map['filesystems']:
-            if fs['mdsmap']['fs_name'] == fs_name:
-                return fs
-        return None
-
-    def get_mds_names(self, fs_name):
-        fs = self.get_fs(fs_name)
-        if fs is None:
-            return []
-        return [mds['name'] for mds in fs['mdsmap']['info'].values()]
-
-    def volume_exists(self, volname):
-        return self.get_fs(volname) is not None
+        if not lvl:
+            lvl = self.mgr.ClusterLogPrio.WARN
+        self.mgr.cluster_log("cluster", lvl, msg)
 
     def volume_exception_to_retval(self, ve):
         """
@@ -203,386 +90,776 @@ class VolumeClient(object):
         """
         return ve.to_tuple()
 
-    def create_pool(self, pool_name, pg_num):
-        # create the given pool
-        command = {'prefix': 'osd pool create', 'pool': pool_name, 'pg_num': pg_num}
-        r, outb, outs = self.mgr.mon_command(command)
-        if r != 0:
-            return r, outb, outs
-
-        return r, outb, outs
-
-    def remove_pool(self, pool_name):
-        command = {'prefix': 'osd pool rm', 'pool': pool_name, 'pool2': pool_name,
-                   'yes_i_really_really_mean_it': True}
-        return self.mgr.mon_command(command)
-
-    def create_filesystem(self, fs_name, metadata_pool, data_pool):
-        command = {'prefix': 'fs new', 'fs_name': fs_name, 'metadata': metadata_pool,
-                   'data': data_pool}
-        return self.mgr.mon_command(command)
-
-    def remove_filesystem(self, fs_name):
-        command = {'prefix': 'fs fail', 'fs_name': fs_name}
-        r, outb, outs = self.mgr.mon_command(command)
-        if r != 0:
-            return r, outb, outs
-        command = {'prefix': 'fs rm', 'fs_name': fs_name, 'yes_i_really_mean_it': True}
-        return self.mgr.mon_command(command)
-
-    def create_mds(self, fs_name):
-        spec = orchestrator.StatelessServiceSpec()
-        spec.name = fs_name
-        try:
-            completion = self.mgr.add_stateless_service("mds", spec)
-            self.mgr._orchestrator_wait([completion])
-            orchestrator.raise_if_exception(completion)
-        except (ImportError, orchestrator.OrchestratorError):
-            return 0, "", "Volume created successfully (no MDS daemons created)"
-        except Exception as e:
-            # Don't let detailed orchestrator exceptions (python backtraces)
-            # bubble out to the user
-            log.exception("Failed to create MDS daemons")
-            return -errno.EINVAL, "", str(e)
-        return 0, "", ""
-
     ### volume operations -- create, rm, ls
 
-    def create_volume(self, volname, size=None):
-        """
-        create volume  (pool, filesystem and mds)
-        """
-        metadata_pool, data_pool = self.gen_pool_names(volname)
-        # create pools
-        r, outs, outb = self.create_pool(metadata_pool, 16)
-        if r != 0:
-            return r, outb, outs
-        r, outb, outs = self.create_pool(data_pool, 8)
-        if r != 0:
-            return r, outb, outs
-        # create filesystem
-        r, outb, outs = self.create_filesystem(volname, metadata_pool, data_pool)
-        if r != 0:
-            log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs))
-            return r, outb, outs
-        # create mds
-        return self.create_mds(volname)
-
-    def delete_volume(self, volname):
-        """
-        delete the given module (tear down mds, remove filesystem)
-        """
-        self.purge_queue.cancel_purge_job(volname)
-        self.connection_pool.del_fs_handle(volname)
-        # Tear down MDS daemons
-        try:
-            completion = self.mgr.remove_stateless_service("mds", volname)
-            self.mgr._orchestrator_wait([completion])
-            orchestrator.raise_if_exception(completion)
-        except (ImportError, orchestrator.OrchestratorError):
-            log.warning("OrchestratorError, not tearing down MDS daemons")
-        except Exception as e:
-            # Don't let detailed orchestrator exceptions (python backtraces)
-            # bubble out to the user
-            log.exception("Failed to tear down MDS daemons")
-            return -errno.EINVAL, "", str(e)
-
-        # In case orchestrator didn't tear down MDS daemons cleanly, or
-        # there was no orchestrator, we force the daemons down.
-        if self.volume_exists(volname):
-            r, outb, outs = self.remove_filesystem(volname)
-            if r != 0:
-                return r, outb, outs
-        else:
-            log.warning("Filesystem already gone for volume '{0}'".format(volname))
-        metadata_pool, data_pool = self.gen_pool_names(volname)
-        r, outb, outs = self.remove_pool(metadata_pool)
-        if r != 0:
-            return r, outb, outs
-        return self.remove_pool(data_pool)
-
-    def list_volumes(self):
-        result = []
-        fs_map = self.mgr.get("fs_map")
-        for f in fs_map['filesystems']:
-            result.append({'name': f['mdsmap']['fs_name']})
-        return 0, json.dumps(result, indent=2), ""
-
-    def group_exists(self, sv, spec):
-        # default group need not be explicitly created (as it gets created
-        # at the time of subvolume, snapshot and other create operations).
-        return spec.is_default_group() or sv.get_group_path(spec)
-
-    @staticmethod
-    def octal_str_to_decimal_int(mode):
-        try:
-            return int(mode, 8)
-        except ValueError:
-            raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
-
-    def connection_pool_wrap(func):
-        """
-        decorator that wraps subvolume calls by fetching filesystem handle
-        from the connection pool when fs_handle argument is empty, otherwise
-        just invoke func with the passed in filesystem handle. Also handles
-        call made to non-existent volumes (only when fs_handle is empty).
-        """
-        def conn_wrapper(self, fs_handle, **kwargs):
-            fs_h = fs_handle
-            fs_name = kwargs['vol_name']
-            # note that force arg is available for remove type commands
-            force = kwargs.get('force', False)
-
-            # fetch the connection from the pool
-            if not fs_handle:
-                try:
-                    fs_h = self.connection_pool.get_fs_handle(fs_name)
-                except VolumeException as ve:
-                    if not force:
-                        return self.volume_exception_to_retval(ve)
-                    return 0, "", ""
+    def create_fs_volume(self, volname, placement):
+        if self.is_stopping():
+            return -errno.ESHUTDOWN, "", "shutdown in progress"
+        return create_volume(self.mgr, volname, placement)
+
+    def delete_fs_volume(self, volname, confirm):
+        if self.is_stopping():
+            return -errno.ESHUTDOWN, "", "shutdown in progress"
+
+        if confirm != "--yes-i-really-mean-it":
+            return -errno.EPERM, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
+                "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
+                "that is what you want, re-issue the command followed by " \
+                "--yes-i-really-mean-it.".format(volname)
+
+        ret, out, err = self.mgr.check_mon_command({
+            'prefix': 'config get',
+            'key': 'mon_allow_pool_delete',
+            'who': 'mon',
+            'format': 'json',
+        })
+        mon_allow_pool_delete = json.loads(out)
+        if not mon_allow_pool_delete:
+            return -errno.EPERM, "", "pool deletion is disabled; you must first " \
+                "set the mon_allow_pool_delete config option to true before volumes " \
+                "can be deleted"
+
+        metadata_pool, data_pools = get_pool_names(self.mgr, volname)
+        if not metadata_pool:
+            return -errno.ENOENT, "", "volume {0} doesn't exist".format(volname)
+        self.purge_queue.cancel_jobs(volname)
+        self.connection_pool.del_connections(volname, wait=True)
+        return delete_volume(self.mgr, volname, metadata_pool, data_pools)
+
+    def list_fs_volumes(self):
+        if self.stopping.is_set():
+            return -errno.ESHUTDOWN, "", "shutdown in progress"
+        volumes = list_volumes(self.mgr)
+        return 0, json.dumps(volumes, indent=4, sort_keys=True), ""
+
+    def rename_fs_volume(self, volname, newvolname, sure):
+        if self.is_stopping():
+            return -errno.ESHUTDOWN, "", "shutdown in progress"
+
+        if not sure:
+            return (
+                -errno.EPERM, "",
+                "WARNING: This will rename the filesystem and possibly its "
+                "pools. It is a potentially disruptive operation, clients' "
+                "cephx credentials need reauthorized to access the file system "
+                "and its pools with the new name. Add --yes-i-really-mean-it "
+                "if you are sure you wish to continue.")
+
+        return rename_volume(self.mgr, volname, newvolname)
 
-            # invoke the actual routine w/ fs handle
-            result = func(self, fs_h, **kwargs)
+    ### subvolume operations
 
-            # hand over the connection back to the pool
-            if fs_h:
-                self.connection_pool.put_fs_handle(fs_name)
-            return result
-        return conn_wrapper
+    def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs):
+        size       = kwargs['size']
+        pool       = kwargs['pool_layout']
+        uid        = kwargs['uid']
+        gid        = kwargs['gid']
+        mode       = kwargs['mode']
+        isolate_nspace = kwargs['namespace_isolated']
 
-    ### subvolume operations
+        oct_mode = octal_str_to_decimal_int(mode)
+        try:
+            create_subvol(
+                self.mgr, fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid)
+        except VolumeException as ve:
+            # kick the purge threads for async removal -- note that this
+            # assumes that the subvolume is moved to trashcan for cleanup on error.
+            self.purge_queue.queue_job(volname)
+            raise ve
 
-    @connection_pool_wrap
-    def create_subvolume(self, fs_handle, **kwargs):
+    def create_subvolume(self, **kwargs):
         ret        = 0, "", ""
         volname    = kwargs['vol_name']
         subvolname = kwargs['sub_name']
         groupname  = kwargs['group_name']
         size       = kwargs['size']
         pool       = kwargs['pool_layout']
+        uid        = kwargs['uid']
+        gid        = kwargs['gid']
         mode       = kwargs['mode']
+        isolate_nspace = kwargs['namespace_isolated']
 
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(subvolname, groupname)
-                if not self.group_exists(sv, spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found, create it with " \
-                        "`ceph fs subvolumegroup create` before creating subvolumes".format(groupname))
-                sv.create_subvolume(spec, size, pool=pool, mode=self.octal_str_to_decimal_int(mode))
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    try:
+                        with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CREATE) as subvolume:
+                            # idempotent creation -- valid. Attributes set is supported.
+                            attrs = {
+                                'uid': uid if uid else subvolume.uid,
+                                'gid': gid if gid else subvolume.gid,
+                                'mode': octal_str_to_decimal_int(mode),
+                                'data_pool': pool,
+                                'pool_namespace': subvolume.namespace if isolate_nspace else None,
+                                'quota': size
+                            }
+                            subvolume.set_attrs(subvolume.path, attrs)
+                    except VolumeException as ve:
+                        if ve.errno == -errno.ENOENT:
+                            self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs)
+                        else:
+                            raise
         except VolumeException as ve:
+            # volume/group does not exist or subvolume creation failed
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def remove_subvolume(self, fs_handle, **kwargs):
+    def remove_subvolume(self, **kwargs):
+        ret         = 0, "", ""
+        volname     = kwargs['vol_name']
+        subvolname  = kwargs['sub_name']
+        groupname   = kwargs['group_name']
+        force       = kwargs['force']
+        retainsnaps = kwargs['retain_snapshots']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    remove_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, force, retainsnaps)
+                    # kick the purge threads for async removal -- note that this
+                    # assumes that the subvolume is moved to trash can.
+                    # TODO: make purge queue as singleton so that trash can kicks
+                    # the purge threads on dump.
+                    self.purge_queue.queue_job(volname)
+        except VolumeException as ve:
+            if ve.errno == -errno.EAGAIN and not force:
+                ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)")
+                ret = self.volume_exception_to_retval(ve)
+            elif not (ve.errno == -errno.ENOENT and force):
+                ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def authorize_subvolume(self, **kwargs):
+        ret = 0, "", ""
+        volname     = kwargs['vol_name']
+        subvolname  = kwargs['sub_name']
+        authid      = kwargs['auth_id']
+        groupname   = kwargs['group_name']
+        accesslevel = kwargs['access_level']
+        tenant_id   = kwargs['tenant_id']
+        allow_existing_id = kwargs['allow_existing_id']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.ALLOW_ACCESS) as subvolume:
+                        key = subvolume.authorize(authid, accesslevel, tenant_id, allow_existing_id)
+                        ret = 0, key, ""
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def deauthorize_subvolume(self, **kwargs):
+        ret = 0, "", ""
+        volname     = kwargs['vol_name']
+        subvolname  = kwargs['sub_name']
+        authid      = kwargs['auth_id']
+        groupname   = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.DENY_ACCESS) as subvolume:
+                        subvolume.deauthorize(authid)
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def authorized_list(self, **kwargs):
+        ret = 0, "", ""
+        volname     = kwargs['vol_name']
+        subvolname  = kwargs['sub_name']
+        groupname   = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.AUTH_LIST) as subvolume:
+                        auths = subvolume.authorized_list()
+                        ret = 0, json.dumps(auths, indent=4, sort_keys=True), ""
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def evict(self, **kwargs):
+        ret = 0, "", ""
+        volname     = kwargs['vol_name']
+        subvolname  = kwargs['sub_name']
+        authid      = kwargs['auth_id']
+        groupname   = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EVICT) as subvolume:
+                        key = subvolume.evict(volname, authid)
+                        ret = 0, "", ""
+        except (VolumeException, ClusterTimeout, ClusterError, EvictionError) as e:
+            if isinstance(e, VolumeException):
+                ret = self.volume_exception_to_retval(e)
+            elif isinstance(e, ClusterTimeout):
+                ret = -errno.ETIMEDOUT , "", "Timedout trying to talk to ceph cluster"
+            elif isinstance(e, ClusterError):
+                ret = e._result_code , "", e._result_str
+            elif isinstance(e, EvictionError):
+                ret = -errno.EINVAL, "", str(e)
+        return ret
+
+    def resize_subvolume(self, **kwargs):
         ret        = 0, "", ""
         volname    = kwargs['vol_name']
         subvolname = kwargs['sub_name']
+        newsize    = kwargs['new_size']
+        noshrink   = kwargs['no_shrink']
         groupname  = kwargs['group_name']
-        force      = kwargs['force']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.RESIZE) as subvolume:
+                        nsize, usedbytes = subvolume.resize(newsize, noshrink)
+                        ret = 0, json.dumps(
+                            [{'bytes_used': usedbytes},{'bytes_quota': nsize},
+                             {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
+                            indent=4, sort_keys=True), ""
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def subvolume_pin(self, **kwargs):
+        ret         = 0, "", ""
+        volname     = kwargs['vol_name']
+        subvolname  = kwargs['sub_name']
+        pin_type    = kwargs['pin_type']
+        pin_setting = kwargs['pin_setting']
+        groupname   = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.PIN) as subvolume:
+                        subvolume.pin(pin_type, pin_setting)
+                        ret = 0, json.dumps({}), ""
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def subvolume_getpath(self, **kwargs):
+        ret        = None
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
+
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(subvolname, groupname)
-                if self.group_exists(sv, spec):
-                    sv.remove_subvolume(spec, force)
-                    self.purge_queue.queue_purge_job(volname)
-                elif not force:
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \
-                        "subvolume '{1}'".format(groupname, subvolname))
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.GETPATH) as subvolume:
+                        subvolpath = subvolume.path
+                        ret = 0, subvolpath.decode("utf-8"), ""
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def subvolume_getpath(self, fs_handle, **kwargs):
+    def subvolume_info(self, **kwargs):
         ret        = None
         volname    = kwargs['vol_name']
         subvolname = kwargs['sub_name']
         groupname  = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.INFO) as subvolume:
+                        mon_addr_lst = []
+                        mon_map_mons = self.mgr.get('mon_map')['mons']
+                        for mon in mon_map_mons:
+                            ip_port = mon['addr'].split("/")[0]
+                            mon_addr_lst.append(ip_port)
+
+                        subvol_info_dict = subvolume.info()
+                        subvol_info_dict["mon_addrs"] = mon_addr_lst
+                        ret = 0, json.dumps(subvol_info_dict, indent=4, sort_keys=True), ""
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def set_user_metadata(self, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
+        keyname    = kwargs['key_name']
+        value      = kwargs['value']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_SET) as subvolume:
+                        subvolume.set_user_metadata(keyname, value)
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def get_user_metadata(self, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
+        keyname    = kwargs['key_name']
+
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(subvolname, groupname)
-                if not self.group_exists(sv, spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found".format(groupname))
-                path = sv.get_subvolume_path(spec)
-                if not path:
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume '{0}' not found".format(subvolname))
-                ret = 0, path, ""
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_GET) as subvolume:
+                        value = subvolume.get_user_metadata(keyname)
+                        ret = 0, value, ""
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def list_user_metadata(self, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_LIST) as subvolume:
+                        subvol_metadata_dict = subvolume.list_user_metadata()
+                        ret = 0, json.dumps(subvol_metadata_dict, indent=4, sort_keys=True), ""
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def remove_user_metadata(self, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
+        keyname    = kwargs['key_name']
+        force      = kwargs['force']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_REMOVE) as subvolume:
+                        subvolume.remove_user_metadata(keyname)
+        except VolumeException as ve:
+            if not (ve.errno == -errno.ENOENT and force):
+                ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def list_subvolumes(self, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        groupname  = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    subvolumes = group.list_subvolumes()
+                    ret = 0, name_to_json(subvolumes), ""
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
     ### subvolume snapshot
 
-    @connection_pool_wrap
-    def create_subvolume_snapshot(self, fs_handle, **kwargs):
+    def create_subvolume_snapshot(self, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        snapname   = kwargs['snap_name']
+        groupname  = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_CREATE) as subvolume:
+                        subvolume.create_snapshot(snapname)
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def remove_subvolume_snapshot(self, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        snapname   = kwargs['snap_name']
+        groupname  = kwargs['group_name']
+        force      = kwargs['force']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_REMOVE) as subvolume:
+                        subvolume.remove_snapshot(snapname)
+        except VolumeException as ve:
+            # ESTALE serves as an error to state that subvolume is currently stale due to internal removal and,
+            # we should tickle the purge jobs to purge the same
+            if ve.errno == -errno.ESTALE:
+                self.purge_queue.queue_job(volname)
+            elif not (ve.errno == -errno.ENOENT and force):
+                ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def subvolume_snapshot_info(self, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        snapname   = kwargs['snap_name']
+        groupname  = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_INFO) as subvolume:
+                        snap_info_dict = subvolume.snapshot_info(snapname)
+                        ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), ""
+        except VolumeException as ve:
+                ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def set_subvolume_snapshot_metadata(self, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        snapname   = kwargs['snap_name']
+        groupname  = kwargs['group_name']
+        keyname    = kwargs['key_name']
+        value      = kwargs['value']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_SET) as subvolume:
+                        if not snapname.encode('utf-8') in subvolume.list_snapshots():
+                            raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+                        subvolume.set_snapshot_metadata(snapname, keyname, value)
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def get_subvolume_snapshot_metadata(self, **kwargs):
         ret        = 0, "", ""
         volname    = kwargs['vol_name']
         subvolname = kwargs['sub_name']
         snapname   = kwargs['snap_name']
         groupname  = kwargs['group_name']
+        keyname    = kwargs['key_name']
 
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(subvolname, groupname)
-                if not self.group_exists(sv, spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found, cannot create " \
-                        "snapshot '{1}'".format(groupname, snapname))
-                if not sv.get_subvolume_path(spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume '{0}' not found, cannot create snapshot " \
-                        "'{1}'".format(subvolname, snapname))
-                sv.create_subvolume_snapshot(spec, snapname)
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_GET) as subvolume:
+                        if not snapname.encode('utf-8') in subvolume.list_snapshots():
+                            raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+                        value = subvolume.get_snapshot_metadata(snapname, keyname)
+                        ret = 0, value, ""
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def remove_subvolume_snapshot(self, fs_handle, **kwargs):
+    def list_subvolume_snapshot_metadata(self, **kwargs):
         ret        = 0, "", ""
         volname    = kwargs['vol_name']
         subvolname = kwargs['sub_name']
         snapname   = kwargs['snap_name']
         groupname  = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_LIST) as subvolume:
+                        if not snapname.encode('utf-8') in subvolume.list_snapshots():
+                            raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+                        snap_metadata_dict = subvolume.list_snapshot_metadata(snapname)
+                        ret = 0, json.dumps(snap_metadata_dict, indent=4, sort_keys=True), ""
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def remove_subvolume_snapshot_metadata(self, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        snapname   = kwargs['snap_name']
+        groupname  = kwargs['group_name']
+        keyname    = kwargs['key_name']
         force      = kwargs['force']
+
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(subvolname, groupname)
-                if self.group_exists(sv, spec):
-                    if sv.get_subvolume_path(spec):
-                        sv.remove_subvolume_snapshot(spec, snapname, force)
-                    elif not force:
-                        raise VolumeException(
-                            -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \
-                            "subvolume snapshot '{1}'".format(subvolname, snapname))
-                elif not force:
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \
-                        "remove subvolume snapshot '{1}'".format(groupname, snapname))
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_REMOVE) as subvolume:
+                        if not snapname.encode('utf-8') in subvolume.list_snapshots():
+                            raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+                        subvolume.remove_snapshot_metadata(snapname, keyname)
+        except VolumeException as ve:
+            if not (ve.errno == -errno.ENOENT and force):
+                ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def list_subvolume_snapshots(self, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_LIST) as subvolume:
+                        snapshots = subvolume.list_snapshots()
+                        ret = 0, name_to_json(snapshots), ""
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    ### group operations
+    def protect_subvolume_snapshot(self, **kwargs):
+        ret        = 0, "", "Deprecation warning: 'snapshot protect' call is deprecated and will be removed in a future release"
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume:
+                        log.warning("snapshot protect call is deprecated and will be removed in a future release")
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def unprotect_subvolume_snapshot(self, **kwargs):
+        ret        = 0, "", "Deprecation warning: 'snapshot unprotect' call is deprecated and will be removed in a future release"
+        volname    = kwargs['vol_name']
+        subvolname = kwargs['sub_name']
+        groupname  = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume:
+                        log.warning("snapshot unprotect call is deprecated and will be removed in a future release")
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def _prepare_clone_subvolume(self, fs_handle, volname, s_subvolume, s_snapname, t_group, t_subvolname, **kwargs):
+        t_pool              = kwargs['pool_layout']
+        s_subvolname        = kwargs['sub_name']
+        s_groupname         = kwargs['group_name']
+        t_groupname         = kwargs['target_group_name']
+
+        create_clone(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, t_pool, volname, s_subvolume, s_snapname)
+        with open_subvol(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, SubvolumeOpType.CLONE_INTERNAL) as t_subvolume:
+            try:
+                if t_groupname == s_groupname and t_subvolname == s_subvolname:
+                    t_subvolume.attach_snapshot(s_snapname, t_subvolume)
+                else:
+                    s_subvolume.attach_snapshot(s_snapname, t_subvolume)
+                self.cloner.queue_job(volname)
+            except VolumeException as ve:
+                try:
+                    t_subvolume.remove()
+                    self.purge_queue.queue_job(volname)
+                except Exception as e:
+                    log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(t_subvolname, e))
+                raise ve
+
+    def _clone_subvolume_snapshot(self, fs_handle, volname, s_group, s_subvolume, **kwargs):
+        s_snapname          = kwargs['snap_name']
+        target_subvolname   = kwargs['target_sub_name']
+        target_groupname    = kwargs['target_group_name']
+        s_groupname         = kwargs['group_name']
+
+        if not s_snapname.encode('utf-8') in s_subvolume.list_snapshots():
+            raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(s_snapname))
+
+        with open_group_unique(fs_handle, self.volspec, target_groupname, s_group, s_groupname) as target_group:
+            try:
+                with open_subvol(self.mgr, fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_CREATE):
+                    raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
+            except VolumeException as ve:
+                if ve.errno == -errno.ENOENT:
+                    self._prepare_clone_subvolume(fs_handle, volname, s_subvolume, s_snapname,
+                                                  target_group, target_subvolname, **kwargs)
+                else:
+                    raise
+
+    def clone_subvolume_snapshot(self, **kwargs):
+        ret        = 0, "", ""
+        volname    = kwargs['vol_name']
+        s_subvolname = kwargs['sub_name']
+        s_groupname  = kwargs['group_name']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, s_groupname) as s_group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
+                        self._clone_subvolume_snapshot(fs_handle, volname, s_group, s_subvolume, **kwargs)
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def clone_status(self, **kwargs):
+        ret       = 0, "", ""
+        volname   = kwargs['vol_name']
+        clonename = kwargs['clone_name']
+        groupname = kwargs['group_name']
 
-    @connection_pool_wrap
-    def create_subvolume_group(self, fs_handle, **kwargs):
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    with open_subvol(self.mgr, fs_handle, self.volspec, group, clonename, SubvolumeOpType.CLONE_STATUS) as subvolume:
+                        ret = 0, json.dumps({'status' : subvolume.status}, indent=2), ""
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def clone_cancel(self, **kwargs):
         ret       = 0, "", ""
         volname   = kwargs['vol_name']
+        clonename = kwargs['clone_name']
+        groupname = kwargs['group_name']
+
+        try:
+            self.cloner.cancel_job(volname, (clonename, groupname))
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    ### group operations
+
+    def create_subvolume_group(self, **kwargs):
+        ret       = 0, "", ""
+        volname    = kwargs['vol_name']
         groupname = kwargs['group_name']
         pool      = kwargs['pool_layout']
+        uid       = kwargs['uid']
+        gid       = kwargs['gid']
         mode      = kwargs['mode']
 
         try:
-            # TODO: validate that subvol size fits in volume size
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec("", groupname)
-                sv.create_group(spec, pool=pool, mode=self.octal_str_to_decimal_int(mode))
+            with open_volume(self, volname) as fs_handle:
+                try:
+                    with open_group(fs_handle, self.volspec, groupname):
+                        # idempotent creation -- valid.
+                        pass
+                except VolumeException as ve:
+                    if ve.errno == -errno.ENOENT:
+                        oct_mode = octal_str_to_decimal_int(mode)
+                        create_group(fs_handle, self.volspec, groupname, pool, oct_mode, uid, gid)
+                    else:
+                        raise
         except VolumeException as ve:
+            # volume does not exist or subvolume group creation failed
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def remove_subvolume_group(self, fs_handle, **kwargs):
+    def remove_subvolume_group(self, **kwargs):
         ret       = 0, "", ""
-        volname   = kwargs['vol_name']
+        volname    = kwargs['vol_name']
         groupname = kwargs['group_name']
         force     = kwargs['force']
+
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                # TODO: check whether there are no subvolumes in the group
-                spec = SubvolumeSpec("", groupname)
-                sv.remove_group(spec, force)
+            with open_volume(self, volname) as fs_handle:
+                remove_group(fs_handle, self.volspec, groupname)
         except VolumeException as ve:
-            ret = self.volume_exception_to_retval(ve)
+            if not (ve.errno == -errno.ENOENT and force):
+                ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def getpath_subvolume_group(self, fs_handle, **kwargs):
+    def getpath_subvolume_group(self, **kwargs):
+        volname    = kwargs['vol_name']
         groupname  = kwargs['group_name']
+
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec("", groupname)
-                path = sv.get_group_path(spec)
-                if path is None:
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found".format(groupname))
-                return 0, path, ""
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    return 0, group.path.decode('utf-8'), ""
         except VolumeException as ve:
             return self.volume_exception_to_retval(ve)
 
+    def list_subvolume_groups(self, **kwargs):
+        volname = kwargs['vol_name']
+        ret     = 0, '[]', ""
+        volume_exists = False
+        try:
+            with open_volume(self, volname) as fs_handle:
+                volume_exists = True
+                groups = listdir(fs_handle, self.volspec.base_dir, filter_entries=[Trash.GROUP_NAME.encode('utf-8')])
+                ret = 0, name_to_json(groups), ""
+        except VolumeException as ve:
+            if not ve.errno == -errno.ENOENT or not volume_exists:
+                ret = self.volume_exception_to_retval(ve)
+        return ret
+
+    def pin_subvolume_group(self, **kwargs):
+        ret           = 0, "", ""
+        volname       = kwargs['vol_name']
+        groupname     = kwargs['group_name']
+        pin_type      = kwargs['pin_type']
+        pin_setting   = kwargs['pin_setting']
+
+        try:
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    group.pin(pin_type, pin_setting)
+                    ret = 0, json.dumps({}), ""
+        except VolumeException as ve:
+            ret = self.volume_exception_to_retval(ve)
+        return ret
+
     ### group snapshot
 
-    @connection_pool_wrap
-    def create_subvolume_group_snapshot(self, fs_handle, **kwargs):
-        ret       = 0, "", ""
+    def create_subvolume_group_snapshot(self, **kwargs):
+        ret       = -errno.ENOSYS, "", "subvolume group snapshots are not supported"
         volname   = kwargs['vol_name']
         groupname = kwargs['group_name']
-        snapname  = kwargs['snap_name']
+        # snapname  = kwargs['snap_name']
+
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec("", groupname)
-                if not self.group_exists(sv, spec):
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found, cannot create " \
-                        "snapshot '{1}'".format(groupname, snapname))
-                sv.create_group_snapshot(spec, snapname)
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    # as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots
+                    # at the subvolume group (see: https://tracker.ceph.com/issues/46074)
+                    # group.create_snapshot(snapname)
+                    pass
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def remove_subvolume_group_snapshot(self, fs_handle, **kwargs):
+    def remove_subvolume_group_snapshot(self, **kwargs):
         ret       = 0, "", ""
         volname   = kwargs['vol_name']
         groupname = kwargs['group_name']
         snapname  = kwargs['snap_name']
         force     = kwargs['force']
-        try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec("", groupname)
-                if self.group_exists(sv, spec):
-                    sv.remove_group_snapshot(spec, snapname, force)
-                elif not force:
-                    raise VolumeException(
-                        -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \
-                        "remove it".format(groupname))
-        except VolumeException as ve:
-            ret = self.volume_exception_to_retval(ve)
-        return ret
-
-    @connection_pool_wrap
-    def get_subvolume_trash_entry(self, fs_handle, **kwargs):
-        ret = None
-        volname = kwargs['vol_name']
-        exclude = kwargs.get('exclude_entries', [])
 
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec("", "")
-                path = sv.get_trash_entry(spec, exclude)
-                ret = 0, path, ""
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    group.remove_snapshot(snapname)
         except VolumeException as ve:
-            ret = self.volume_exception_to_retval(ve)
+            if not (ve.errno == -errno.ENOENT and force):
+                ret = self.volume_exception_to_retval(ve)
         return ret
 
-    @connection_pool_wrap
-    def purge_subvolume_trash_entry(self, fs_handle, **kwargs):
-        ret = 0, "", ""
-        volname = kwargs['vol_name']
-        purge_dir = kwargs['purge_dir']
-        should_cancel = kwargs.get('should_cancel', lambda: False)
+    def list_subvolume_group_snapshots(self, **kwargs):
+        ret       = 0, "", ""
+        volname   = kwargs['vol_name']
+        groupname = kwargs['group_name']
 
         try:
-            with SubVolume(self.mgr, fs_handle) as sv:
-                spec = SubvolumeSpec(purge_dir.decode('utf-8'), "")
-                sv.purge_subvolume(spec, should_cancel)
+            with open_volume(self, volname) as fs_handle:
+                with open_group(fs_handle, self.volspec, groupname) as group:
+                    snapshots = group.list_snapshots()
+                    ret = 0, name_to_json(snapshots), ""
         except VolumeException as ve:
             ret = self.volume_exception_to_retval(ve)
         return ret