8 from .async_job
import AsyncJobs
9 from .exception
import VolumeException
10 from .operations
.resolver
import resolve_trash
11 from .operations
.template
import SubvolumeOpType
12 from .operations
.group
import open_group
13 from .operations
.subvolume
import open_subvol
14 from .operations
.volume
import open_volume
, open_volume_lockless
15 from .operations
.trash
import open_trashcan
17 log
= logging
.getLogger(__name__
)
20 # helper for fetching a trash entry for a given volume
21 def get_trash_entry_for_volume(fs_client
, volspec
, volname
, running_jobs
):
22 log
.debug("fetching trash entry for volume '{0}'".format(volname
))
25 with
open_volume_lockless(fs_client
, volname
) as fs_handle
:
27 with
open_trashcan(fs_handle
, volspec
) as trashcan
:
28 path
= trashcan
.get_trash_entry(running_jobs
)
30 except VolumeException
as ve
:
31 if ve
.errno
== -errno
.ENOENT
:
34 except VolumeException
as ve
:
35 log
.error("error fetching trash entry for volume '{0}' ({1})".format(volname
, ve
))
39 def subvolume_purge(fs_client
, volspec
, volname
, trashcan
, subvolume_trash_entry
, should_cancel
):
40 groupname
, subvolname
= resolve_trash(volspec
, subvolume_trash_entry
.decode('utf-8'))
41 log
.debug("subvolume resolved to {0}/{1}".format(groupname
, subvolname
))
44 with
open_volume(fs_client
, volname
) as fs_handle
:
45 with
open_group(fs_handle
, volspec
, groupname
) as group
:
46 with
open_subvol(fs_client
.mgr
, fs_handle
, volspec
, group
, subvolname
, SubvolumeOpType
.REMOVE
) as subvolume
:
47 log
.debug("subvolume.path={0}, purgeable={1}".format(subvolume
.path
, subvolume
.purgeable
))
48 if not subvolume
.purgeable
:
50 # this is fine under the global lock -- there are just a handful
51 # of entries in the subvolume to purge. moreover, the purge needs
52 # to be guarded since a create request might sneak in.
53 trashcan
.purge(subvolume
.base_path
, should_cancel
)
54 except VolumeException
as ve
:
55 if not ve
.errno
== -errno
.ENOENT
:
59 # helper for starting a purge operation on a trash entry
60 def purge_trash_entry_for_volume(fs_client
, volspec
, volname
, purge_entry
, should_cancel
):
61 log
.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry
, volname
))
65 with
open_volume_lockless(fs_client
, volname
) as fs_handle
:
66 with
open_trashcan(fs_handle
, volspec
) as trashcan
:
68 pth
= os
.path
.join(trashcan
.path
, purge_entry
)
69 stx
= fs_handle
.statx(pth
, cephfs
.CEPH_STATX_MODE | cephfs
.CEPH_STATX_SIZE
,
70 cephfs
.AT_SYMLINK_NOFOLLOW
)
71 if stat
.S_ISLNK(stx
['mode']):
72 tgt
= fs_handle
.readlink(pth
, 4096)
73 tgt
= tgt
[:stx
['size']]
74 log
.debug("purging entry pointing to subvolume trash: {0}".format(tgt
))
77 trashcan
.purge(tgt
, should_cancel
)
78 except VolumeException
as ve
:
79 if not ve
.errno
== -errno
.ENOENT
:
84 subvolume_purge(fs_client
, volspec
, volname
, trashcan
, tgt
, should_cancel
)
85 log
.debug("purging trash link: {0}".format(purge_entry
))
86 trashcan
.delink(purge_entry
)
88 log
.debug("purging entry pointing to trash: {0}".format(pth
))
89 trashcan
.purge(pth
, should_cancel
)
90 except cephfs
.Error
as e
:
91 log
.warn("failed to remove trash entry: {0}".format(e
))
92 except VolumeException
as ve
:
97 class ThreadPoolPurgeQueueMixin(AsyncJobs
):
99 Purge queue mixin class maintaining a pool of threads for purging trash entries.
100 Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
101 entries (belonging to a set of volumes) have huge directory tree's (such as, lots
102 of small files in a directory w/ deep directory trees), this model may lead to
103 _all_ threads purging entries for one volume (starving other volumes).
105 def __init__(self
, volume_client
, tp_size
):
106 self
.vc
= volume_client
107 super(ThreadPoolPurgeQueueMixin
, self
).__init
__(volume_client
, "puregejob", tp_size
)
109 def get_next_job(self
, volname
, running_jobs
):
110 return get_trash_entry_for_volume(self
.fs_client
, self
.vc
.volspec
, volname
, running_jobs
)
112 def execute_job(self
, volname
, job
, should_cancel
):
113 purge_trash_entry_for_volume(self
.fs_client
, self
.vc
.volspec
, volname
, job
, should_cancel
)