]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/purge_queue.py
2e2ea41ac0587d848167919c2807fe551a9224c5
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / purge_queue.py
1 import errno
2 import logging
3 import os
4 import stat
5
6 import cephfs
7
8 from .async_job import AsyncJobs
9 from .exception import VolumeException
10 from .operations.resolver import resolve_trash
11 from .operations.template import SubvolumeOpType
12 from .operations.group import open_group
13 from .operations.subvolume import open_subvol
14 from .operations.volume import open_volume, open_volume_lockless
15 from .operations.trash import open_trashcan
16
17 log = logging.getLogger(__name__)
18
19 # helper for fetching a trash entry for a given volume
20 def get_trash_entry_for_volume(volume_client, volname, running_jobs):
21 log.debug("fetching trash entry for volume '{0}'".format(volname))
22
23 try:
24 with open_volume_lockless(volume_client, volname) as fs_handle:
25 try:
26 with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
27 path = trashcan.get_trash_entry(running_jobs)
28 return 0, path
29 except VolumeException as ve:
30 if ve.errno == -errno.ENOENT:
31 return 0, None
32 raise ve
33 except VolumeException as ve:
34 log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve))
35 return ve.errno, None
36
37 def subvolume_purge(volume_client, volname, trashcan, subvolume_trash_entry, should_cancel):
38 groupname, subvolname = resolve_trash(volume_client.volspec, subvolume_trash_entry.decode('utf-8'))
39 log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname))
40
41 try:
42 with open_volume(volume_client, volname) as fs_handle:
43 with open_group(fs_handle, volume_client.volspec, groupname) as group:
44 with open_subvol(fs_handle, volume_client.volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
45 log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable))
46 if not subvolume.purgeable:
47 return
48 # this is fine under the global lock -- there are just a handful
49 # of entries in the subvolume to purge. moreover, the purge needs
50 # to be guarded since a create request might sneak in.
51 trashcan.purge(subvolume.base_path, should_cancel)
52 except VolumeException as ve:
53 if not ve.errno == -errno.ENOENT:
54 raise
55
56 # helper for starting a purge operation on a trash entry
57 def purge_trash_entry_for_volume(volume_client, volname, purge_entry, should_cancel):
58 log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname))
59
60 ret = 0
61 try:
62 with open_volume_lockless(volume_client, volname) as fs_handle:
63 with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
64 try:
65 pth = os.path.join(trashcan.path, purge_entry)
66 stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE,
67 cephfs.AT_SYMLINK_NOFOLLOW)
68 if stat.S_ISLNK(stx['mode']):
69 tgt = fs_handle.readlink(pth, 4096)
70 tgt = tgt[:stx['size']]
71 log.debug("purging entry pointing to subvolume trash: {0}".format(tgt))
72 delink = True
73 try:
74 trashcan.purge(tgt, should_cancel)
75 except VolumeException as ve:
76 if not ve.errno == -errno.ENOENT:
77 delink = False
78 return ve.errno
79 finally:
80 if delink:
81 subvolume_purge(volume_client, volname, trashcan, tgt, should_cancel)
82 log.debug("purging trash link: {0}".format(purge_entry))
83 trashcan.delink(purge_entry)
84 else:
85 log.debug("purging entry pointing to trash: {0}".format(pth))
86 trashcan.purge(pth, should_cancel)
87 except cephfs.Error as e:
88 log.warn("failed to remove trash entry: {0}".format(e))
89 except VolumeException as ve:
90 ret = ve.errno
91 return ret
92
93 class ThreadPoolPurgeQueueMixin(AsyncJobs):
94 """
95 Purge queue mixin class maintaining a pool of threads for purging trash entries.
96 Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
97 entries (belonging to a set of volumes) have huge directory tree's (such as, lots
98 of small files in a directory w/ deep directory trees), this model may lead to
99 _all_ threads purging entries for one volume (starving other volumes).
100 """
101 def __init__(self, volume_client, tp_size):
102 self.vc = volume_client
103 super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "puregejob", tp_size)
104
105 def get_next_job(self, volname, running_jobs):
106 return get_trash_entry_for_volume(self.vc, volname, running_jobs)
107
108 def execute_job(self, volname, job, should_cancel):
109 purge_trash_entry_for_volume(self.vc, volname, job, should_cancel)