import json
import errno
import logging
-from threading import Event
+from typing import TYPE_CHECKING
import cephfs
+from mgr_util import CephfsClient
+
from .fs_util import listdir
-from .operations.volume import ConnectionPool, open_volume, create_volume, \
- delete_volume, list_volumes, get_pool_names
+from .operations.volume import create_volume, \
+ delete_volume, rename_volume, list_volumes, open_volume, get_pool_names
from .operations.group import open_group, create_group, remove_group, open_group_unique
from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
create_clone
+from .operations.trash import Trash
from .vol_spec import VolSpec
from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
from .purge_queue import ThreadPoolPurgeQueueMixin
from .operations.template import SubvolumeOpType
+if TYPE_CHECKING:
+ from volumes import Module
+
log = logging.getLogger(__name__)
ALLOWED_ACCESS_LEVELS = ('r', 'rw')
except ValueError:
raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
+
def name_to_json(names):
"""
convert the list of names to json
namedict.append({'name': names[i].decode('utf-8')})
return json.dumps(namedict, indent=4, sort_keys=True)
-class VolumeClient(object):
+
+class VolumeClient(CephfsClient["Module"]):
def __init__(self, mgr):
- self.mgr = mgr
- self.stopping = Event()
+ super().__init__(mgr)
# volume specification
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
- self.connection_pool = ConnectionPool(self.mgr)
- self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
+ self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
# on startup, queue purge job for available volumes to kickstart
# purge for leftover subvolume entries in trash. note that, if the
self.cloner.queue_job(fs['mdsmap']['fs_name'])
self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
- def is_stopping(self):
- return self.stopping.is_set()
-
def shutdown(self):
+ # Overrides CephfsClient.shutdown()
log.info("shutting down")
# first, note that we're shutting down
self.stopping.set()
- # second, ask purge threads to quit
- self.purge_queue.cancel_all_jobs()
- # third, delete all libcephfs handles from connection pool
- self.connection_pool.del_all_handles()
+ # stop clones
+ self.cloner.shutdown()
+ # stop purge threads
+ self.purge_queue.shutdown()
+ # last, delete all libcephfs handles from connection pool
+ self.connection_pool.del_all_connections()
def cluster_log(self, msg, lvl=None):
"""
log to cluster log with default log level as WARN.
"""
if not lvl:
- lvl = self.mgr.CLUSTER_LOG_PRIO_WARN
+ lvl = self.mgr.ClusterLogPrio.WARN
self.mgr.cluster_log("cluster", lvl, msg)
def volume_exception_to_retval(self, ve):
if not metadata_pool:
return -errno.ENOENT, "", "volume {0} doesn't exist".format(volname)
self.purge_queue.cancel_jobs(volname)
- self.connection_pool.del_fs_handle(volname, wait=True)
+ self.connection_pool.del_connections(volname, wait=True)
return delete_volume(self.mgr, volname, metadata_pool, data_pools)
def list_fs_volumes(self):
volumes = list_volumes(self.mgr)
return 0, json.dumps(volumes, indent=4, sort_keys=True), ""
+ def rename_fs_volume(self, volname, newvolname, sure):
+ if self.is_stopping():
+ return -errno.ESHUTDOWN, "", "shutdown in progress"
+
+ if not sure:
+ return (
+ -errno.EPERM, "",
+ "WARNING: This will rename the filesystem and possibly its "
+ "pools. It is a potentially disruptive operation, clients' "
+ "cephx credentials need reauthorized to access the file system "
+ "and its pools with the new name. Add --yes-i-really-mean-it "
+ "if you are sure you wish to continue.")
+
+ return rename_volume(self.mgr, volname, newvolname)
+
### subvolume operations
def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs):
pool = kwargs['pool_layout']
uid = kwargs['uid']
gid = kwargs['gid']
+ mode = kwargs['mode']
isolate_nspace = kwargs['namespace_isolated']
try:
attrs = {
'uid': uid if uid else subvolume.uid,
'gid': gid if gid else subvolume.gid,
+ 'mode': octal_str_to_decimal_int(mode),
'data_pool': pool,
'pool_namespace': subvolume.namespace if isolate_nspace else None,
'quota': size
# the purge threads on dump.
self.purge_queue.queue_job(volname)
except VolumeException as ve:
- if ve.errno == -errno.EAGAIN:
+ if ve.errno == -errno.EAGAIN and not force:
ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)")
ret = self.volume_exception_to_retval(ve)
elif not (ve.errno == -errno.ENOENT and force):
ret = self.volume_exception_to_retval(ve)
return ret
+ def set_user_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+ value = kwargs['value']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_SET) as subvolume:
+ subvolume.set_user_metadata(keyname, value)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def get_user_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_GET) as subvolume:
+ value = subvolume.get_user_metadata(keyname)
+ ret = 0, value, ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_user_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_LIST) as subvolume:
+ subvol_metadata_dict = subvolume.list_user_metadata()
+ ret = 0, json.dumps(subvol_metadata_dict, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_user_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_REMOVE) as subvolume:
+ subvolume.remove_user_metadata(keyname)
+ except VolumeException as ve:
+ if not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
def list_subvolumes(self, **kwargs):
ret = 0, "", ""
volname = kwargs['vol_name']
ret = self.volume_exception_to_retval(ve)
return ret
+ def set_subvolume_snapshot_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+ value = kwargs['value']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_SET) as subvolume:
+ if not snapname.encode('utf-8') in subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ subvolume.set_snapshot_metadata(snapname, keyname, value)
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def get_subvolume_snapshot_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_GET) as subvolume:
+ if not snapname.encode('utf-8') in subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ value = subvolume.get_snapshot_metadata(snapname, keyname)
+ ret = 0, value, ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def list_subvolume_snapshot_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_LIST) as subvolume:
+ if not snapname.encode('utf-8') in subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ snap_metadata_dict = subvolume.list_snapshot_metadata(snapname)
+ ret = 0, json.dumps(snap_metadata_dict, indent=4, sort_keys=True), ""
+ except VolumeException as ve:
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
+ def remove_subvolume_snapshot_metadata(self, **kwargs):
+ ret = 0, "", ""
+ volname = kwargs['vol_name']
+ subvolname = kwargs['sub_name']
+ snapname = kwargs['snap_name']
+ groupname = kwargs['group_name']
+ keyname = kwargs['key_name']
+ force = kwargs['force']
+
+ try:
+ with open_volume(self, volname) as fs_handle:
+ with open_group(fs_handle, self.volspec, groupname) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_REMOVE) as subvolume:
+ if not snapname.encode('utf-8') in subvolume.list_snapshots():
+ raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
+ subvolume.remove_snapshot_metadata(snapname, keyname)
+ except VolumeException as ve:
+ if not (ve.errno == -errno.ENOENT and force):
+ ret = self.volume_exception_to_retval(ve)
+ return ret
+
def list_subvolume_snapshots(self, **kwargs):
ret = 0, "", ""
volname = kwargs['vol_name']
def list_subvolume_groups(self, **kwargs):
volname = kwargs['vol_name']
ret = 0, '[]', ""
+ volume_exists = False
try:
with open_volume(self, volname) as fs_handle:
- groups = listdir(fs_handle, self.volspec.base_dir)
+ volume_exists = True
+ groups = listdir(fs_handle, self.volspec.base_dir, filter_entries=[Trash.GROUP_NAME.encode('utf-8')])
ret = 0, name_to_json(groups), ""
except VolumeException as ve:
- if not ve.errno == -errno.ENOENT:
+ if not ve.errno == -errno.ENOENT or not volume_exists:
ret = self.volume_exception_to_retval(ve)
return ret