]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/progress/module.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / pybind / mgr / progress / module.py
CommitLineData
11fdf7f2
TL
1from mgr_module import MgrModule
2import threading
3import datetime
4import uuid
5
6import json
7
8
9ENCODING_VERSION = 1
10
11# keep a global reference to the module so we can use it from Event methods
12_module = None
13
14
15class Event(object):
16 """
17 A generic "event" that has a start time, completion percentage,
18 and a list of "refs" that are (type, id) tuples describing which
19 objects (osds, pools) this relates to.
20 """
21
22 def __init__(self, message, refs):
23 self._message = message
24 self._refs = refs
25
26 self.started_at = datetime.datetime.utcnow()
27
28 self.id = None
29
30 def _refresh(self):
31 global _module
32 _module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message,
33 self._progress))
34 _module.update_progress_event(self.id, self._message, self._progress)
35
36 @property
37 def message(self):
38 return self._message
39
40 @property
41 def refs(self):
42 return self._refs
43
44 @property
45 def progress(self):
46 raise NotImplementedError()
47
48 def summary(self):
49 return "{0} {1}".format(self.progress, self._message)
50
51 def _progress_str(self, width):
52 inner_width = width - 2
53 out = "["
54 done_chars = int(self.progress * inner_width)
55 out += done_chars * '='
56 out += (inner_width - done_chars) * '.'
57 out += "]"
58
59 return out
60
61 def twoline_progress(self):
62 """
63 e.g.
64
65 - Eating my delicious strudel
66 [===============..............]
67
68 """
69 return "{0}\n {1}".format(
70 self._message, self._progress_str(30))
71
72 def to_json(self):
73 return {
74 "id": self.id,
75 "message": self.message,
76 "refs": self._refs
77 }
78
79
80class GhostEvent(Event):
81 """
82 The ghost of a completed event: these are the fields that we persist
83 after the event is complete.
84 """
85
86 def __init__(self, my_id, message, refs):
87 super(GhostEvent, self).__init__(message, refs)
88 self.id = my_id
89
90 @property
91 def progress(self):
92 return 1.0
93
94
95class RemoteEvent(Event):
96 """
97 An event that was published by another module: we know nothing about
98 this, rely on the other module to continuously update us with
99 progress information as it emerges.
100 """
101
102 def __init__(self, my_id, message, refs):
103 super(RemoteEvent, self).__init__(message, refs)
104 self.id = my_id
105 self._progress = 0.0
106 self._refresh()
107
108 def set_progress(self, progress):
109 self._progress = progress
110 self._refresh()
111
112 @property
113 def progress(self):
114 return self._progress
115
116
117class PgRecoveryEvent(Event):
118 """
119 An event whose completion is determined by the recovery of a set of
120 PGs to a healthy state.
121
122 Always call update() immediately after construction.
123 """
124
125 def __init__(self, message, refs, which_pgs, evactuate_osds):
126 super(PgRecoveryEvent, self).__init__(message, refs)
127
128 self._pgs = which_pgs
129
130 self._evacuate_osds = evactuate_osds
131
132 self._original_pg_count = len(self._pgs)
133
134 self._original_bytes_recovered = None
135
136 self._progress = 0.0
137
138 self.id = str(uuid.uuid4())
139 self._refresh()
140
141 @property
142 def evacuating_osds(self):
143 return self. _evacuate_osds
144
145 def pg_update(self, pg_dump, log):
146 # FIXME: O(pg_num) in python
147 # FIXME: far more fields getting pythonized than we really care about
148 pg_to_state = dict([(p['pgid'], p) for p in pg_dump['pg_stats']])
149
150 if self._original_bytes_recovered is None:
151 self._original_bytes_recovered = {}
152 for pg in self._pgs:
153 pg_str = str(pg)
154 self._original_bytes_recovered[pg] = \
155 pg_to_state[pg_str]['stat_sum']['num_bytes_recovered']
156
157 complete_accumulate = 0.0
158
159 # Calculating progress as the number of PGs recovered divided by the
160 # original where partially completed PGs count for something
161 # between 0.0-1.0. This is perhaps less faithful than looking at the
162 # total number of bytes recovered, but it does a better job of
163 # representing the work still to do if there are a number of very
164 # few-bytes PGs that still need the housekeeping of their recovery
165 # to be done. This is subjective...
166
167 complete = set()
168 for pg in self._pgs:
169 pg_str = str(pg)
170 try:
171 info = pg_to_state[pg_str]
172 except KeyError:
173 # The PG is gone! Probably a pool was deleted. Drop it.
174 complete.add(pg)
175 continue
176
177 state = info['state']
178
179 states = state.split("+")
180
181 unmoved = bool(set(self._evacuate_osds) & (
182 set(info['up']) | set(info['acting'])))
183
184 if "active" in states and "clean" in states and not unmoved:
185 complete.add(pg)
186 else:
187 if info['stat_sum']['num_bytes'] == 0:
188 # Empty PGs are considered 0% done until they are
189 # in the correct state.
190 pass
191 else:
192 recovered = info['stat_sum']['num_bytes_recovered']
193 total_bytes = info['stat_sum']['num_bytes']
194 if total_bytes > 0:
195 ratio = float(recovered -
196 self._original_bytes_recovered[pg]) / \
197 total_bytes
198
199 # Since the recovered bytes (over time) could perhaps
200 # exceed the contents of the PG (moment in time), we
201 # must clamp this
202 ratio = min(ratio, 1.0)
203
204 else:
205 # Dataless PGs (e.g. containing only OMAPs) count
206 # as half done.
207 ratio = 0.5
208
209 complete_accumulate += ratio
210
211 self._pgs = list(set(self._pgs) ^ complete)
212 completed_pgs = self._original_pg_count - len(self._pgs)
213 self._progress = (completed_pgs + complete_accumulate)\
214 / self._original_pg_count
215 self._refresh()
216
217 log.info("Updated progress to {0} ({1})".format(
218 self._progress, self._message
219 ))
220
221 @property
222 def progress(self):
223 return self._progress
224
225
226class PgId(object):
227 def __init__(self, pool_id, ps):
228 self.pool_id = pool_id
229 self.ps = ps
230
231 def __cmp__(self, other):
232 return (self.pool_id, self.ps) == (other.pool_id, other.ps)
233
234 def __lt__(self, other):
235 return (self.pool_id, self.ps) < (other.pool_id, other.ps)
236
237 def __str__(self):
238 return "{0}.{1:x}".format(self.pool_id, self.ps)
239
240
241class Module(MgrModule):
242 COMMANDS = [
243 {"cmd": "progress",
244 "desc": "Show progress of recovery operations",
245 "perm": "r"},
246 {"cmd": "progress json",
247 "desc": "Show machine readable progress information",
248 "perm": "r"},
249 {"cmd": "progress clear",
250 "desc": "Reset progress tracking",
251 "perm": "rw"}
252 ]
253
254 MODULE_OPTIONS = [
255 {
256 'name': 'max_completed_events',
257 'default': 50,
258 'type': 'int',
259 'desc': 'number of past completed events to remember',
260 'runtime': True,
261 },
262 {
263 'name': 'persist_interval',
264 'default': 5,
265 'type': 'secs',
266 'desc': 'how frequently to persist completed events',
267 'runtime': True,
268 },
269 ]
270
271 def __init__(self, *args, **kwargs):
272 super(Module, self).__init__(*args, **kwargs)
273
274 self._events = {}
275 self._completed_events = []
276
277 self._old_osd_map = None
278
279 self._ready = threading.Event()
280 self._shutdown = threading.Event()
281
282 self._latest_osdmap = None
283
284 self._dirty = False
285
286 global _module
287 _module = self
288
289 def config_notify(self):
290 for opt in self.MODULE_OPTIONS:
291 setattr(self,
292 opt['name'],
293 self.get_module_option(opt['name']))
294 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
295
296 def _osd_out(self, old_map, old_dump, new_map, osd_id):
297 affected_pgs = []
298 unmoved_pgs = []
299 for pool in old_dump['pools']:
300 pool_id = pool['pool']
301 for ps in range(0, pool['pg_num']):
302 up_acting = old_map.pg_to_up_acting_osds(pool['pool'], ps)
303
304 # Was this OSD affected by the OSD going out?
305 old_osds = set(up_acting['up']) | set(up_acting['acting'])
306 was_on_out_osd = osd_id in old_osds
307 if not was_on_out_osd:
308 continue
309
310 self.log.debug("pool_id, ps = {0}, {1}".format(
311 pool_id, ps
312 ))
313
314 self.log.debug(
315 "up_acting: {0}".format(json.dumps(up_acting, indent=2)))
316
317 new_up_acting = new_map.pg_to_up_acting_osds(pool['pool'], ps)
318 new_osds = set(new_up_acting['up']) | set(new_up_acting['acting'])
319
320 # Has this OSD been assigned a new location?
321 # (it might not be if there is no suitable place to move
322 # after an OSD failure)
323 is_relocated = len(new_osds - old_osds) > 0
324
325 self.log.debug(
326 "new_up_acting: {0}".format(json.dumps(new_up_acting,
327 indent=2)))
328
329 if was_on_out_osd and is_relocated:
330 # This PG is now in motion, track its progress
331 affected_pgs.append(PgId(pool_id, ps))
332 elif not is_relocated:
333 # This PG didn't get a new location, we'll log it
334 unmoved_pgs.append(PgId(pool_id, ps))
335
336 # In the case that we ignored some PGs, log the reason why (we may
337 # not end up creating a progress event)
338 if len(unmoved_pgs):
339 self.log.warn("{0} PGs were on osd.{1}, but didn't get new locations".format(
340 len(unmoved_pgs), osd_id))
341
342 self.log.warn("{0} PGs affected by osd.{1} going out".format(
343 len(affected_pgs), osd_id))
344
345 if len(affected_pgs) == 0:
346 # Don't emit events if there were no PGs
347 return
348
349 # TODO: reconcile with existing events referring to this OSD going out
350 ev = PgRecoveryEvent(
351 "Rebalancing after osd.{0} marked out".format(osd_id),
352 refs=[("osd", osd_id)],
353 which_pgs=affected_pgs,
354 evactuate_osds=[osd_id]
355 )
356 ev.pg_update(self.get("pg_dump"), self.log)
357 self._events[ev.id] = ev
358
359 def _osd_in(self, osd_id):
360 for ev_id, ev in self._events.items():
361 if isinstance(ev, PgRecoveryEvent) and osd_id in ev.evacuating_osds:
362 self.log.info("osd.{0} came back in, cancelling event".format(
363 osd_id
364 ))
365 self._complete(ev)
366
367 def _osdmap_changed(self, old_osdmap, new_osdmap):
368 old_dump = old_osdmap.dump()
369 new_dump = new_osdmap.dump()
370
371 old_osds = dict([(o['osd'], o) for o in old_dump['osds']])
372
373 for osd in new_dump['osds']:
374 osd_id = osd['osd']
375 new_weight = osd['in']
376 if osd_id in old_osds:
377 old_weight = old_osds[osd_id]['in']
378
379 if new_weight == 0.0 and old_weight > new_weight:
380 self.log.warn("osd.{0} marked out".format(osd_id))
381 self._osd_out(old_osdmap, old_dump, new_osdmap, osd_id)
382 elif new_weight >= 1.0 and old_weight == 0.0:
383 # Only consider weight>=1.0 as "in" to avoid spawning
384 # individual recovery events on every adjustment
385 # in a gradual weight-in
386 self.log.warn("osd.{0} marked in".format(osd_id))
387 self._osd_in(osd_id)
388
389 def notify(self, notify_type, notify_data):
390 self._ready.wait()
391
392 if notify_type == "osd_map":
393 old_osdmap = self._latest_osdmap
394 self._latest_osdmap = self.get_osdmap()
395
396 self.log.info("Processing OSDMap change {0}..{1}".format(
397 old_osdmap.get_epoch(), self._latest_osdmap.get_epoch()
398 ))
399 self._osdmap_changed(old_osdmap, self._latest_osdmap)
400 elif notify_type == "pg_summary":
401 data = self.get("pg_dump")
402 for ev_id, ev in self._events.items():
403 if isinstance(ev, PgRecoveryEvent):
404 ev.pg_update(data, self.log)
405 self.maybe_complete(ev)
406
407 def maybe_complete(self, event):
408 if event.progress >= 1.0:
409 self._complete(event)
410
411 def _save(self):
412 self.log.info("Writing back {0} completed events".format(
413 len(self._completed_events)
414 ))
415 # TODO: bound the number we store.
416 encoded = json.dumps({
417 "events": [ev.to_json() for ev in self._completed_events],
418 "version": ENCODING_VERSION,
419 "compat_version": ENCODING_VERSION
420 })
421 self.set_store("completed", encoded)
422
423 def _load(self):
424 stored = self.get_store("completed")
425
426 if stored is None:
427 self.log.info("No stored events to load")
428 return
429
430 decoded = json.loads(stored)
431 if decoded['compat_version'] > ENCODING_VERSION:
432 raise RuntimeError("Cannot decode version {0}".format(
433 decoded['compat_version']))
434
435 for ev in decoded['events']:
436 self._completed_events.append(GhostEvent(ev['id'], ev['message'], ev['refs']))
437
438 self._prune_completed_events()
439
440 def _prune_completed_events(self):
441 length = len(self._completed_events)
442 if length > self.max_completed_events:
443 self._completed_events = self._completed_events[length - self.max_completed_events : length]
444 self._dirty = True
445
446 def serve(self):
447 self.config_notify()
448 self.clear_all_progress_events()
449 self.log.info("Loading...")
450
451 self._load()
452 self.log.info("Loaded {0} historic events".format(self._completed_events))
453
454 self._latest_osdmap = self.get_osdmap()
455 self.log.info("Loaded OSDMap, ready.")
456
457 self._ready.set()
458
459 while not self._shutdown.is_set():
460 # Lazy periodic write back of completed events
461 if self._dirty:
462 self._save()
463 self._dirty = False
464
465 self._shutdown.wait(timeout=self.persist_interval)
466
467 self._shutdown.wait()
468
469 def shutdown(self):
470 self._shutdown.set()
471 self.clear_all_progress_events()
472
473 def update(self, ev_id, ev_msg, ev_progress):
474 """
475 For calling from other mgr modules
476 """
477 try:
478 ev = self._events[ev_id]
479 except KeyError:
480 ev = RemoteEvent(ev_id, ev_msg, [])
481 self._events[ev_id] = ev
482 self.log.info("update: starting ev {0} ({1})".format(
483 ev_id, ev_msg))
484 else:
485 self.log.debug("update: {0} on {1}".format(
486 ev_progress, ev_msg))
487
488 ev.set_progress(ev_progress)
489 ev._refresh()
490
491 def _complete(self, ev):
492 duration = (datetime.datetime.utcnow() - ev.started_at)
493 self.log.info("Completed event {0} ({1}) in {2} seconds".format(
494 ev.id, ev.message, duration.seconds
495 ))
496 self.complete_progress_event(ev.id)
497
498 self._completed_events.append(
499 GhostEvent(ev.id, ev.message, ev.refs))
500 del self._events[ev.id]
501 self._prune_completed_events()
502 self._dirty = True
503
504 def complete(self, ev_id):
505 """
506 For calling from other mgr modules
507 """
508 try:
509 ev = self._events[ev_id]
510 ev.set_progress(1.0)
511 self.log.info("complete: finished ev {0} ({1})".format(ev_id,
512 ev.message))
513 self._complete(ev)
514 except KeyError:
515 self.log.warn("complete: ev {0} does not exist".format(ev_id))
516 pass
517
518 def _handle_ls(self):
519 if len(self._events) or len(self._completed_events):
520 out = ""
521 chrono_order = sorted(self._events.values(),
522 key=lambda x: x.started_at, reverse=True)
523 for ev in chrono_order:
524 out += ev.twoline_progress()
525 out += "\n"
526
527 if len(self._completed_events):
528 # TODO: limit number of completed events to show
529 out += "\n"
530 for ev in self._completed_events:
531 out += "[Complete]: {0}\n".format(ev.twoline_progress())
532
533 return 0, out, ""
534 else:
535 return 0, "", "Nothing in progress"
536
537 def _json(self):
538 return {
539 'events': [ev.to_json() for ev in self._events.values()],
540 'completed': [ev.to_json() for ev in self._completed_events]
541 }
542
543 def _handle_clear(self):
544 self._events = {}
545 self._completed_events = []
546 self._dirty = True
547 self._save()
548
549 return 0, "", ""
550
551 def handle_command(self, _, cmd):
552 if cmd['prefix'] == "progress":
553 return self._handle_ls()
554 elif cmd['prefix'] == "progress clear":
555 # The clear command isn't usually needed - it's to enable
556 # the admin to "kick" this module if it seems to have done
557 # something wrong (e.g. we have a bug causing a progress event
558 # that never finishes)
559 return self._handle_clear()
560 elif cmd['prefix'] == "progress json":
561 return 0, json.dumps(self._json(), indent=2), ""
562 else:
563 raise NotImplementedError(cmd['prefix'])