4 from .async_job
import AsyncJobs
5 from .exception
import VolumeException
6 from .operations
.volume
import open_volume
, open_volume_lockless
7 from .operations
.trash
import open_trashcan
9 log
= logging
.getLogger(__name__
)
11 # helper for fetching a trash entry for a given volume
12 def get_trash_entry_for_volume(volume_client
, volname
, running_jobs
):
13 log
.debug("fetching trash entry for volume '{0}'".format(volname
))
16 with
open_volume_lockless(volume_client
, volname
) as fs_handle
:
18 with
open_trashcan(fs_handle
, volume_client
.volspec
) as trashcan
:
19 path
= trashcan
.get_trash_entry(running_jobs
)
21 except VolumeException
as ve
:
22 if ve
.errno
== -errno
.ENOENT
:
25 except VolumeException
as ve
:
26 log
.error("error fetching trash entry for volume '{0}' ({1})".format(volname
, ve
))
29 # helper for starting a purge operation on a trash entry
30 def purge_trash_entry_for_volume(volume_client
, volname
, purge_dir
, should_cancel
):
31 log
.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir
, volname
))
35 with
open_volume_lockless(volume_client
, volname
) as fs_handle
:
36 with
open_trashcan(fs_handle
, volume_client
.volspec
) as trashcan
:
37 trashcan
.purge(purge_dir
, should_cancel
)
38 except VolumeException
as ve
:
42 class ThreadPoolPurgeQueueMixin(AsyncJobs
):
44 Purge queue mixin class maintaining a pool of threads for purging trash entries.
45 Subvolumes are chosen from volumes in a round robin fashion. If some of the purge
46 entries (belonging to a set of volumes) have huge directory tree's (such as, lots
47 of small files in a directory w/ deep directory trees), this model may lead to
48 _all_ threads purging entries for one volume (starving other volumes).
50 def __init__(self
, volume_client
, tp_size
):
51 self
.vc
= volume_client
52 super(ThreadPoolPurgeQueueMixin
, self
).__init
__(volume_client
, "puregejob", tp_size
)
54 def get_next_job(self
, volname
, running_jobs
):
55 return get_trash_entry_for_volume(self
.vc
, volname
, running_jobs
)
57 def execute_job(self
, volname
, job
, should_cancel
):
58 purge_trash_entry_for_volume(self
.vc
, volname
, job
, should_cancel
)