]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/inventory.py
import 15.2.9
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
1 import datetime
2 from copy import copy
3 import json
4 import logging
5 from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set
6
7 import six
8
9 import orchestrator
10 from ceph.deployment import inventory
11 from ceph.deployment.service_spec import ServiceSpec
12 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
13 from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent
14
15 if TYPE_CHECKING:
16 from .module import CephadmOrchestrator
17
18
19 logger = logging.getLogger(__name__)
20
21 HOST_CACHE_PREFIX = "host."
22 SPEC_STORE_PREFIX = "spec."
23
24
25 class Inventory:
26 """
27 The inventory stores a HostSpec for all hosts persistently.
28 """
29
30 def __init__(self, mgr: 'CephadmOrchestrator'):
31 self.mgr = mgr
32 # load inventory
33 i = self.mgr.get_store('inventory')
34 if i:
35 self._inventory: Dict[str, dict] = json.loads(i)
36 # handle old clusters missing 'hostname' key from hostspec
37 for k, v in self._inventory.items():
38 if 'hostname' not in v:
39 v['hostname'] = k
40 else:
41 self._inventory = dict()
42 logger.debug('Loaded inventory %s' % self._inventory)
43
44 def keys(self) -> List[str]:
45 return list(self._inventory.keys())
46
47 def __contains__(self, host: str) -> bool:
48 return host in self._inventory
49
50 def assert_host(self, host: str) -> None:
51 if host not in self._inventory:
52 raise OrchestratorError('host %s does not exist' % host)
53
54 def add_host(self, spec: HostSpec) -> None:
55 self._inventory[spec.hostname] = spec.to_json()
56 self.save()
57
58 def rm_host(self, host: str) -> None:
59 self.assert_host(host)
60 del self._inventory[host]
61 self.save()
62
63 def set_addr(self, host: str, addr: str) -> None:
64 self.assert_host(host)
65 self._inventory[host]['addr'] = addr
66 self.save()
67
68 def add_label(self, host: str, label: str) -> None:
69 self.assert_host(host)
70
71 if 'labels' not in self._inventory[host]:
72 self._inventory[host]['labels'] = list()
73 if label not in self._inventory[host]['labels']:
74 self._inventory[host]['labels'].append(label)
75 self.save()
76
77 def rm_label(self, host: str, label: str) -> None:
78 self.assert_host(host)
79
80 if 'labels' not in self._inventory[host]:
81 self._inventory[host]['labels'] = list()
82 if label in self._inventory[host]['labels']:
83 self._inventory[host]['labels'].remove(label)
84 self.save()
85
86 def get_addr(self, host: str) -> str:
87 self.assert_host(host)
88 return self._inventory[host].get('addr', host)
89
90 def filter_by_label(self, label: Optional[str] = '', as_hostspec: bool = False) -> Iterator:
91 for h, hostspec in self._inventory.items():
92 if not label or label in hostspec.get('labels', []):
93 if as_hostspec:
94 yield self.spec_from_dict(hostspec)
95 else:
96 yield h
97
98 def spec_from_dict(self, info: dict) -> HostSpec:
99 hostname = info['hostname']
100 return HostSpec(
101 hostname,
102 addr=info.get('addr', hostname),
103 labels=info.get('labels', []),
104 status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''),
105 )
106
107 def all_specs(self) -> List[HostSpec]:
108 return list(map(self.spec_from_dict, self._inventory.values()))
109
110 def save(self) -> None:
111 self.mgr.set_store('inventory', json.dumps(self._inventory))
112
113
114 class SpecStore():
115 def __init__(self, mgr):
116 # type: (CephadmOrchestrator) -> None
117 self.mgr = mgr
118 self.specs = {} # type: Dict[str, ServiceSpec]
119 self.spec_created = {} # type: Dict[str, datetime.datetime]
120 self.spec_preview = {} # type: Dict[str, ServiceSpec]
121
122 def load(self):
123 # type: () -> None
124 for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)):
125 service_name = k[len(SPEC_STORE_PREFIX):]
126 try:
127 v = json.loads(v)
128 spec = ServiceSpec.from_json(v['spec'])
129 created = str_to_datetime(v['created'])
130 self.specs[service_name] = spec
131 self.spec_created[service_name] = created
132 self.mgr.log.debug('SpecStore: loaded spec for %s' % (
133 service_name))
134 except Exception as e:
135 self.mgr.log.warning('unable to load spec for %s: %s' % (
136 service_name, e))
137 pass
138
139 def save(self, spec):
140 # type: (ServiceSpec) -> None
141 if spec.preview_only:
142 self.spec_preview[spec.service_name()] = spec
143 return None
144 self.specs[spec.service_name()] = spec
145 self.spec_created[spec.service_name()] = datetime_now()
146 self.mgr.set_store(
147 SPEC_STORE_PREFIX + spec.service_name(),
148 json.dumps({
149 'spec': spec.to_json(),
150 'created': datetime_to_str(self.spec_created[spec.service_name()]),
151 }, sort_keys=True),
152 )
153 self.mgr.events.for_service(spec, OrchestratorEvent.INFO, 'service was created')
154
155 def rm(self, service_name):
156 # type: (str) -> bool
157 found = service_name in self.specs
158 if found:
159 del self.specs[service_name]
160 del self.spec_created[service_name]
161 self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
162 return found
163
164 def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]:
165 specs = []
166 for sn, spec in self.specs.items():
167 if not service_name or \
168 sn == service_name or \
169 sn.startswith(service_name + '.'):
170 specs.append(spec)
171 self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % (
172 service_name, specs))
173 return specs
174
175
176 class HostCache():
177 """
178 HostCache stores different things:
179
180 1. `daemons`: Deployed daemons O(daemons)
181
182 They're part of the configuration nowadays and need to be
183 persistent. The name "daemon cache" is unfortunately a bit misleading.
184 Like for example we really need to know where daemons are deployed on
185 hosts that are offline.
186
187 2. `devices`: ceph-volume inventory cache O(hosts)
188
189 As soon as this is populated, it becomes more or less read-only.
190
191 3. `networks`: network interfaces for each host. O(hosts)
192
193 This is needed in order to deploy MONs. As this is mostly read-only.
194
195 4. `last_etc_ceph_ceph_conf` O(hosts)
196
197 Stores the last refresh time for the /etc/ceph/ceph.conf. Used
198 to avoid deploying new configs when failing over to a new mgr.
199
200 5. `scheduled_daemon_actions`: O(daemons)
201
202 Used to run daemon actions after deploying a daemon. We need to
203 store it persistently, in order to stay consistent across
204 MGR failovers.
205 """
206
207 def __init__(self, mgr):
208 # type: (CephadmOrchestrator) -> None
209 self.mgr: CephadmOrchestrator = mgr
210 self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
211 self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
212 self.devices = {} # type: Dict[str, List[inventory.Device]]
213 self.facts = {} # type: Dict[str, Dict[str, Any]]
214 self.last_facts_update = {} # type: Dict[str, datetime.datetime]
215 self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
216 self.networks = {} # type: Dict[str, Dict[str, List[str]]]
217 self.last_device_update = {} # type: Dict[str, datetime.datetime]
218 self.daemon_refresh_queue = [] # type: List[str]
219 self.device_refresh_queue = [] # type: List[str]
220 self.osdspec_previews_refresh_queue = [] # type: List[str]
221
222 # host -> daemon name -> dict
223 self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
224 self.last_host_check = {} # type: Dict[str, datetime.datetime]
225 self.loading_osdspec_preview = set() # type: Set[str]
226 self.last_etc_ceph_ceph_conf: Dict[str, datetime.datetime] = {}
227 self.registry_login_queue: Set[str] = set()
228
229 self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
230
231 def load(self):
232 # type: () -> None
233 for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)):
234 host = k[len(HOST_CACHE_PREFIX):]
235 if host not in self.mgr.inventory:
236 self.mgr.log.warning('removing stray HostCache host record %s' % (
237 host))
238 self.mgr.set_store(k, None)
239 try:
240 j = json.loads(v)
241 if 'last_device_update' in j:
242 self.last_device_update[host] = str_to_datetime(j['last_device_update'])
243 else:
244 self.device_refresh_queue.append(host)
245 # for services, we ignore the persisted last_*_update
246 # and always trigger a new scrape on mgr restart.
247 self.daemon_refresh_queue.append(host)
248 self.daemons[host] = {}
249 self.osdspec_previews[host] = []
250 self.devices[host] = []
251 self.networks[host] = {}
252 self.daemon_config_deps[host] = {}
253 for name, d in j.get('daemons', {}).items():
254 self.daemons[host][name] = \
255 orchestrator.DaemonDescription.from_json(d)
256 for d in j.get('devices', []):
257 self.devices[host].append(inventory.Device.from_json(d))
258 self.networks[host] = j.get('networks', {})
259 self.osdspec_previews[host] = j.get('osdspec_previews', {})
260
261 for name, d in j.get('daemon_config_deps', {}).items():
262 self.daemon_config_deps[host][name] = {
263 'deps': d.get('deps', []),
264 'last_config': str_to_datetime(d['last_config']),
265 }
266 if 'last_host_check' in j:
267 self.last_host_check[host] = str_to_datetime(j['last_host_check'])
268 if 'last_etc_ceph_ceph_conf' in j:
269 self.last_etc_ceph_ceph_conf[host] = str_to_datetime(
270 j['last_etc_ceph_ceph_conf'])
271 self.registry_login_queue.add(host)
272 self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
273
274 self.mgr.log.debug(
275 'HostCache.load: host %s has %d daemons, '
276 '%d devices, %d networks' % (
277 host, len(self.daemons[host]), len(self.devices[host]),
278 len(self.networks[host])))
279 except Exception as e:
280 self.mgr.log.warning('unable to load cached state for %s: %s' % (
281 host, e))
282 pass
283
284 def update_host_daemons(self, host, dm):
285 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
286 self.daemons[host] = dm
287 self.last_daemon_update[host] = datetime_now()
288
289 def update_host_facts(self, host, facts):
290 # type: (str, Dict[str, Dict[str, Any]]) -> None
291 self.facts[host] = facts
292 self.last_facts_update[host] = datetime.datetime.utcnow()
293
294 def update_host_devices_networks(self, host, dls, nets):
295 # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
296 self.devices[host] = dls
297 self.networks[host] = nets
298 self.last_device_update[host] = datetime_now()
299
300 def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
301 self.daemon_config_deps[host][name] = {
302 'deps': deps,
303 'last_config': stamp,
304 }
305
306 def update_last_host_check(self, host):
307 # type: (str) -> None
308 self.last_host_check[host] = datetime_now()
309
310 def prime_empty_host(self, host):
311 # type: (str) -> None
312 """
313 Install an empty entry for a host
314 """
315 self.daemons[host] = {}
316 self.devices[host] = []
317 self.networks[host] = {}
318 self.osdspec_previews[host] = []
319 self.daemon_config_deps[host] = {}
320 self.daemon_refresh_queue.append(host)
321 self.device_refresh_queue.append(host)
322 self.osdspec_previews_refresh_queue.append(host)
323 self.registry_login_queue.add(host)
324
325 def invalidate_host_daemons(self, host):
326 # type: (str) -> None
327 self.daemon_refresh_queue.append(host)
328 if host in self.last_daemon_update:
329 del self.last_daemon_update[host]
330 self.mgr.event.set()
331
332 def invalidate_host_devices(self, host):
333 # type: (str) -> None
334 self.device_refresh_queue.append(host)
335 if host in self.last_device_update:
336 del self.last_device_update[host]
337 self.mgr.event.set()
338
339 def distribute_new_registry_login_info(self) -> None:
340 self.registry_login_queue = set(self.mgr.inventory.keys())
341
342 def save_host(self, host: str) -> None:
343 j: Dict[str, Any] = {
344 'daemons': {},
345 'devices': [],
346 'osdspec_previews': [],
347 'daemon_config_deps': {},
348 }
349 if host in self.last_daemon_update:
350 j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
351 if host in self.last_device_update:
352 j['last_device_update'] = datetime_to_str(self.last_device_update[host])
353 if host in self.daemons:
354 for name, dd in self.daemons[host].items():
355 j['daemons'][name] = dd.to_json()
356 if host in self.devices:
357 for d in self.devices[host]:
358 j['devices'].append(d.to_json())
359 if host in self.networks:
360 j['networks'] = self.networks[host]
361 if host in self.daemon_config_deps:
362 for name, depi in self.daemon_config_deps[host].items():
363 j['daemon_config_deps'][name] = {
364 'deps': depi.get('deps', []),
365 'last_config': datetime_to_str(depi['last_config']),
366 }
367 if host in self.osdspec_previews and self.osdspec_previews[host]:
368 j['osdspec_previews'] = self.osdspec_previews[host]
369
370 if host in self.last_host_check:
371 j['last_host_check'] = datetime_to_str(self.last_host_check[host])
372
373 if host in self.last_etc_ceph_ceph_conf:
374 j['last_etc_ceph_ceph_conf'] = datetime_to_str(self.last_etc_ceph_ceph_conf[host])
375 if host in self.scheduled_daemon_actions:
376 j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
377
378 self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
379
380 def rm_host(self, host):
381 # type: (str) -> None
382 if host in self.daemons:
383 del self.daemons[host]
384 if host in self.devices:
385 del self.devices[host]
386 if host in self.facts:
387 del self.facts[host]
388 if host in self.last_facts_update:
389 del self.last_facts_update[host]
390 if host in self.osdspec_previews:
391 del self.osdspec_previews[host]
392 if host in self.loading_osdspec_preview:
393 self.loading_osdspec_preview.remove(host)
394 if host in self.networks:
395 del self.networks[host]
396 if host in self.last_daemon_update:
397 del self.last_daemon_update[host]
398 if host in self.last_device_update:
399 del self.last_device_update[host]
400 if host in self.daemon_config_deps:
401 del self.daemon_config_deps[host]
402 if host in self.scheduled_daemon_actions:
403 del self.scheduled_daemon_actions[host]
404 self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
405
406 def get_hosts(self):
407 # type: () -> List[str]
408 r = []
409 for host, di in self.daemons.items():
410 r.append(host)
411 return r
412
413 def get_daemons(self):
414 # type: () -> List[orchestrator.DaemonDescription]
415 r = []
416 for host, dm in self.daemons.items():
417 for name, dd in dm.items():
418 r.append(dd)
419 return r
420
421 def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription:
422 for _, dm in self.daemons.items():
423 for _, dd in dm.items():
424 if dd.name() == daemon_name:
425 return dd
426 raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
427
428 def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
429 def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
430 dd = copy(dd_orig)
431 if host in self.mgr.offline_hosts:
432 dd.status = -1
433 dd.status_desc = 'host is offline'
434 dd.events = self.mgr.events.get_for_daemon(dd.name())
435 return dd
436
437 for host, dm in self.daemons.items():
438 yield host, {name: alter(host, d) for name, d in dm.items()}
439
440 def get_daemons_by_service(self, service_name):
441 # type: (str) -> List[orchestrator.DaemonDescription]
442 result = [] # type: List[orchestrator.DaemonDescription]
443 for host, dm in self.daemons.items():
444 for name, d in dm.items():
445 if d.service_name() == service_name:
446 result.append(d)
447 return result
448
449 def get_daemons_by_type(self, service_type):
450 # type: (str) -> List[orchestrator.DaemonDescription]
451 result = [] # type: List[orchestrator.DaemonDescription]
452 for host, dm in self.daemons.items():
453 for name, d in dm.items():
454 if d.daemon_type == service_type:
455 result.append(d)
456 return result
457
458 def get_daemon_names(self):
459 # type: () -> List[str]
460 r = []
461 for host, dm in self.daemons.items():
462 for name, dd in dm.items():
463 r.append(name)
464 return r
465
466 def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
467 if host in self.daemon_config_deps:
468 if name in self.daemon_config_deps[host]:
469 return self.daemon_config_deps[host][name].get('deps', []), \
470 self.daemon_config_deps[host][name].get('last_config', None)
471 return None, None
472
473 def host_needs_daemon_refresh(self, host):
474 # type: (str) -> bool
475 if host in self.mgr.offline_hosts:
476 logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
477 return False
478 if host in self.daemon_refresh_queue:
479 self.daemon_refresh_queue.remove(host)
480 return True
481 cutoff = datetime_now() - datetime.timedelta(
482 seconds=self.mgr.daemon_cache_timeout)
483 if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
484 return True
485 return False
486
487 def host_needs_facts_refresh(self, host):
488 # type: (str) -> bool
489 if host in self.mgr.offline_hosts:
490 logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh')
491 return False
492 cutoff = datetime.datetime.utcnow() - datetime.timedelta(
493 seconds=self.mgr.facts_cache_timeout)
494 if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
495 return True
496 return False
497
498 def host_had_daemon_refresh(self, host: str) -> bool:
499 """
500 ... at least once.
501 """
502 if host in self.last_daemon_update:
503 return True
504 if host not in self.daemons:
505 return False
506 return bool(self.daemons[host])
507
508 def host_needs_device_refresh(self, host):
509 # type: (str) -> bool
510 if host in self.mgr.offline_hosts:
511 logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
512 return False
513 if host in self.device_refresh_queue:
514 self.device_refresh_queue.remove(host)
515 return True
516 cutoff = datetime_now() - datetime.timedelta(
517 seconds=self.mgr.device_cache_timeout)
518 if host not in self.last_device_update or self.last_device_update[host] < cutoff:
519 return True
520 return False
521
522 def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
523 if host in self.mgr.offline_hosts:
524 logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
525 return False
526 if host in self.osdspec_previews_refresh_queue:
527 self.osdspec_previews_refresh_queue.remove(host)
528 return True
529 # Since this is dependent on other factors (device and spec) this does not need
530 # to be updated periodically.
531 return False
532
533 def host_needs_check(self, host):
534 # type: (str) -> bool
535 cutoff = datetime_now() - datetime.timedelta(
536 seconds=self.mgr.host_check_interval)
537 return host not in self.last_host_check or self.last_host_check[host] < cutoff
538
539 def host_needs_new_etc_ceph_ceph_conf(self, host: str) -> bool:
540 if not self.mgr.manage_etc_ceph_ceph_conf:
541 return False
542 if self.mgr.paused:
543 return False
544 if host in self.mgr.offline_hosts:
545 return False
546 if not self.mgr.last_monmap:
547 return False
548 if host not in self.last_etc_ceph_ceph_conf:
549 return True
550 if self.mgr.last_monmap > self.last_etc_ceph_ceph_conf[host]:
551 return True
552 if self.mgr.extra_ceph_conf_is_newer(self.last_etc_ceph_ceph_conf[host]):
553 return True
554 # already up to date:
555 return False
556
557 def update_last_etc_ceph_ceph_conf(self, host: str) -> None:
558 if not self.mgr.last_monmap:
559 return
560 self.last_etc_ceph_ceph_conf[host] = datetime_now()
561
562 def host_needs_registry_login(self, host: str) -> bool:
563 if host in self.mgr.offline_hosts:
564 return False
565 if host in self.registry_login_queue:
566 self.registry_login_queue.remove(host)
567 return True
568 return False
569
570 def add_daemon(self, host, dd):
571 # type: (str, orchestrator.DaemonDescription) -> None
572 assert host in self.daemons
573 self.daemons[host][dd.name()] = dd
574
575 def rm_daemon(self, host: str, name: str) -> None:
576 if host in self.daemons:
577 if name in self.daemons[host]:
578 del self.daemons[host][name]
579
580 def daemon_cache_filled(self) -> bool:
581 """
582 i.e. we have checked the daemons for each hosts at least once.
583 excluding offline hosts.
584
585 We're not checking for `host_needs_daemon_refresh`, as this might never be
586 False for all hosts.
587 """
588 return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
589 for h in self.get_hosts())
590
591 def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
592 priorities = {
593 'start': 1,
594 'restart': 2,
595 'reconfig': 3,
596 'redeploy': 4,
597 'stop': 5,
598 }
599 existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None)
600 if existing_action and priorities[existing_action] > priorities[action]:
601 logger.debug(
602 f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
603 return
604
605 if host not in self.scheduled_daemon_actions:
606 self.scheduled_daemon_actions[host] = {}
607 self.scheduled_daemon_actions[host][daemon_name] = action
608
609 def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> None:
610 if host in self.scheduled_daemon_actions:
611 if daemon_name in self.scheduled_daemon_actions[host]:
612 del self.scheduled_daemon_actions[host][daemon_name]
613 if not self.scheduled_daemon_actions[host]:
614 del self.scheduled_daemon_actions[host]
615
616 def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
617 return self.scheduled_daemon_actions.get(host, {}).get(daemon)
618
619
620 class EventStore():
621 def __init__(self, mgr):
622 # type: (CephadmOrchestrator) -> None
623 self.mgr: CephadmOrchestrator = mgr
624 self.events = {} # type: Dict[str, List[OrchestratorEvent]]
625
626 def add(self, event: OrchestratorEvent) -> None:
627
628 if event.kind_subject() not in self.events:
629 self.events[event.kind_subject()] = [event]
630
631 for e in self.events[event.kind_subject()]:
632 if e.message == event.message:
633 return
634
635 self.events[event.kind_subject()].append(event)
636
637 # limit to five events for now.
638 self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
639
640 def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
641 e = OrchestratorEvent(datetime_now(), 'service',
642 spec.service_name(), level, message)
643 self.add(e)
644
645 def from_orch_error(self, e: OrchestratorError) -> None:
646 if e.event_subject is not None:
647 self.add(OrchestratorEvent(
648 datetime_now(),
649 e.event_subject[0],
650 e.event_subject[1],
651 "ERROR",
652 str(e)
653 ))
654
655 def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
656 e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
657 self.add(e)
658
659 def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
660 self.for_daemon(
661 daemon_name,
662 "ERROR",
663 str(e)
664 )
665
666 def cleanup(self) -> None:
667 # Needs to be properly done, in case events are persistently stored.
668
669 unknowns: List[str] = []
670 daemons = self.mgr.cache.get_daemon_names()
671 specs = self.mgr.spec_store.specs.keys()
672 for k_s, v in self.events.items():
673 kind, subject = k_s.split(':')
674 if kind == 'service':
675 if subject not in specs:
676 unknowns.append(k_s)
677 elif kind == 'daemon':
678 if subject not in daemons:
679 unknowns.append(k_s)
680
681 for k_s in unknowns:
682 del self.events[k_s]
683
684 def get_for_service(self, name: str) -> List[OrchestratorEvent]:
685 return self.events.get('service:' + name, [])
686
687 def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
688 return self.events.get('daemon:' + name, [])