]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/inventory.py
import ceph pacific 16.2.5
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
1 import datetime
2 from copy import copy
3 import ipaddress
4 import json
5 import logging
6 import socket
7 from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
8 NamedTuple, Type
9
10 import orchestrator
11 from ceph.deployment import inventory
12 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
13 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
14 from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
15
16 from .utils import resolve_ip
17
18 if TYPE_CHECKING:
19 from .module import CephadmOrchestrator
20
21
22 logger = logging.getLogger(__name__)
23
24 HOST_CACHE_PREFIX = "host."
25 SPEC_STORE_PREFIX = "spec."
26
27
28 class Inventory:
29 """
30 The inventory stores a HostSpec for all hosts persistently.
31 """
32
33 def __init__(self, mgr: 'CephadmOrchestrator'):
34 self.mgr = mgr
35 adjusted_addrs = False
36
37 def is_valid_ip(ip: str) -> bool:
38 try:
39 ipaddress.ip_address(ip)
40 return True
41 except ValueError:
42 return False
43
44 # load inventory
45 i = self.mgr.get_store('inventory')
46 if i:
47 self._inventory: Dict[str, dict] = json.loads(i)
48 # handle old clusters missing 'hostname' key from hostspec
49 for k, v in self._inventory.items():
50 if 'hostname' not in v:
51 v['hostname'] = k
52
53 # convert legacy non-IP addr?
54 if is_valid_ip(str(v.get('addr'))):
55 continue
56 if len(self._inventory) > 1:
57 if k == socket.gethostname():
58 # Never try to resolve our own host! This is
59 # fraught and can lead to either a loopback
60 # address (due to podman's futzing with
61 # /etc/hosts) or a private IP based on the CNI
62 # configuration. Instead, wait until the mgr
63 # fails over to another host and let them resolve
64 # this host.
65 continue
66 ip = resolve_ip(cast(str, v.get('addr')))
67 else:
68 # we only have 1 node in the cluster, so we can't
69 # rely on another host doing the lookup. use the
70 # IP the mgr binds to.
71 ip = self.mgr.get_mgr_ip()
72 if is_valid_ip(ip) and not ip.startswith('127.0.'):
73 self.mgr.log.info(
74 f"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'"
75 )
76 v['addr'] = ip
77 adjusted_addrs = True
78 if adjusted_addrs:
79 self.save()
80 else:
81 self._inventory = dict()
82 logger.debug('Loaded inventory %s' % self._inventory)
83
84 def keys(self) -> List[str]:
85 return list(self._inventory.keys())
86
87 def __contains__(self, host: str) -> bool:
88 return host in self._inventory
89
90 def assert_host(self, host: str) -> None:
91 if host not in self._inventory:
92 raise OrchestratorError('host %s does not exist' % host)
93
94 def add_host(self, spec: HostSpec) -> None:
95 self._inventory[spec.hostname] = spec.to_json()
96 self.save()
97
98 def rm_host(self, host: str) -> None:
99 self.assert_host(host)
100 del self._inventory[host]
101 self.save()
102
103 def set_addr(self, host: str, addr: str) -> None:
104 self.assert_host(host)
105 self._inventory[host]['addr'] = addr
106 self.save()
107
108 def add_label(self, host: str, label: str) -> None:
109 self.assert_host(host)
110
111 if 'labels' not in self._inventory[host]:
112 self._inventory[host]['labels'] = list()
113 if label not in self._inventory[host]['labels']:
114 self._inventory[host]['labels'].append(label)
115 self.save()
116
117 def rm_label(self, host: str, label: str) -> None:
118 self.assert_host(host)
119
120 if 'labels' not in self._inventory[host]:
121 self._inventory[host]['labels'] = list()
122 if label in self._inventory[host]['labels']:
123 self._inventory[host]['labels'].remove(label)
124 self.save()
125
126 def has_label(self, host: str, label: str) -> bool:
127 return (
128 host in self._inventory
129 and label in self._inventory[host].get('labels', [])
130 )
131
132 def get_addr(self, host: str) -> str:
133 self.assert_host(host)
134 return self._inventory[host].get('addr', host)
135
136 def filter_by_label(self, label: Optional[str] = '', as_hostspec: bool = False) -> Iterator:
137 for h, hostspec in self._inventory.items():
138 if not label or label in hostspec.get('labels', []):
139 if as_hostspec:
140 yield self.spec_from_dict(hostspec)
141 else:
142 yield h
143
144 def spec_from_dict(self, info: dict) -> HostSpec:
145 hostname = info['hostname']
146 return HostSpec(
147 hostname,
148 addr=info.get('addr', hostname),
149 labels=info.get('labels', []),
150 status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''),
151 )
152
153 def all_specs(self) -> List[HostSpec]:
154 return list(map(self.spec_from_dict, self._inventory.values()))
155
156 def get_host_with_state(self, state: str = "") -> List[str]:
157 """return a list of host names in a specific state"""
158 return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state]
159
160 def save(self) -> None:
161 self.mgr.set_store('inventory', json.dumps(self._inventory))
162
163
164 class SpecDescription(NamedTuple):
165 spec: ServiceSpec
166 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]]
167 created: datetime.datetime
168 deleted: Optional[datetime.datetime]
169
170
171 class SpecStore():
172 def __init__(self, mgr):
173 # type: (CephadmOrchestrator) -> None
174 self.mgr = mgr
175 self._specs = {} # type: Dict[str, ServiceSpec]
176 # service_name -> rank -> gen -> daemon_id
177 self._rank_maps = {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]]
178 self.spec_created = {} # type: Dict[str, datetime.datetime]
179 self.spec_deleted = {} # type: Dict[str, datetime.datetime]
180 self.spec_preview = {} # type: Dict[str, ServiceSpec]
181
182 @property
183 def all_specs(self) -> Mapping[str, ServiceSpec]:
184 """
185 returns active and deleted specs. Returns read-only dict.
186 """
187 return self._specs
188
189 def __contains__(self, name: str) -> bool:
190 return name in self._specs
191
192 def __getitem__(self, name: str) -> SpecDescription:
193 if name not in self._specs:
194 raise OrchestratorError(f'Service {name} not found.')
195 return SpecDescription(self._specs[name],
196 self._rank_maps.get(name),
197 self.spec_created[name],
198 self.spec_deleted.get(name, None))
199
200 @property
201 def active_specs(self) -> Mapping[str, ServiceSpec]:
202 return {k: v for k, v in self._specs.items() if k not in self.spec_deleted}
203
204 def load(self):
205 # type: () -> None
206 for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items():
207 service_name = k[len(SPEC_STORE_PREFIX):]
208 try:
209 j = cast(Dict[str, dict], json.loads(v))
210 spec = ServiceSpec.from_json(j['spec'])
211 created = str_to_datetime(cast(str, j['created']))
212 self._specs[service_name] = spec
213 self.spec_created[service_name] = created
214
215 if 'deleted' in j:
216 deleted = str_to_datetime(cast(str, j['deleted']))
217 self.spec_deleted[service_name] = deleted
218
219 if 'rank_map' in j and isinstance(j['rank_map'], dict):
220 self._rank_maps[service_name] = {}
221 for rank_str, m in j['rank_map'].items():
222 try:
223 rank = int(rank_str)
224 except ValueError:
225 logger.exception(f"failed to parse rank in {j['rank_map']}")
226 continue
227 if isinstance(m, dict):
228 self._rank_maps[service_name][rank] = {}
229 for gen_str, name in m.items():
230 try:
231 gen = int(gen_str)
232 except ValueError:
233 logger.exception(f"failed to parse gen in {j['rank_map']}")
234 continue
235 if isinstance(name, str) or m is None:
236 self._rank_maps[service_name][rank][gen] = name
237
238 self.mgr.log.debug('SpecStore: loaded spec for %s' % (
239 service_name))
240 except Exception as e:
241 self.mgr.log.warning('unable to load spec for %s: %s' % (
242 service_name, e))
243 pass
244
245 def save(
246 self,
247 spec: ServiceSpec,
248 update_create: bool = True,
249 ) -> None:
250 name = spec.service_name()
251 if spec.preview_only:
252 self.spec_preview[name] = spec
253 return None
254 self._specs[name] = spec
255
256 if update_create:
257 self.spec_created[name] = datetime_now()
258 self._save(name)
259
260 def save_rank_map(self,
261 name: str,
262 rank_map: Dict[int, Dict[int, Optional[str]]]) -> None:
263 self._rank_maps[name] = rank_map
264 self._save(name)
265
266 def _save(self, name: str) -> None:
267 data: Dict[str, Any] = {
268 'spec': self._specs[name].to_json(),
269 'created': datetime_to_str(self.spec_created[name]),
270 }
271 if name in self._rank_maps:
272 data['rank_map'] = self._rank_maps[name]
273 if name in self.spec_deleted:
274 data['deleted'] = datetime_to_str(self.spec_deleted[name])
275
276 self.mgr.set_store(
277 SPEC_STORE_PREFIX + name,
278 json.dumps(data, sort_keys=True),
279 )
280 self.mgr.events.for_service(self._specs[name],
281 OrchestratorEvent.INFO,
282 'service was created')
283
284 def rm(self, service_name: str) -> bool:
285 if service_name not in self._specs:
286 return False
287
288 if self._specs[service_name].preview_only:
289 self.finally_rm(service_name)
290 return True
291
292 self.spec_deleted[service_name] = datetime_now()
293 self.save(self._specs[service_name], update_create=False)
294 return True
295
296 def finally_rm(self, service_name):
297 # type: (str) -> bool
298 found = service_name in self._specs
299 if found:
300 del self._specs[service_name]
301 if service_name in self._rank_maps:
302 del self._rank_maps[service_name]
303 del self.spec_created[service_name]
304 if service_name in self.spec_deleted:
305 del self.spec_deleted[service_name]
306 self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
307 return found
308
309 def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
310 return self.spec_created.get(spec.service_name())
311
312
313 class ClientKeyringSpec(object):
314 """
315 A client keyring file that we should maintain
316 """
317
318 def __init__(
319 self,
320 entity: str,
321 placement: PlacementSpec,
322 mode: Optional[int] = None,
323 uid: Optional[int] = None,
324 gid: Optional[int] = None,
325 ) -> None:
326 self.entity = entity
327 self.placement = placement
328 self.mode = mode or 0o600
329 self.uid = uid or 0
330 self.gid = gid or 0
331
332 def validate(self) -> None:
333 pass
334
335 def to_json(self) -> Dict[str, Any]:
336 return {
337 'entity': self.entity,
338 'placement': self.placement.to_json(),
339 'mode': self.mode,
340 'uid': self.uid,
341 'gid': self.gid,
342 }
343
344 @property
345 def path(self) -> str:
346 return f'/etc/ceph/ceph.{self.entity}.keyring'
347
348 @classmethod
349 def from_json(cls: Type, data: dict) -> 'ClientKeyringSpec':
350 c = data.copy()
351 if 'placement' in c:
352 c['placement'] = PlacementSpec.from_json(c['placement'])
353 _cls = cls(**c)
354 _cls.validate()
355 return _cls
356
357
358 class ClientKeyringStore():
359 """
360 Track client keyring files that we are supposed to maintain
361 """
362
363 def __init__(self, mgr):
364 # type: (CephadmOrchestrator) -> None
365 self.mgr: CephadmOrchestrator = mgr
366 self.mgr = mgr
367 self.keys: Dict[str, ClientKeyringSpec] = {}
368
369 def load(self) -> None:
370 c = self.mgr.get_store('client_keyrings') or b'{}'
371 j = json.loads(c)
372 for e, d in j.items():
373 self.keys[e] = ClientKeyringSpec.from_json(d)
374
375 def save(self) -> None:
376 data = {
377 k: v.to_json() for k, v in self.keys.items()
378 }
379 self.mgr.set_store('client_keyrings', json.dumps(data))
380
381 def update(self, ks: ClientKeyringSpec) -> None:
382 self.keys[ks.entity] = ks
383 self.save()
384
385 def rm(self, entity: str) -> None:
386 if entity in self.keys:
387 del self.keys[entity]
388 self.save()
389
390
391 class HostCache():
392 """
393 HostCache stores different things:
394
395 1. `daemons`: Deployed daemons O(daemons)
396
397 They're part of the configuration nowadays and need to be
398 persistent. The name "daemon cache" is unfortunately a bit misleading.
399 Like for example we really need to know where daemons are deployed on
400 hosts that are offline.
401
402 2. `devices`: ceph-volume inventory cache O(hosts)
403
404 As soon as this is populated, it becomes more or less read-only.
405
406 3. `networks`: network interfaces for each host. O(hosts)
407
408 This is needed in order to deploy MONs. As this is mostly read-only.
409
410 4. `last_client_files` O(hosts)
411
412 Stores the last digest and owner/mode for files we've pushed to /etc/ceph
413 (ceph.conf or client keyrings).
414
415 5. `scheduled_daemon_actions`: O(daemons)
416
417 Used to run daemon actions after deploying a daemon. We need to
418 store it persistently, in order to stay consistent across
419 MGR failovers.
420 """
421
422 def __init__(self, mgr):
423 # type: (CephadmOrchestrator) -> None
424 self.mgr: CephadmOrchestrator = mgr
425 self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
426 self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
427 self.devices = {} # type: Dict[str, List[inventory.Device]]
428 self.facts = {} # type: Dict[str, Dict[str, Any]]
429 self.last_facts_update = {} # type: Dict[str, datetime.datetime]
430 self.last_autotune = {} # type: Dict[str, datetime.datetime]
431 self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
432 self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]]
433 self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
434 self.last_device_update = {} # type: Dict[str, datetime.datetime]
435 self.last_device_change = {} # type: Dict[str, datetime.datetime]
436 self.daemon_refresh_queue = [] # type: List[str]
437 self.device_refresh_queue = [] # type: List[str]
438 self.osdspec_previews_refresh_queue = [] # type: List[str]
439
440 # host -> daemon name -> dict
441 self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
442 self.last_host_check = {} # type: Dict[str, datetime.datetime]
443 self.loading_osdspec_preview = set() # type: Set[str]
444 self.last_client_files: Dict[str, Dict[str, Tuple[str, int, int, int]]] = {}
445 self.registry_login_queue: Set[str] = set()
446
447 self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
448
449 def load(self):
450 # type: () -> None
451 for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
452 host = k[len(HOST_CACHE_PREFIX):]
453 if host not in self.mgr.inventory:
454 self.mgr.log.warning('removing stray HostCache host record %s' % (
455 host))
456 self.mgr.set_store(k, None)
457 try:
458 j = json.loads(v)
459 if 'last_device_update' in j:
460 self.last_device_update[host] = str_to_datetime(j['last_device_update'])
461 else:
462 self.device_refresh_queue.append(host)
463 if 'last_device_change' in j:
464 self.last_device_change[host] = str_to_datetime(j['last_device_change'])
465 # for services, we ignore the persisted last_*_update
466 # and always trigger a new scrape on mgr restart.
467 self.daemon_refresh_queue.append(host)
468 self.daemons[host] = {}
469 self.osdspec_previews[host] = []
470 self.osdspec_last_applied[host] = {}
471 self.devices[host] = []
472 self.networks[host] = {}
473 self.daemon_config_deps[host] = {}
474 for name, d in j.get('daemons', {}).items():
475 self.daemons[host][name] = \
476 orchestrator.DaemonDescription.from_json(d)
477 for d in j.get('devices', []):
478 self.devices[host].append(inventory.Device.from_json(d))
479 self.networks[host] = j.get('networks_and_interfaces', {})
480 self.osdspec_previews[host] = j.get('osdspec_previews', {})
481 self.last_client_files[host] = j.get('last_client_files', {})
482 for name, ts in j.get('osdspec_last_applied', {}).items():
483 self.osdspec_last_applied[host][name] = str_to_datetime(ts)
484
485 for name, d in j.get('daemon_config_deps', {}).items():
486 self.daemon_config_deps[host][name] = {
487 'deps': d.get('deps', []),
488 'last_config': str_to_datetime(d['last_config']),
489 }
490 if 'last_host_check' in j:
491 self.last_host_check[host] = str_to_datetime(j['last_host_check'])
492 self.registry_login_queue.add(host)
493 self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
494
495 self.mgr.log.debug(
496 'HostCache.load: host %s has %d daemons, '
497 '%d devices, %d networks' % (
498 host, len(self.daemons[host]), len(self.devices[host]),
499 len(self.networks[host])))
500 except Exception as e:
501 self.mgr.log.warning('unable to load cached state for %s: %s' % (
502 host, e))
503 pass
504
505 def update_host_daemons(self, host, dm):
506 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
507 self.daemons[host] = dm
508 self.last_daemon_update[host] = datetime_now()
509
510 def update_host_facts(self, host, facts):
511 # type: (str, Dict[str, Dict[str, Any]]) -> None
512 self.facts[host] = facts
513 self.last_facts_update[host] = datetime_now()
514
515 def update_autotune(self, host: str) -> None:
516 self.last_autotune[host] = datetime_now()
517
518 def invalidate_autotune(self, host: str) -> None:
519 if host in self.last_autotune:
520 del self.last_autotune[host]
521
522 def devices_changed(self, host: str, b: List[inventory.Device]) -> bool:
523 a = self.devices[host]
524 if len(a) != len(b):
525 return True
526 aj = {d.path: d.to_json() for d in a}
527 bj = {d.path: d.to_json() for d in b}
528 if aj != bj:
529 self.mgr.log.info("Detected new or changed devices on %s" % host)
530 return True
531 return False
532
533 def update_host_devices_networks(
534 self,
535 host: str,
536 dls: List[inventory.Device],
537 nets: Dict[str, Dict[str, List[str]]]
538 ) -> None:
539 if (
540 host not in self.devices
541 or host not in self.last_device_change
542 or self.devices_changed(host, dls)
543 ):
544 self.last_device_change[host] = datetime_now()
545 self.last_device_update[host] = datetime_now()
546 self.devices[host] = dls
547 self.networks[host] = nets
548
549 def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
550 self.daemon_config_deps[host][name] = {
551 'deps': deps,
552 'last_config': stamp,
553 }
554
555 def update_last_host_check(self, host):
556 # type: (str) -> None
557 self.last_host_check[host] = datetime_now()
558
559 def update_osdspec_last_applied(self, host, service_name, ts):
560 # type: (str, str, datetime.datetime) -> None
561 self.osdspec_last_applied[host][service_name] = ts
562
563 def update_client_file(self,
564 host: str,
565 path: str,
566 digest: str,
567 mode: int,
568 uid: int,
569 gid: int) -> None:
570 if host not in self.last_client_files:
571 self.last_client_files[host] = {}
572 self.last_client_files[host][path] = (digest, mode, uid, gid)
573
574 def removed_client_file(self, host: str, path: str) -> None:
575 if (
576 host in self.last_client_files
577 and path in self.last_client_files[host]
578 ):
579 del self.last_client_files[host][path]
580
581 def prime_empty_host(self, host):
582 # type: (str) -> None
583 """
584 Install an empty entry for a host
585 """
586 self.daemons[host] = {}
587 self.devices[host] = []
588 self.networks[host] = {}
589 self.osdspec_previews[host] = []
590 self.osdspec_last_applied[host] = {}
591 self.daemon_config_deps[host] = {}
592 self.daemon_refresh_queue.append(host)
593 self.device_refresh_queue.append(host)
594 self.osdspec_previews_refresh_queue.append(host)
595 self.registry_login_queue.add(host)
596 self.last_client_files[host] = {}
597
598 def invalidate_host_daemons(self, host):
599 # type: (str) -> None
600 self.daemon_refresh_queue.append(host)
601 if host in self.last_daemon_update:
602 del self.last_daemon_update[host]
603 self.mgr.event.set()
604
605 def invalidate_host_devices(self, host):
606 # type: (str) -> None
607 self.device_refresh_queue.append(host)
608 if host in self.last_device_update:
609 del self.last_device_update[host]
610 self.mgr.event.set()
611
612 def distribute_new_registry_login_info(self) -> None:
613 self.registry_login_queue = set(self.mgr.inventory.keys())
614
615 def save_host(self, host: str) -> None:
616 j: Dict[str, Any] = {
617 'daemons': {},
618 'devices': [],
619 'osdspec_previews': [],
620 'osdspec_last_applied': {},
621 'daemon_config_deps': {},
622 }
623 if host in self.last_daemon_update:
624 j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
625 if host in self.last_device_update:
626 j['last_device_update'] = datetime_to_str(self.last_device_update[host])
627 if host in self.last_device_change:
628 j['last_device_change'] = datetime_to_str(self.last_device_change[host])
629 if host in self.daemons:
630 for name, dd in self.daemons[host].items():
631 j['daemons'][name] = dd.to_json()
632 if host in self.devices:
633 for d in self.devices[host]:
634 j['devices'].append(d.to_json())
635 if host in self.networks:
636 j['networks_and_interfaces'] = self.networks[host]
637 if host in self.daemon_config_deps:
638 for name, depi in self.daemon_config_deps[host].items():
639 j['daemon_config_deps'][name] = {
640 'deps': depi.get('deps', []),
641 'last_config': datetime_to_str(depi['last_config']),
642 }
643 if host in self.osdspec_previews and self.osdspec_previews[host]:
644 j['osdspec_previews'] = self.osdspec_previews[host]
645 if host in self.osdspec_last_applied:
646 for name, ts in self.osdspec_last_applied[host].items():
647 j['osdspec_last_applied'][name] = datetime_to_str(ts)
648
649 if host in self.last_host_check:
650 j['last_host_check'] = datetime_to_str(self.last_host_check[host])
651
652 if host in self.last_client_files:
653 j['last_client_files'] = self.last_client_files[host]
654 if host in self.scheduled_daemon_actions:
655 j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
656
657 self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
658
659 def rm_host(self, host):
660 # type: (str) -> None
661 if host in self.daemons:
662 del self.daemons[host]
663 if host in self.devices:
664 del self.devices[host]
665 if host in self.facts:
666 del self.facts[host]
667 if host in self.last_facts_update:
668 del self.last_facts_update[host]
669 if host in self.last_autotune:
670 del self.last_autotune[host]
671 if host in self.osdspec_previews:
672 del self.osdspec_previews[host]
673 if host in self.osdspec_last_applied:
674 del self.osdspec_last_applied[host]
675 if host in self.loading_osdspec_preview:
676 self.loading_osdspec_preview.remove(host)
677 if host in self.networks:
678 del self.networks[host]
679 if host in self.last_daemon_update:
680 del self.last_daemon_update[host]
681 if host in self.last_device_update:
682 del self.last_device_update[host]
683 if host in self.last_device_change:
684 del self.last_device_change[host]
685 if host in self.daemon_config_deps:
686 del self.daemon_config_deps[host]
687 if host in self.scheduled_daemon_actions:
688 del self.scheduled_daemon_actions[host]
689 if host in self.last_client_files:
690 del self.last_client_files[host]
691 self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
692
693 def get_hosts(self):
694 # type: () -> List[str]
695 r = []
696 for host, di in self.daemons.items():
697 r.append(host)
698 return r
699
700 def get_facts(self, host: str) -> Dict[str, Any]:
701 return self.facts.get(host, {})
702
703 def get_daemons(self):
704 # type: () -> List[orchestrator.DaemonDescription]
705 r = []
706 for host, dm in self.daemons.items():
707 for name, dd in dm.items():
708 r.append(dd)
709 return r
710
711 def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
712 return list(self.daemons.get(host, {}).values())
713
714 def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription:
715 assert not daemon_name.startswith('ha-rgw.')
716 for _, dm in self.daemons.items():
717 for _, dd in dm.items():
718 if dd.name() == daemon_name:
719 return dd
720 raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
721
722 def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
723 def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
724 dd = copy(dd_orig)
725 if host in self.mgr.offline_hosts:
726 dd.status = orchestrator.DaemonDescriptionStatus.error
727 dd.status_desc = 'host is offline'
728 elif self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
729 # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
730 # could be wrong. We must assume maintenance is working and daemons are stopped
731 dd.status = orchestrator.DaemonDescriptionStatus.stopped
732 dd.status_desc = 'stopped'
733 dd.events = self.mgr.events.get_for_daemon(dd.name())
734 return dd
735
736 for host, dm in self.daemons.items():
737 yield host, {name: alter(host, d) for name, d in dm.items()}
738
739 def get_daemons_by_service(self, service_name):
740 # type: (str) -> List[orchestrator.DaemonDescription]
741 assert not service_name.startswith('keepalived.')
742 assert not service_name.startswith('haproxy.')
743
744 result = [] # type: List[orchestrator.DaemonDescription]
745 for host, dm in self.daemons.items():
746 for name, d in dm.items():
747 if d.service_name() == service_name:
748 result.append(d)
749 return result
750
751 def get_daemons_by_type(self, service_type):
752 # type: (str) -> List[orchestrator.DaemonDescription]
753 assert service_type not in ['keepalived', 'haproxy']
754
755 result = [] # type: List[orchestrator.DaemonDescription]
756 for host, dm in self.daemons.items():
757 for name, d in dm.items():
758 if d.daemon_type in service_to_daemon_types(service_type):
759 result.append(d)
760 return result
761
762 def get_daemon_types(self, hostname: str) -> List[str]:
763 """Provide a list of the types of daemons on the host"""
764 result = set()
765 for _d, dm in self.daemons[hostname].items():
766 assert dm.daemon_type is not None, f'no daemon type for {dm!r}'
767 result.add(dm.daemon_type)
768 return list(result)
769
770 def get_daemon_names(self):
771 # type: () -> List[str]
772 r = []
773 for host, dm in self.daemons.items():
774 for name, dd in dm.items():
775 r.append(name)
776 return r
777
778 def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
779 if host in self.daemon_config_deps:
780 if name in self.daemon_config_deps[host]:
781 return self.daemon_config_deps[host][name].get('deps', []), \
782 self.daemon_config_deps[host][name].get('last_config', None)
783 return None, None
784
785 def get_host_client_files(self, host: str) -> Dict[str, Tuple[str, int, int, int]]:
786 return self.last_client_files.get(host, {})
787
788 def host_needs_daemon_refresh(self, host):
789 # type: (str) -> bool
790 if host in self.mgr.offline_hosts:
791 logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
792 return False
793 if host in self.daemon_refresh_queue:
794 self.daemon_refresh_queue.remove(host)
795 return True
796 cutoff = datetime_now() - datetime.timedelta(
797 seconds=self.mgr.daemon_cache_timeout)
798 if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
799 return True
800 return False
801
802 def host_needs_facts_refresh(self, host):
803 # type: (str) -> bool
804 if host in self.mgr.offline_hosts:
805 logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh')
806 return False
807 cutoff = datetime_now() - datetime.timedelta(
808 seconds=self.mgr.facts_cache_timeout)
809 if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
810 return True
811 return False
812
813 def host_needs_autotune_memory(self, host):
814 # type: (str) -> bool
815 if host in self.mgr.offline_hosts:
816 logger.debug(f'Host "{host}" marked as offline. Skipping autotune')
817 return False
818 cutoff = datetime_now() - datetime.timedelta(
819 seconds=self.mgr.autotune_interval)
820 if host not in self.last_autotune or self.last_autotune[host] < cutoff:
821 return True
822 return False
823
824 def host_had_daemon_refresh(self, host: str) -> bool:
825 """
826 ... at least once.
827 """
828 if host in self.last_daemon_update:
829 return True
830 if host not in self.daemons:
831 return False
832 return bool(self.daemons[host])
833
834 def host_needs_device_refresh(self, host):
835 # type: (str) -> bool
836 if host in self.mgr.offline_hosts:
837 logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
838 return False
839 if host in self.device_refresh_queue:
840 self.device_refresh_queue.remove(host)
841 return True
842 cutoff = datetime_now() - datetime.timedelta(
843 seconds=self.mgr.device_cache_timeout)
844 if host not in self.last_device_update or self.last_device_update[host] < cutoff:
845 return True
846 return False
847
848 def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
849 if host in self.mgr.offline_hosts:
850 logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
851 return False
852 if host in self.osdspec_previews_refresh_queue:
853 self.osdspec_previews_refresh_queue.remove(host)
854 return True
855 # Since this is dependent on other factors (device and spec) this does not need
856 # to be updated periodically.
857 return False
858
859 def host_needs_check(self, host):
860 # type: (str) -> bool
861 cutoff = datetime_now() - datetime.timedelta(
862 seconds=self.mgr.host_check_interval)
863 return host not in self.last_host_check or self.last_host_check[host] < cutoff
864
865 def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool:
866 if (
867 host not in self.devices
868 or host not in self.last_device_change
869 or host not in self.last_device_update
870 or host not in self.osdspec_last_applied
871 or spec.service_name() not in self.osdspec_last_applied[host]
872 ):
873 return True
874 created = self.mgr.spec_store.get_created(spec)
875 if not created or created > self.last_device_change[host]:
876 return True
877 return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host]
878
879 def host_needs_registry_login(self, host: str) -> bool:
880 if host in self.mgr.offline_hosts:
881 return False
882 if host in self.registry_login_queue:
883 self.registry_login_queue.remove(host)
884 return True
885 return False
886
887 def add_daemon(self, host, dd):
888 # type: (str, orchestrator.DaemonDescription) -> None
889 assert host in self.daemons
890 self.daemons[host][dd.name()] = dd
891
892 def rm_daemon(self, host: str, name: str) -> None:
893 assert not name.startswith('ha-rgw.')
894
895 if host in self.daemons:
896 if name in self.daemons[host]:
897 del self.daemons[host][name]
898
899 def daemon_cache_filled(self) -> bool:
900 """
901 i.e. we have checked the daemons for each hosts at least once.
902 excluding offline hosts.
903
904 We're not checking for `host_needs_daemon_refresh`, as this might never be
905 False for all hosts.
906 """
907 return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
908 for h in self.get_hosts())
909
910 def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
911 assert not daemon_name.startswith('ha-rgw.')
912
913 priorities = {
914 'start': 1,
915 'restart': 2,
916 'reconfig': 3,
917 'redeploy': 4,
918 'stop': 5,
919 }
920 existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None)
921 if existing_action and priorities[existing_action] > priorities[action]:
922 logger.debug(
923 f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
924 return
925
926 if host not in self.scheduled_daemon_actions:
927 self.scheduled_daemon_actions[host] = {}
928 self.scheduled_daemon_actions[host][daemon_name] = action
929
930 def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> None:
931 if host in self.scheduled_daemon_actions:
932 if daemon_name in self.scheduled_daemon_actions[host]:
933 del self.scheduled_daemon_actions[host][daemon_name]
934 if not self.scheduled_daemon_actions[host]:
935 del self.scheduled_daemon_actions[host]
936
937 def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
938 assert not daemon.startswith('ha-rgw.')
939
940 return self.scheduled_daemon_actions.get(host, {}).get(daemon)
941
942
943 class EventStore():
944 def __init__(self, mgr):
945 # type: (CephadmOrchestrator) -> None
946 self.mgr: CephadmOrchestrator = mgr
947 self.events = {} # type: Dict[str, List[OrchestratorEvent]]
948
949 def add(self, event: OrchestratorEvent) -> None:
950
951 if event.kind_subject() not in self.events:
952 self.events[event.kind_subject()] = [event]
953
954 for e in self.events[event.kind_subject()]:
955 if e.message == event.message:
956 return
957
958 self.events[event.kind_subject()].append(event)
959
960 # limit to five events for now.
961 self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
962
963 def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
964 e = OrchestratorEvent(datetime_now(), 'service',
965 spec.service_name(), level, message)
966 self.add(e)
967
968 def from_orch_error(self, e: OrchestratorError) -> None:
969 if e.event_subject is not None:
970 self.add(OrchestratorEvent(
971 datetime_now(),
972 e.event_subject[0],
973 e.event_subject[1],
974 "ERROR",
975 str(e)
976 ))
977
978 def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
979 e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
980 self.add(e)
981
982 def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
983 self.for_daemon(
984 daemon_name,
985 "ERROR",
986 str(e)
987 )
988
989 def cleanup(self) -> None:
990 # Needs to be properly done, in case events are persistently stored.
991
992 unknowns: List[str] = []
993 daemons = self.mgr.cache.get_daemon_names()
994 specs = self.mgr.spec_store.all_specs.keys()
995 for k_s, v in self.events.items():
996 kind, subject = k_s.split(':')
997 if kind == 'service':
998 if subject not in specs:
999 unknowns.append(k_s)
1000 elif kind == 'daemon':
1001 if subject not in daemons:
1002 unknowns.append(k_s)
1003
1004 for k_s in unknowns:
1005 del self.events[k_s]
1006
1007 def get_for_service(self, name: str) -> List[OrchestratorEvent]:
1008 return self.events.get('service:' + name, [])
1009
1010 def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
1011 return self.events.get('daemon:' + name, [])