4 from .async_job
import AsyncJobs
5 from .exception
import VolumeException
6 from .operations
.volume
import open_volume
, open_volume_lockless
7 from .operations
.trash
import open_trashcan
9 log
= logging
.getLogger(__name__
)
11 # helper for fetching a trash entry for a given volume
12 def get_trash_entry_for_volume(volume_client
, volname
, running_jobs
):
13 log
.debug("fetching trash entry for volume '{0}'".format(volname
))
16 with
open_volume_lockless(volume_client
, volname
) as fs_handle
:
18 with
open_trashcan(fs_handle
, volume_client
.volspec
) as trashcan
:
19 path
= trashcan
.get_trash_entry(running_jobs
)
21 except VolumeException
as ve
:
22 if ve
.errno
== -errno
.ENOENT
:
25 except VolumeException
as ve
:
26 log
.error("error fetching trash entry for volume '{0}' ({1})".format(volname
), ve
)
30 # helper for starting a purge operation on a trash entry
31 def purge_trash_entry_for_volume(volume_client
, volname
, purge_dir
, should_cancel
):
32 log
.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir
, volname
))
36 with
open_volume_lockless(volume_client
, volname
) as fs_handle
:
37 with
open_trashcan(fs_handle
, volume_client
.volspec
) as trashcan
:
38 trashcan
.purge(purge_dir
, should_cancel
)
39 except VolumeException
as ve
:
43 class ThreadPoolPurgeQueueMixin(AsyncJobs
):
45 Purge queue mixin class maintaining a pool of threads for purging trash entries.
46 Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
47 entries (belonging to a set of volumes) have huge directory tree's (such as, lots
48 of small files in a directory w/ deep directory trees), this model may lead to
49 _all_ threads purging entries for one volume (starving other volumes).
51 def __init__(self
, volume_client
, tp_size
):
52 self
.vc
= volume_client
53 super(ThreadPoolPurgeQueueMixin
, self
).__init
__(volume_client
, "puregejob", tp_size
)
55 def get_next_job(self
, volname
, running_jobs
):
56 return get_trash_entry_for_volume(self
.vc
, volname
, running_jobs
)
58 def execute_job(self
, volname
, job
, should_cancel
):
59 purge_trash_entry_for_volume(self
.vc
, volname
, job
, should_cancel
)