]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/purge_queue.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / purge_queue.py
1 import errno
2 import logging
3 import os
4 import stat
5
6 import cephfs
7
8 from .async_job import AsyncJobs
9 from .exception import VolumeException
10 from .operations.resolver import resolve_trash
11 from .operations.template import SubvolumeOpType
12 from .operations.group import open_group
13 from .operations.subvolume import open_subvol
14 from .operations.volume import open_volume, open_volume_lockless
15 from .operations.trash import open_trashcan
16
17 log = logging.getLogger(__name__)
18
19
20 # helper for fetching a trash entry for a given volume
21 def get_trash_entry_for_volume(fs_client, volspec, volname, running_jobs):
22 log.debug("fetching trash entry for volume '{0}'".format(volname))
23
24 try:
25 with open_volume_lockless(fs_client, volname) as fs_handle:
26 try:
27 with open_trashcan(fs_handle, volspec) as trashcan:
28 path = trashcan.get_trash_entry(running_jobs)
29 return 0, path
30 except VolumeException as ve:
31 if ve.errno == -errno.ENOENT:
32 return 0, None
33 raise ve
34 except VolumeException as ve:
35 log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve))
36 return ve.errno, None
37
38
39 def subvolume_purge(fs_client, volspec, volname, trashcan, subvolume_trash_entry, should_cancel):
40 groupname, subvolname = resolve_trash(volspec, subvolume_trash_entry.decode('utf-8'))
41 log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname))
42
43 try:
44 with open_volume(fs_client, volname) as fs_handle:
45 with open_group(fs_handle, volspec, groupname) as group:
46 with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
47 log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable))
48 if not subvolume.purgeable:
49 return
50 # this is fine under the global lock -- there are just a handful
51 # of entries in the subvolume to purge. moreover, the purge needs
52 # to be guarded since a create request might sneak in.
53 trashcan.purge(subvolume.base_path, should_cancel)
54 except VolumeException as ve:
55 if not ve.errno == -errno.ENOENT:
56 raise
57
58
59 # helper for starting a purge operation on a trash entry
60 def purge_trash_entry_for_volume(fs_client, volspec, volname, purge_entry, should_cancel):
61 log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname))
62
63 ret = 0
64 try:
65 with open_volume_lockless(fs_client, volname) as fs_handle:
66 with open_trashcan(fs_handle, volspec) as trashcan:
67 try:
68 pth = os.path.join(trashcan.path, purge_entry)
69 stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE,
70 cephfs.AT_SYMLINK_NOFOLLOW)
71 if stat.S_ISLNK(stx['mode']):
72 tgt = fs_handle.readlink(pth, 4096)
73 tgt = tgt[:stx['size']]
74 log.debug("purging entry pointing to subvolume trash: {0}".format(tgt))
75 delink = True
76 try:
77 trashcan.purge(tgt, should_cancel)
78 except VolumeException as ve:
79 if not ve.errno == -errno.ENOENT:
80 delink = False
81 return ve.errno
82 finally:
83 if delink:
84 subvolume_purge(fs_client, volspec, volname, trashcan, tgt, should_cancel)
85 log.debug("purging trash link: {0}".format(purge_entry))
86 trashcan.delink(purge_entry)
87 else:
88 log.debug("purging entry pointing to trash: {0}".format(pth))
89 trashcan.purge(pth, should_cancel)
90 except cephfs.Error as e:
91 log.warn("failed to remove trash entry: {0}".format(e))
92 except VolumeException as ve:
93 ret = ve.errno
94 return ret
95
96
97 class ThreadPoolPurgeQueueMixin(AsyncJobs):
98 """
99 Purge queue mixin class maintaining a pool of threads for purging trash entries.
100 Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
101 entries (belonging to a set of volumes) have huge directory tree's (such as, lots
102 of small files in a directory w/ deep directory trees), this model may lead to
103 _all_ threads purging entries for one volume (starving other volumes).
104 """
105 def __init__(self, volume_client, tp_size):
106 self.vc = volume_client
107 super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "puregejob", tp_size)
108
109 def get_next_job(self, volname, running_jobs):
110 return get_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, running_jobs)
111
112 def execute_job(self, volname, job, should_cancel):
113 purge_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, job, should_cancel)