]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/volumes/fs/purge_queue.py
bump version to 19.2.0-pve1
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / purge_queue.py
CommitLineData
92f5a8d4 1import errno
494da23a 2import logging
adb31ebb
TL
3import os
4import stat
5
6import cephfs
494da23a 7
92f5a8d4
TL
8from .async_job import AsyncJobs
9from .exception import VolumeException
adb31ebb
TL
10from .operations.resolver import resolve_trash
11from .operations.template import SubvolumeOpType
12from .operations.group import open_group
13from .operations.subvolume import open_subvol
92f5a8d4
TL
14from .operations.volume import open_volume, open_volume_lockless
15from .operations.trash import open_trashcan
494da23a 16
92f5a8d4 17log = logging.getLogger(__name__)
494da23a 18
20effc67 19
92f5a8d4 20# helper for fetching a trash entry for a given volume
522d829b 21def get_trash_entry_for_volume(fs_client, volspec, volname, running_jobs):
92f5a8d4
TL
22 log.debug("fetching trash entry for volume '{0}'".format(volname))
23
24 try:
522d829b 25 with open_volume_lockless(fs_client, volname) as fs_handle:
92f5a8d4 26 try:
522d829b 27 with open_trashcan(fs_handle, volspec) as trashcan:
92f5a8d4
TL
28 path = trashcan.get_trash_entry(running_jobs)
29 return 0, path
30 except VolumeException as ve:
31 if ve.errno == -errno.ENOENT:
32 return 0, None
33 raise ve
34 except VolumeException as ve:
9f95a23c 35 log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve))
92f5a8d4 36 return ve.errno, None
92f5a8d4 37
20effc67 38
522d829b
TL
39def subvolume_purge(fs_client, volspec, volname, trashcan, subvolume_trash_entry, should_cancel):
40 groupname, subvolname = resolve_trash(volspec, subvolume_trash_entry.decode('utf-8'))
adb31ebb
TL
41 log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname))
42
43 try:
522d829b
TL
44 with open_volume(fs_client, volname) as fs_handle:
45 with open_group(fs_handle, volspec, groupname) as group:
46 with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
adb31ebb
TL
47 log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable))
48 if not subvolume.purgeable:
49 return
50 # this is fine under the global lock -- there are just a handful
51 # of entries in the subvolume to purge. moreover, the purge needs
52 # to be guarded since a create request might sneak in.
53 trashcan.purge(subvolume.base_path, should_cancel)
54 except VolumeException as ve:
55 if not ve.errno == -errno.ENOENT:
56 raise
57
20effc67 58
92f5a8d4 59# helper for starting a purge operation on a trash entry
522d829b 60def purge_trash_entry_for_volume(fs_client, volspec, volname, purge_entry, should_cancel):
adb31ebb 61 log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname))
92f5a8d4
TL
62
63 ret = 0
64 try:
522d829b
TL
65 with open_volume_lockless(fs_client, volname) as fs_handle:
66 with open_trashcan(fs_handle, volspec) as trashcan:
adb31ebb
TL
67 try:
68 pth = os.path.join(trashcan.path, purge_entry)
69 stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE,
70 cephfs.AT_SYMLINK_NOFOLLOW)
71 if stat.S_ISLNK(stx['mode']):
72 tgt = fs_handle.readlink(pth, 4096)
73 tgt = tgt[:stx['size']]
74 log.debug("purging entry pointing to subvolume trash: {0}".format(tgt))
75 delink = True
76 try:
77 trashcan.purge(tgt, should_cancel)
78 except VolumeException as ve:
79 if not ve.errno == -errno.ENOENT:
80 delink = False
81 return ve.errno
82 finally:
83 if delink:
522d829b 84 subvolume_purge(fs_client, volspec, volname, trashcan, tgt, should_cancel)
adb31ebb
TL
85 log.debug("purging trash link: {0}".format(purge_entry))
86 trashcan.delink(purge_entry)
87 else:
88 log.debug("purging entry pointing to trash: {0}".format(pth))
89 trashcan.purge(pth, should_cancel)
90 except cephfs.Error as e:
91 log.warn("failed to remove trash entry: {0}".format(e))
92f5a8d4
TL
92 except VolumeException as ve:
93 ret = ve.errno
94 return ret
95
20effc67 96
92f5a8d4 97class ThreadPoolPurgeQueueMixin(AsyncJobs):
494da23a
TL
98 """
99 Purge queue mixin class maintaining a pool of threads for purging trash entries.
100 Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
101 entries (belonging to a set of volumes) have huge directory tree's (such as, lots
102 of small files in a directory w/ deep directory trees), this model may lead to
103 _all_ threads purging entries for one volume (starving other volumes).
104 """
105 def __init__(self, volume_client, tp_size):
92f5a8d4 106 self.vc = volume_client
1e59de90 107 super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "purgejob", tp_size)
494da23a 108
92f5a8d4 109 def get_next_job(self, volname, running_jobs):
522d829b 110 return get_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, running_jobs)
494da23a 111
92f5a8d4 112 def execute_job(self, volname, job, should_cancel):
522d829b 113 purge_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, job, should_cancel)