]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rbd_support/trash_purge_schedule.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / rbd_support / trash_purge_schedule.py
CommitLineData
9f95a23c
TL
1import errno
2import json
3import rados
4import rbd
5import re
6import traceback
7
8from datetime import datetime
9from threading import Condition, Lock, Thread
20effc67 10from typing import Any, Dict, List, Optional, Tuple
9f95a23c
TL
11
12from .common import get_rbd_pools
13from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
14
15
16class TrashPurgeScheduleHandler:
17 MODULE_OPTION_NAME = "trash_purge_schedule"
e306af50 18 SCHEDULE_OID = "rbd_trash_purge_schedule"
9f95a23c
TL
19
20 lock = Lock()
21 condition = Condition(lock)
22 thread = None
23
20effc67 24 def __init__(self, module: Any) -> None:
9f95a23c
TL
25 self.module = module
26 self.log = module.log
27 self.last_refresh_pools = datetime(1970, 1, 1)
28
29 self.init_schedule_queue()
30
31 self.thread = Thread(target=self.run)
32 self.thread.start()
33
20effc67 34 def run(self) -> None:
9f95a23c
TL
35 try:
36 self.log.info("TrashPurgeScheduleHandler: starting")
37 while True:
38 self.refresh_pools()
39 with self.lock:
40 (ns_spec, wait_time) = self.dequeue()
41 if not ns_spec:
42 self.condition.wait(min(wait_time, 60))
43 continue
44 pool_id, namespace = ns_spec
45 self.trash_purge(pool_id, namespace)
46 with self.lock:
47 self.enqueue(datetime.now(), pool_id, namespace)
48
49 except Exception as ex:
50 self.log.fatal("Fatal runtime error: {}\n{}".format(
51 ex, traceback.format_exc()))
52
20effc67 53 def trash_purge(self, pool_id: str, namespace: str) -> None:
9f95a23c
TL
54 try:
55 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
56 ioctx.set_namespace(namespace)
57 rbd.RBD().trash_purge(ioctx, datetime.now())
58 except Exception as e:
59 self.log.error("exception when purgin {}/{}: {}".format(
60 pool_id, namespace, e))
61
20effc67
TL
62 def init_schedule_queue(self) -> None:
63 self.queue: Dict[str, List[Tuple[str, str]]] = {}
64 # pool_id => {namespace => pool_name}
65 self.pools: Dict[str, Dict[str, str]] = {}
9f95a23c
TL
66 self.refresh_pools()
67 self.log.debug("scheduler queue is initialized")
68
20effc67 69 def load_schedules(self) -> None:
9f95a23c
TL
70 self.log.info("TrashPurgeScheduleHandler: load_schedules")
71
72 schedules = Schedules(self)
73 schedules.load()
74 with self.lock:
75 self.schedules = schedules
76
20effc67 77 def refresh_pools(self) -> None:
9f95a23c
TL
78 if (datetime.now() - self.last_refresh_pools).seconds < 60:
79 return
80
81 self.log.debug("TrashPurgeScheduleHandler: refresh_pools")
82
83 self.load_schedules()
84
20effc67 85 pools: Dict[str, Dict[str, str]] = {}
9f95a23c
TL
86
87 for pool_id, pool_name in get_rbd_pools(self.module).items():
88 if not self.schedules.intersects(
89 LevelSpec.from_pool_spec(pool_id, pool_name)):
90 continue
91 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
92 self.load_pool(ioctx, pools)
93
94 with self.lock:
95 self.refresh_queue(pools)
96 self.pools = pools
97
98 self.last_refresh_pools = datetime.now()
99
20effc67 100 def load_pool(self, ioctx: rados.Ioctx, pools: Dict[str, Dict[str, str]]) -> None:
9f95a23c
TL
101 pool_id = str(ioctx.get_pool_id())
102 pool_name = ioctx.get_pool_name()
103 pools[pool_id] = {}
104 pool_namespaces = ['']
105
106 self.log.debug("load_pool: {}".format(pool_name))
107
108 try:
109 pool_namespaces += rbd.RBD().namespace_list(ioctx)
110 except rbd.OperationNotSupported:
111 self.log.debug("namespaces not supported")
112 except Exception as e:
113 self.log.error("exception when scanning pool {}: {}".format(
114 pool_name, e))
115
116 for namespace in pool_namespaces:
117 pools[pool_id][namespace] = pool_name
118
20effc67 119 def rebuild_queue(self) -> None:
9f95a23c
TL
120 with self.lock:
121 now = datetime.now()
122
123 # don't remove from queue "due" images
124 now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
125
126 for schedule_time in list(self.queue):
127 if schedule_time > now_string:
128 del self.queue[schedule_time]
129
130 if not self.schedules:
131 return
132
133 for pool_id, namespaces in self.pools.items():
134 for namespace in namespaces:
135 self.enqueue(now, pool_id, namespace)
136
137 self.condition.notify()
138
20effc67 139 def refresh_queue(self, current_pools: Dict[str, Dict[str, str]]) -> None:
9f95a23c
TL
140 now = datetime.now()
141
142 for pool_id, namespaces in self.pools.items():
143 for namespace in namespaces:
144 if pool_id not in current_pools or \
145 namespace not in current_pools[pool_id]:
146 self.remove_from_queue(pool_id, namespace)
147
148 for pool_id, namespaces in current_pools.items():
149 for namespace in namespaces:
150 if pool_id not in self.pools or \
151 namespace not in self.pools[pool_id]:
152 self.enqueue(now, pool_id, namespace)
153
154 self.condition.notify()
155
20effc67 156 def enqueue(self, now: datetime, pool_id: str, namespace: str) -> None:
9f95a23c
TL
157
158 schedule = self.schedules.find(pool_id, namespace)
159 if not schedule:
160 return
161
162 schedule_time = schedule.next_run(now)
163 if schedule_time not in self.queue:
164 self.queue[schedule_time] = []
165 self.log.debug("schedule {}/{} at {}".format(
166 pool_id, namespace, schedule_time))
167 ns_spec = (pool_id, namespace)
168 if ns_spec not in self.queue[schedule_time]:
169 self.queue[schedule_time].append((pool_id, namespace))
170
20effc67 171 def dequeue(self) -> Tuple[Optional[Tuple[str, str]], float]:
9f95a23c 172 if not self.queue:
20effc67 173 return None, 1000.0
9f95a23c
TL
174
175 now = datetime.now()
176 schedule_time = sorted(self.queue)[0]
177
178 if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
179 wait_time = (datetime.strptime(schedule_time,
180 "%Y-%m-%d %H:%M:%S") - now)
181 return None, wait_time.total_seconds()
182
183 namespaces = self.queue[schedule_time]
184 namespace = namespaces.pop(0)
185 if not namespaces:
186 del self.queue[schedule_time]
20effc67 187 return namespace, 0.0
9f95a23c 188
20effc67 189 def remove_from_queue(self, pool_id: str, namespace: str) -> None:
9f95a23c
TL
190 empty_slots = []
191 for schedule_time, namespaces in self.queue.items():
192 if (pool_id, namespace) in namespaces:
193 namespaces.remove((pool_id, namespace))
194 if not namespaces:
195 empty_slots.append(schedule_time)
196 for schedule_time in empty_slots:
197 del self.queue[schedule_time]
198
20effc67
TL
199 def add_schedule(self,
200 level_spec: LevelSpec,
201 interval: str,
202 start_time: Optional[str]) -> Tuple[int, str, str]:
9f95a23c
TL
203 self.log.debug(
204 "add_schedule: level_spec={}, interval={}, start_time={}".format(
205 level_spec.name, interval, start_time))
206
207 with self.lock:
208 self.schedules.add(level_spec, interval, start_time)
209
210 # TODO: optimize to rebuild only affected part of the queue
211 self.rebuild_queue()
212 return 0, "", ""
213
20effc67
TL
214 def remove_schedule(self,
215 level_spec: LevelSpec,
216 interval: Optional[str],
217 start_time: Optional[str]) -> Tuple[int, str, str]:
9f95a23c
TL
218 self.log.debug(
219 "remove_schedule: level_spec={}, interval={}, start_time={}".format(
220 level_spec.name, interval, start_time))
221
222 with self.lock:
223 self.schedules.remove(level_spec, interval, start_time)
224
225 # TODO: optimize to rebuild only affected part of the queue
226 self.rebuild_queue()
227 return 0, "", ""
228
20effc67 229 def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
9f95a23c
TL
230 self.log.debug("list: level_spec={}".format(level_spec.name))
231
232 with self.lock:
233 result = self.schedules.to_list(level_spec)
234
235 return 0, json.dumps(result, indent=4, sort_keys=True), ""
236
20effc67 237 def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
9f95a23c
TL
238 self.log.debug("status: level_spec={}".format(level_spec.name))
239
240 scheduled = []
241 with self.lock:
242 for schedule_time in sorted(self.queue):
243 for pool_id, namespace in self.queue[schedule_time]:
244 if not level_spec.matches(pool_id, namespace):
245 continue
246 pool_name = self.pools[pool_id][namespace]
247 scheduled.append({
248 'schedule_time' : schedule_time,
249 'pool_id' : pool_id,
250 'pool_name' : pool_name,
251 'namespace' : namespace
252 })
253 return 0, json.dumps({'scheduled' : scheduled}, indent=4,
254 sort_keys=True), ""