import logging
import json
from datetime import datetime
-from typing import List, Dict
+from typing import Any, List, Dict
+from pathlib import Path
import cephfs
if isinstance(e, MetadataMgrException):
log.error("metadata manager exception: {0}".format(e))
- e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
+ e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}")
elif isinstance(e, cephfs.Error):
e = VolumeException(-e.args[0], e.args[1])
raise e
self.metadata_mgr.flush()
def add_clone_failure(self, errno, error_msg):
- self.metadata_mgr.add_section(MetadataManager.CLONE_FAILURE_SECTION)
- self.metadata_mgr.update_section(MetadataManager.CLONE_FAILURE_SECTION,
- MetadataManager.CLONE_FAILURE_META_KEY_ERRNO, errno)
- self.metadata_mgr.update_section(MetadataManager.CLONE_FAILURE_SECTION,
- MetadataManager.CLONE_FAILURE_META_KEY_ERROR_MSG, error_msg)
- self.metadata_mgr.flush()
+ try:
+ self.metadata_mgr.add_section(MetadataManager.CLONE_FAILURE_SECTION)
+ self.metadata_mgr.update_section(MetadataManager.CLONE_FAILURE_SECTION,
+ MetadataManager.CLONE_FAILURE_META_KEY_ERRNO, errno)
+ self.metadata_mgr.update_section(MetadataManager.CLONE_FAILURE_SECTION,
+ MetadataManager.CLONE_FAILURE_META_KEY_ERROR_MSG, error_msg)
+ self.metadata_mgr.flush()
+ except MetadataMgrException as me:
+ log.error(f"Failed to add clone failure status clone={self.subvol_name} group={self.group_name} "
+ f"reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}")
def create_clone(self, pool, source_volname, source_subvolume, snapname):
subvolume_type = SubvolumeTypes.TYPE_CLONE
if isinstance(e, MetadataMgrException):
log.error("metadata manager exception: {0}".format(e))
- e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
+ e = VolumeException(-errno.EINVAL, f"exception in subvolume metadata: {os.strerror(-e.args[0])}")
elif isinstance(e, cephfs.Error):
e = VolumeException(-e.args[0], e.args[1])
raise e
return False
raise
- def remove_snapshot(self, snapname):
+ def get_pending_clones(self, snapname):
+ pending_clones_info = {"has_pending_clones": "no"} # type: Dict[str, Any]
+ pending_track_id_list = []
+ pending_clone_list = []
+ index_path = ""
+ orphan_clones_count = 0
+
+ try:
+ if self.has_pending_clones(snapname):
+ pending_track_id_list = self.metadata_mgr.list_all_keys_with_specified_values_from_section('clone snaps', snapname)
+ else:
+ return pending_clones_info
+ except MetadataMgrException as me:
+ if me.errno != -errno.ENOENT:
+ raise VolumeException(-me.args[0], me.args[1])
+
+ try:
+ with open_clone_index(self.fs, self.vol_spec) as index:
+ index_path = index.path.decode('utf-8')
+ except IndexException as e:
+ log.warning("failed to open clone index '{0}' for snapshot '{1}'".format(e, snapname))
+ raise VolumeException(-errno.EINVAL, "failed to open clone index")
+
+ for track_id in pending_track_id_list:
+ try:
+ link_path = self.fs.readlink(os.path.join(index_path, track_id), 4096)
+ except cephfs.Error as e:
+ if e.errno != errno.ENOENT:
+ raise VolumeException(-e.args[0], e.args[1])
+ else:
+ try:
+ # If clone is completed between 'list_all_keys_with_specified_values_from_section'
+ # and readlink(track_id_path) call then readlink will fail with error ENOENT (2)
+ # Hence we double check whether track_id is exist in .meta file or not.
+ value = self.metadata_mgr.get_option('clone snaps', track_id)
+ # Edge case scenario.
+ # If track_id for clone exist but path /volumes/_index/clone/{track_id} not found
+ # then clone is orphan.
+ orphan_clones_count += 1
+ continue
+ except MetadataMgrException as me:
+ if me.errno != -errno.ENOENT:
+ raise VolumeException(-me.args[0], me.args[1])
+
+ path = Path(link_path.decode('utf-8'))
+ clone_name = os.path.basename(link_path).decode('utf-8')
+ group_name = os.path.basename(path.parent.absolute())
+ details = {"name": clone_name} # type: Dict[str, str]
+ if group_name != Group.NO_GROUP_NAME:
+ details["target_group"] = group_name
+ pending_clone_list.append(details)
+
+ if len(pending_clone_list) != 0:
+ pending_clones_info["has_pending_clones"] = "yes"
+ pending_clones_info["pending_clones"] = pending_clone_list
+ else:
+ pending_clones_info["has_pending_clones"] = "no"
+
+ if orphan_clones_count > 0:
+ pending_clones_info["orphan_clones_count"] = orphan_clones_count
+
+ return pending_clones_info
+
+ def remove_snapshot(self, snapname, force=False):
if self.has_pending_clones(snapname):
raise VolumeException(-errno.EAGAIN, "snapshot '{0}' has pending clones".format(snapname))
snappath = self.snapshot_path(snapname)
+ try:
+ self.metadata_mgr.remove_section(self.get_snap_section_name(snapname))
+ self.metadata_mgr.flush()
+ except MetadataMgrException as me:
+ if force:
+ log.info(f"Allowing snapshot removal on failure of it's metadata removal with force on "
+ f"snap={snapname} subvol={self.subvol_name} group={self.group_name} reason={me.args[1]}, "
+ f"errno:{-me.args[0]}, {os.strerror(-me.args[0])}")
+ pass
+ else:
+ log.error(f"Failed to remove snapshot metadata on snap={snapname} subvol={self.subvol_name} "
+ f"group={self.group_name} reason={me.args[1]}, errno:{-me.args[0]}, {os.strerror(-me.args[0])}")
+ raise VolumeException(-errno.EAGAIN,
+ f"failed to remove snapshot metadata on snap={snapname} reason={me.args[0]} {me.args[1]}")
rmsnap(self.fs, snappath)
- self.metadata_mgr.remove_section(self.get_snap_section_name(snapname))
- self.metadata_mgr.flush()
def snapshot_info(self, snapname):
if is_inherited_snap(snapname):
snappath = self.snapshot_data_path(snapname)
snap_info = {}
try:
- snap_attrs = {'created_at':'ceph.snap.btime', 'size':'ceph.dir.rbytes',
+ snap_attrs = {'created_at':'ceph.snap.btime',
'data_pool':'ceph.dir.layout.pool'}
for key, val in snap_attrs.items():
snap_info[key] = self.fs.getxattr(snappath, val)
- return {'size': int(snap_info['size']),
- 'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))),
- 'data_pool': snap_info['data_pool'].decode('utf-8'),
- 'has_pending_clones': "yes" if self.has_pending_clones(snapname) else "no"}
+ pending_clones_info = self.get_pending_clones(snapname)
+ info_dict = {'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))),
+ 'data_pool': snap_info['data_pool'].decode('utf-8')} # type: Dict[str, Any]
+ info_dict.update(pending_clones_info);
+ return info_dict
except cephfs.Error as e:
if e.errno == errno.ENOENT:
raise VolumeException(-errno.ENOENT,
return []
raise
+ def clean_stale_snapshot_metadata(self):
+ """ Clean up stale snapshot metadata """
+ if self.metadata_mgr.has_snap_metadata_section():
+ snap_list = self.list_snapshots()
+ snaps_with_metadata_list = self.metadata_mgr.list_snaps_with_metadata()
+ for snap_with_metadata in snaps_with_metadata_list:
+ if snap_with_metadata.encode('utf-8') not in snap_list:
+ try:
+ self.metadata_mgr.remove_section(self.get_snap_section_name(snap_with_metadata))
+ self.metadata_mgr.flush()
+ except MetadataMgrException as me:
+ log.error(f"Failed to remove stale snap metadata on snap={snap_with_metadata} "
+ f"subvol={self.subvol_name} group={self.group_name} reason={me.args[1]}, "
+ f"errno:{-me.args[0]}, {os.strerror(-me.args[0])}")
+ pass
+
def _add_snap_clone(self, track_id, snapname):
self.metadata_mgr.add_section("clone snaps")
self.metadata_mgr.update_section("clone snaps", track_id, snapname)