8 from datetime
import datetime
9 from threading
import Condition
, Lock
, Thread
10 from typing
import Any
, Dict
, List
, Optional
, Tuple
12 from .common
import get_rbd_pools
13 from .schedule
import LevelSpec
, Interval
, StartTime
, Schedule
, Schedules
16 class TrashPurgeScheduleHandler
:
17 MODULE_OPTION_NAME
= "trash_purge_schedule"
18 SCHEDULE_OID
= "rbd_trash_purge_schedule"
19 REFRESH_DELAY_SECONDS
= 60.0
22 condition
= Condition(lock
)
25 def __init__(self
, module
: Any
) -> None:
28 self
.last_refresh_pools
= datetime(1970, 1, 1)
30 self
.init_schedule_queue()
32 self
.thread
= Thread(target
=self
.run
)
35 def run(self
) -> None:
37 self
.log
.info("TrashPurgeScheduleHandler: starting")
39 refresh_delay
= self
.refresh_pools()
41 (ns_spec
, wait_time
) = self
.dequeue()
43 self
.condition
.wait(min(wait_time
, refresh_delay
))
45 pool_id
, namespace
= ns_spec
46 self
.trash_purge(pool_id
, namespace
)
48 self
.enqueue(datetime
.now(), pool_id
, namespace
)
50 except Exception as ex
:
51 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
52 ex
, traceback
.format_exc()))
54 def trash_purge(self
, pool_id
: str, namespace
: str) -> None:
56 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
57 ioctx
.set_namespace(namespace
)
58 rbd
.RBD().trash_purge(ioctx
, datetime
.now())
59 except Exception as e
:
60 self
.log
.error("exception when purgin {}/{}: {}".format(
61 pool_id
, namespace
, e
))
63 def init_schedule_queue(self
) -> None:
64 self
.queue
: Dict
[str, List
[Tuple
[str, str]]] = {}
65 # pool_id => {namespace => pool_name}
66 self
.pools
: Dict
[str, Dict
[str, str]] = {}
68 self
.log
.debug("TrashPurgeScheduleHandler: queue is initialized")
70 def load_schedules(self
) -> None:
71 self
.log
.info("TrashPurgeScheduleHandler: load_schedules")
73 schedules
= Schedules(self
)
75 self
.schedules
= schedules
77 def refresh_pools(self
) -> float:
78 elapsed
= (datetime
.now() - self
.last_refresh_pools
).total_seconds()
79 if elapsed
< self
.REFRESH_DELAY_SECONDS
:
80 return self
.REFRESH_DELAY_SECONDS
- elapsed
82 self
.log
.debug("TrashPurgeScheduleHandler: refresh_pools")
86 if not self
.schedules
:
87 self
.log
.debug("TrashPurgeScheduleHandler: no schedules")
90 self
.last_refresh_pools
= datetime
.now()
91 return self
.REFRESH_DELAY_SECONDS
93 pools
: Dict
[str, Dict
[str, str]] = {}
95 for pool_id
, pool_name
in get_rbd_pools(self
.module
).items():
96 if not self
.schedules
.intersects(
97 LevelSpec
.from_pool_spec(pool_id
, pool_name
)):
99 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
100 self
.load_pool(ioctx
, pools
)
103 self
.refresh_queue(pools
)
106 self
.last_refresh_pools
= datetime
.now()
107 return self
.REFRESH_DELAY_SECONDS
109 def load_pool(self
, ioctx
: rados
.Ioctx
, pools
: Dict
[str, Dict
[str, str]]) -> None:
110 pool_id
= str(ioctx
.get_pool_id())
111 pool_name
= ioctx
.get_pool_name()
113 pool_namespaces
= ['']
115 self
.log
.debug("load_pool: {}".format(pool_name
))
118 pool_namespaces
+= rbd
.RBD().namespace_list(ioctx
)
119 except rbd
.OperationNotSupported
:
120 self
.log
.debug("namespaces not supported")
121 except Exception as e
:
122 self
.log
.error("exception when scanning pool {}: {}".format(
125 for namespace
in pool_namespaces
:
126 pools
[pool_id
][namespace
] = pool_name
128 def rebuild_queue(self
) -> None:
131 # don't remove from queue "due" images
132 now_string
= datetime
.strftime(now
, "%Y-%m-%d %H:%M:00")
134 for schedule_time
in list(self
.queue
):
135 if schedule_time
> now_string
:
136 del self
.queue
[schedule_time
]
138 if not self
.schedules
:
141 for pool_id
, namespaces
in self
.pools
.items():
142 for namespace
in namespaces
:
143 self
.enqueue(now
, pool_id
, namespace
)
145 self
.condition
.notify()
147 def refresh_queue(self
, current_pools
: Dict
[str, Dict
[str, str]]) -> None:
150 for pool_id
, namespaces
in self
.pools
.items():
151 for namespace
in namespaces
:
152 if pool_id
not in current_pools
or \
153 namespace
not in current_pools
[pool_id
]:
154 self
.remove_from_queue(pool_id
, namespace
)
156 for pool_id
, namespaces
in current_pools
.items():
157 for namespace
in namespaces
:
158 if pool_id
not in self
.pools
or \
159 namespace
not in self
.pools
[pool_id
]:
160 self
.enqueue(now
, pool_id
, namespace
)
162 self
.condition
.notify()
164 def enqueue(self
, now
: datetime
, pool_id
: str, namespace
: str) -> None:
165 schedule
= self
.schedules
.find(pool_id
, namespace
)
168 "TrashPurgeScheduleHandler: no schedule for {}/{}".format(
172 schedule_time
= schedule
.next_run(now
)
173 if schedule_time
not in self
.queue
:
174 self
.queue
[schedule_time
] = []
176 "TrashPurgeScheduleHandler: scheduling {}/{} at {}".format(
177 pool_id
, namespace
, schedule_time
))
178 ns_spec
= (pool_id
, namespace
)
179 if ns_spec
not in self
.queue
[schedule_time
]:
180 self
.queue
[schedule_time
].append((pool_id
, namespace
))
182 def dequeue(self
) -> Tuple
[Optional
[Tuple
[str, str]], float]:
187 schedule_time
= sorted(self
.queue
)[0]
189 if datetime
.strftime(now
, "%Y-%m-%d %H:%M:%S") < schedule_time
:
190 wait_time
= (datetime
.strptime(schedule_time
,
191 "%Y-%m-%d %H:%M:%S") - now
)
192 return None, wait_time
.total_seconds()
194 namespaces
= self
.queue
[schedule_time
]
195 namespace
= namespaces
.pop(0)
197 del self
.queue
[schedule_time
]
198 return namespace
, 0.0
200 def remove_from_queue(self
, pool_id
: str, namespace
: str) -> None:
202 "TrashPurgeScheduleHandler: descheduling {}/{}".format(
206 for schedule_time
, namespaces
in self
.queue
.items():
207 if (pool_id
, namespace
) in namespaces
:
208 namespaces
.remove((pool_id
, namespace
))
210 empty_slots
.append(schedule_time
)
211 for schedule_time
in empty_slots
:
212 del self
.queue
[schedule_time
]
214 def add_schedule(self
,
215 level_spec
: LevelSpec
,
217 start_time
: Optional
[str]) -> Tuple
[int, str, str]:
219 "TrashPurgeScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
220 level_spec
.name
, interval
, start_time
))
222 # TODO: optimize to rebuild only affected part of the queue
224 self
.schedules
.add(level_spec
, interval
, start_time
)
228 def remove_schedule(self
,
229 level_spec
: LevelSpec
,
230 interval
: Optional
[str],
231 start_time
: Optional
[str]) -> Tuple
[int, str, str]:
233 "TrashPurgeScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
234 level_spec
.name
, interval
, start_time
))
236 # TODO: optimize to rebuild only affected part of the queue
238 self
.schedules
.remove(level_spec
, interval
, start_time
)
242 def list(self
, level_spec
: LevelSpec
) -> Tuple
[int, str, str]:
244 "TrashPurgeScheduleHandler: list: level_spec={}".format(
248 result
= self
.schedules
.to_list(level_spec
)
250 return 0, json
.dumps(result
, indent
=4, sort_keys
=True), ""
252 def status(self
, level_spec
: LevelSpec
) -> Tuple
[int, str, str]:
254 "TrashPurgeScheduleHandler: status: level_spec={}".format(
259 for schedule_time
in sorted(self
.queue
):
260 for pool_id
, namespace
in self
.queue
[schedule_time
]:
261 if not level_spec
.matches(pool_id
, namespace
):
263 pool_name
= self
.pools
[pool_id
][namespace
]
265 'schedule_time' : schedule_time
,
267 'pool_name' : pool_name
,
268 'namespace' : namespace
270 return 0, json
.dumps({'scheduled' : scheduled
}, indent
=4,