]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rbd_support/trash_purge_schedule.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / rbd_support / trash_purge_schedule.py
CommitLineData
9f95a23c
TL
1import json
2import rados
3import rbd
9f95a23c
TL
4import traceback
5
6from datetime import datetime
7from threading import Condition, Lock, Thread
20effc67 8from typing import Any, Dict, List, Optional, Tuple
9f95a23c
TL
9
10from .common import get_rbd_pools
1e59de90 11from .schedule import LevelSpec, Schedules
9f95a23c
TL
12
13
14class TrashPurgeScheduleHandler:
15 MODULE_OPTION_NAME = "trash_purge_schedule"
e306af50 16 SCHEDULE_OID = "rbd_trash_purge_schedule"
2a845540 17 REFRESH_DELAY_SECONDS = 60.0
9f95a23c 18
20effc67 19 def __init__(self, module: Any) -> None:
aee94f69
TL
20 self.lock = Lock()
21 self.condition = Condition(self.lock)
9f95a23c
TL
22 self.module = module
23 self.log = module.log
24 self.last_refresh_pools = datetime(1970, 1, 1)
25
1e59de90 26 self.stop_thread = False
9f95a23c 27 self.thread = Thread(target=self.run)
1e59de90
TL
28
29 def setup(self) -> None:
30 self.init_schedule_queue()
9f95a23c
TL
31 self.thread.start()
32
1e59de90
TL
33 def shutdown(self) -> None:
34 self.log.info("TrashPurgeScheduleHandler: shutting down")
35 self.stop_thread = True
36 if self.thread.is_alive():
37 self.log.debug("TrashPurgeScheduleHandler: joining thread")
38 self.thread.join()
39 self.log.info("TrashPurgeScheduleHandler: shut down")
40
20effc67 41 def run(self) -> None:
9f95a23c
TL
42 try:
43 self.log.info("TrashPurgeScheduleHandler: starting")
1e59de90 44 while not self.stop_thread:
2a845540 45 refresh_delay = self.refresh_pools()
9f95a23c
TL
46 with self.lock:
47 (ns_spec, wait_time) = self.dequeue()
48 if not ns_spec:
2a845540 49 self.condition.wait(min(wait_time, refresh_delay))
9f95a23c
TL
50 continue
51 pool_id, namespace = ns_spec
52 self.trash_purge(pool_id, namespace)
53 with self.lock:
54 self.enqueue(datetime.now(), pool_id, namespace)
55
1e59de90
TL
56 except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
57 self.log.exception("TrashPurgeScheduleHandler: client blocklisted")
58 self.module.client_blocklisted.set()
9f95a23c
TL
59 except Exception as ex:
60 self.log.fatal("Fatal runtime error: {}\n{}".format(
61 ex, traceback.format_exc()))
62
20effc67 63 def trash_purge(self, pool_id: str, namespace: str) -> None:
9f95a23c
TL
64 try:
65 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
66 ioctx.set_namespace(namespace)
67 rbd.RBD().trash_purge(ioctx, datetime.now())
1e59de90
TL
68 except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
69 raise
9f95a23c 70 except Exception as e:
1e59de90 71 self.log.error("exception when purging {}/{}: {}".format(
9f95a23c
TL
72 pool_id, namespace, e))
73
20effc67
TL
74 def init_schedule_queue(self) -> None:
75 self.queue: Dict[str, List[Tuple[str, str]]] = {}
76 # pool_id => {namespace => pool_name}
77 self.pools: Dict[str, Dict[str, str]] = {}
39ae355f 78 self.schedules = Schedules(self)
9f95a23c 79 self.refresh_pools()
2a845540 80 self.log.debug("TrashPurgeScheduleHandler: queue is initialized")
9f95a23c 81
20effc67 82 def load_schedules(self) -> None:
9f95a23c 83 self.log.info("TrashPurgeScheduleHandler: load_schedules")
39ae355f 84 self.schedules.load()
9f95a23c 85
2a845540
TL
86 def refresh_pools(self) -> float:
87 elapsed = (datetime.now() - self.last_refresh_pools).total_seconds()
88 if elapsed < self.REFRESH_DELAY_SECONDS:
89 return self.REFRESH_DELAY_SECONDS - elapsed
9f95a23c
TL
90
91 self.log.debug("TrashPurgeScheduleHandler: refresh_pools")
92
2a845540
TL
93 with self.lock:
94 self.load_schedules()
95 if not self.schedules:
96 self.log.debug("TrashPurgeScheduleHandler: no schedules")
97 self.pools = {}
98 self.queue = {}
99 self.last_refresh_pools = datetime.now()
100 return self.REFRESH_DELAY_SECONDS
9f95a23c 101
20effc67 102 pools: Dict[str, Dict[str, str]] = {}
9f95a23c
TL
103
104 for pool_id, pool_name in get_rbd_pools(self.module).items():
105 if not self.schedules.intersects(
106 LevelSpec.from_pool_spec(pool_id, pool_name)):
107 continue
108 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
109 self.load_pool(ioctx, pools)
110
111 with self.lock:
112 self.refresh_queue(pools)
113 self.pools = pools
114
115 self.last_refresh_pools = datetime.now()
2a845540 116 return self.REFRESH_DELAY_SECONDS
9f95a23c 117
20effc67 118 def load_pool(self, ioctx: rados.Ioctx, pools: Dict[str, Dict[str, str]]) -> None:
9f95a23c
TL
119 pool_id = str(ioctx.get_pool_id())
120 pool_name = ioctx.get_pool_name()
121 pools[pool_id] = {}
122 pool_namespaces = ['']
123
124 self.log.debug("load_pool: {}".format(pool_name))
125
126 try:
127 pool_namespaces += rbd.RBD().namespace_list(ioctx)
128 except rbd.OperationNotSupported:
129 self.log.debug("namespaces not supported")
1e59de90
TL
130 except rbd.ConnectionShutdown:
131 raise
9f95a23c
TL
132 except Exception as e:
133 self.log.error("exception when scanning pool {}: {}".format(
134 pool_name, e))
135
136 for namespace in pool_namespaces:
137 pools[pool_id][namespace] = pool_name
138
20effc67 139 def rebuild_queue(self) -> None:
2a845540 140 now = datetime.now()
9f95a23c 141
2a845540
TL
142 # don't remove from queue "due" images
143 now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
9f95a23c 144
2a845540
TL
145 for schedule_time in list(self.queue):
146 if schedule_time > now_string:
147 del self.queue[schedule_time]
9f95a23c 148
2a845540
TL
149 if not self.schedules:
150 return
9f95a23c 151
2a845540
TL
152 for pool_id, namespaces in self.pools.items():
153 for namespace in namespaces:
154 self.enqueue(now, pool_id, namespace)
9f95a23c 155
2a845540 156 self.condition.notify()
9f95a23c 157
20effc67 158 def refresh_queue(self, current_pools: Dict[str, Dict[str, str]]) -> None:
9f95a23c
TL
159 now = datetime.now()
160
161 for pool_id, namespaces in self.pools.items():
162 for namespace in namespaces:
163 if pool_id not in current_pools or \
164 namespace not in current_pools[pool_id]:
165 self.remove_from_queue(pool_id, namespace)
166
167 for pool_id, namespaces in current_pools.items():
168 for namespace in namespaces:
169 if pool_id not in self.pools or \
170 namespace not in self.pools[pool_id]:
171 self.enqueue(now, pool_id, namespace)
172
173 self.condition.notify()
174
20effc67 175 def enqueue(self, now: datetime, pool_id: str, namespace: str) -> None:
9f95a23c
TL
176 schedule = self.schedules.find(pool_id, namespace)
177 if not schedule:
2a845540
TL
178 self.log.debug(
179 "TrashPurgeScheduleHandler: no schedule for {}/{}".format(
180 pool_id, namespace))
9f95a23c
TL
181 return
182
183 schedule_time = schedule.next_run(now)
184 if schedule_time not in self.queue:
185 self.queue[schedule_time] = []
2a845540
TL
186 self.log.debug(
187 "TrashPurgeScheduleHandler: scheduling {}/{} at {}".format(
188 pool_id, namespace, schedule_time))
9f95a23c
TL
189 ns_spec = (pool_id, namespace)
190 if ns_spec not in self.queue[schedule_time]:
191 self.queue[schedule_time].append((pool_id, namespace))
192
20effc67 193 def dequeue(self) -> Tuple[Optional[Tuple[str, str]], float]:
9f95a23c 194 if not self.queue:
20effc67 195 return None, 1000.0
9f95a23c
TL
196
197 now = datetime.now()
198 schedule_time = sorted(self.queue)[0]
199
200 if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
201 wait_time = (datetime.strptime(schedule_time,
202 "%Y-%m-%d %H:%M:%S") - now)
203 return None, wait_time.total_seconds()
204
205 namespaces = self.queue[schedule_time]
206 namespace = namespaces.pop(0)
207 if not namespaces:
208 del self.queue[schedule_time]
20effc67 209 return namespace, 0.0
9f95a23c 210
20effc67 211 def remove_from_queue(self, pool_id: str, namespace: str) -> None:
2a845540
TL
212 self.log.debug(
213 "TrashPurgeScheduleHandler: descheduling {}/{}".format(
214 pool_id, namespace))
215
9f95a23c
TL
216 empty_slots = []
217 for schedule_time, namespaces in self.queue.items():
218 if (pool_id, namespace) in namespaces:
219 namespaces.remove((pool_id, namespace))
220 if not namespaces:
221 empty_slots.append(schedule_time)
222 for schedule_time in empty_slots:
223 del self.queue[schedule_time]
224
20effc67
TL
225 def add_schedule(self,
226 level_spec: LevelSpec,
227 interval: str,
228 start_time: Optional[str]) -> Tuple[int, str, str]:
9f95a23c 229 self.log.debug(
2a845540 230 "TrashPurgeScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
9f95a23c
TL
231 level_spec.name, interval, start_time))
232
2a845540 233 # TODO: optimize to rebuild only affected part of the queue
9f95a23c
TL
234 with self.lock:
235 self.schedules.add(level_spec, interval, start_time)
2a845540 236 self.rebuild_queue()
9f95a23c
TL
237 return 0, "", ""
238
20effc67
TL
239 def remove_schedule(self,
240 level_spec: LevelSpec,
241 interval: Optional[str],
242 start_time: Optional[str]) -> Tuple[int, str, str]:
9f95a23c 243 self.log.debug(
2a845540 244 "TrashPurgeScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
9f95a23c
TL
245 level_spec.name, interval, start_time))
246
2a845540 247 # TODO: optimize to rebuild only affected part of the queue
9f95a23c
TL
248 with self.lock:
249 self.schedules.remove(level_spec, interval, start_time)
2a845540 250 self.rebuild_queue()
9f95a23c
TL
251 return 0, "", ""
252
20effc67 253 def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
2a845540
TL
254 self.log.debug(
255 "TrashPurgeScheduleHandler: list: level_spec={}".format(
256 level_spec.name))
9f95a23c
TL
257
258 with self.lock:
259 result = self.schedules.to_list(level_spec)
260
261 return 0, json.dumps(result, indent=4, sort_keys=True), ""
262
20effc67 263 def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
2a845540
TL
264 self.log.debug(
265 "TrashPurgeScheduleHandler: status: level_spec={}".format(
266 level_spec.name))
9f95a23c
TL
267
268 scheduled = []
269 with self.lock:
270 for schedule_time in sorted(self.queue):
271 for pool_id, namespace in self.queue[schedule_time]:
272 if not level_spec.matches(pool_id, namespace):
273 continue
274 pool_name = self.pools[pool_id][namespace]
275 scheduled.append({
1e59de90
TL
276 'schedule_time': schedule_time,
277 'pool_id': pool_id,
278 'pool_name': pool_name,
279 'namespace': namespace
9f95a23c 280 })
1e59de90 281 return 0, json.dumps({'scheduled': scheduled}, indent=4,
9f95a23c 282 sort_keys=True), ""