]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/inventory.py
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
CommitLineData
e306af50
TL
1import datetime
2from copy import copy
3import json
4import logging
5from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set
6
7import six
8
9import orchestrator
10from ceph.deployment import inventory
11from ceph.deployment.service_spec import ServiceSpec
adb31ebb 12from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
f6b5b4d7 13from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent
e306af50
TL
14
15if TYPE_CHECKING:
16 from .module import CephadmOrchestrator
17
18
19logger = logging.getLogger(__name__)
20
21HOST_CACHE_PREFIX = "host."
22SPEC_STORE_PREFIX = "spec."
e306af50
TL
23
24
25class Inventory:
adb31ebb
TL
26 """
27 The inventory stores a HostSpec for all hosts persistently.
28 """
29
e306af50
TL
30 def __init__(self, mgr: 'CephadmOrchestrator'):
31 self.mgr = mgr
32 # load inventory
33 i = self.mgr.get_store('inventory')
34 if i:
35 self._inventory: Dict[str, dict] = json.loads(i)
adb31ebb
TL
36 # handle old clusters missing 'hostname' key from hostspec
37 for k, v in self._inventory.items():
38 if 'hostname' not in v:
39 v['hostname'] = k
e306af50
TL
40 else:
41 self._inventory = dict()
42 logger.debug('Loaded inventory %s' % self._inventory)
43
44 def keys(self) -> List[str]:
45 return list(self._inventory.keys())
46
47 def __contains__(self, host: str) -> bool:
48 return host in self._inventory
49
adb31ebb 50 def assert_host(self, host: str) -> None:
e306af50
TL
51 if host not in self._inventory:
52 raise OrchestratorError('host %s does not exist' % host)
53
adb31ebb 54 def add_host(self, spec: HostSpec) -> None:
e306af50
TL
55 self._inventory[spec.hostname] = spec.to_json()
56 self.save()
57
adb31ebb 58 def rm_host(self, host: str) -> None:
e306af50
TL
59 self.assert_host(host)
60 del self._inventory[host]
61 self.save()
62
adb31ebb 63 def set_addr(self, host: str, addr: str) -> None:
e306af50
TL
64 self.assert_host(host)
65 self._inventory[host]['addr'] = addr
66 self.save()
67
adb31ebb 68 def add_label(self, host: str, label: str) -> None:
e306af50
TL
69 self.assert_host(host)
70
71 if 'labels' not in self._inventory[host]:
72 self._inventory[host]['labels'] = list()
73 if label not in self._inventory[host]['labels']:
74 self._inventory[host]['labels'].append(label)
75 self.save()
76
adb31ebb 77 def rm_label(self, host: str, label: str) -> None:
e306af50
TL
78 self.assert_host(host)
79
80 if 'labels' not in self._inventory[host]:
81 self._inventory[host]['labels'] = list()
82 if label in self._inventory[host]['labels']:
83 self._inventory[host]['labels'].remove(label)
84 self.save()
85
adb31ebb 86 def get_addr(self, host: str) -> str:
e306af50
TL
87 self.assert_host(host)
88 return self._inventory[host].get('addr', host)
89
90 def filter_by_label(self, label: Optional[str] = '', as_hostspec: bool = False) -> Iterator:
91 for h, hostspec in self._inventory.items():
92 if not label or label in hostspec.get('labels', []):
93 if as_hostspec:
f6b5b4d7
TL
94 yield self.spec_from_dict(hostspec)
95 else:
96 yield h
e306af50 97
adb31ebb 98 def spec_from_dict(self, info: dict) -> HostSpec:
e306af50
TL
99 hostname = info['hostname']
100 return HostSpec(
f91f0fd5
TL
101 hostname,
102 addr=info.get('addr', hostname),
103 labels=info.get('labels', []),
104 status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''),
105 )
e306af50 106
f91f0fd5
TL
107 def all_specs(self) -> List[HostSpec]:
108 return list(map(self.spec_from_dict, self._inventory.values()))
e306af50 109
adb31ebb 110 def save(self) -> None:
e306af50
TL
111 self.mgr.set_store('inventory', json.dumps(self._inventory))
112
113
114class SpecStore():
115 def __init__(self, mgr):
116 # type: (CephadmOrchestrator) -> None
117 self.mgr = mgr
f91f0fd5
TL
118 self.specs = {} # type: Dict[str, ServiceSpec]
119 self.spec_created = {} # type: Dict[str, datetime.datetime]
120 self.spec_preview = {} # type: Dict[str, ServiceSpec]
e306af50
TL
121
122 def load(self):
123 # type: () -> None
124 for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)):
125 service_name = k[len(SPEC_STORE_PREFIX):]
126 try:
127 v = json.loads(v)
128 spec = ServiceSpec.from_json(v['spec'])
f91f0fd5 129 created = str_to_datetime(v['created'])
e306af50
TL
130 self.specs[service_name] = spec
131 self.spec_created[service_name] = created
132 self.mgr.log.debug('SpecStore: loaded spec for %s' % (
133 service_name))
134 except Exception as e:
135 self.mgr.log.warning('unable to load spec for %s: %s' % (
136 service_name, e))
137 pass
138
139 def save(self, spec):
140 # type: (ServiceSpec) -> None
f6b5b4d7
TL
141 if spec.preview_only:
142 self.spec_preview[spec.service_name()] = spec
143 return None
e306af50 144 self.specs[spec.service_name()] = spec
adb31ebb 145 self.spec_created[spec.service_name()] = datetime_now()
e306af50
TL
146 self.mgr.set_store(
147 SPEC_STORE_PREFIX + spec.service_name(),
148 json.dumps({
149 'spec': spec.to_json(),
f91f0fd5 150 'created': datetime_to_str(self.spec_created[spec.service_name()]),
e306af50
TL
151 }, sort_keys=True),
152 )
f6b5b4d7 153 self.mgr.events.for_service(spec, OrchestratorEvent.INFO, 'service was created')
e306af50
TL
154
155 def rm(self, service_name):
156 # type: (str) -> bool
157 found = service_name in self.specs
158 if found:
159 del self.specs[service_name]
160 del self.spec_created[service_name]
161 self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
162 return found
163
164 def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]:
165 specs = []
166 for sn, spec in self.specs.items():
167 if not service_name or \
168 sn == service_name or \
169 sn.startswith(service_name + '.'):
170 specs.append(spec)
171 self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % (
172 service_name, specs))
173 return specs
174
f91f0fd5 175
e306af50 176class HostCache():
adb31ebb
TL
177 """
178 HostCache stores different things:
179
180 1. `daemons`: Deployed daemons O(daemons)
181
182 They're part of the configuration nowadays and need to be
183 persistent. The name "daemon cache" is unfortunately a bit misleading.
184 Like for example we really need to know where daemons are deployed on
185 hosts that are offline.
186
187 2. `devices`: ceph-volume inventory cache O(hosts)
188
189 As soon as this is populated, it becomes more or less read-only.
190
191 3. `networks`: network interfaces for each host. O(hosts)
192
193 This is needed in order to deploy MONs. As this is mostly read-only.
194
195 4. `last_etc_ceph_ceph_conf` O(hosts)
196
197 Stores the last refresh time for the /etc/ceph/ceph.conf. Used
198 to avoid deploying new configs when failing over to a new mgr.
199
200 5. `scheduled_daemon_actions`: O(daemons)
201
202 Used to run daemon actions after deploying a daemon. We need to
203 store it persistently, in order to stay consistent across
204 MGR failovers.
205 """
206
e306af50
TL
207 def __init__(self, mgr):
208 # type: (CephadmOrchestrator) -> None
209 self.mgr: CephadmOrchestrator = mgr
210 self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
211 self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
212 self.devices = {} # type: Dict[str, List[inventory.Device]]
adb31ebb
TL
213 self.facts = {} # type: Dict[str, Dict[str, Any]]
214 self.last_facts_update = {} # type: Dict[str, datetime.datetime]
e306af50
TL
215 self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
216 self.networks = {} # type: Dict[str, Dict[str, List[str]]]
217 self.last_device_update = {} # type: Dict[str, datetime.datetime]
f91f0fd5
TL
218 self.daemon_refresh_queue = [] # type: List[str]
219 self.device_refresh_queue = [] # type: List[str]
220 self.osdspec_previews_refresh_queue = [] # type: List[str]
f6b5b4d7
TL
221
222 # host -> daemon name -> dict
e306af50
TL
223 self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
224 self.last_host_check = {} # type: Dict[str, datetime.datetime]
225 self.loading_osdspec_preview = set() # type: Set[str]
f6b5b4d7
TL
226 self.last_etc_ceph_ceph_conf: Dict[str, datetime.datetime] = {}
227 self.registry_login_queue: Set[str] = set()
e306af50 228
f91f0fd5
TL
229 self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
230
e306af50
TL
231 def load(self):
232 # type: () -> None
233 for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)):
234 host = k[len(HOST_CACHE_PREFIX):]
235 if host not in self.mgr.inventory:
236 self.mgr.log.warning('removing stray HostCache host record %s' % (
237 host))
238 self.mgr.set_store(k, None)
239 try:
240 j = json.loads(v)
241 if 'last_device_update' in j:
f91f0fd5 242 self.last_device_update[host] = str_to_datetime(j['last_device_update'])
e306af50
TL
243 else:
244 self.device_refresh_queue.append(host)
245 # for services, we ignore the persisted last_*_update
246 # and always trigger a new scrape on mgr restart.
247 self.daemon_refresh_queue.append(host)
248 self.daemons[host] = {}
249 self.osdspec_previews[host] = []
250 self.devices[host] = []
251 self.networks[host] = {}
252 self.daemon_config_deps[host] = {}
253 for name, d in j.get('daemons', {}).items():
254 self.daemons[host][name] = \
255 orchestrator.DaemonDescription.from_json(d)
256 for d in j.get('devices', []):
257 self.devices[host].append(inventory.Device.from_json(d))
258 self.networks[host] = j.get('networks', {})
259 self.osdspec_previews[host] = j.get('osdspec_previews', {})
260
261 for name, d in j.get('daemon_config_deps', {}).items():
262 self.daemon_config_deps[host][name] = {
263 'deps': d.get('deps', []),
f91f0fd5 264 'last_config': str_to_datetime(d['last_config']),
e306af50
TL
265 }
266 if 'last_host_check' in j:
f91f0fd5 267 self.last_host_check[host] = str_to_datetime(j['last_host_check'])
f6b5b4d7 268 if 'last_etc_ceph_ceph_conf' in j:
f91f0fd5
TL
269 self.last_etc_ceph_ceph_conf[host] = str_to_datetime(
270 j['last_etc_ceph_ceph_conf'])
f6b5b4d7 271 self.registry_login_queue.add(host)
f91f0fd5
TL
272 self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
273
e306af50
TL
274 self.mgr.log.debug(
275 'HostCache.load: host %s has %d daemons, '
276 '%d devices, %d networks' % (
277 host, len(self.daemons[host]), len(self.devices[host]),
278 len(self.networks[host])))
279 except Exception as e:
280 self.mgr.log.warning('unable to load cached state for %s: %s' % (
281 host, e))
282 pass
283
284 def update_host_daemons(self, host, dm):
285 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
286 self.daemons[host] = dm
adb31ebb
TL
287 self.last_daemon_update[host] = datetime_now()
288
289 def update_host_facts(self, host, facts):
290 # type: (str, Dict[str, Dict[str, Any]]) -> None
291 self.facts[host] = facts
292 self.last_facts_update[host] = datetime.datetime.utcnow()
e306af50
TL
293
294 def update_host_devices_networks(self, host, dls, nets):
295 # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
296 self.devices[host] = dls
297 self.networks[host] = nets
adb31ebb 298 self.last_device_update[host] = datetime_now()
e306af50 299
adb31ebb 300 def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
e306af50
TL
301 self.daemon_config_deps[host][name] = {
302 'deps': deps,
303 'last_config': stamp,
304 }
305
306 def update_last_host_check(self, host):
307 # type: (str) -> None
adb31ebb 308 self.last_host_check[host] = datetime_now()
e306af50
TL
309
310 def prime_empty_host(self, host):
311 # type: (str) -> None
312 """
313 Install an empty entry for a host
314 """
315 self.daemons[host] = {}
316 self.devices[host] = []
317 self.networks[host] = {}
318 self.osdspec_previews[host] = []
319 self.daemon_config_deps[host] = {}
320 self.daemon_refresh_queue.append(host)
321 self.device_refresh_queue.append(host)
322 self.osdspec_previews_refresh_queue.append(host)
f6b5b4d7 323 self.registry_login_queue.add(host)
e306af50
TL
324
325 def invalidate_host_daemons(self, host):
326 # type: (str) -> None
327 self.daemon_refresh_queue.append(host)
328 if host in self.last_daemon_update:
329 del self.last_daemon_update[host]
330 self.mgr.event.set()
331
332 def invalidate_host_devices(self, host):
333 # type: (str) -> None
334 self.device_refresh_queue.append(host)
335 if host in self.last_device_update:
336 del self.last_device_update[host]
337 self.mgr.event.set()
f91f0fd5 338
adb31ebb 339 def distribute_new_registry_login_info(self) -> None:
f6b5b4d7 340 self.registry_login_queue = set(self.mgr.inventory.keys())
e306af50 341
f91f0fd5
TL
342 def save_host(self, host: str) -> None:
343 j: Dict[str, Any] = {
e306af50
TL
344 'daemons': {},
345 'devices': [],
346 'osdspec_previews': [],
347 'daemon_config_deps': {},
348 }
349 if host in self.last_daemon_update:
f91f0fd5 350 j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
e306af50 351 if host in self.last_device_update:
f91f0fd5 352 j['last_device_update'] = datetime_to_str(self.last_device_update[host])
adb31ebb
TL
353 if host in self.daemons:
354 for name, dd in self.daemons[host].items():
355 j['daemons'][name] = dd.to_json()
356 if host in self.devices:
357 for d in self.devices[host]:
358 j['devices'].append(d.to_json())
359 if host in self.networks:
360 j['networks'] = self.networks[host]
361 if host in self.daemon_config_deps:
362 for name, depi in self.daemon_config_deps[host].items():
363 j['daemon_config_deps'][name] = {
364 'deps': depi.get('deps', []),
365 'last_config': datetime_to_str(depi['last_config']),
366 }
367 if host in self.osdspec_previews and self.osdspec_previews[host]:
e306af50
TL
368 j['osdspec_previews'] = self.osdspec_previews[host]
369
370 if host in self.last_host_check:
f91f0fd5 371 j['last_host_check'] = datetime_to_str(self.last_host_check[host])
f6b5b4d7
TL
372
373 if host in self.last_etc_ceph_ceph_conf:
f91f0fd5 374 j['last_etc_ceph_ceph_conf'] = datetime_to_str(self.last_etc_ceph_ceph_conf[host])
adb31ebb 375 if host in self.scheduled_daemon_actions:
f91f0fd5 376 j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
f6b5b4d7 377
e306af50
TL
378 self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
379
380 def rm_host(self, host):
381 # type: (str) -> None
382 if host in self.daemons:
383 del self.daemons[host]
384 if host in self.devices:
385 del self.devices[host]
adb31ebb
TL
386 if host in self.facts:
387 del self.facts[host]
388 if host in self.last_facts_update:
389 del self.last_facts_update[host]
e306af50
TL
390 if host in self.osdspec_previews:
391 del self.osdspec_previews[host]
392 if host in self.loading_osdspec_preview:
393 self.loading_osdspec_preview.remove(host)
394 if host in self.networks:
395 del self.networks[host]
396 if host in self.last_daemon_update:
397 del self.last_daemon_update[host]
398 if host in self.last_device_update:
399 del self.last_device_update[host]
400 if host in self.daemon_config_deps:
401 del self.daemon_config_deps[host]
f91f0fd5
TL
402 if host in self.scheduled_daemon_actions:
403 del self.scheduled_daemon_actions[host]
e306af50
TL
404 self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
405
406 def get_hosts(self):
407 # type: () -> List[str]
408 r = []
409 for host, di in self.daemons.items():
410 r.append(host)
411 return r
412
413 def get_daemons(self):
414 # type: () -> List[orchestrator.DaemonDescription]
415 r = []
416 for host, dm in self.daemons.items():
417 for name, dd in dm.items():
418 r.append(dd)
419 return r
420
f6b5b4d7
TL
421 def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription:
422 for _, dm in self.daemons.items():
423 for _, dd in dm.items():
424 if dd.name() == daemon_name:
425 return dd
426 raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
427
e306af50 428 def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
adb31ebb 429 def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
f6b5b4d7 430 dd = copy(dd_orig)
e306af50 431 if host in self.mgr.offline_hosts:
f6b5b4d7
TL
432 dd.status = -1
433 dd.status_desc = 'host is offline'
434 dd.events = self.mgr.events.get_for_daemon(dd.name())
435 return dd
436
437 for host, dm in self.daemons.items():
438 yield host, {name: alter(host, d) for name, d in dm.items()}
e306af50
TL
439
440 def get_daemons_by_service(self, service_name):
441 # type: (str) -> List[orchestrator.DaemonDescription]
442 result = [] # type: List[orchestrator.DaemonDescription]
443 for host, dm in self.daemons.items():
444 for name, d in dm.items():
f6b5b4d7
TL
445 if d.service_name() == service_name:
446 result.append(d)
447 return result
448
449 def get_daemons_by_type(self, service_type):
450 # type: (str) -> List[orchestrator.DaemonDescription]
451 result = [] # type: List[orchestrator.DaemonDescription]
452 for host, dm in self.daemons.items():
453 for name, d in dm.items():
454 if d.daemon_type == service_type:
e306af50
TL
455 result.append(d)
456 return result
457
458 def get_daemon_names(self):
459 # type: () -> List[str]
460 r = []
461 for host, dm in self.daemons.items():
462 for name, dd in dm.items():
463 r.append(name)
464 return r
465
adb31ebb 466 def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
e306af50
TL
467 if host in self.daemon_config_deps:
468 if name in self.daemon_config_deps[host]:
469 return self.daemon_config_deps[host][name].get('deps', []), \
470 self.daemon_config_deps[host][name].get('last_config', None)
471 return None, None
472
473 def host_needs_daemon_refresh(self, host):
474 # type: (str) -> bool
475 if host in self.mgr.offline_hosts:
476 logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
477 return False
478 if host in self.daemon_refresh_queue:
479 self.daemon_refresh_queue.remove(host)
480 return True
adb31ebb 481 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
482 seconds=self.mgr.daemon_cache_timeout)
483 if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
484 return True
485 return False
486
adb31ebb
TL
487 def host_needs_facts_refresh(self, host):
488 # type: (str) -> bool
489 if host in self.mgr.offline_hosts:
490 logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh')
491 return False
492 cutoff = datetime.datetime.utcnow() - datetime.timedelta(
493 seconds=self.mgr.facts_cache_timeout)
494 if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
495 return True
496 return False
497
f91f0fd5
TL
498 def host_had_daemon_refresh(self, host: str) -> bool:
499 """
500 ... at least once.
501 """
502 if host in self.last_daemon_update:
503 return True
504 if host not in self.daemons:
505 return False
506 return bool(self.daemons[host])
507
e306af50
TL
508 def host_needs_device_refresh(self, host):
509 # type: (str) -> bool
510 if host in self.mgr.offline_hosts:
511 logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
512 return False
513 if host in self.device_refresh_queue:
514 self.device_refresh_queue.remove(host)
515 return True
adb31ebb 516 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
517 seconds=self.mgr.device_cache_timeout)
518 if host not in self.last_device_update or self.last_device_update[host] < cutoff:
519 return True
520 return False
521
adb31ebb 522 def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
e306af50
TL
523 if host in self.mgr.offline_hosts:
524 logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
525 return False
526 if host in self.osdspec_previews_refresh_queue:
527 self.osdspec_previews_refresh_queue.remove(host)
528 return True
529 # Since this is dependent on other factors (device and spec) this does not need
530 # to be updated periodically.
531 return False
532
533 def host_needs_check(self, host):
534 # type: (str) -> bool
adb31ebb 535 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
536 seconds=self.mgr.host_check_interval)
537 return host not in self.last_host_check or self.last_host_check[host] < cutoff
538
adb31ebb 539 def host_needs_new_etc_ceph_ceph_conf(self, host: str) -> bool:
f6b5b4d7
TL
540 if not self.mgr.manage_etc_ceph_ceph_conf:
541 return False
542 if self.mgr.paused:
543 return False
544 if host in self.mgr.offline_hosts:
545 return False
546 if not self.mgr.last_monmap:
547 return False
548 if host not in self.last_etc_ceph_ceph_conf:
549 return True
550 if self.mgr.last_monmap > self.last_etc_ceph_ceph_conf[host]:
551 return True
f91f0fd5
TL
552 if self.mgr.extra_ceph_conf_is_newer(self.last_etc_ceph_ceph_conf[host]):
553 return True
f6b5b4d7
TL
554 # already up to date:
555 return False
f91f0fd5 556
adb31ebb 557 def update_last_etc_ceph_ceph_conf(self, host: str) -> None:
f6b5b4d7
TL
558 if not self.mgr.last_monmap:
559 return
adb31ebb 560 self.last_etc_ceph_ceph_conf[host] = datetime_now()
f6b5b4d7 561
f91f0fd5 562 def host_needs_registry_login(self, host: str) -> bool:
f6b5b4d7
TL
563 if host in self.mgr.offline_hosts:
564 return False
565 if host in self.registry_login_queue:
566 self.registry_login_queue.remove(host)
567 return True
568 return False
569
e306af50
TL
570 def add_daemon(self, host, dd):
571 # type: (str, orchestrator.DaemonDescription) -> None
572 assert host in self.daemons
573 self.daemons[host][dd.name()] = dd
574
adb31ebb 575 def rm_daemon(self, host: str, name: str) -> None:
e306af50
TL
576 if host in self.daemons:
577 if name in self.daemons[host]:
f6b5b4d7
TL
578 del self.daemons[host][name]
579
adb31ebb 580 def daemon_cache_filled(self) -> bool:
f6b5b4d7
TL
581 """
582 i.e. we have checked the daemons for each hosts at least once.
583 excluding offline hosts.
584
585 We're not checking for `host_needs_daemon_refresh`, as this might never be
586 False for all hosts.
587 """
f91f0fd5 588 return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
f6b5b4d7
TL
589 for h in self.get_hosts())
590
adb31ebb 591 def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
f91f0fd5
TL
592 priorities = {
593 'start': 1,
594 'restart': 2,
595 'reconfig': 3,
596 'redeploy': 4,
597 'stop': 5,
598 }
599 existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None)
600 if existing_action and priorities[existing_action] > priorities[action]:
601 logger.debug(
602 f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
603 return
604
605 if host not in self.scheduled_daemon_actions:
606 self.scheduled_daemon_actions[host] = {}
607 self.scheduled_daemon_actions[host][daemon_name] = action
608
adb31ebb 609 def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> None:
f91f0fd5
TL
610 if host in self.scheduled_daemon_actions:
611 if daemon_name in self.scheduled_daemon_actions[host]:
612 del self.scheduled_daemon_actions[host][daemon_name]
613 if not self.scheduled_daemon_actions[host]:
614 del self.scheduled_daemon_actions[host]
615
adb31ebb 616 def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
f91f0fd5
TL
617 return self.scheduled_daemon_actions.get(host, {}).get(daemon)
618
f6b5b4d7
TL
619
620class EventStore():
621 def __init__(self, mgr):
622 # type: (CephadmOrchestrator) -> None
623 self.mgr: CephadmOrchestrator = mgr
f91f0fd5 624 self.events = {} # type: Dict[str, List[OrchestratorEvent]]
f6b5b4d7
TL
625
626 def add(self, event: OrchestratorEvent) -> None:
627
628 if event.kind_subject() not in self.events:
629 self.events[event.kind_subject()] = [event]
630
631 for e in self.events[event.kind_subject()]:
632 if e.message == event.message:
633 return
634
635 self.events[event.kind_subject()].append(event)
636
637 # limit to five events for now.
638 self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
639
adb31ebb
TL
640 def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
641 e = OrchestratorEvent(datetime_now(), 'service',
f91f0fd5 642 spec.service_name(), level, message)
f6b5b4d7
TL
643 self.add(e)
644
adb31ebb 645 def from_orch_error(self, e: OrchestratorError) -> None:
f6b5b4d7
TL
646 if e.event_subject is not None:
647 self.add(OrchestratorEvent(
adb31ebb 648 datetime_now(),
f6b5b4d7
TL
649 e.event_subject[0],
650 e.event_subject[1],
651 "ERROR",
652 str(e)
653 ))
654
adb31ebb
TL
655 def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
656 e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
f6b5b4d7
TL
657 self.add(e)
658
adb31ebb 659 def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
f6b5b4d7
TL
660 self.for_daemon(
661 daemon_name,
662 "ERROR",
663 str(e)
664 )
665
666 def cleanup(self) -> None:
667 # Needs to be properly done, in case events are persistently stored.
668
669 unknowns: List[str] = []
670 daemons = self.mgr.cache.get_daemon_names()
671 specs = self.mgr.spec_store.specs.keys()
672 for k_s, v in self.events.items():
673 kind, subject = k_s.split(':')
674 if kind == 'service':
675 if subject not in specs:
676 unknowns.append(k_s)
677 elif kind == 'daemon':
678 if subject not in daemons:
679 unknowns.append(k_s)
680
681 for k_s in unknowns:
682 del self.events[k_s]
683
adb31ebb 684 def get_for_service(self, name: str) -> List[OrchestratorEvent]:
f6b5b4d7
TL
685 return self.events.get('service:' + name, [])
686
adb31ebb 687 def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
f6b5b4d7 688 return self.events.get('daemon:' + name, [])