X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fpybind%2Fmgr%2Fvolumes%2Ffs%2Fvolume.py;h=18c5da222fe15c8d3a0f472d4830fcf4b42739a3;hb=33c7a0ef2143973309014ab28861a6fa401a5aa5;hp=9182318477817e8dde3a189ae9a821dba266ec55;hpb=cd265ab1e2bb0c89e7a1001629426438754333f4;p=ceph.git diff --git a/ceph/src/pybind/mgr/volumes/fs/volume.py b/ceph/src/pybind/mgr/volumes/fs/volume.py index 918231847..18c5da222 100644 --- a/ceph/src/pybind/mgr/volumes/fs/volume.py +++ b/ceph/src/pybind/mgr/volumes/fs/volume.py @@ -1,17 +1,20 @@ import json import errno import logging -from threading import Event +from typing import TYPE_CHECKING import cephfs +from mgr_util import CephfsClient + from .fs_util import listdir -from .operations.volume import ConnectionPool, open_volume, create_volume, \ - delete_volume, list_volumes, get_pool_names +from .operations.volume import create_volume, \ + delete_volume, rename_volume, list_volumes, open_volume, get_pool_names from .operations.group import open_group, create_group, remove_group, open_group_unique from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \ create_clone +from .operations.trash import Trash from .vol_spec import VolSpec from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError @@ -19,6 +22,9 @@ from .async_cloner import Cloner from .purge_queue import ThreadPoolPurgeQueueMixin from .operations.template import SubvolumeOpType +if TYPE_CHECKING: + from volumes import Module + log = logging.getLogger(__name__) ALLOWED_ACCESS_LEVELS = ('r', 'rw') @@ -30,6 +36,7 @@ def octal_str_to_decimal_int(mode): except ValueError: raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode)) + def name_to_json(names): """ convert the list of names to json @@ -39,14 +46,13 @@ def name_to_json(names): namedict.append({'name': names[i].decode('utf-8')}) return json.dumps(namedict, indent=4, sort_keys=True) -class VolumeClient(object): + +class VolumeClient(CephfsClient["Module"]): def __init__(self, mgr): - self.mgr = mgr - self.stopping = Event() + super().__init__(mgr) # volume specification self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir')) - self.connection_pool = ConnectionPool(self.mgr) - self.cloner = Cloner(self, self.mgr.max_concurrent_clones) + self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay) self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) # on startup, queue purge job for available volumes to kickstart # purge for leftover subvolume entries in trash. note that, if the @@ -58,24 +64,24 @@ class VolumeClient(object): self.cloner.queue_job(fs['mdsmap']['fs_name']) self.purge_queue.queue_job(fs['mdsmap']['fs_name']) - def is_stopping(self): - return self.stopping.is_set() - def shutdown(self): + # Overrides CephfsClient.shutdown() log.info("shutting down") # first, note that we're shutting down self.stopping.set() - # second, ask purge threads to quit - self.purge_queue.cancel_all_jobs() - # third, delete all libcephfs handles from connection pool - self.connection_pool.del_all_handles() + # stop clones + self.cloner.shutdown() + # stop purge threads + self.purge_queue.shutdown() + # last, delete all libcephfs handles from connection pool + self.connection_pool.del_all_connections() def cluster_log(self, msg, lvl=None): """ log to cluster log with default log level as WARN. """ if not lvl: - lvl = self.mgr.CLUSTER_LOG_PRIO_WARN + lvl = self.mgr.ClusterLogPrio.WARN self.mgr.cluster_log("cluster", lvl, msg) def volume_exception_to_retval(self, ve): @@ -117,7 +123,7 @@ class VolumeClient(object): if not metadata_pool: return -errno.ENOENT, "", "volume {0} doesn't exist".format(volname) self.purge_queue.cancel_jobs(volname) - self.connection_pool.del_fs_handle(volname, wait=True) + self.connection_pool.del_connections(volname, wait=True) return delete_volume(self.mgr, volname, metadata_pool, data_pools) def list_fs_volumes(self): @@ -126,6 +132,21 @@ class VolumeClient(object): volumes = list_volumes(self.mgr) return 0, json.dumps(volumes, indent=4, sort_keys=True), "" + def rename_fs_volume(self, volname, newvolname, sure): + if self.is_stopping(): + return -errno.ESHUTDOWN, "", "shutdown in progress" + + if not sure: + return ( + -errno.EPERM, "", + "WARNING: This will rename the filesystem and possibly its " + "pools. It is a potentially disruptive operation, clients' " + "cephx credentials need reauthorized to access the file system " + "and its pools with the new name. Add --yes-i-really-mean-it " + "if you are sure you wish to continue.") + + return rename_volume(self.mgr, volname, newvolname) + ### subvolume operations def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs): @@ -155,6 +176,7 @@ class VolumeClient(object): pool = kwargs['pool_layout'] uid = kwargs['uid'] gid = kwargs['gid'] + mode = kwargs['mode'] isolate_nspace = kwargs['namespace_isolated'] try: @@ -166,6 +188,7 @@ class VolumeClient(object): attrs = { 'uid': uid if uid else subvolume.uid, 'gid': gid if gid else subvolume.gid, + 'mode': octal_str_to_decimal_int(mode), 'data_pool': pool, 'pool_namespace': subvolume.namespace if isolate_nspace else None, 'quota': size @@ -199,7 +222,7 @@ class VolumeClient(object): # the purge threads on dump. self.purge_queue.queue_job(volname) except VolumeException as ve: - if ve.errno == -errno.EAGAIN: + if ve.errno == -errno.EAGAIN and not force: ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)") ret = self.volume_exception_to_retval(ve) elif not (ve.errno == -errno.ENOENT and force): @@ -360,6 +383,74 @@ class VolumeClient(object): ret = self.volume_exception_to_retval(ve) return ret + def set_user_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + keyname = kwargs['key_name'] + value = kwargs['value'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_SET) as subvolume: + subvolume.set_user_metadata(keyname, value) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def get_user_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + keyname = kwargs['key_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_GET) as subvolume: + value = subvolume.get_user_metadata(keyname) + ret = 0, value, "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def list_user_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_LIST) as subvolume: + subvol_metadata_dict = subvolume.list_user_metadata() + ret = 0, json.dumps(subvol_metadata_dict, indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def remove_user_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + groupname = kwargs['group_name'] + keyname = kwargs['key_name'] + force = kwargs['force'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_REMOVE) as subvolume: + subvolume.remove_user_metadata(keyname) + except VolumeException as ve: + if not (ve.errno == -errno.ENOENT and force): + ret = self.volume_exception_to_retval(ve) + return ret + def list_subvolumes(self, **kwargs): ret = 0, "", "" volname = kwargs['vol_name'] @@ -431,6 +522,86 @@ class VolumeClient(object): ret = self.volume_exception_to_retval(ve) return ret + def set_subvolume_snapshot_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + keyname = kwargs['key_name'] + value = kwargs['value'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_SET) as subvolume: + if not snapname.encode('utf-8') in subvolume.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + subvolume.set_snapshot_metadata(snapname, keyname, value) + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def get_subvolume_snapshot_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + keyname = kwargs['key_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_GET) as subvolume: + if not snapname.encode('utf-8') in subvolume.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + value = subvolume.get_snapshot_metadata(snapname, keyname) + ret = 0, value, "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def list_subvolume_snapshot_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_LIST) as subvolume: + if not snapname.encode('utf-8') in subvolume.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + snap_metadata_dict = subvolume.list_snapshot_metadata(snapname) + ret = 0, json.dumps(snap_metadata_dict, indent=4, sort_keys=True), "" + except VolumeException as ve: + ret = self.volume_exception_to_retval(ve) + return ret + + def remove_subvolume_snapshot_metadata(self, **kwargs): + ret = 0, "", "" + volname = kwargs['vol_name'] + subvolname = kwargs['sub_name'] + snapname = kwargs['snap_name'] + groupname = kwargs['group_name'] + keyname = kwargs['key_name'] + force = kwargs['force'] + + try: + with open_volume(self, volname) as fs_handle: + with open_group(fs_handle, self.volspec, groupname) as group: + with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_REMOVE) as subvolume: + if not snapname.encode('utf-8') in subvolume.list_snapshots(): + raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) + subvolume.remove_snapshot_metadata(snapname, keyname) + except VolumeException as ve: + if not (ve.errno == -errno.ENOENT and force): + ret = self.volume_exception_to_retval(ve) + return ret + def list_subvolume_snapshots(self, **kwargs): ret = 0, "", "" volname = kwargs['vol_name'] @@ -617,12 +788,14 @@ class VolumeClient(object): def list_subvolume_groups(self, **kwargs): volname = kwargs['vol_name'] ret = 0, '[]', "" + volume_exists = False try: with open_volume(self, volname) as fs_handle: - groups = listdir(fs_handle, self.volspec.base_dir) + volume_exists = True + groups = listdir(fs_handle, self.volspec.base_dir, filter_entries=[Trash.GROUP_NAME.encode('utf-8')]) ret = 0, name_to_json(groups), "" except VolumeException as ve: - if not ve.errno == -errno.ENOENT: + if not ve.errno == -errno.ENOENT or not volume_exists: ret = self.volume_exception_to_retval(ve) return ret