8 from .async_job
import AsyncJobs
9 from .exception
import VolumeException
10 from .operations
.resolver
import resolve_trash
11 from .operations
.template
import SubvolumeOpType
12 from .operations
.group
import open_group
13 from .operations
.subvolume
import open_subvol
14 from .operations
.volume
import open_volume
, open_volume_lockless
15 from .operations
.trash
import open_trashcan
17 log
= logging
.getLogger(__name__
)
19 # helper for fetching a trash entry for a given volume
20 def get_trash_entry_for_volume(volume_client
, volname
, running_jobs
):
21 log
.debug("fetching trash entry for volume '{0}'".format(volname
))
24 with
open_volume_lockless(volume_client
, volname
) as fs_handle
:
26 with
open_trashcan(fs_handle
, volume_client
.volspec
) as trashcan
:
27 path
= trashcan
.get_trash_entry(running_jobs
)
29 except VolumeException
as ve
:
30 if ve
.errno
== -errno
.ENOENT
:
33 except VolumeException
as ve
:
34 log
.error("error fetching trash entry for volume '{0}' ({1})".format(volname
, ve
))
37 def subvolume_purge(volume_client
, volname
, trashcan
, subvolume_trash_entry
, should_cancel
):
38 groupname
, subvolname
= resolve_trash(volume_client
.volspec
, subvolume_trash_entry
.decode('utf-8'))
39 log
.debug("subvolume resolved to {0}/{1}".format(groupname
, subvolname
))
42 with
open_volume(volume_client
, volname
) as fs_handle
:
43 with
open_group(fs_handle
, volume_client
.volspec
, groupname
) as group
:
44 with
open_subvol(fs_handle
, volume_client
.volspec
, group
, subvolname
, SubvolumeOpType
.REMOVE
) as subvolume
:
45 log
.debug("subvolume.path={0}, purgeable={1}".format(subvolume
.path
, subvolume
.purgeable
))
46 if not subvolume
.purgeable
:
48 # this is fine under the global lock -- there are just a handful
49 # of entries in the subvolume to purge. moreover, the purge needs
50 # to be guarded since a create request might sneak in.
51 trashcan
.purge(subvolume
.base_path
, should_cancel
)
52 except VolumeException
as ve
:
53 if not ve
.errno
== -errno
.ENOENT
:
56 # helper for starting a purge operation on a trash entry
57 def purge_trash_entry_for_volume(volume_client
, volname
, purge_entry
, should_cancel
):
58 log
.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry
, volname
))
62 with
open_volume_lockless(volume_client
, volname
) as fs_handle
:
63 with
open_trashcan(fs_handle
, volume_client
.volspec
) as trashcan
:
65 pth
= os
.path
.join(trashcan
.path
, purge_entry
)
66 stx
= fs_handle
.statx(pth
, cephfs
.CEPH_STATX_MODE | cephfs
.CEPH_STATX_SIZE
,
67 cephfs
.AT_SYMLINK_NOFOLLOW
)
68 if stat
.S_ISLNK(stx
['mode']):
69 tgt
= fs_handle
.readlink(pth
, 4096)
70 tgt
= tgt
[:stx
['size']]
71 log
.debug("purging entry pointing to subvolume trash: {0}".format(tgt
))
74 trashcan
.purge(tgt
, should_cancel
)
75 except VolumeException
as ve
:
76 if not ve
.errno
== -errno
.ENOENT
:
81 subvolume_purge(volume_client
, volname
, trashcan
, tgt
, should_cancel
)
82 log
.debug("purging trash link: {0}".format(purge_entry
))
83 trashcan
.delink(purge_entry
)
85 log
.debug("purging entry pointing to trash: {0}".format(pth
))
86 trashcan
.purge(pth
, should_cancel
)
87 except cephfs
.Error
as e
:
88 log
.warn("failed to remove trash entry: {0}".format(e
))
89 except VolumeException
as ve
:
93 class ThreadPoolPurgeQueueMixin(AsyncJobs
):
95 Purge queue mixin class maintaining a pool of threads for purging trash entries.
96 Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
97 entries (belonging to a set of volumes) have huge directory tree's (such as, lots
98 of small files in a directory w/ deep directory trees), this model may lead to
99 _all_ threads purging entries for one volume (starving other volumes).
101 def __init__(self
, volume_client
, tp_size
):
102 self
.vc
= volume_client
103 super(ThreadPoolPurgeQueueMixin
, self
).__init
__(volume_client
, "puregejob", tp_size
)
105 def get_next_job(self
, volname
, running_jobs
):
106 return get_trash_entry_for_volume(self
.vc
, volname
, running_jobs
)
108 def execute_job(self
, volname
, job
, should_cancel
):
109 purge_trash_entry_for_volume(self
.vc
, volname
, job
, should_cancel
)