]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/trash_purge_schedule.py
d9bf24cf27cd49cd4ed3398c28e02286e861a6e0
[ceph.git] / ceph / src / pybind / mgr / rbd_support / trash_purge_schedule.py
1 import errno
2 import json
3 import rados
4 import rbd
5 import re
6 import traceback
7
8 from datetime import datetime
9 from threading import Condition, Lock, Thread
10 from typing import Any, Dict, List, Optional, Tuple
11
12 from .common import get_rbd_pools
13 from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
14
15
16 class TrashPurgeScheduleHandler:
17 MODULE_OPTION_NAME = "trash_purge_schedule"
18 SCHEDULE_OID = "rbd_trash_purge_schedule"
19 REFRESH_DELAY_SECONDS = 60.0
20
21 lock = Lock()
22 condition = Condition(lock)
23 thread = None
24
25 def __init__(self, module: Any) -> None:
26 self.module = module
27 self.log = module.log
28 self.last_refresh_pools = datetime(1970, 1, 1)
29
30 self.init_schedule_queue()
31
32 self.thread = Thread(target=self.run)
33 self.thread.start()
34
35 def run(self) -> None:
36 try:
37 self.log.info("TrashPurgeScheduleHandler: starting")
38 while True:
39 refresh_delay = self.refresh_pools()
40 with self.lock:
41 (ns_spec, wait_time) = self.dequeue()
42 if not ns_spec:
43 self.condition.wait(min(wait_time, refresh_delay))
44 continue
45 pool_id, namespace = ns_spec
46 self.trash_purge(pool_id, namespace)
47 with self.lock:
48 self.enqueue(datetime.now(), pool_id, namespace)
49
50 except Exception as ex:
51 self.log.fatal("Fatal runtime error: {}\n{}".format(
52 ex, traceback.format_exc()))
53
54 def trash_purge(self, pool_id: str, namespace: str) -> None:
55 try:
56 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
57 ioctx.set_namespace(namespace)
58 rbd.RBD().trash_purge(ioctx, datetime.now())
59 except Exception as e:
60 self.log.error("exception when purgin {}/{}: {}".format(
61 pool_id, namespace, e))
62
63 def init_schedule_queue(self) -> None:
64 self.queue: Dict[str, List[Tuple[str, str]]] = {}
65 # pool_id => {namespace => pool_name}
66 self.pools: Dict[str, Dict[str, str]] = {}
67 self.refresh_pools()
68 self.log.debug("TrashPurgeScheduleHandler: queue is initialized")
69
70 def load_schedules(self) -> None:
71 self.log.info("TrashPurgeScheduleHandler: load_schedules")
72
73 schedules = Schedules(self)
74 schedules.load()
75 self.schedules = schedules
76
77 def refresh_pools(self) -> float:
78 elapsed = (datetime.now() - self.last_refresh_pools).total_seconds()
79 if elapsed < self.REFRESH_DELAY_SECONDS:
80 return self.REFRESH_DELAY_SECONDS - elapsed
81
82 self.log.debug("TrashPurgeScheduleHandler: refresh_pools")
83
84 with self.lock:
85 self.load_schedules()
86 if not self.schedules:
87 self.log.debug("TrashPurgeScheduleHandler: no schedules")
88 self.pools = {}
89 self.queue = {}
90 self.last_refresh_pools = datetime.now()
91 return self.REFRESH_DELAY_SECONDS
92
93 pools: Dict[str, Dict[str, str]] = {}
94
95 for pool_id, pool_name in get_rbd_pools(self.module).items():
96 if not self.schedules.intersects(
97 LevelSpec.from_pool_spec(pool_id, pool_name)):
98 continue
99 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
100 self.load_pool(ioctx, pools)
101
102 with self.lock:
103 self.refresh_queue(pools)
104 self.pools = pools
105
106 self.last_refresh_pools = datetime.now()
107 return self.REFRESH_DELAY_SECONDS
108
109 def load_pool(self, ioctx: rados.Ioctx, pools: Dict[str, Dict[str, str]]) -> None:
110 pool_id = str(ioctx.get_pool_id())
111 pool_name = ioctx.get_pool_name()
112 pools[pool_id] = {}
113 pool_namespaces = ['']
114
115 self.log.debug("load_pool: {}".format(pool_name))
116
117 try:
118 pool_namespaces += rbd.RBD().namespace_list(ioctx)
119 except rbd.OperationNotSupported:
120 self.log.debug("namespaces not supported")
121 except Exception as e:
122 self.log.error("exception when scanning pool {}: {}".format(
123 pool_name, e))
124
125 for namespace in pool_namespaces:
126 pools[pool_id][namespace] = pool_name
127
128 def rebuild_queue(self) -> None:
129 now = datetime.now()
130
131 # don't remove from queue "due" images
132 now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
133
134 for schedule_time in list(self.queue):
135 if schedule_time > now_string:
136 del self.queue[schedule_time]
137
138 if not self.schedules:
139 return
140
141 for pool_id, namespaces in self.pools.items():
142 for namespace in namespaces:
143 self.enqueue(now, pool_id, namespace)
144
145 self.condition.notify()
146
147 def refresh_queue(self, current_pools: Dict[str, Dict[str, str]]) -> None:
148 now = datetime.now()
149
150 for pool_id, namespaces in self.pools.items():
151 for namespace in namespaces:
152 if pool_id not in current_pools or \
153 namespace not in current_pools[pool_id]:
154 self.remove_from_queue(pool_id, namespace)
155
156 for pool_id, namespaces in current_pools.items():
157 for namespace in namespaces:
158 if pool_id not in self.pools or \
159 namespace not in self.pools[pool_id]:
160 self.enqueue(now, pool_id, namespace)
161
162 self.condition.notify()
163
164 def enqueue(self, now: datetime, pool_id: str, namespace: str) -> None:
165 schedule = self.schedules.find(pool_id, namespace)
166 if not schedule:
167 self.log.debug(
168 "TrashPurgeScheduleHandler: no schedule for {}/{}".format(
169 pool_id, namespace))
170 return
171
172 schedule_time = schedule.next_run(now)
173 if schedule_time not in self.queue:
174 self.queue[schedule_time] = []
175 self.log.debug(
176 "TrashPurgeScheduleHandler: scheduling {}/{} at {}".format(
177 pool_id, namespace, schedule_time))
178 ns_spec = (pool_id, namespace)
179 if ns_spec not in self.queue[schedule_time]:
180 self.queue[schedule_time].append((pool_id, namespace))
181
182 def dequeue(self) -> Tuple[Optional[Tuple[str, str]], float]:
183 if not self.queue:
184 return None, 1000.0
185
186 now = datetime.now()
187 schedule_time = sorted(self.queue)[0]
188
189 if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
190 wait_time = (datetime.strptime(schedule_time,
191 "%Y-%m-%d %H:%M:%S") - now)
192 return None, wait_time.total_seconds()
193
194 namespaces = self.queue[schedule_time]
195 namespace = namespaces.pop(0)
196 if not namespaces:
197 del self.queue[schedule_time]
198 return namespace, 0.0
199
200 def remove_from_queue(self, pool_id: str, namespace: str) -> None:
201 self.log.debug(
202 "TrashPurgeScheduleHandler: descheduling {}/{}".format(
203 pool_id, namespace))
204
205 empty_slots = []
206 for schedule_time, namespaces in self.queue.items():
207 if (pool_id, namespace) in namespaces:
208 namespaces.remove((pool_id, namespace))
209 if not namespaces:
210 empty_slots.append(schedule_time)
211 for schedule_time in empty_slots:
212 del self.queue[schedule_time]
213
214 def add_schedule(self,
215 level_spec: LevelSpec,
216 interval: str,
217 start_time: Optional[str]) -> Tuple[int, str, str]:
218 self.log.debug(
219 "TrashPurgeScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
220 level_spec.name, interval, start_time))
221
222 # TODO: optimize to rebuild only affected part of the queue
223 with self.lock:
224 self.schedules.add(level_spec, interval, start_time)
225 self.rebuild_queue()
226 return 0, "", ""
227
228 def remove_schedule(self,
229 level_spec: LevelSpec,
230 interval: Optional[str],
231 start_time: Optional[str]) -> Tuple[int, str, str]:
232 self.log.debug(
233 "TrashPurgeScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
234 level_spec.name, interval, start_time))
235
236 # TODO: optimize to rebuild only affected part of the queue
237 with self.lock:
238 self.schedules.remove(level_spec, interval, start_time)
239 self.rebuild_queue()
240 return 0, "", ""
241
242 def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
243 self.log.debug(
244 "TrashPurgeScheduleHandler: list: level_spec={}".format(
245 level_spec.name))
246
247 with self.lock:
248 result = self.schedules.to_list(level_spec)
249
250 return 0, json.dumps(result, indent=4, sort_keys=True), ""
251
252 def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
253 self.log.debug(
254 "TrashPurgeScheduleHandler: status: level_spec={}".format(
255 level_spec.name))
256
257 scheduled = []
258 with self.lock:
259 for schedule_time in sorted(self.queue):
260 for pool_id, namespace in self.queue[schedule_time]:
261 if not level_spec.matches(pool_id, namespace):
262 continue
263 pool_name = self.pools[pool_id][namespace]
264 scheduled.append({
265 'schedule_time' : schedule_time,
266 'pool_id' : pool_id,
267 'pool_name' : pool_name,
268 'namespace' : namespace
269 })
270 return 0, json.dumps({'scheduled' : scheduled}, indent=4,
271 sort_keys=True), ""