5 from typing
import TYPE_CHECKING
, Dict
, List
, Iterator
, Optional
, Any
, Tuple
, Set
10 from ceph
.deployment
import inventory
11 from ceph
.deployment
.service_spec
import ServiceSpec
12 from ceph
.utils
import str_to_datetime
, datetime_to_str
, datetime_now
13 from orchestrator
import OrchestratorError
, HostSpec
, OrchestratorEvent
16 from .module
import CephadmOrchestrator
19 logger
= logging
.getLogger(__name__
)
21 HOST_CACHE_PREFIX
= "host."
22 SPEC_STORE_PREFIX
= "spec."
27 The inventory stores a HostSpec for all hosts persistently.
30 def __init__(self
, mgr
: 'CephadmOrchestrator'):
33 i
= self
.mgr
.get_store('inventory')
35 self
._inventory
: Dict
[str, dict] = json
.loads(i
)
36 # handle old clusters missing 'hostname' key from hostspec
37 for k
, v
in self
._inventory
.items():
38 if 'hostname' not in v
:
41 self
._inventory
= dict()
42 logger
.debug('Loaded inventory %s' % self
._inventory
)
44 def keys(self
) -> List
[str]:
45 return list(self
._inventory
.keys())
47 def __contains__(self
, host
: str) -> bool:
48 return host
in self
._inventory
50 def assert_host(self
, host
: str) -> None:
51 if host
not in self
._inventory
:
52 raise OrchestratorError('host %s does not exist' % host
)
54 def add_host(self
, spec
: HostSpec
) -> None:
55 self
._inventory
[spec
.hostname
] = spec
.to_json()
58 def rm_host(self
, host
: str) -> None:
59 self
.assert_host(host
)
60 del self
._inventory
[host
]
63 def set_addr(self
, host
: str, addr
: str) -> None:
64 self
.assert_host(host
)
65 self
._inventory
[host
]['addr'] = addr
68 def add_label(self
, host
: str, label
: str) -> None:
69 self
.assert_host(host
)
71 if 'labels' not in self
._inventory
[host
]:
72 self
._inventory
[host
]['labels'] = list()
73 if label
not in self
._inventory
[host
]['labels']:
74 self
._inventory
[host
]['labels'].append(label
)
77 def rm_label(self
, host
: str, label
: str) -> None:
78 self
.assert_host(host
)
80 if 'labels' not in self
._inventory
[host
]:
81 self
._inventory
[host
]['labels'] = list()
82 if label
in self
._inventory
[host
]['labels']:
83 self
._inventory
[host
]['labels'].remove(label
)
86 def get_addr(self
, host
: str) -> str:
87 self
.assert_host(host
)
88 return self
._inventory
[host
].get('addr', host
)
90 def filter_by_label(self
, label
: Optional
[str] = '', as_hostspec
: bool = False) -> Iterator
:
91 for h
, hostspec
in self
._inventory
.items():
92 if not label
or label
in hostspec
.get('labels', []):
94 yield self
.spec_from_dict(hostspec
)
98 def spec_from_dict(self
, info
: dict) -> HostSpec
:
99 hostname
= info
['hostname']
102 addr
=info
.get('addr', hostname
),
103 labels
=info
.get('labels', []),
104 status
='Offline' if hostname
in self
.mgr
.offline_hosts
else info
.get('status', ''),
107 def all_specs(self
) -> List
[HostSpec
]:
108 return list(map(self
.spec_from_dict
, self
._inventory
.values()))
110 def save(self
) -> None:
111 self
.mgr
.set_store('inventory', json
.dumps(self
._inventory
))
115 def __init__(self
, mgr
):
116 # type: (CephadmOrchestrator) -> None
118 self
.specs
= {} # type: Dict[str, ServiceSpec]
119 self
.spec_created
= {} # type: Dict[str, datetime.datetime]
120 self
.spec_preview
= {} # type: Dict[str, ServiceSpec]
124 for k
, v
in six
.iteritems(self
.mgr
.get_store_prefix(SPEC_STORE_PREFIX
)):
125 service_name
= k
[len(SPEC_STORE_PREFIX
):]
128 spec
= ServiceSpec
.from_json(v
['spec'])
129 created
= str_to_datetime(v
['created'])
130 self
.specs
[service_name
] = spec
131 self
.spec_created
[service_name
] = created
132 self
.mgr
.log
.debug('SpecStore: loaded spec for %s' % (
134 except Exception as e
:
135 self
.mgr
.log
.warning('unable to load spec for %s: %s' % (
139 def save(self
, spec
):
140 # type: (ServiceSpec) -> None
141 if spec
.preview_only
:
142 self
.spec_preview
[spec
.service_name()] = spec
144 self
.specs
[spec
.service_name()] = spec
145 self
.spec_created
[spec
.service_name()] = datetime_now()
147 SPEC_STORE_PREFIX
+ spec
.service_name(),
149 'spec': spec
.to_json(),
150 'created': datetime_to_str(self
.spec_created
[spec
.service_name()]),
153 self
.mgr
.events
.for_service(spec
, OrchestratorEvent
.INFO
, 'service was created')
155 def rm(self
, service_name
):
156 # type: (str) -> bool
157 found
= service_name
in self
.specs
159 del self
.specs
[service_name
]
160 del self
.spec_created
[service_name
]
161 self
.mgr
.set_store(SPEC_STORE_PREFIX
+ service_name
, None)
164 def find(self
, service_name
: Optional
[str] = None) -> List
[ServiceSpec
]:
166 for sn
, spec
in self
.specs
.items():
167 if not service_name
or \
168 sn
== service_name
or \
169 sn
.startswith(service_name
+ '.'):
171 self
.mgr
.log
.debug('SpecStore: find spec for %s returned: %s' % (
172 service_name
, specs
))
178 HostCache stores different things:
180 1. `daemons`: Deployed daemons O(daemons)
182 They're part of the configuration nowadays and need to be
183 persistent. The name "daemon cache" is unfortunately a bit misleading.
184 Like for example we really need to know where daemons are deployed on
185 hosts that are offline.
187 2. `devices`: ceph-volume inventory cache O(hosts)
189 As soon as this is populated, it becomes more or less read-only.
191 3. `networks`: network interfaces for each host. O(hosts)
193 This is needed in order to deploy MONs. As this is mostly read-only.
195 4. `last_etc_ceph_ceph_conf` O(hosts)
197 Stores the last refresh time for the /etc/ceph/ceph.conf. Used
198 to avoid deploying new configs when failing over to a new mgr.
200 5. `scheduled_daemon_actions`: O(daemons)
202 Used to run daemon actions after deploying a daemon. We need to
203 store it persistently, in order to stay consistent across
207 def __init__(self
, mgr
):
208 # type: (CephadmOrchestrator) -> None
209 self
.mgr
: CephadmOrchestrator
= mgr
210 self
.daemons
= {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
211 self
.last_daemon_update
= {} # type: Dict[str, datetime.datetime]
212 self
.devices
= {} # type: Dict[str, List[inventory.Device]]
213 self
.facts
= {} # type: Dict[str, Dict[str, Any]]
214 self
.last_facts_update
= {} # type: Dict[str, datetime.datetime]
215 self
.osdspec_previews
= {} # type: Dict[str, List[Dict[str, Any]]]
216 self
.networks
= {} # type: Dict[str, Dict[str, List[str]]]
217 self
.last_device_update
= {} # type: Dict[str, datetime.datetime]
218 self
.daemon_refresh_queue
= [] # type: List[str]
219 self
.device_refresh_queue
= [] # type: List[str]
220 self
.osdspec_previews_refresh_queue
= [] # type: List[str]
222 # host -> daemon name -> dict
223 self
.daemon_config_deps
= {} # type: Dict[str, Dict[str, Dict[str,Any]]]
224 self
.last_host_check
= {} # type: Dict[str, datetime.datetime]
225 self
.loading_osdspec_preview
= set() # type: Set[str]
226 self
.last_etc_ceph_ceph_conf
: Dict
[str, datetime
.datetime
] = {}
227 self
.registry_login_queue
: Set
[str] = set()
229 self
.scheduled_daemon_actions
: Dict
[str, Dict
[str, str]] = {}
233 for k
, v
in six
.iteritems(self
.mgr
.get_store_prefix(HOST_CACHE_PREFIX
)):
234 host
= k
[len(HOST_CACHE_PREFIX
):]
235 if host
not in self
.mgr
.inventory
:
236 self
.mgr
.log
.warning('removing stray HostCache host record %s' % (
238 self
.mgr
.set_store(k
, None)
241 if 'last_device_update' in j
:
242 self
.last_device_update
[host
] = str_to_datetime(j
['last_device_update'])
244 self
.device_refresh_queue
.append(host
)
245 # for services, we ignore the persisted last_*_update
246 # and always trigger a new scrape on mgr restart.
247 self
.daemon_refresh_queue
.append(host
)
248 self
.daemons
[host
] = {}
249 self
.osdspec_previews
[host
] = []
250 self
.devices
[host
] = []
251 self
.networks
[host
] = {}
252 self
.daemon_config_deps
[host
] = {}
253 for name
, d
in j
.get('daemons', {}).items():
254 self
.daemons
[host
][name
] = \
255 orchestrator
.DaemonDescription
.from_json(d
)
256 for d
in j
.get('devices', []):
257 self
.devices
[host
].append(inventory
.Device
.from_json(d
))
258 self
.networks
[host
] = j
.get('networks', {})
259 self
.osdspec_previews
[host
] = j
.get('osdspec_previews', {})
261 for name
, d
in j
.get('daemon_config_deps', {}).items():
262 self
.daemon_config_deps
[host
][name
] = {
263 'deps': d
.get('deps', []),
264 'last_config': str_to_datetime(d
['last_config']),
266 if 'last_host_check' in j
:
267 self
.last_host_check
[host
] = str_to_datetime(j
['last_host_check'])
268 if 'last_etc_ceph_ceph_conf' in j
:
269 self
.last_etc_ceph_ceph_conf
[host
] = str_to_datetime(
270 j
['last_etc_ceph_ceph_conf'])
271 self
.registry_login_queue
.add(host
)
272 self
.scheduled_daemon_actions
[host
] = j
.get('scheduled_daemon_actions', {})
275 'HostCache.load: host %s has %d daemons, '
276 '%d devices, %d networks' % (
277 host
, len(self
.daemons
[host
]), len(self
.devices
[host
]),
278 len(self
.networks
[host
])))
279 except Exception as e
:
280 self
.mgr
.log
.warning('unable to load cached state for %s: %s' % (
284 def update_host_daemons(self
, host
, dm
):
285 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
286 self
.daemons
[host
] = dm
287 self
.last_daemon_update
[host
] = datetime_now()
289 def update_host_facts(self
, host
, facts
):
290 # type: (str, Dict[str, Dict[str, Any]]) -> None
291 self
.facts
[host
] = facts
292 self
.last_facts_update
[host
] = datetime
.datetime
.utcnow()
294 def update_host_devices_networks(self
, host
, dls
, nets
):
295 # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
296 self
.devices
[host
] = dls
297 self
.networks
[host
] = nets
298 self
.last_device_update
[host
] = datetime_now()
300 def update_daemon_config_deps(self
, host
: str, name
: str, deps
: List
[str], stamp
: datetime
.datetime
) -> None:
301 self
.daemon_config_deps
[host
][name
] = {
303 'last_config': stamp
,
306 def update_last_host_check(self
, host
):
307 # type: (str) -> None
308 self
.last_host_check
[host
] = datetime_now()
310 def prime_empty_host(self
, host
):
311 # type: (str) -> None
313 Install an empty entry for a host
315 self
.daemons
[host
] = {}
316 self
.devices
[host
] = []
317 self
.networks
[host
] = {}
318 self
.osdspec_previews
[host
] = []
319 self
.daemon_config_deps
[host
] = {}
320 self
.daemon_refresh_queue
.append(host
)
321 self
.device_refresh_queue
.append(host
)
322 self
.osdspec_previews_refresh_queue
.append(host
)
323 self
.registry_login_queue
.add(host
)
325 def invalidate_host_daemons(self
, host
):
326 # type: (str) -> None
327 self
.daemon_refresh_queue
.append(host
)
328 if host
in self
.last_daemon_update
:
329 del self
.last_daemon_update
[host
]
332 def invalidate_host_devices(self
, host
):
333 # type: (str) -> None
334 self
.device_refresh_queue
.append(host
)
335 if host
in self
.last_device_update
:
336 del self
.last_device_update
[host
]
339 def distribute_new_registry_login_info(self
) -> None:
340 self
.registry_login_queue
= set(self
.mgr
.inventory
.keys())
342 def save_host(self
, host
: str) -> None:
343 j
: Dict
[str, Any
] = {
346 'osdspec_previews': [],
347 'daemon_config_deps': {},
349 if host
in self
.last_daemon_update
:
350 j
['last_daemon_update'] = datetime_to_str(self
.last_daemon_update
[host
])
351 if host
in self
.last_device_update
:
352 j
['last_device_update'] = datetime_to_str(self
.last_device_update
[host
])
353 if host
in self
.daemons
:
354 for name
, dd
in self
.daemons
[host
].items():
355 j
['daemons'][name
] = dd
.to_json()
356 if host
in self
.devices
:
357 for d
in self
.devices
[host
]:
358 j
['devices'].append(d
.to_json())
359 if host
in self
.networks
:
360 j
['networks'] = self
.networks
[host
]
361 if host
in self
.daemon_config_deps
:
362 for name
, depi
in self
.daemon_config_deps
[host
].items():
363 j
['daemon_config_deps'][name
] = {
364 'deps': depi
.get('deps', []),
365 'last_config': datetime_to_str(depi
['last_config']),
367 if host
in self
.osdspec_previews
and self
.osdspec_previews
[host
]:
368 j
['osdspec_previews'] = self
.osdspec_previews
[host
]
370 if host
in self
.last_host_check
:
371 j
['last_host_check'] = datetime_to_str(self
.last_host_check
[host
])
373 if host
in self
.last_etc_ceph_ceph_conf
:
374 j
['last_etc_ceph_ceph_conf'] = datetime_to_str(self
.last_etc_ceph_ceph_conf
[host
])
375 if host
in self
.scheduled_daemon_actions
:
376 j
['scheduled_daemon_actions'] = self
.scheduled_daemon_actions
[host
]
378 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, json
.dumps(j
))
380 def rm_host(self
, host
):
381 # type: (str) -> None
382 if host
in self
.daemons
:
383 del self
.daemons
[host
]
384 if host
in self
.devices
:
385 del self
.devices
[host
]
386 if host
in self
.facts
:
388 if host
in self
.last_facts_update
:
389 del self
.last_facts_update
[host
]
390 if host
in self
.osdspec_previews
:
391 del self
.osdspec_previews
[host
]
392 if host
in self
.loading_osdspec_preview
:
393 self
.loading_osdspec_preview
.remove(host
)
394 if host
in self
.networks
:
395 del self
.networks
[host
]
396 if host
in self
.last_daemon_update
:
397 del self
.last_daemon_update
[host
]
398 if host
in self
.last_device_update
:
399 del self
.last_device_update
[host
]
400 if host
in self
.daemon_config_deps
:
401 del self
.daemon_config_deps
[host
]
402 if host
in self
.scheduled_daemon_actions
:
403 del self
.scheduled_daemon_actions
[host
]
404 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, None)
407 # type: () -> List[str]
409 for host
, di
in self
.daemons
.items():
413 def get_daemons(self
):
414 # type: () -> List[orchestrator.DaemonDescription]
416 for host
, dm
in self
.daemons
.items():
417 for name
, dd
in dm
.items():
421 def get_daemon(self
, daemon_name
: str) -> orchestrator
.DaemonDescription
:
422 for _
, dm
in self
.daemons
.items():
423 for _
, dd
in dm
.items():
424 if dd
.name() == daemon_name
:
426 raise orchestrator
.OrchestratorError(f
'Unable to find {daemon_name} daemon(s)')
428 def get_daemons_with_volatile_status(self
) -> Iterator
[Tuple
[str, Dict
[str, orchestrator
.DaemonDescription
]]]:
429 def alter(host
: str, dd_orig
: orchestrator
.DaemonDescription
) -> orchestrator
.DaemonDescription
:
431 if host
in self
.mgr
.offline_hosts
:
433 dd
.status_desc
= 'host is offline'
434 dd
.events
= self
.mgr
.events
.get_for_daemon(dd
.name())
437 for host
, dm
in self
.daemons
.items():
438 yield host
, {name
: alter(host
, d
) for name
, d
in dm
.items()}
440 def get_daemons_by_service(self
, service_name
):
441 # type: (str) -> List[orchestrator.DaemonDescription]
442 result
= [] # type: List[orchestrator.DaemonDescription]
443 for host
, dm
in self
.daemons
.items():
444 for name
, d
in dm
.items():
445 if d
.service_name() == service_name
:
449 def get_daemons_by_type(self
, service_type
):
450 # type: (str) -> List[orchestrator.DaemonDescription]
451 result
= [] # type: List[orchestrator.DaemonDescription]
452 for host
, dm
in self
.daemons
.items():
453 for name
, d
in dm
.items():
454 if d
.daemon_type
== service_type
:
458 def get_daemon_names(self
):
459 # type: () -> List[str]
461 for host
, dm
in self
.daemons
.items():
462 for name
, dd
in dm
.items():
466 def get_daemon_last_config_deps(self
, host
: str, name
: str) -> Tuple
[Optional
[List
[str]], Optional
[datetime
.datetime
]]:
467 if host
in self
.daemon_config_deps
:
468 if name
in self
.daemon_config_deps
[host
]:
469 return self
.daemon_config_deps
[host
][name
].get('deps', []), \
470 self
.daemon_config_deps
[host
][name
].get('last_config', None)
473 def host_needs_daemon_refresh(self
, host
):
474 # type: (str) -> bool
475 if host
in self
.mgr
.offline_hosts
:
476 logger
.debug(f
'Host "{host}" marked as offline. Skipping daemon refresh')
478 if host
in self
.daemon_refresh_queue
:
479 self
.daemon_refresh_queue
.remove(host
)
481 cutoff
= datetime_now() - datetime
.timedelta(
482 seconds
=self
.mgr
.daemon_cache_timeout
)
483 if host
not in self
.last_daemon_update
or self
.last_daemon_update
[host
] < cutoff
:
487 def host_needs_facts_refresh(self
, host
):
488 # type: (str) -> bool
489 if host
in self
.mgr
.offline_hosts
:
490 logger
.debug(f
'Host "{host}" marked as offline. Skipping gather facts refresh')
492 cutoff
= datetime
.datetime
.utcnow() - datetime
.timedelta(
493 seconds
=self
.mgr
.facts_cache_timeout
)
494 if host
not in self
.last_facts_update
or self
.last_facts_update
[host
] < cutoff
:
498 def host_had_daemon_refresh(self
, host
: str) -> bool:
502 if host
in self
.last_daemon_update
:
504 if host
not in self
.daemons
:
506 return bool(self
.daemons
[host
])
508 def host_needs_device_refresh(self
, host
):
509 # type: (str) -> bool
510 if host
in self
.mgr
.offline_hosts
:
511 logger
.debug(f
'Host "{host}" marked as offline. Skipping device refresh')
513 if host
in self
.device_refresh_queue
:
514 self
.device_refresh_queue
.remove(host
)
516 cutoff
= datetime_now() - datetime
.timedelta(
517 seconds
=self
.mgr
.device_cache_timeout
)
518 if host
not in self
.last_device_update
or self
.last_device_update
[host
] < cutoff
:
522 def host_needs_osdspec_preview_refresh(self
, host
: str) -> bool:
523 if host
in self
.mgr
.offline_hosts
:
524 logger
.debug(f
'Host "{host}" marked as offline. Skipping osdspec preview refresh')
526 if host
in self
.osdspec_previews_refresh_queue
:
527 self
.osdspec_previews_refresh_queue
.remove(host
)
529 # Since this is dependent on other factors (device and spec) this does not need
530 # to be updated periodically.
533 def host_needs_check(self
, host
):
534 # type: (str) -> bool
535 cutoff
= datetime_now() - datetime
.timedelta(
536 seconds
=self
.mgr
.host_check_interval
)
537 return host
not in self
.last_host_check
or self
.last_host_check
[host
] < cutoff
539 def host_needs_new_etc_ceph_ceph_conf(self
, host
: str) -> bool:
540 if not self
.mgr
.manage_etc_ceph_ceph_conf
:
544 if host
in self
.mgr
.offline_hosts
:
546 if not self
.mgr
.last_monmap
:
548 if host
not in self
.last_etc_ceph_ceph_conf
:
550 if self
.mgr
.last_monmap
> self
.last_etc_ceph_ceph_conf
[host
]:
552 if self
.mgr
.extra_ceph_conf_is_newer(self
.last_etc_ceph_ceph_conf
[host
]):
554 # already up to date:
557 def update_last_etc_ceph_ceph_conf(self
, host
: str) -> None:
558 if not self
.mgr
.last_monmap
:
560 self
.last_etc_ceph_ceph_conf
[host
] = datetime_now()
562 def host_needs_registry_login(self
, host
: str) -> bool:
563 if host
in self
.mgr
.offline_hosts
:
565 if host
in self
.registry_login_queue
:
566 self
.registry_login_queue
.remove(host
)
570 def add_daemon(self
, host
, dd
):
571 # type: (str, orchestrator.DaemonDescription) -> None
572 assert host
in self
.daemons
573 self
.daemons
[host
][dd
.name()] = dd
575 def rm_daemon(self
, host
: str, name
: str) -> None:
576 if host
in self
.daemons
:
577 if name
in self
.daemons
[host
]:
578 del self
.daemons
[host
][name
]
580 def daemon_cache_filled(self
) -> bool:
582 i.e. we have checked the daemons for each hosts at least once.
583 excluding offline hosts.
585 We're not checking for `host_needs_daemon_refresh`, as this might never be
588 return all((self
.host_had_daemon_refresh(h
) or h
in self
.mgr
.offline_hosts
)
589 for h
in self
.get_hosts())
591 def schedule_daemon_action(self
, host
: str, daemon_name
: str, action
: str) -> None:
599 existing_action
= self
.scheduled_daemon_actions
.get(host
, {}).get(daemon_name
, None)
600 if existing_action
and priorities
[existing_action
] > priorities
[action
]:
602 f
'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
605 if host
not in self
.scheduled_daemon_actions
:
606 self
.scheduled_daemon_actions
[host
] = {}
607 self
.scheduled_daemon_actions
[host
][daemon_name
] = action
609 def rm_scheduled_daemon_action(self
, host
: str, daemon_name
: str) -> None:
610 if host
in self
.scheduled_daemon_actions
:
611 if daemon_name
in self
.scheduled_daemon_actions
[host
]:
612 del self
.scheduled_daemon_actions
[host
][daemon_name
]
613 if not self
.scheduled_daemon_actions
[host
]:
614 del self
.scheduled_daemon_actions
[host
]
616 def get_scheduled_daemon_action(self
, host
: str, daemon
: str) -> Optional
[str]:
617 return self
.scheduled_daemon_actions
.get(host
, {}).get(daemon
)
621 def __init__(self
, mgr
):
622 # type: (CephadmOrchestrator) -> None
623 self
.mgr
: CephadmOrchestrator
= mgr
624 self
.events
= {} # type: Dict[str, List[OrchestratorEvent]]
626 def add(self
, event
: OrchestratorEvent
) -> None:
628 if event
.kind_subject() not in self
.events
:
629 self
.events
[event
.kind_subject()] = [event
]
631 for e
in self
.events
[event
.kind_subject()]:
632 if e
.message
== event
.message
:
635 self
.events
[event
.kind_subject()].append(event
)
637 # limit to five events for now.
638 self
.events
[event
.kind_subject()] = self
.events
[event
.kind_subject()][-5:]
640 def for_service(self
, spec
: ServiceSpec
, level
: str, message
: str) -> None:
641 e
= OrchestratorEvent(datetime_now(), 'service',
642 spec
.service_name(), level
, message
)
645 def from_orch_error(self
, e
: OrchestratorError
) -> None:
646 if e
.event_subject
is not None:
647 self
.add(OrchestratorEvent(
655 def for_daemon(self
, daemon_name
: str, level
: str, message
: str) -> None:
656 e
= OrchestratorEvent(datetime_now(), 'daemon', daemon_name
, level
, message
)
659 def for_daemon_from_exception(self
, daemon_name
: str, e
: Exception) -> None:
666 def cleanup(self
) -> None:
667 # Needs to be properly done, in case events are persistently stored.
669 unknowns
: List
[str] = []
670 daemons
= self
.mgr
.cache
.get_daemon_names()
671 specs
= self
.mgr
.spec_store
.specs
.keys()
672 for k_s
, v
in self
.events
.items():
673 kind
, subject
= k_s
.split(':')
674 if kind
== 'service':
675 if subject
not in specs
:
677 elif kind
== 'daemon':
678 if subject
not in daemons
:
684 def get_for_service(self
, name
: str) -> List
[OrchestratorEvent
]:
685 return self
.events
.get('service:' + name
, [])
687 def get_for_daemon(self
, name
: str) -> List
[OrchestratorEvent
]:
688 return self
.events
.get('daemon:' + name
, [])