]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rbd_support/trash_purge_schedule.py
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / pybind / mgr / rbd_support / trash_purge_schedule.py
CommitLineData
9f95a23c
TL
1import json
2import rados
3import rbd
9f95a23c
TL
4import traceback
5
6from datetime import datetime
7from threading import Condition, Lock, Thread
20effc67 8from typing import Any, Dict, List, Optional, Tuple
9f95a23c
TL
9
10from .common import get_rbd_pools
1e59de90 11from .schedule import LevelSpec, Schedules
9f95a23c
TL
12
13
14class TrashPurgeScheduleHandler:
15 MODULE_OPTION_NAME = "trash_purge_schedule"
e306af50 16 SCHEDULE_OID = "rbd_trash_purge_schedule"
2a845540 17 REFRESH_DELAY_SECONDS = 60.0
9f95a23c
TL
18
19 lock = Lock()
20 condition = Condition(lock)
9f95a23c 21
20effc67 22 def __init__(self, module: Any) -> None:
9f95a23c
TL
23 self.module = module
24 self.log = module.log
25 self.last_refresh_pools = datetime(1970, 1, 1)
26
1e59de90 27 self.stop_thread = False
9f95a23c 28 self.thread = Thread(target=self.run)
1e59de90
TL
29
30 def setup(self) -> None:
31 self.init_schedule_queue()
9f95a23c
TL
32 self.thread.start()
33
1e59de90
TL
34 def shutdown(self) -> None:
35 self.log.info("TrashPurgeScheduleHandler: shutting down")
36 self.stop_thread = True
37 if self.thread.is_alive():
38 self.log.debug("TrashPurgeScheduleHandler: joining thread")
39 self.thread.join()
40 self.log.info("TrashPurgeScheduleHandler: shut down")
41
20effc67 42 def run(self) -> None:
9f95a23c
TL
43 try:
44 self.log.info("TrashPurgeScheduleHandler: starting")
1e59de90 45 while not self.stop_thread:
2a845540 46 refresh_delay = self.refresh_pools()
9f95a23c
TL
47 with self.lock:
48 (ns_spec, wait_time) = self.dequeue()
49 if not ns_spec:
2a845540 50 self.condition.wait(min(wait_time, refresh_delay))
9f95a23c
TL
51 continue
52 pool_id, namespace = ns_spec
53 self.trash_purge(pool_id, namespace)
54 with self.lock:
55 self.enqueue(datetime.now(), pool_id, namespace)
56
1e59de90
TL
57 except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
58 self.log.exception("TrashPurgeScheduleHandler: client blocklisted")
59 self.module.client_blocklisted.set()
9f95a23c
TL
60 except Exception as ex:
61 self.log.fatal("Fatal runtime error: {}\n{}".format(
62 ex, traceback.format_exc()))
63
20effc67 64 def trash_purge(self, pool_id: str, namespace: str) -> None:
9f95a23c
TL
65 try:
66 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
67 ioctx.set_namespace(namespace)
68 rbd.RBD().trash_purge(ioctx, datetime.now())
1e59de90
TL
69 except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
70 raise
9f95a23c 71 except Exception as e:
1e59de90 72 self.log.error("exception when purging {}/{}: {}".format(
9f95a23c
TL
73 pool_id, namespace, e))
74
20effc67
TL
75 def init_schedule_queue(self) -> None:
76 self.queue: Dict[str, List[Tuple[str, str]]] = {}
77 # pool_id => {namespace => pool_name}
78 self.pools: Dict[str, Dict[str, str]] = {}
39ae355f 79 self.schedules = Schedules(self)
9f95a23c 80 self.refresh_pools()
2a845540 81 self.log.debug("TrashPurgeScheduleHandler: queue is initialized")
9f95a23c 82
20effc67 83 def load_schedules(self) -> None:
9f95a23c 84 self.log.info("TrashPurgeScheduleHandler: load_schedules")
39ae355f 85 self.schedules.load()
9f95a23c 86
2a845540
TL
87 def refresh_pools(self) -> float:
88 elapsed = (datetime.now() - self.last_refresh_pools).total_seconds()
89 if elapsed < self.REFRESH_DELAY_SECONDS:
90 return self.REFRESH_DELAY_SECONDS - elapsed
9f95a23c
TL
91
92 self.log.debug("TrashPurgeScheduleHandler: refresh_pools")
93
2a845540
TL
94 with self.lock:
95 self.load_schedules()
96 if not self.schedules:
97 self.log.debug("TrashPurgeScheduleHandler: no schedules")
98 self.pools = {}
99 self.queue = {}
100 self.last_refresh_pools = datetime.now()
101 return self.REFRESH_DELAY_SECONDS
9f95a23c 102
20effc67 103 pools: Dict[str, Dict[str, str]] = {}
9f95a23c
TL
104
105 for pool_id, pool_name in get_rbd_pools(self.module).items():
106 if not self.schedules.intersects(
107 LevelSpec.from_pool_spec(pool_id, pool_name)):
108 continue
109 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
110 self.load_pool(ioctx, pools)
111
112 with self.lock:
113 self.refresh_queue(pools)
114 self.pools = pools
115
116 self.last_refresh_pools = datetime.now()
2a845540 117 return self.REFRESH_DELAY_SECONDS
9f95a23c 118
20effc67 119 def load_pool(self, ioctx: rados.Ioctx, pools: Dict[str, Dict[str, str]]) -> None:
9f95a23c
TL
120 pool_id = str(ioctx.get_pool_id())
121 pool_name = ioctx.get_pool_name()
122 pools[pool_id] = {}
123 pool_namespaces = ['']
124
125 self.log.debug("load_pool: {}".format(pool_name))
126
127 try:
128 pool_namespaces += rbd.RBD().namespace_list(ioctx)
129 except rbd.OperationNotSupported:
130 self.log.debug("namespaces not supported")
1e59de90
TL
131 except rbd.ConnectionShutdown:
132 raise
9f95a23c
TL
133 except Exception as e:
134 self.log.error("exception when scanning pool {}: {}".format(
135 pool_name, e))
136
137 for namespace in pool_namespaces:
138 pools[pool_id][namespace] = pool_name
139
20effc67 140 def rebuild_queue(self) -> None:
2a845540 141 now = datetime.now()
9f95a23c 142
2a845540
TL
143 # don't remove from queue "due" images
144 now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
9f95a23c 145
2a845540
TL
146 for schedule_time in list(self.queue):
147 if schedule_time > now_string:
148 del self.queue[schedule_time]
9f95a23c 149
2a845540
TL
150 if not self.schedules:
151 return
9f95a23c 152
2a845540
TL
153 for pool_id, namespaces in self.pools.items():
154 for namespace in namespaces:
155 self.enqueue(now, pool_id, namespace)
9f95a23c 156
2a845540 157 self.condition.notify()
9f95a23c 158
20effc67 159 def refresh_queue(self, current_pools: Dict[str, Dict[str, str]]) -> None:
9f95a23c
TL
160 now = datetime.now()
161
162 for pool_id, namespaces in self.pools.items():
163 for namespace in namespaces:
164 if pool_id not in current_pools or \
165 namespace not in current_pools[pool_id]:
166 self.remove_from_queue(pool_id, namespace)
167
168 for pool_id, namespaces in current_pools.items():
169 for namespace in namespaces:
170 if pool_id not in self.pools or \
171 namespace not in self.pools[pool_id]:
172 self.enqueue(now, pool_id, namespace)
173
174 self.condition.notify()
175
20effc67 176 def enqueue(self, now: datetime, pool_id: str, namespace: str) -> None:
9f95a23c
TL
177 schedule = self.schedules.find(pool_id, namespace)
178 if not schedule:
2a845540
TL
179 self.log.debug(
180 "TrashPurgeScheduleHandler: no schedule for {}/{}".format(
181 pool_id, namespace))
9f95a23c
TL
182 return
183
184 schedule_time = schedule.next_run(now)
185 if schedule_time not in self.queue:
186 self.queue[schedule_time] = []
2a845540
TL
187 self.log.debug(
188 "TrashPurgeScheduleHandler: scheduling {}/{} at {}".format(
189 pool_id, namespace, schedule_time))
9f95a23c
TL
190 ns_spec = (pool_id, namespace)
191 if ns_spec not in self.queue[schedule_time]:
192 self.queue[schedule_time].append((pool_id, namespace))
193
20effc67 194 def dequeue(self) -> Tuple[Optional[Tuple[str, str]], float]:
9f95a23c 195 if not self.queue:
20effc67 196 return None, 1000.0
9f95a23c
TL
197
198 now = datetime.now()
199 schedule_time = sorted(self.queue)[0]
200
201 if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
202 wait_time = (datetime.strptime(schedule_time,
203 "%Y-%m-%d %H:%M:%S") - now)
204 return None, wait_time.total_seconds()
205
206 namespaces = self.queue[schedule_time]
207 namespace = namespaces.pop(0)
208 if not namespaces:
209 del self.queue[schedule_time]
20effc67 210 return namespace, 0.0
9f95a23c 211
20effc67 212 def remove_from_queue(self, pool_id: str, namespace: str) -> None:
2a845540
TL
213 self.log.debug(
214 "TrashPurgeScheduleHandler: descheduling {}/{}".format(
215 pool_id, namespace))
216
9f95a23c
TL
217 empty_slots = []
218 for schedule_time, namespaces in self.queue.items():
219 if (pool_id, namespace) in namespaces:
220 namespaces.remove((pool_id, namespace))
221 if not namespaces:
222 empty_slots.append(schedule_time)
223 for schedule_time in empty_slots:
224 del self.queue[schedule_time]
225
20effc67
TL
226 def add_schedule(self,
227 level_spec: LevelSpec,
228 interval: str,
229 start_time: Optional[str]) -> Tuple[int, str, str]:
9f95a23c 230 self.log.debug(
2a845540 231 "TrashPurgeScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
9f95a23c
TL
232 level_spec.name, interval, start_time))
233
2a845540 234 # TODO: optimize to rebuild only affected part of the queue
9f95a23c
TL
235 with self.lock:
236 self.schedules.add(level_spec, interval, start_time)
2a845540 237 self.rebuild_queue()
9f95a23c
TL
238 return 0, "", ""
239
20effc67
TL
240 def remove_schedule(self,
241 level_spec: LevelSpec,
242 interval: Optional[str],
243 start_time: Optional[str]) -> Tuple[int, str, str]:
9f95a23c 244 self.log.debug(
2a845540 245 "TrashPurgeScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
9f95a23c
TL
246 level_spec.name, interval, start_time))
247
2a845540 248 # TODO: optimize to rebuild only affected part of the queue
9f95a23c
TL
249 with self.lock:
250 self.schedules.remove(level_spec, interval, start_time)
2a845540 251 self.rebuild_queue()
9f95a23c
TL
252 return 0, "", ""
253
20effc67 254 def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
2a845540
TL
255 self.log.debug(
256 "TrashPurgeScheduleHandler: list: level_spec={}".format(
257 level_spec.name))
9f95a23c
TL
258
259 with self.lock:
260 result = self.schedules.to_list(level_spec)
261
262 return 0, json.dumps(result, indent=4, sort_keys=True), ""
263
20effc67 264 def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
2a845540
TL
265 self.log.debug(
266 "TrashPurgeScheduleHandler: status: level_spec={}".format(
267 level_spec.name))
9f95a23c
TL
268
269 scheduled = []
270 with self.lock:
271 for schedule_time in sorted(self.queue):
272 for pool_id, namespace in self.queue[schedule_time]:
273 if not level_spec.matches(pool_id, namespace):
274 continue
275 pool_name = self.pools[pool_id][namespace]
276 scheduled.append({
1e59de90
TL
277 'schedule_time': schedule_time,
278 'pool_id': pool_id,
279 'pool_name': pool_name,
280 'namespace': namespace
9f95a23c 281 })
1e59de90 282 return 0, json.dumps({'scheduled': scheduled}, indent=4,
9f95a23c 283 sort_keys=True), ""