8 from datetime
import datetime
9 from threading
import Condition
, Lock
, Thread
10 from typing
import Any
, Dict
, List
, Optional
, Tuple
12 from .common
import get_rbd_pools
13 from .schedule
import LevelSpec
, Interval
, StartTime
, Schedule
, Schedules
16 class TrashPurgeScheduleHandler
:
17 MODULE_OPTION_NAME
= "trash_purge_schedule"
18 SCHEDULE_OID
= "rbd_trash_purge_schedule"
21 condition
= Condition(lock
)
24 def __init__(self
, module
: Any
) -> None:
27 self
.last_refresh_pools
= datetime(1970, 1, 1)
29 self
.init_schedule_queue()
31 self
.thread
= Thread(target
=self
.run
)
34 def run(self
) -> None:
36 self
.log
.info("TrashPurgeScheduleHandler: starting")
40 (ns_spec
, wait_time
) = self
.dequeue()
42 self
.condition
.wait(min(wait_time
, 60))
44 pool_id
, namespace
= ns_spec
45 self
.trash_purge(pool_id
, namespace
)
47 self
.enqueue(datetime
.now(), pool_id
, namespace
)
49 except Exception as ex
:
50 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
51 ex
, traceback
.format_exc()))
53 def trash_purge(self
, pool_id
: str, namespace
: str) -> None:
55 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
56 ioctx
.set_namespace(namespace
)
57 rbd
.RBD().trash_purge(ioctx
, datetime
.now())
58 except Exception as e
:
59 self
.log
.error("exception when purgin {}/{}: {}".format(
60 pool_id
, namespace
, e
))
62 def init_schedule_queue(self
) -> None:
63 self
.queue
: Dict
[str, List
[Tuple
[str, str]]] = {}
64 # pool_id => {namespace => pool_name}
65 self
.pools
: Dict
[str, Dict
[str, str]] = {}
67 self
.log
.debug("scheduler queue is initialized")
69 def load_schedules(self
) -> None:
70 self
.log
.info("TrashPurgeScheduleHandler: load_schedules")
72 schedules
= Schedules(self
)
75 self
.schedules
= schedules
77 def refresh_pools(self
) -> None:
78 if (datetime
.now() - self
.last_refresh_pools
).seconds
< 60:
81 self
.log
.debug("TrashPurgeScheduleHandler: refresh_pools")
85 pools
: Dict
[str, Dict
[str, str]] = {}
87 for pool_id
, pool_name
in get_rbd_pools(self
.module
).items():
88 if not self
.schedules
.intersects(
89 LevelSpec
.from_pool_spec(pool_id
, pool_name
)):
91 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
92 self
.load_pool(ioctx
, pools
)
95 self
.refresh_queue(pools
)
98 self
.last_refresh_pools
= datetime
.now()
100 def load_pool(self
, ioctx
: rados
.Ioctx
, pools
: Dict
[str, Dict
[str, str]]) -> None:
101 pool_id
= str(ioctx
.get_pool_id())
102 pool_name
= ioctx
.get_pool_name()
104 pool_namespaces
= ['']
106 self
.log
.debug("load_pool: {}".format(pool_name
))
109 pool_namespaces
+= rbd
.RBD().namespace_list(ioctx
)
110 except rbd
.OperationNotSupported
:
111 self
.log
.debug("namespaces not supported")
112 except Exception as e
:
113 self
.log
.error("exception when scanning pool {}: {}".format(
116 for namespace
in pool_namespaces
:
117 pools
[pool_id
][namespace
] = pool_name
119 def rebuild_queue(self
) -> None:
123 # don't remove from queue "due" images
124 now_string
= datetime
.strftime(now
, "%Y-%m-%d %H:%M:00")
126 for schedule_time
in list(self
.queue
):
127 if schedule_time
> now_string
:
128 del self
.queue
[schedule_time
]
130 if not self
.schedules
:
133 for pool_id
, namespaces
in self
.pools
.items():
134 for namespace
in namespaces
:
135 self
.enqueue(now
, pool_id
, namespace
)
137 self
.condition
.notify()
139 def refresh_queue(self
, current_pools
: Dict
[str, Dict
[str, str]]) -> None:
142 for pool_id
, namespaces
in self
.pools
.items():
143 for namespace
in namespaces
:
144 if pool_id
not in current_pools
or \
145 namespace
not in current_pools
[pool_id
]:
146 self
.remove_from_queue(pool_id
, namespace
)
148 for pool_id
, namespaces
in current_pools
.items():
149 for namespace
in namespaces
:
150 if pool_id
not in self
.pools
or \
151 namespace
not in self
.pools
[pool_id
]:
152 self
.enqueue(now
, pool_id
, namespace
)
154 self
.condition
.notify()
156 def enqueue(self
, now
: datetime
, pool_id
: str, namespace
: str) -> None:
158 schedule
= self
.schedules
.find(pool_id
, namespace
)
162 schedule_time
= schedule
.next_run(now
)
163 if schedule_time
not in self
.queue
:
164 self
.queue
[schedule_time
] = []
165 self
.log
.debug("schedule {}/{} at {}".format(
166 pool_id
, namespace
, schedule_time
))
167 ns_spec
= (pool_id
, namespace
)
168 if ns_spec
not in self
.queue
[schedule_time
]:
169 self
.queue
[schedule_time
].append((pool_id
, namespace
))
171 def dequeue(self
) -> Tuple
[Optional
[Tuple
[str, str]], float]:
176 schedule_time
= sorted(self
.queue
)[0]
178 if datetime
.strftime(now
, "%Y-%m-%d %H:%M:%S") < schedule_time
:
179 wait_time
= (datetime
.strptime(schedule_time
,
180 "%Y-%m-%d %H:%M:%S") - now
)
181 return None, wait_time
.total_seconds()
183 namespaces
= self
.queue
[schedule_time
]
184 namespace
= namespaces
.pop(0)
186 del self
.queue
[schedule_time
]
187 return namespace
, 0.0
189 def remove_from_queue(self
, pool_id
: str, namespace
: str) -> None:
191 for schedule_time
, namespaces
in self
.queue
.items():
192 if (pool_id
, namespace
) in namespaces
:
193 namespaces
.remove((pool_id
, namespace
))
195 empty_slots
.append(schedule_time
)
196 for schedule_time
in empty_slots
:
197 del self
.queue
[schedule_time
]
199 def add_schedule(self
,
200 level_spec
: LevelSpec
,
202 start_time
: Optional
[str]) -> Tuple
[int, str, str]:
204 "add_schedule: level_spec={}, interval={}, start_time={}".format(
205 level_spec
.name
, interval
, start_time
))
208 self
.schedules
.add(level_spec
, interval
, start_time
)
210 # TODO: optimize to rebuild only affected part of the queue
214 def remove_schedule(self
,
215 level_spec
: LevelSpec
,
216 interval
: Optional
[str],
217 start_time
: Optional
[str]) -> Tuple
[int, str, str]:
219 "remove_schedule: level_spec={}, interval={}, start_time={}".format(
220 level_spec
.name
, interval
, start_time
))
223 self
.schedules
.remove(level_spec
, interval
, start_time
)
225 # TODO: optimize to rebuild only affected part of the queue
229 def list(self
, level_spec
: LevelSpec
) -> Tuple
[int, str, str]:
230 self
.log
.debug("list: level_spec={}".format(level_spec
.name
))
233 result
= self
.schedules
.to_list(level_spec
)
235 return 0, json
.dumps(result
, indent
=4, sort_keys
=True), ""
237 def status(self
, level_spec
: LevelSpec
) -> Tuple
[int, str, str]:
238 self
.log
.debug("status: level_spec={}".format(level_spec
.name
))
242 for schedule_time
in sorted(self
.queue
):
243 for pool_id
, namespace
in self
.queue
[schedule_time
]:
244 if not level_spec
.matches(pool_id
, namespace
):
246 pool_name
= self
.pools
[pool_id
][namespace
]
248 'schedule_time' : schedule_time
,
250 'pool_name' : pool_name
,
251 'namespace' : namespace
253 return 0, json
.dumps({'scheduled' : scheduled
}, indent
=4,