]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/purge_queue.py
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / purge_queue.py
1 import errno
2 import logging
3
4 from .async_job import AsyncJobs
5 from .exception import VolumeException
6 from .operations.volume import open_volume, open_volume_lockless
7 from .operations.trash import open_trashcan
8
9 log = logging.getLogger(__name__)
10
11 # helper for fetching a trash entry for a given volume
12 def get_trash_entry_for_volume(volume_client, volname, running_jobs):
13 log.debug("fetching trash entry for volume '{0}'".format(volname))
14
15 try:
16 with open_volume_lockless(volume_client, volname) as fs_handle:
17 try:
18 with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
19 path = trashcan.get_trash_entry(running_jobs)
20 return 0, path
21 except VolumeException as ve:
22 if ve.errno == -errno.ENOENT:
23 return 0, None
24 raise ve
25 except VolumeException as ve:
26 log.error("error fetching trash entry for volume '{0}' ({1})".format(volname), ve)
27 return ve.errno, None
28 return ret
29
30 # helper for starting a purge operation on a trash entry
31 def purge_trash_entry_for_volume(volume_client, volname, purge_dir, should_cancel):
32 log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname))
33
34 ret = 0
35 try:
36 with open_volume_lockless(volume_client, volname) as fs_handle:
37 with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
38 trashcan.purge(purge_dir, should_cancel)
39 except VolumeException as ve:
40 ret = ve.errno
41 return ret
42
43 class ThreadPoolPurgeQueueMixin(AsyncJobs):
44 """
45 Purge queue mixin class maintaining a pool of threads for purging trash entries.
46 Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
47 entries (belonging to a set of volumes) have huge directory tree's (such as, lots
48 of small files in a directory w/ deep directory trees), this model may lead to
49 _all_ threads purging entries for one volume (starving other volumes).
50 """
51 def __init__(self, volume_client, tp_size):
52 self.vc = volume_client
53 super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "puregejob", tp_size)
54
55 def get_next_job(self, volname, running_jobs):
56 return get_trash_entry_for_volume(self.vc, volname, running_jobs)
57
58 def execute_job(self, volname, job, should_cancel):
59 purge_trash_entry_for_volume(self.vc, volname, job, should_cancel)