8 from datetime
import datetime
9 from threading
import Condition
, Lock
, Thread
11 from .common
import get_rbd_pools
12 from .schedule
import LevelSpec
, Interval
, StartTime
, Schedule
, Schedules
15 class TrashPurgeScheduleHandler
:
16 MODULE_OPTION_NAME
= "trash_purge_schedule"
17 SCHEDULE_OID
= "rbd_trash_trash_purge_schedule"
20 condition
= Condition(lock
)
23 def __init__(self
, module
):
26 self
.last_refresh_pools
= datetime(1970, 1, 1)
28 self
.init_schedule_queue()
30 self
.thread
= Thread(target
=self
.run
)
35 self
.log
.info("TrashPurgeScheduleHandler: starting")
39 (ns_spec
, wait_time
) = self
.dequeue()
41 self
.condition
.wait(min(wait_time
, 60))
43 pool_id
, namespace
= ns_spec
44 self
.trash_purge(pool_id
, namespace
)
46 self
.enqueue(datetime
.now(), pool_id
, namespace
)
48 except Exception as ex
:
49 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
50 ex
, traceback
.format_exc()))
52 def trash_purge(self
, pool_id
, namespace
):
54 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
55 ioctx
.set_namespace(namespace
)
56 rbd
.RBD().trash_purge(ioctx
, datetime
.now())
57 except Exception as e
:
58 self
.log
.error("exception when purgin {}/{}: {}".format(
59 pool_id
, namespace
, e
))
62 def init_schedule_queue(self
):
66 self
.log
.debug("scheduler queue is initialized")
68 def load_schedules(self
):
69 self
.log
.info("TrashPurgeScheduleHandler: load_schedules")
71 schedules
= Schedules(self
)
74 self
.schedules
= schedules
76 def refresh_pools(self
):
77 if (datetime
.now() - self
.last_refresh_pools
).seconds
< 60:
80 self
.log
.debug("TrashPurgeScheduleHandler: refresh_pools")
86 for pool_id
, pool_name
in get_rbd_pools(self
.module
).items():
87 if not self
.schedules
.intersects(
88 LevelSpec
.from_pool_spec(pool_id
, pool_name
)):
90 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
91 self
.load_pool(ioctx
, pools
)
94 self
.refresh_queue(pools
)
97 self
.last_refresh_pools
= datetime
.now()
99 def load_pool(self
, ioctx
, pools
):
100 pool_id
= str(ioctx
.get_pool_id())
101 pool_name
= ioctx
.get_pool_name()
103 pool_namespaces
= ['']
105 self
.log
.debug("load_pool: {}".format(pool_name
))
108 pool_namespaces
+= rbd
.RBD().namespace_list(ioctx
)
109 except rbd
.OperationNotSupported
:
110 self
.log
.debug("namespaces not supported")
111 except Exception as e
:
112 self
.log
.error("exception when scanning pool {}: {}".format(
115 for namespace
in pool_namespaces
:
116 pools
[pool_id
][namespace
] = pool_name
118 def rebuild_queue(self
):
122 # don't remove from queue "due" images
123 now_string
= datetime
.strftime(now
, "%Y-%m-%d %H:%M:00")
125 for schedule_time
in list(self
.queue
):
126 if schedule_time
> now_string
:
127 del self
.queue
[schedule_time
]
129 if not self
.schedules
:
132 for pool_id
, namespaces
in self
.pools
.items():
133 for namespace
in namespaces
:
134 self
.enqueue(now
, pool_id
, namespace
)
136 self
.condition
.notify()
138 def refresh_queue(self
, current_pools
):
141 for pool_id
, namespaces
in self
.pools
.items():
142 for namespace
in namespaces
:
143 if pool_id
not in current_pools
or \
144 namespace
not in current_pools
[pool_id
]:
145 self
.remove_from_queue(pool_id
, namespace
)
147 for pool_id
, namespaces
in current_pools
.items():
148 for namespace
in namespaces
:
149 if pool_id
not in self
.pools
or \
150 namespace
not in self
.pools
[pool_id
]:
151 self
.enqueue(now
, pool_id
, namespace
)
153 self
.condition
.notify()
155 def enqueue(self
, now
, pool_id
, namespace
):
157 schedule
= self
.schedules
.find(pool_id
, namespace
)
161 schedule_time
= schedule
.next_run(now
)
162 if schedule_time
not in self
.queue
:
163 self
.queue
[schedule_time
] = []
164 self
.log
.debug("schedule {}/{} at {}".format(
165 pool_id
, namespace
, schedule_time
))
166 ns_spec
= (pool_id
, namespace
)
167 if ns_spec
not in self
.queue
[schedule_time
]:
168 self
.queue
[schedule_time
].append((pool_id
, namespace
))
175 schedule_time
= sorted(self
.queue
)[0]
177 if datetime
.strftime(now
, "%Y-%m-%d %H:%M:%S") < schedule_time
:
178 wait_time
= (datetime
.strptime(schedule_time
,
179 "%Y-%m-%d %H:%M:%S") - now
)
180 return None, wait_time
.total_seconds()
182 namespaces
= self
.queue
[schedule_time
]
183 namespace
= namespaces
.pop(0)
185 del self
.queue
[schedule_time
]
188 def remove_from_queue(self
, pool_id
, namespace
):
190 for schedule_time
, namespaces
in self
.queue
.items():
191 if (pool_id
, namespace
) in namespaces
:
192 namespaces
.remove((pool_id
, namespace
))
194 empty_slots
.append(schedule_time
)
195 for schedule_time
in empty_slots
:
196 del self
.queue
[schedule_time
]
198 def add_schedule(self
, level_spec
, interval
, start_time
):
200 "add_schedule: level_spec={}, interval={}, start_time={}".format(
201 level_spec
.name
, interval
, start_time
))
204 self
.schedules
.add(level_spec
, interval
, start_time
)
206 # TODO: optimize to rebuild only affected part of the queue
210 def remove_schedule(self
, level_spec
, interval
, start_time
):
212 "remove_schedule: level_spec={}, interval={}, start_time={}".format(
213 level_spec
.name
, interval
, start_time
))
216 self
.schedules
.remove(level_spec
, interval
, start_time
)
218 # TODO: optimize to rebuild only affected part of the queue
222 def list(self
, level_spec
):
223 self
.log
.debug("list: level_spec={}".format(level_spec
.name
))
226 result
= self
.schedules
.to_list(level_spec
)
228 return 0, json
.dumps(result
, indent
=4, sort_keys
=True), ""
230 def status(self
, level_spec
):
231 self
.log
.debug("status: level_spec={}".format(level_spec
.name
))
235 for schedule_time
in sorted(self
.queue
):
236 for pool_id
, namespace
in self
.queue
[schedule_time
]:
237 if not level_spec
.matches(pool_id
, namespace
):
239 pool_name
= self
.pools
[pool_id
][namespace
]
241 'schedule_time' : schedule_time
,
243 'pool_name' : pool_name
,
244 'namespace' : namespace
246 return 0, json
.dumps({'scheduled' : scheduled
}, indent
=4,
249 def handle_command(self
, inbuf
, prefix
, cmd
):
250 level_spec_name
= cmd
.get('level_spec', "")
253 level_spec
= LevelSpec
.from_name(self
, level_spec_name
,
254 allow_image_level
=False)
255 except ValueError as e
:
256 return -errno
.EINVAL
, '', "Invalid level spec {}: {}".format(
260 return self
.add_schedule(level_spec
, cmd
['interval'],
261 cmd
.get('start_time'))
262 elif prefix
== 'remove':
263 return self
.remove_schedule(level_spec
, cmd
.get('interval'),
264 cmd
.get('start_time'))
265 elif prefix
== 'list':
266 return self
.list(level_spec
)
267 elif prefix
== 'status':
268 return self
.status(level_spec
)
270 raise NotImplementedError(cmd
['prefix'])