import json
import errno
import logging
+from typing import TYPE_CHECKING
import cephfs
-import orchestrator
-from .subvolspec import SubvolumeSpec
-from .subvolume import SubVolume
-from .exception import VolumeException
+from mgr_util import CephfsClient
+
+from .fs_util import listdir
+
+from .operations.volume import create_volume, \
+ delete_volume, rename_volume, list_volumes, open_volume, get_pool_names
+from .operations.group import open_group, create_group, remove_group, open_group_unique
+from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
+ create_clone
+from .operations.trash import Trash
+
+from .vol_spec import VolSpec
+from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
+from .async_cloner import Cloner
+from .purge_queue import ThreadPoolPurgeQueueMixin
+from .operations.template import SubvolumeOpType
+
+if TYPE_CHECKING:
+ from volumes import Module
log = logging.getLogger(__name__)
-class VolumeClient(object):
- def __init__(self, mgr):
- self.mgr = mgr
+ALLOWED_ACCESS_LEVELS = ('r', 'rw')
- def gen_pool_names(self, volname):
- """
- return metadata and data pool name (from a filesystem/volume name) as a tuple
- """
- return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
- def get_fs(self, fs_name):
- fs_map = self.mgr.get('fs_map')
- for fs in fs_map['filesystems']:
- if fs['mdsmap']['fs_name'] == fs_name:
- return fs
- return None
+def octal_str_to_decimal_int(mode):
+ try:
+ return int(mode, 8)
+ except ValueError:
+ raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
- def get_mds_names(self, fs_name):
- fs = self.get_fs(fs_name)
- if fs is None:
- return []
- return [mds['name'] for mds in fs['mdsmap']['info'].values()]
- def volume_exists(self, volname):
- return self.get_fs(volname) is not None
+def name_to_json(names):
+ """
+ convert the list of names to json
+ """
+ namedict = []
+ for i in range(len(names)):
+ namedict.append({'name': names[i].decode('utf-8')})
+ return json.dumps(namedict, indent=4, sort_keys=True)
+
+
+class VolumeClient(CephfsClient["Module"]):
+ def __init__(self, mgr):
+ super().__init__(mgr)
+ # volume specification
+ self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
+ self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
+ self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
+ # on startup, queue purge job for available volumes to kickstart
+ # purge for leftover subvolume entries in trash. note that, if the
+ # trash directory does not exist or if there are no purge entries
+ # available for a volume, the volume is removed from the purge
+ # job list.
+ fs_map = self.mgr.get('fs_map')
+ for fs in fs_map['filesystems']:
+ self.cloner.queue_job(fs['mdsmap']['fs_name'])
+ self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
+
+ def shutdown(self):
+ # Overrides CephfsClient.shutdown()
+ log.info("shutting down")
+ # first, note that we're shutting down
+ self.stopping.set()
+ # stop clones
+ self.cloner.shutdown()
+ # stop purge threads
+ self.purge_queue.shutdown()
+ # last, delete all libcephfs handles from connection pool
+ self.connection_pool.del_all_connections()
+
+ def cluster_log(self, msg, lvl=None):
+ """
+ log to cluster log with default log level as WARN.
+ """
+ if not lvl:
+ lvl = self.mgr.ClusterLogPrio.WARN
+ self.mgr.cluster_log("cluster", lvl, msg)
def volume_exception_to_retval(self, ve):
"""
"""
return ve.to_tuple()
- def create_pool(self, pool_name, pg_num, pg_num_min=None, pg_autoscale_factor=None):
- # create the given pool
- command = {'prefix': 'osd pool create', 'pool': pool_name, 'pg_num': pg_num}
- if pg_num_min:
- command['pg_num_min'] = pg_num_min
- r, outb, outs = self.mgr.mon_command(command)
- if r != 0:
- return r, outb, outs
-
- # set pg autoscale if needed
- if pg_autoscale_factor:
- command = {'prefix': 'osd pool set', 'pool': pool_name, 'var': 'pg_autoscale_bias',
- 'val': str(pg_autoscale_factor)}
- r, outb, outs = self.mgr.mon_command(command)
- return r, outb, outs
-
- def remove_pool(self, pool_name):
- command = {'prefix': 'osd pool rm', 'pool': pool_name, 'pool2': pool_name,
- 'yes_i_really_really_mean_it': True}
- return self.mgr.mon_command(command)
-
- def create_filesystem(self, fs_name, metadata_pool, data_pool):
- command = {'prefix': 'fs new', 'fs_name': fs_name, 'metadata': metadata_pool,
- 'data': data_pool}
- return self.mgr.mon_command(command)
-
- def remove_filesystem(self, fs_name):
- command = {'prefix': 'fs rm', 'fs_name': fs_name, 'yes_i_really_mean_it': True}
- return self.mgr.mon_command(command)
-
- def create_mds(self, fs_name):
- spec = orchestrator.StatelessServiceSpec()
- spec.name = fs_name
- try:
- completion = self.mgr.add_stateless_service("mds", spec)
- self.mgr._orchestrator_wait([completion])
- orchestrator.raise_if_exception(completion)
- except (ImportError, orchestrator.OrchestratorError):
- return 0, "", "Volume created successfully (no MDS daemons created)"
- except Exception as e:
- # Don't let detailed orchestrator exceptions (python backtraces)
- # bubble out to the user
- log.exception("Failed to create MDS daemons")
- return -errno.EINVAL, "", str(e)
- return 0, "", ""
-
- def set_mds_down(self, fs_name):
- command = {'prefix': 'fs set', 'fs_name': fs_name, 'var': 'cluster_down', 'val': 'true'}
- r, outb, outs = self.mgr.mon_command(command)
- if r != 0:
- return r, outb, outs
- for mds in self.get_mds_names(fs_name):
- command = {'prefix': 'mds fail', 'role_or_gid': mds}
- r, outb, outs = self.mgr.mon_command(command)
- if r != 0:
- return r, outb, outs
- return 0, "", ""
-
### volume operations -- create, rm, ls
- def create_volume(self, volname, size=None):
- """
- create volume (pool, filesystem and mds)
- """
- metadata_pool, data_pool = self.gen_pool_names(volname)
- # create pools
- r, outs, outb = self.create_pool(metadata_pool, 16, pg_num_min=16, pg_autoscale_factor=4.0)
- if r != 0:
- return r, outb, outs
- r, outb, outs = self.create_pool(data_pool, 8)
- if r != 0:
- return r, outb, outs
- # create filesystem
- r, outb, outs = self.create_filesystem(volname, metadata_pool, data_pool)
- if r != 0:
- log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs))
- return r, outb, outs
- # create mds
- return self.create_mds(volname)
-
- def delete_volume(self, volname):
- """
- delete the given module (tear down mds, remove filesystem)
- """
- # Tear down MDS daemons
- try:
- completion = self.mgr.remove_stateless_service("mds", volname)
- self.mgr._orchestrator_wait([completion])
- orchestrator.raise_if_exception(completion)
- except (ImportError, orchestrator.OrchestratorError):
- log.warning("OrchestratorError, not tearing down MDS daemons")
- except Exception as e:
- # Don't let detailed orchestrator exceptions (python backtraces)
- # bubble out to the user
- log.exception("Failed to tear down MDS daemons")
- return -errno.EINVAL, "", str(e)
-
- # In case orchestrator didn't tear down MDS daemons cleanly, or
- # there was no orchestrator, we force the daemons down.
- if self.volume_exists(volname):
- r, outb, outs = self.set_mds_down(volname)
- if r != 0:
- return r, outb, outs
- r, outb, outs = self.remove_filesystem(volname)
- if r != 0:
- return r, outb, outs
- else:
- log.warning("Filesystem already gone for volume '{0}'".format(volname))
- metadata_pool, data_pool = self.gen_pool_names(volname)
- r, outb, outs = self.remove_pool(metadata_pool)
- if r != 0:
- return r, outb, outs
- return self.remove_pool(data_pool)
-
- def list_volumes(self):
- result = []
- fs_map = self.mgr.get("fs_map")
- for f in fs_map['filesystems']:
- result.append({'name': f['mdsmap']['fs_name']})
- return 0, json.dumps(result, indent=2), ""
-
- def group_exists(self, sv, spec):
- # default group need not be explicitly created (as it gets created
- # at the time of subvolume, snapshot and other create operations).
- return spec.is_default_group() or sv.get_group_path(spec)
-
- @staticmethod
- def octal_str_to_decimal_int(mode):
- try:
- return int(mode, 8)
- except ValueError:
- raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
+ def create_fs_volume(self, volname, placement):
+ if self.is_stopping():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+ return create_volume(self.mgr, volname, placement)
+
+ def delete_fs_volume(self, volname, confirm):
+ if self.is_stopping():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+
+ if confirm != "--yes-i-really-mean-it":
+ return -errno.EPERM, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
+ "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
+ "that is what you want, re-issue the command followed by " \
+ "--yes-i-really-mean-it.".format(volname)
+
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config get',
+ 'key': 'mon_allow_pool_delete',
+ 'who': 'mon',
+ 'format': 'json',
+ })
+ mon_allow_pool_delete = json.loads(out)
+ if not mon_allow_pool_delete:
+ return -errno.EPERM, "", "pool deletion is disabled; you must first " \
+ "set the mon_allow_pool_delete config option to true before volumes " \
+ "can be deleted"
+
+ metadata_pool, data_pools = get_pool_names(self.mgr, volname)
+ if not metadata_pool:
+ return -errno.ENOENT, "", "volume {0} doesn't exist".format(volname)
+ self.purge_queue.cancel_jobs(volname)
+ self.connection_pool.del_connections(volname, wait=True)
+ return delete_volume(self.mgr, volname, metadata_pool, data_pools)
+
+ def list_fs_volumes(self):
+ if self.stopping.is_set():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+ volumes = list_volumes(self.mgr)
+ return 0, json.dumps(volumes, indent=4, sort_keys=True), ""
+
+ def rename_fs_volume(self, volname, newvolname, sure):
+ if self.is_stopping():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+
+ if not sure:
+ return (
+ -errno.EPERM, "",
+ "WARNING: This will rename the filesystem and possibly its "
+ "pools. It is a potentially disruptive operation, clients' "
+ "cephx credentials need reauthorized to access the file system "
+ "and its pools with the new name. Add --yes-i-really-mean-it "
+ "if you are sure you wish to continue.")
+
+ return rename_volume(self.mgr, volname, newvolname)
### subvolume operations
- def create_subvolume(self, volname, subvolname, groupname, size, mode='755', pool=None):
+ def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs):
+ size = kwargs['size']
+ pool = kwargs['pool_layout']
+ uid = kwargs['uid']
+ gid = kwargs['gid']
+ mode = kwargs['mode']
+ isolate_nspace = kwargs['namespace_isolated']
+
+ oct_mode = octal_str_to_decimal_int(mode)
+ try:
+ create_subvol(
+ self.mgr, fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid)
+ except VolumeException as ve:
+ # kick the purge threads for async removal -- note that this
+ # assumes that the subvolume is moved to trashcan for cleanup on error.
+ self.purge_queue.queue_job(volname)
+ raise ve
+
+ def create_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ size = kwargs['size']
+ pool = kwargs['pool_layout']
+ uid = kwargs['uid']
+ gid = kwargs['gid']
+ mode = kwargs['mode']
+ isolate_nspace = kwargs['namespace_isolated']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ try:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CREATE) as subvolume:
+ # idempotent creation -- valid. Attributes set is supported.
+ attrs = {
+ 'uid': uid if uid else subvolume.uid,
+ 'gid': gid if gid else subvolume.gid,
+ 'mode': octal_str_to_decimal_int(mode),
+ 'data_pool': pool,
+ 'pool_namespace': subvolume.namespace if isolate_nspace else None,
+ 'quota': size
+ }
+ subvolume.set_attrs(subvolume.path, attrs)
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs)
+ else:
+ raise
+ except VolumeException as ve:
+ # volume/group does not exist or subvolume creation failed
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ force = kwargs['force']
+ retainsnaps = kwargs['retain_snapshots']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ remove_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, force, retainsnaps)
+ # kick the purge threads for async removal -- note that this
+ # assumes that the subvolume is moved to trash can.
+ # TODO: make purge queue as singleton so that trash can kicks
+ # the purge threads on dump.
+ self.purge_queue.queue_job(volname)
+ except VolumeException as ve:
+ if ve.errno == -errno.EAGAIN and not force:
+ ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)")
+ ret = self.volume_exception_to_retval(ve)
+ elif not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def authorize_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ authid = kwargs['auth_id']
+ groupname = kwargs['group_name']
+ accesslevel = kwargs['access_level']
+ tenant_id = kwargs['tenant_id']
+ allow_existing_id = kwargs['allow_existing_id']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.ALLOW_ACCESS) as subvolume:
+ key = subvolume.authorize(authid, accesslevel, tenant_id, allow_existing_id)
+ ret = 0, key, ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def deauthorize_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ authid = kwargs['auth_id']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.DENY_ACCESS) as subvolume:
+ subvolume.deauthorize(authid)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def authorized_list(self, **kwargs):
ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
try:
- if not self.volume_exists(volname):
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \
- "volume create` before trying to create subvolumes".format(volname))
- with SubVolume(self.mgr, fs_name=volname) as sv:
- spec = SubvolumeSpec(subvolname, groupname)
- if not self.group_exists(sv, spec):
- raise VolumeException(
- -errno.ENOENT, "Subvolume group '{0}' not found, create it with " \
- "`ceph fs subvolumegroup create` before creating subvolumes".format(groupname))
- sv.create_subvolume(spec, size, pool=pool, mode=self.octal_str_to_decimal_int(mode))
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.AUTH_LIST) as subvolume:
+ auths = subvolume.authorized_list()
+ ret = 0, json.dumps(auths, indent=4, sort_keys=True), ""
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
- def remove_subvolume(self, volname, subvolname, groupname, force):
+ def evict(self, **kwargs):
ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ authid = kwargs['auth_id']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EVICT) as subvolume:
+ key = subvolume.evict(volname, authid)
+ ret = 0, "", ""
+ except (VolumeException, ClusterTimeout, ClusterError, EvictionError) as e:
+ if isinstance(e, VolumeException):
+ ret = self.volume_exception_to_retval(e)
+ elif isinstance(e, ClusterTimeout):
+ ret = -errno.ETIMEDOUT , "", "Timedout trying to talk to ceph cluster"
+ elif isinstance(e, ClusterError):
+ ret = e._result_code , "", e._result_str
+ elif isinstance(e, EvictionError):
+ ret = -errno.EINVAL, "", str(e)
+ return ret
+
+ def resize_subvolume(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ newsize = kwargs['new_size']
+ noshrink = kwargs['no_shrink']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.RESIZE) as subvolume:
+ nsize, usedbytes = subvolume.resize(newsize, noshrink)
+ ret = 0, json.dumps(
+ [{'bytes_used': usedbytes},{'bytes_quota': nsize},
+ {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
+ indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolume_pin(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ pin_type = kwargs['pin_type']
+ pin_setting = kwargs['pin_setting']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.PIN) as subvolume:
+ subvolume.pin(pin_type, pin_setting)
+ ret = 0, json.dumps({}), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolume_getpath(self, **kwargs):
+ ret = None
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.GETPATH) as subvolume:
+ subvolpath = subvolume.path
+ ret = 0, subvolpath.decode("utf-8"), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolume_info(self, **kwargs):
+ ret = None
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.INFO) as subvolume:
+ mon_addr_lst = []
+ mon_map_mons = self.mgr.get('mon_map')['mons']
+ for mon in mon_map_mons:
+ ip_port = mon['addr'].split("/")[0]
+ mon_addr_lst.append(ip_port)
+
+ subvol_info_dict = subvolume.info()
+ subvol_info_dict["mon_addrs"] = mon_addr_lst
+ ret = 0, json.dumps(subvol_info_dict, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def set_user_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+ value = kwargs['value']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_SET) as subvolume:
+ subvolume.set_user_metadata(keyname, value)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def get_user_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_GET) as subvolume:
+ value = subvolume.get_user_metadata(keyname)
+ ret = 0, value, ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_user_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
try:
- fs = self.get_fs(volname)
- if fs:
- with SubVolume(self.mgr, fs_name=volname) as sv:
- spec = SubvolumeSpec(subvolname, groupname)
- if self.group_exists(sv, spec):
- sv.remove_subvolume(spec, force)
- sv.purge_subvolume(spec)
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \
- "subvolume '{1}'".format(groupname, subvolname))
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \
- "'{1}'".format(volname, subvolname))
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_LIST) as subvolume:
+ subvol_metadata_dict = subvolume.list_user_metadata()
+ ret = 0, json.dumps(subvol_metadata_dict, indent=4, sort_keys=True), ""
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
- def subvolume_getpath(self, volname, subvolname, groupname):
- ret = None
+ def remove_user_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+ force = kwargs['force']
+
try:
- if not self.volume_exists(volname):
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found".format(volname))
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_REMOVE) as subvolume:
+ subvolume.remove_user_metadata(keyname)
+ except VolumeException as ve:
+ if not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_subvolumes(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
- with SubVolume(self.mgr, fs_name=volname) as sv:
- spec = SubvolumeSpec(subvolname, groupname)
- if not self.group_exists(sv, spec):
- raise VolumeException(
- -errno.ENOENT, "Subvolume group '{0}' not found".format(groupname))
- path = sv.get_subvolume_path(spec)
- if not path:
- raise VolumeException(
- -errno.ENOENT, "Subvolume '{0}' not found".format(subvolname))
- ret = 0, path, ""
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ subvolumes = group.list_subvolumes()
+ ret = 0, name_to_json(subvolumes), ""
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
### subvolume snapshot
- def create_subvolume_snapshot(self, volname, subvolname, snapname, groupname):
- ret = 0, "", ""
+ def create_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+
try:
- if not self.volume_exists(volname):
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \
- "'{1}'".format(volname, snapname))
-
- with SubVolume(self.mgr, fs_name=volname) as sv:
- spec = SubvolumeSpec(subvolname, groupname)
- if not self.group_exists(sv, spec):
- raise VolumeException(
- -errno.ENOENT, "Subvolume group '{0}' not found, cannot create " \
- "snapshot '{1}'".format(groupname, snapname))
- if not sv.get_subvolume_path(spec):
- raise VolumeException(
- -errno.ENOENT, "Subvolume '{0}' not found, cannot create snapshot " \
- "'{1}'".format(subvolname, snapname))
- sv.create_subvolume_snapshot(spec, snapname)
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_CREATE) as subvolume:
+ subvolume.create_snapshot(snapname)
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
- def remove_subvolume_snapshot(self, volname, subvolname, snapname, groupname, force):
- ret = 0, "", ""
+ def remove_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_REMOVE) as subvolume:
+ subvolume.remove_snapshot(snapname)
+ except VolumeException as ve:
+ # ESTALE serves as an error to state that subvolume is currently stale due to internal removal and,
+ # we should tickle the purge jobs to purge the same
+ if ve.errno == -errno.ESTALE:
+ self.purge_queue.queue_job(volname)
+ elif not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def subvolume_snapshot_info(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_INFO) as subvolume:
+ snap_info_dict = subvolume.snapshot_info(snapname)
+ ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def set_subvolume_snapshot_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+ value = kwargs['value']
+
try:
- if self.volume_exists(volname):
- with SubVolume(self.mgr, fs_name=volname) as sv:
- spec = SubvolumeSpec(subvolname, groupname)
- if self.group_exists(sv, spec):
- if sv.get_subvolume_path(spec):
- sv.remove_subvolume_snapshot(spec, snapname, force)
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \
- "subvolume snapshot '{1}'".format(subvolname, snapname))
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \
- "remove subvolume snapshot '{1}'".format(groupname, snapname))
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \
- "snapshot '{1}'".format(volname, snapname))
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_SET) as subvolume:
+ if not snapname.encode('utf-8') in subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ subvolume.set_snapshot_metadata(snapname, keyname, value)
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
- ### group operations
+ def get_subvolume_snapshot_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
- def create_subvolume_group(self, volname, groupname, mode='755', pool=None):
- ret = 0, "", ""
try:
- if not self.volume_exists(volname):
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, create it with `ceph fs " \
- "volume create` before trying to create subvolume groups".format(volname))
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_GET) as subvolume:
+ if not snapname.encode('utf-8') in subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ value = subvolume.get_snapshot_metadata(snapname, keyname)
+ ret = 0, value, ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_subvolume_snapshot_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
- # TODO: validate that subvol size fits in volume size
- with SubVolume(self.mgr, fs_name=volname) as sv:
- spec = SubvolumeSpec("", groupname)
- sv.create_group(spec, pool=pool, mode=self.octal_str_to_decimal_int(mode))
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_LIST) as subvolume:
+ if not snapname.encode('utf-8') in subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ snap_metadata_dict = subvolume.list_snapshot_metadata(snapname)
+ ret = 0, json.dumps(snap_metadata_dict, indent=4, sort_keys=True), ""
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
- def remove_subvolume_group(self, volname, groupname, force):
- ret = 0, "", ""
+ def remove_subvolume_snapshot_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_REMOVE) as subvolume:
+ if not snapname.encode('utf-8') in subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ subvolume.remove_snapshot_metadata(snapname, keyname)
+ except VolumeException as ve:
+ if not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_subvolume_snapshots(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
try:
- if self.volume_exists(volname):
- with SubVolume(self.mgr, fs_name=volname) as sv:
- # TODO: check whether there are no subvolumes in the group
- spec = SubvolumeSpec("", groupname)
- sv.remove_group(spec, force)
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolume " \
- "group '{0}'".format(volname, groupname))
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_LIST) as subvolume:
+ snapshots = subvolume.list_snapshots()
+ ret = 0, name_to_json(snapshots), ""
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
- ### group snapshot
+ def protect_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", "Deprecation warning: 'snapshot protect' call is deprecated and will be removed in a future release"
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
- def create_subvolume_group_snapshot(self, volname, groupname, snapname):
- ret = 0, "", ""
try:
- if not self.volume_exists(volname):
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, cannot create snapshot " \
- "'{1}'".format(volname, snapname))
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume:
+ log.warning("snapshot protect call is deprecated and will be removed in a future release")
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
- with SubVolume(self.mgr, fs_name=volname) as sv:
- spec = SubvolumeSpec("", groupname)
- if not self.group_exists(sv, spec):
- raise VolumeException(
- -errno.ENOENT, "Subvolume group '{0}' not found, cannot create " \
- "snapshot '{1}'".format(groupname, snapname))
- sv.create_group_snapshot(spec, snapname)
+ def unprotect_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", "Deprecation warning: 'snapshot unprotect' call is deprecated and will be removed in a future release"
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume:
+ log.warning("snapshot unprotect call is deprecated and will be removed in a future release")
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret
- def remove_subvolume_group_snapshot(self, volname, groupname, snapname, force):
- ret = 0, "", ""
+ def _prepare_clone_subvolume(self, fs_handle, volname, s_subvolume, s_snapname, t_group, t_subvolname, **kwargs):
+ t_pool = kwargs['pool_layout']
+ s_subvolname = kwargs['sub_name']
+ s_groupname = kwargs['group_name']
+ t_groupname = kwargs['target_group_name']
+
+ create_clone(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, t_pool, volname, s_subvolume, s_snapname)
+ with open_subvol(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, SubvolumeOpType.CLONE_INTERNAL) as t_subvolume:
+ try:
+ if t_groupname == s_groupname and t_subvolname == s_subvolname:
+ t_subvolume.attach_snapshot(s_snapname, t_subvolume)
+ else:
+ s_subvolume.attach_snapshot(s_snapname, t_subvolume)
+ self.cloner.queue_job(volname)
+ except VolumeException as ve:
+ try:
+ t_subvolume.remove()
+ self.purge_queue.queue_job(volname)
+ except Exception as e:
+ log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(t_subvolname, e))
+ raise ve
+
+ def _clone_subvolume_snapshot(self, fs_handle, volname, s_group, s_subvolume, **kwargs):
+ s_snapname = kwargs['snap_name']
+ target_subvolname = kwargs['target_sub_name']
+ target_groupname = kwargs['target_group_name']
+ s_groupname = kwargs['group_name']
+
+ if not s_snapname.encode('utf-8') in s_subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(s_snapname))
+
+ with open_group_unique(fs_handle, self.volspec, target_groupname, s_group, s_groupname) as target_group:
+ try:
+ with open_subvol(self.mgr, fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_CREATE):
+ raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ self._prepare_clone_subvolume(fs_handle, volname, s_subvolume, s_snapname,
+ target_group, target_subvolname, **kwargs)
+ else:
+ raise
+
+ def clone_subvolume_snapshot(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ s_subvolname = kwargs['sub_name']
+ s_groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, s_groupname) as s_group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
+ self._clone_subvolume_snapshot(fs_handle, volname, s_group, s_subvolume, **kwargs)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def clone_status(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ clonename = kwargs['clone_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, clonename, SubvolumeOpType.CLONE_STATUS) as subvolume:
+ ret = 0, json.dumps({'status' : subvolume.status}, indent=2), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def clone_cancel(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ clonename = kwargs['clone_name']
+ groupname = kwargs['group_name']
+
+ try:
+ self.cloner.cancel_job(volname, (clonename, groupname))
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ ### group operations
+
+ def create_subvolume_group(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ pool = kwargs['pool_layout']
+ uid = kwargs['uid']
+ gid = kwargs['gid']
+ mode = kwargs['mode']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ try:
+ with open_group(fs_handle, self.volspec, groupname):
+ # idempotent creation -- valid.
+ pass
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ oct_mode = octal_str_to_decimal_int(mode)
+ create_group(fs_handle, self.volspec, groupname, pool, oct_mode, uid, gid)
+ else:
+ raise
+ except VolumeException as ve:
+ # volume does not exist or subvolume group creation failed
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_subvolume_group(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ remove_group(fs_handle, self.volspec, groupname)
+ except VolumeException as ve:
+ if not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def getpath_subvolume_group(self, **kwargs):
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ return 0, group.path.decode('utf-8'), ""
+ except VolumeException as ve:
+ return self.volume_exception_to_retval(ve)
+
+ def list_subvolume_groups(self, **kwargs):
+ volname = kwargs['vol_name']
+ ret = 0, '[]', ""
+ volume_exists = False
+ try:
+ with open_volume(self, volname) as fs_handle:
+ volume_exists = True
+ groups = listdir(fs_handle, self.volspec.base_dir, filter_entries=[Trash.GROUP_NAME.encode('utf-8')])
+ ret = 0, name_to_json(groups), ""
+ except VolumeException as ve:
+ if not ve.errno == -errno.ENOENT or not volume_exists:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def pin_subvolume_group(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ pin_type = kwargs['pin_type']
+ pin_setting = kwargs['pin_setting']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ group.pin(pin_type, pin_setting)
+ ret = 0, json.dumps({}), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ ### group snapshot
+
+ def create_subvolume_group_snapshot(self, **kwargs):
+ ret = -errno.ENOSYS, "", "subvolume group snapshots are not supported"
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ # snapname = kwargs['snap_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ # as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots
+ # at the subvolume group (see: https://tracker.ceph.com/issues/46074)
+ # group.create_snapshot(snapname)
+ pass
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_subvolume_group_snapshot(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+ snapname = kwargs['snap_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ group.remove_snapshot(snapname)
+ except VolumeException as ve:
+ if not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_subvolume_group_snapshots(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ groupname = kwargs['group_name']
+
try:
- if self.volume_exists(volname):
- with SubVolume(self.mgr, fs_name=volname) as sv:
- spec = SubvolumeSpec("", groupname)
- if self.group_exists(sv, spec):
- sv.remove_group_snapshot(spec, snapname, force)
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \
- "remove it".format(groupname))
- elif not force:
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found, cannot remove subvolumegroup " \
- "snapshot '{1}'".format(volname, snapname))
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ snapshots = group.list_snapshots()
+ ret = 0, name_to_json(snapshots), ""
except VolumeException as ve:
ret = self.volume_exception_to_retval(ve)
return ret