]>
Commit | Line | Data |
---|---|---|
92f5a8d4 | 1 | import errno |
494da23a | 2 | import logging |
adb31ebb TL |
3 | import os |
4 | import stat | |
5 | ||
6 | import cephfs | |
494da23a | 7 | |
92f5a8d4 TL |
8 | from .async_job import AsyncJobs |
9 | from .exception import VolumeException | |
adb31ebb TL |
10 | from .operations.resolver import resolve_trash |
11 | from .operations.template import SubvolumeOpType | |
12 | from .operations.group import open_group | |
13 | from .operations.subvolume import open_subvol | |
92f5a8d4 TL |
14 | from .operations.volume import open_volume, open_volume_lockless |
15 | from .operations.trash import open_trashcan | |
494da23a | 16 | |
92f5a8d4 | 17 | log = logging.getLogger(__name__) |
494da23a | 18 | |
20effc67 | 19 | |
92f5a8d4 | 20 | # helper for fetching a trash entry for a given volume |
522d829b | 21 | def get_trash_entry_for_volume(fs_client, volspec, volname, running_jobs): |
92f5a8d4 TL |
22 | log.debug("fetching trash entry for volume '{0}'".format(volname)) |
23 | ||
24 | try: | |
522d829b | 25 | with open_volume_lockless(fs_client, volname) as fs_handle: |
92f5a8d4 | 26 | try: |
522d829b | 27 | with open_trashcan(fs_handle, volspec) as trashcan: |
92f5a8d4 TL |
28 | path = trashcan.get_trash_entry(running_jobs) |
29 | return 0, path | |
30 | except VolumeException as ve: | |
31 | if ve.errno == -errno.ENOENT: | |
32 | return 0, None | |
33 | raise ve | |
34 | except VolumeException as ve: | |
9f95a23c | 35 | log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve)) |
92f5a8d4 | 36 | return ve.errno, None |
92f5a8d4 | 37 | |
20effc67 | 38 | |
522d829b TL |
39 | def subvolume_purge(fs_client, volspec, volname, trashcan, subvolume_trash_entry, should_cancel): |
40 | groupname, subvolname = resolve_trash(volspec, subvolume_trash_entry.decode('utf-8')) | |
adb31ebb TL |
41 | log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname)) |
42 | ||
43 | try: | |
522d829b TL |
44 | with open_volume(fs_client, volname) as fs_handle: |
45 | with open_group(fs_handle, volspec, groupname) as group: | |
46 | with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume: | |
adb31ebb TL |
47 | log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable)) |
48 | if not subvolume.purgeable: | |
49 | return | |
50 | # this is fine under the global lock -- there are just a handful | |
51 | # of entries in the subvolume to purge. moreover, the purge needs | |
52 | # to be guarded since a create request might sneak in. | |
53 | trashcan.purge(subvolume.base_path, should_cancel) | |
54 | except VolumeException as ve: | |
55 | if not ve.errno == -errno.ENOENT: | |
56 | raise | |
57 | ||
20effc67 | 58 | |
92f5a8d4 | 59 | # helper for starting a purge operation on a trash entry |
522d829b | 60 | def purge_trash_entry_for_volume(fs_client, volspec, volname, purge_entry, should_cancel): |
adb31ebb | 61 | log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname)) |
92f5a8d4 TL |
62 | |
63 | ret = 0 | |
64 | try: | |
522d829b TL |
65 | with open_volume_lockless(fs_client, volname) as fs_handle: |
66 | with open_trashcan(fs_handle, volspec) as trashcan: | |
adb31ebb TL |
67 | try: |
68 | pth = os.path.join(trashcan.path, purge_entry) | |
69 | stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE, | |
70 | cephfs.AT_SYMLINK_NOFOLLOW) | |
71 | if stat.S_ISLNK(stx['mode']): | |
72 | tgt = fs_handle.readlink(pth, 4096) | |
73 | tgt = tgt[:stx['size']] | |
74 | log.debug("purging entry pointing to subvolume trash: {0}".format(tgt)) | |
75 | delink = True | |
76 | try: | |
77 | trashcan.purge(tgt, should_cancel) | |
78 | except VolumeException as ve: | |
79 | if not ve.errno == -errno.ENOENT: | |
80 | delink = False | |
81 | return ve.errno | |
82 | finally: | |
83 | if delink: | |
522d829b | 84 | subvolume_purge(fs_client, volspec, volname, trashcan, tgt, should_cancel) |
adb31ebb TL |
85 | log.debug("purging trash link: {0}".format(purge_entry)) |
86 | trashcan.delink(purge_entry) | |
87 | else: | |
88 | log.debug("purging entry pointing to trash: {0}".format(pth)) | |
89 | trashcan.purge(pth, should_cancel) | |
90 | except cephfs.Error as e: | |
91 | log.warn("failed to remove trash entry: {0}".format(e)) | |
92f5a8d4 TL |
92 | except VolumeException as ve: |
93 | ret = ve.errno | |
94 | return ret | |
95 | ||
20effc67 | 96 | |
92f5a8d4 | 97 | class ThreadPoolPurgeQueueMixin(AsyncJobs): |
494da23a TL |
98 | """ |
99 | Purge queue mixin class maintaining a pool of threads for purging trash entries. | |
100 | Subvolumes are chosen from volumes in a round robin fashion. If some of the purge | |
101 | entries (belonging to a set of volumes) have huge directory tree's (such as, lots | |
102 | of small files in a directory w/ deep directory trees), this model may lead to | |
103 | _all_ threads purging entries for one volume (starving other volumes). | |
104 | """ | |
105 | def __init__(self, volume_client, tp_size): | |
92f5a8d4 | 106 | self.vc = volume_client |
1e59de90 | 107 | super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "purgejob", tp_size) |
494da23a | 108 | |
92f5a8d4 | 109 | def get_next_job(self, volname, running_jobs): |
522d829b | 110 | return get_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, running_jobs) |
494da23a | 111 | |
92f5a8d4 | 112 | def execute_job(self, volname, job, should_cancel): |
522d829b | 113 | purge_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, job, should_cancel) |