]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/trash_purge_schedule.py
b7875c9866a4bc4c224e32eca601fb434bf24200
[ceph.git] / ceph / src / pybind / mgr / rbd_support / trash_purge_schedule.py
1 import errno
2 import json
3 import rados
4 import rbd
5 import re
6 import traceback
7
8 from datetime import datetime
9 from threading import Condition, Lock, Thread
10
11 from .common import get_rbd_pools
12 from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
13
14
15 class TrashPurgeScheduleHandler:
16 MODULE_OPTION_NAME = "trash_purge_schedule"
17 SCHEDULE_OID = "rbd_trash_trash_purge_schedule"
18
19 lock = Lock()
20 condition = Condition(lock)
21 thread = None
22
23 def __init__(self, module):
24 self.module = module
25 self.log = module.log
26 self.last_refresh_pools = datetime(1970, 1, 1)
27
28 self.init_schedule_queue()
29
30 self.thread = Thread(target=self.run)
31 self.thread.start()
32
33 def run(self):
34 try:
35 self.log.info("TrashPurgeScheduleHandler: starting")
36 while True:
37 self.refresh_pools()
38 with self.lock:
39 (ns_spec, wait_time) = self.dequeue()
40 if not ns_spec:
41 self.condition.wait(min(wait_time, 60))
42 continue
43 pool_id, namespace = ns_spec
44 self.trash_purge(pool_id, namespace)
45 with self.lock:
46 self.enqueue(datetime.now(), pool_id, namespace)
47
48 except Exception as ex:
49 self.log.fatal("Fatal runtime error: {}\n{}".format(
50 ex, traceback.format_exc()))
51
52 def trash_purge(self, pool_id, namespace):
53 try:
54 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
55 ioctx.set_namespace(namespace)
56 rbd.RBD().trash_purge(ioctx, datetime.now())
57 except Exception as e:
58 self.log.error("exception when purgin {}/{}: {}".format(
59 pool_id, namespace, e))
60
61
62 def init_schedule_queue(self):
63 self.queue = {}
64 self.pools = {}
65 self.refresh_pools()
66 self.log.debug("scheduler queue is initialized")
67
68 def load_schedules(self):
69 self.log.info("TrashPurgeScheduleHandler: load_schedules")
70
71 schedules = Schedules(self)
72 schedules.load()
73 with self.lock:
74 self.schedules = schedules
75
76 def refresh_pools(self):
77 if (datetime.now() - self.last_refresh_pools).seconds < 60:
78 return
79
80 self.log.debug("TrashPurgeScheduleHandler: refresh_pools")
81
82 self.load_schedules()
83
84 pools = {}
85
86 for pool_id, pool_name in get_rbd_pools(self.module).items():
87 if not self.schedules.intersects(
88 LevelSpec.from_pool_spec(pool_id, pool_name)):
89 continue
90 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
91 self.load_pool(ioctx, pools)
92
93 with self.lock:
94 self.refresh_queue(pools)
95 self.pools = pools
96
97 self.last_refresh_pools = datetime.now()
98
99 def load_pool(self, ioctx, pools):
100 pool_id = str(ioctx.get_pool_id())
101 pool_name = ioctx.get_pool_name()
102 pools[pool_id] = {}
103 pool_namespaces = ['']
104
105 self.log.debug("load_pool: {}".format(pool_name))
106
107 try:
108 pool_namespaces += rbd.RBD().namespace_list(ioctx)
109 except rbd.OperationNotSupported:
110 self.log.debug("namespaces not supported")
111 except Exception as e:
112 self.log.error("exception when scanning pool {}: {}".format(
113 pool_name, e))
114
115 for namespace in pool_namespaces:
116 pools[pool_id][namespace] = pool_name
117
118 def rebuild_queue(self):
119 with self.lock:
120 now = datetime.now()
121
122 # don't remove from queue "due" images
123 now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
124
125 for schedule_time in list(self.queue):
126 if schedule_time > now_string:
127 del self.queue[schedule_time]
128
129 if not self.schedules:
130 return
131
132 for pool_id, namespaces in self.pools.items():
133 for namespace in namespaces:
134 self.enqueue(now, pool_id, namespace)
135
136 self.condition.notify()
137
138 def refresh_queue(self, current_pools):
139 now = datetime.now()
140
141 for pool_id, namespaces in self.pools.items():
142 for namespace in namespaces:
143 if pool_id not in current_pools or \
144 namespace not in current_pools[pool_id]:
145 self.remove_from_queue(pool_id, namespace)
146
147 for pool_id, namespaces in current_pools.items():
148 for namespace in namespaces:
149 if pool_id not in self.pools or \
150 namespace not in self.pools[pool_id]:
151 self.enqueue(now, pool_id, namespace)
152
153 self.condition.notify()
154
155 def enqueue(self, now, pool_id, namespace):
156
157 schedule = self.schedules.find(pool_id, namespace)
158 if not schedule:
159 return
160
161 schedule_time = schedule.next_run(now)
162 if schedule_time not in self.queue:
163 self.queue[schedule_time] = []
164 self.log.debug("schedule {}/{} at {}".format(
165 pool_id, namespace, schedule_time))
166 ns_spec = (pool_id, namespace)
167 if ns_spec not in self.queue[schedule_time]:
168 self.queue[schedule_time].append((pool_id, namespace))
169
170 def dequeue(self):
171 if not self.queue:
172 return None, 1000
173
174 now = datetime.now()
175 schedule_time = sorted(self.queue)[0]
176
177 if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
178 wait_time = (datetime.strptime(schedule_time,
179 "%Y-%m-%d %H:%M:%S") - now)
180 return None, wait_time.total_seconds()
181
182 namespaces = self.queue[schedule_time]
183 namespace = namespaces.pop(0)
184 if not namespaces:
185 del self.queue[schedule_time]
186 return namespace, 0
187
188 def remove_from_queue(self, pool_id, namespace):
189 empty_slots = []
190 for schedule_time, namespaces in self.queue.items():
191 if (pool_id, namespace) in namespaces:
192 namespaces.remove((pool_id, namespace))
193 if not namespaces:
194 empty_slots.append(schedule_time)
195 for schedule_time in empty_slots:
196 del self.queue[schedule_time]
197
198 def add_schedule(self, level_spec, interval, start_time):
199 self.log.debug(
200 "add_schedule: level_spec={}, interval={}, start_time={}".format(
201 level_spec.name, interval, start_time))
202
203 with self.lock:
204 self.schedules.add(level_spec, interval, start_time)
205
206 # TODO: optimize to rebuild only affected part of the queue
207 self.rebuild_queue()
208 return 0, "", ""
209
210 def remove_schedule(self, level_spec, interval, start_time):
211 self.log.debug(
212 "remove_schedule: level_spec={}, interval={}, start_time={}".format(
213 level_spec.name, interval, start_time))
214
215 with self.lock:
216 self.schedules.remove(level_spec, interval, start_time)
217
218 # TODO: optimize to rebuild only affected part of the queue
219 self.rebuild_queue()
220 return 0, "", ""
221
222 def list(self, level_spec):
223 self.log.debug("list: level_spec={}".format(level_spec.name))
224
225 with self.lock:
226 result = self.schedules.to_list(level_spec)
227
228 return 0, json.dumps(result, indent=4, sort_keys=True), ""
229
230 def status(self, level_spec):
231 self.log.debug("status: level_spec={}".format(level_spec.name))
232
233 scheduled = []
234 with self.lock:
235 for schedule_time in sorted(self.queue):
236 for pool_id, namespace in self.queue[schedule_time]:
237 if not level_spec.matches(pool_id, namespace):
238 continue
239 pool_name = self.pools[pool_id][namespace]
240 scheduled.append({
241 'schedule_time' : schedule_time,
242 'pool_id' : pool_id,
243 'pool_name' : pool_name,
244 'namespace' : namespace
245 })
246 return 0, json.dumps({'scheduled' : scheduled}, indent=4,
247 sort_keys=True), ""
248
249 def handle_command(self, inbuf, prefix, cmd):
250 level_spec_name = cmd.get('level_spec', "")
251
252 try:
253 level_spec = LevelSpec.from_name(self, level_spec_name,
254 allow_image_level=False)
255 except ValueError as e:
256 return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
257 level_spec_name, e)
258
259 if prefix == 'add':
260 return self.add_schedule(level_spec, cmd['interval'],
261 cmd.get('start_time'))
262 elif prefix == 'remove':
263 return self.remove_schedule(level_spec, cmd.get('interval'),
264 cmd.get('start_time'))
265 elif prefix == 'list':
266 return self.list(level_spec)
267 elif prefix == 'status':
268 return self.status(level_spec)
269
270 raise NotImplementedError(cmd['prefix'])