-import time
import errno
import logging
import sys
-from contextlib import contextmanager
-from threading import Lock, Condition
-from typing import no_type_check
+from typing import List, Tuple
-if sys.version_info >= (3, 3):
- from threading import Timer
-else:
- from threading import _Timer as Timer
+from contextlib import contextmanager
-import cephfs
import orchestrator
from .lock import GlobalLock
from ..exception import VolumeException
-from ..fs_util import create_pool, remove_pool, create_filesystem, \
- remove_filesystem, create_mds, volume_exists
+from ..fs_util import create_pool, remove_pool, rename_pool, create_filesystem, \
+ remove_filesystem, rename_filesystem, create_mds, volume_exists
+from mgr_util import open_filesystem, CephfsConnectionException
log = logging.getLogger(__name__)
-class ConnectionPool(object):
- class Connection(object):
- def __init__(self, mgr, fs_name):
- self.fs = None
- self.mgr = mgr
- self.fs_name = fs_name
- self.ops_in_progress = 0
- self.last_used = time.time()
- self.fs_id = self.get_fs_id()
-
- def get_fs_id(self):
- fs_map = self.mgr.get('fs_map')
- for fs in fs_map['filesystems']:
- if fs['mdsmap']['fs_name'] == self.fs_name:
- return fs['id']
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
-
- def get_fs_handle(self):
- self.last_used = time.time()
- self.ops_in_progress += 1
- return self.fs
-
- def put_fs_handle(self, notify):
- assert self.ops_in_progress > 0
- self.ops_in_progress -= 1
- if self.ops_in_progress == 0:
- notify()
-
- def del_fs_handle(self, waiter):
- if waiter:
- while self.ops_in_progress != 0:
- waiter()
- if self.is_connection_valid():
- self.disconnect()
- else:
- self.abort()
-
- def is_connection_valid(self):
- fs_id = None
- try:
- fs_id = self.get_fs_id()
- except:
- # the filesystem does not exist now -- connection is not valid.
- pass
- log.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
- return self.fs_id == fs_id
-
- def is_connection_idle(self, timeout):
- return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
-
- def connect(self):
- assert self.ops_in_progress == 0
- log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
- self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
- log.debug("Setting user ID and group ID of CephFS mount as root...")
- self.fs.conf_set("client_mount_uid", "0")
- self.fs.conf_set("client_mount_gid", "0")
- log.debug("CephFS initializing...")
- self.fs.init()
- log.debug("CephFS mounting...")
- self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
- log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
- self.mgr._ceph_register_client(self.fs.get_addrs())
-
- def disconnect(self):
- try:
- assert self.fs
- assert self.ops_in_progress == 0
- log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
- addrs = self.fs.get_addrs()
- self.fs.shutdown()
- self.mgr._ceph_unregister_client(addrs)
- self.fs = None
- except Exception as e:
- log.debug("disconnect: ({0})".format(e))
- raise
-
- def abort(self):
- assert self.fs
- assert self.ops_in_progress == 0
- log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
- self.fs.abort_conn()
- log.info("abort done from cephfs '{0}'".format(self.fs_name))
- self.fs = None
-
- class RTimer(Timer):
- """
- recurring timer variant of Timer
- """
- @no_type_check
- def run(self):
- try:
- while not self.finished.is_set():
- self.finished.wait(self.interval)
- self.function(*self.args, **self.kwargs)
- self.finished.set()
- except Exception as e:
- log.error("ConnectionPool.RTimer: %s", e)
- raise
-
- # TODO: make this configurable
- TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
- CONNECTION_IDLE_INTERVAL = 60.0 # seconds
-
- def __init__(self, mgr):
- self.mgr = mgr
- self.connections = {}
- self.lock = Lock()
- self.cond = Condition(self.lock)
- self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
- self.cleanup_connections)
- self.timer_task.start()
-
- def cleanup_connections(self):
- with self.lock:
- log.info("scanning for idle connections..")
- idle_fs = [fs_name for fs_name,conn in self.connections.items()
- if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
- for fs_name in idle_fs:
- log.info("cleaning up connection for '{}'".format(fs_name))
- self._del_fs_handle(fs_name)
-
- def get_fs_handle(self, fs_name):
- with self.lock:
- conn = None
- try:
- conn = self.connections.get(fs_name, None)
- if conn:
- if conn.is_connection_valid():
- return conn.get_fs_handle()
- else:
- # filesystem id changed beneath us (or the filesystem does not exist).
- # this is possible if the filesystem got removed (and recreated with
- # same name) via "ceph fs rm/new" mon command.
- log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
- self._del_fs_handle(fs_name)
- conn = ConnectionPool.Connection(self.mgr, fs_name)
- conn.connect()
- except cephfs.Error as e:
- # try to provide a better error string if possible
- if e.args[0] == errno.ENOENT:
- raise VolumeException(
- -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
- raise VolumeException(-e.args[0], e.args[1])
- self.connections[fs_name] = conn
- return conn.get_fs_handle()
-
- def put_fs_handle(self, fs_name):
- with self.lock:
- conn = self.connections.get(fs_name, None)
- if conn:
- conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
-
- def _del_fs_handle(self, fs_name, wait=False):
- conn = self.connections.pop(fs_name, None)
- if conn:
- conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
-
- def del_fs_handle(self, fs_name, wait=False):
- with self.lock:
- self._del_fs_handle(fs_name, wait)
-
- def del_all_handles(self):
- with self.lock:
- for fs_name in list(self.connections.keys()):
- log.info("waiting for pending ops for '{}'".format(fs_name))
- self._del_fs_handle(fs_name, wait=True)
- log.info("pending ops completed for '{}'".format(fs_name))
- # no new connections should have been initialized since its
- # guarded on shutdown.
- assert len(self.connections) == 0
def gen_pool_names(volname):
"""
"""
return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
+def get_mds_map(mgr, volname):
+ """
+ return mdsmap for a volname
+ """
+ mds_map = None
+ fs_map = mgr.get("fs_map")
+ for f in fs_map['filesystems']:
+ if volname == f['mdsmap']['fs_name']:
+ return f['mdsmap']
+ return mds_map
+
+def get_pool_names(mgr, volname):
+ """
+ return metadata and data pools (list) names of volume as a tuple
+ """
+ fs_map = mgr.get("fs_map")
+ metadata_pool_id = None
+ data_pool_ids = [] # type: List[int]
+ for f in fs_map['filesystems']:
+ if volname == f['mdsmap']['fs_name']:
+ metadata_pool_id = f['mdsmap']['metadata_pool']
+ data_pool_ids = f['mdsmap']['data_pools']
+ break
+ if metadata_pool_id is None:
+ return None, None
+
+ osdmap = mgr.get("osd_map")
+ pools = dict([(p['pool'], p['pool_name']) for p in osdmap['pools']])
+ metadata_pool = pools[metadata_pool_id]
+ data_pools = [pools[id] for id in data_pool_ids]
+ return metadata_pool, data_pools
+
def create_volume(mgr, volname, placement):
"""
create volume (pool, filesystem and mds)
"""
metadata_pool, data_pool = gen_pool_names(volname)
# create pools
- r, outs, outb = create_pool(mgr, metadata_pool)
+ r, outb, outs = create_pool(mgr, metadata_pool)
if r != 0:
return r, outb, outs
r, outb, outs = create_pool(mgr, data_pool)
remove_pool(mgr, data_pool)
remove_pool(mgr, metadata_pool)
return r, outb, outs
- # create mds
return create_mds(mgr, volname, placement)
-def delete_volume(mgr, volname):
+
+def delete_volume(mgr, volname, metadata_pool, data_pools):
"""
- delete the given module (tear down mds, remove filesystem)
+ delete the given module (tear down mds, remove filesystem, remove pools)
"""
# Tear down MDS daemons
try:
completion = mgr.remove_service('mds.' + volname)
- mgr._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
except (ImportError, orchestrator.OrchestratorError):
log.warning("OrchestratorError, not tearing down MDS daemons")
err = "Filesystem not found for volume '{0}'".format(volname)
log.warning(err)
return -errno.ENOENT, "", err
- metadata_pool, data_pool = gen_pool_names(volname)
r, outb, outs = remove_pool(mgr, metadata_pool)
if r != 0:
return r, outb, outs
- return remove_pool(mgr, data_pool)
+
+ for data_pool in data_pools:
+ r, outb, outs = remove_pool(mgr, data_pool)
+ if r != 0:
+ return r, outb, outs
+ result_str = "metadata pool: {0} data pool: {1} removed".format(metadata_pool, str(data_pools))
+ return r, result_str, ""
+
+def rename_volume(mgr, volname: str, newvolname: str) -> Tuple[int, str, str]:
+ """
+ rename volume (orch MDS service, file system, pools)
+ """
+ # To allow volume rename to be idempotent, check whether orch managed MDS
+ # service is already renamed. If so, skip renaming MDS service.
+ completion = None
+ rename_mds_service = True
+ try:
+ completion = mgr.describe_service(
+ service_type='mds', service_name=f"mds.{newvolname}", refresh=True)
+ orchestrator.raise_if_exception(completion)
+ except (ImportError, orchestrator.OrchestratorError):
+ log.warning("Failed to fetch orch service mds.%s", newvolname)
+ except Exception as e:
+ # Don't let detailed orchestrator exceptions (python backtraces)
+ # bubble out to the user
+ log.exception("Failed to fetch orch service mds.%s", newvolname)
+ return -errno.EINVAL, "", str(e)
+ if completion and completion.result:
+ rename_mds_service = False
+
+ # Launch new MDS service matching newvolname
+ completion = None
+ remove_mds_service = False
+ if rename_mds_service:
+ try:
+ completion = mgr.describe_service(
+ service_type='mds', service_name=f"mds.{volname}", refresh=True)
+ orchestrator.raise_if_exception(completion)
+ except (ImportError, orchestrator.OrchestratorError):
+ log.warning("Failed to fetch orch service mds.%s", volname)
+ except Exception as e:
+ # Don't let detailed orchestrator exceptions (python backtraces)
+ # bubble out to the user
+ log.exception("Failed to fetch orch service mds.%s", volname)
+ return -errno.EINVAL, "", str(e)
+ if completion and completion.result:
+ svc = completion.result[0]
+ placement = svc.spec.placement.pretty_str()
+ create_mds(mgr, newvolname, placement)
+ remove_mds_service = True
+
+ # rename_filesytem is idempotent
+ r, outb, outs = rename_filesystem(mgr, volname, newvolname)
+ if r != 0:
+ errmsg = f"Failed to rename file system '{volname}' to '{newvolname}'"
+ log.error("Failed to rename file system '%s' to '%s'", volname, newvolname)
+ outs = f'{errmsg}; {outs}'
+ return r, outb, outs
+
+ # Rename file system's metadata and data pools
+ metadata_pool, data_pools = get_pool_names(mgr, newvolname)
+
+ new_metadata_pool, new_data_pool = gen_pool_names(newvolname)
+ if metadata_pool != new_metadata_pool:
+ r, outb, outs = rename_pool(mgr, metadata_pool, new_metadata_pool)
+ if r != 0:
+ errmsg = f"Failed to rename metadata pool '{metadata_pool}' to '{new_metadata_pool}'"
+ log.error("Failed to rename metadata pool '%s' to '%s'", metadata_pool, new_metadata_pool)
+ outs = f'{errmsg}; {outs}'
+ return r, outb, outs
+
+ data_pool_rename_failed = False
+ # If file system has more than one data pool, then skip renaming
+ # the data pools, and proceed to remove the old MDS service.
+ if len(data_pools) > 1:
+ data_pool_rename_failed = True
+ else:
+ data_pool = data_pools[0]
+ if data_pool != new_data_pool:
+ r, outb, outs = rename_pool(mgr, data_pool, new_data_pool)
+ if r != 0:
+ errmsg = f"Failed to rename data pool '{data_pool}' to '{new_data_pool}'"
+ log.error("Failed to rename data pool '%s' to '%s'", data_pool, new_data_pool)
+ outs = f'{errmsg}; {outs}'
+ return r, outb, outs
+
+ # Tear down old MDS service
+ if remove_mds_service:
+ try:
+ completion = mgr.remove_service('mds.' + volname)
+ orchestrator.raise_if_exception(completion)
+ except (ImportError, orchestrator.OrchestratorError):
+ log.warning("Failed to tear down orch service mds.%s", volname)
+ except Exception as e:
+ # Don't let detailed orchestrator exceptions (python backtraces)
+ # bubble out to the user
+ log.exception("Failed to tear down orch service mds.%s", volname)
+ return -errno.EINVAL, "", str(e)
+
+ outb = f"FS volume '{volname}' renamed to '{newvolname}'"
+ if data_pool_rename_failed:
+ outb += ". But failed to rename data pools as more than one data pool was found."
+
+ return r, outb, ""
def list_volumes(mgr):
"""
result.append({'name': f['mdsmap']['fs_name']})
return result
+
@contextmanager
def open_volume(vc, volname):
"""
- open a volume for exclusive access. This API is to be used as a context manager.
+ open a volume for exclusive access. This API is to be used as a contextr
+ manager.
:param vc: volume client instance
:param volname: volume name
:return: yields a volume handle (ceph filesystem handle)
"""
- if vc.is_stopping():
- raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
-
g_lock = GlobalLock()
- fs_handle = vc.connection_pool.get_fs_handle(volname)
- try:
- with g_lock.lock_op():
- yield fs_handle
- finally:
- vc.connection_pool.put_fs_handle(volname)
+ with g_lock.lock_op():
+ try:
+ with open_filesystem(vc, volname) as fs_handle:
+ yield fs_handle
+ except CephfsConnectionException as ce:
+ raise VolumeException(ce.errno, ce.error_str)
+
@contextmanager
def open_volume_lockless(vc, volname):
"""
- open a volume with shared access. This API is to be used as a context manager.
+ open a volume with shared access. This API is to be used as a context
+ manager.
:param vc: volume client instance
:param volname: volume name
:return: yields a volume handle (ceph filesystem handle)
"""
- if vc.is_stopping():
- raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
-
- fs_handle = vc.connection_pool.get_fs_handle(volname)
try:
- yield fs_handle
- finally:
- vc.connection_pool.put_fs_handle(volname)
+ with open_filesystem(vc, volname) as fs_handle:
+ yield fs_handle
+ except CephfsConnectionException as ce:
+ raise VolumeException(ce.errno, ce.error_str)