log = logging.getLogger(__name__)
# helper for fetching a trash entry for a given volume
-def get_trash_entry_for_volume(volume_client, volname, running_jobs):
+def get_trash_entry_for_volume(fs_client, volspec, volname, running_jobs):
log.debug("fetching trash entry for volume '{0}'".format(volname))
try:
- with open_volume_lockless(volume_client, volname) as fs_handle:
+ with open_volume_lockless(fs_client, volname) as fs_handle:
try:
- with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
+ with open_trashcan(fs_handle, volspec) as trashcan:
path = trashcan.get_trash_entry(running_jobs)
return 0, path
except VolumeException as ve:
log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve))
return ve.errno, None
-def subvolume_purge(volume_client, volname, trashcan, subvolume_trash_entry, should_cancel):
- groupname, subvolname = resolve_trash(volume_client.volspec, subvolume_trash_entry.decode('utf-8'))
+def subvolume_purge(fs_client, volspec, volname, trashcan, subvolume_trash_entry, should_cancel):
+ groupname, subvolname = resolve_trash(volspec, subvolume_trash_entry.decode('utf-8'))
log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname))
try:
- with open_volume(volume_client, volname) as fs_handle:
- with open_group(fs_handle, volume_client.volspec, groupname) as group:
- with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
+ with open_volume(fs_client, volname) as fs_handle:
+ with open_group(fs_handle, volspec, groupname) as group:
+ with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable))
if not subvolume.purgeable:
return
raise
# helper for starting a purge operation on a trash entry
-def purge_trash_entry_for_volume(volume_client, volname, purge_entry, should_cancel):
+def purge_trash_entry_for_volume(fs_client, volspec, volname, purge_entry, should_cancel):
log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname))
ret = 0
try:
- with open_volume_lockless(volume_client, volname) as fs_handle:
- with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
+ with open_volume_lockless(fs_client, volname) as fs_handle:
+ with open_trashcan(fs_handle, volspec) as trashcan:
try:
pth = os.path.join(trashcan.path, purge_entry)
stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE,
return ve.errno
finally:
if delink:
- subvolume_purge(volume_client, volname, trashcan, tgt, should_cancel)
+ subvolume_purge(fs_client, volspec, volname, trashcan, tgt, should_cancel)
log.debug("purging trash link: {0}".format(purge_entry))
trashcan.delink(purge_entry)
else:
super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "puregejob", tp_size)
def get_next_job(self, volname, running_jobs):
- return get_trash_entry_for_volume(self.vc, volname, running_jobs)
+ return get_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, running_jobs)
def execute_job(self, volname, job, should_cancel):
- purge_trash_entry_for_volume(self.vc, volname, job, should_cancel)
+ purge_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, job, should_cancel)