]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/inventory.py
check in ceph 17.2.3 sources
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
1 import datetime
2 from copy import copy
3 import ipaddress
4 import json
5 import logging
6 import socket
7 from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
8 NamedTuple, Type
9
10 import orchestrator
11 from ceph.deployment import inventory
12 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
13 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
14 from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
15 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
16
17 from .utils import resolve_ip
18 from .migrations import queue_migrate_nfs_spec
19
20 if TYPE_CHECKING:
21 from .module import CephadmOrchestrator
22
23
24 logger = logging.getLogger(__name__)
25
26 HOST_CACHE_PREFIX = "host."
27 SPEC_STORE_PREFIX = "spec."
28 AGENT_CACHE_PREFIX = 'agent.'
29
30
31 class Inventory:
32 """
33 The inventory stores a HostSpec for all hosts persistently.
34 """
35
36 def __init__(self, mgr: 'CephadmOrchestrator'):
37 self.mgr = mgr
38 adjusted_addrs = False
39
40 def is_valid_ip(ip: str) -> bool:
41 try:
42 ipaddress.ip_address(ip)
43 return True
44 except ValueError:
45 return False
46
47 # load inventory
48 i = self.mgr.get_store('inventory')
49 if i:
50 self._inventory: Dict[str, dict] = json.loads(i)
51 # handle old clusters missing 'hostname' key from hostspec
52 for k, v in self._inventory.items():
53 if 'hostname' not in v:
54 v['hostname'] = k
55
56 # convert legacy non-IP addr?
57 if is_valid_ip(str(v.get('addr'))):
58 continue
59 if len(self._inventory) > 1:
60 if k == socket.gethostname():
61 # Never try to resolve our own host! This is
62 # fraught and can lead to either a loopback
63 # address (due to podman's futzing with
64 # /etc/hosts) or a private IP based on the CNI
65 # configuration. Instead, wait until the mgr
66 # fails over to another host and let them resolve
67 # this host.
68 continue
69 ip = resolve_ip(cast(str, v.get('addr')))
70 else:
71 # we only have 1 node in the cluster, so we can't
72 # rely on another host doing the lookup. use the
73 # IP the mgr binds to.
74 ip = self.mgr.get_mgr_ip()
75 if is_valid_ip(ip) and not ip.startswith('127.0.'):
76 self.mgr.log.info(
77 f"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'"
78 )
79 v['addr'] = ip
80 adjusted_addrs = True
81 if adjusted_addrs:
82 self.save()
83 else:
84 self._inventory = dict()
85 logger.debug('Loaded inventory %s' % self._inventory)
86
87 def keys(self) -> List[str]:
88 return list(self._inventory.keys())
89
90 def __contains__(self, host: str) -> bool:
91 return host in self._inventory
92
93 def assert_host(self, host: str) -> None:
94 if host not in self._inventory:
95 raise OrchestratorError('host %s does not exist' % host)
96
97 def add_host(self, spec: HostSpec) -> None:
98 if spec.hostname in self._inventory:
99 # addr
100 if self.get_addr(spec.hostname) != spec.addr:
101 self.set_addr(spec.hostname, spec.addr)
102 # labels
103 for label in spec.labels:
104 self.add_label(spec.hostname, label)
105 else:
106 self._inventory[spec.hostname] = spec.to_json()
107 self.save()
108
109 def rm_host(self, host: str) -> None:
110 self.assert_host(host)
111 del self._inventory[host]
112 self.save()
113
114 def set_addr(self, host: str, addr: str) -> None:
115 self.assert_host(host)
116 self._inventory[host]['addr'] = addr
117 self.save()
118
119 def add_label(self, host: str, label: str) -> None:
120 self.assert_host(host)
121
122 if 'labels' not in self._inventory[host]:
123 self._inventory[host]['labels'] = list()
124 if label not in self._inventory[host]['labels']:
125 self._inventory[host]['labels'].append(label)
126 self.save()
127
128 def rm_label(self, host: str, label: str) -> None:
129 self.assert_host(host)
130
131 if 'labels' not in self._inventory[host]:
132 self._inventory[host]['labels'] = list()
133 if label in self._inventory[host]['labels']:
134 self._inventory[host]['labels'].remove(label)
135 self.save()
136
137 def has_label(self, host: str, label: str) -> bool:
138 return (
139 host in self._inventory
140 and label in self._inventory[host].get('labels', [])
141 )
142
143 def get_addr(self, host: str) -> str:
144 self.assert_host(host)
145 return self._inventory[host].get('addr', host)
146
147 def spec_from_dict(self, info: dict) -> HostSpec:
148 hostname = info['hostname']
149 return HostSpec(
150 hostname,
151 addr=info.get('addr', hostname),
152 labels=info.get('labels', []),
153 status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''),
154 )
155
156 def all_specs(self) -> List[HostSpec]:
157 return list(map(self.spec_from_dict, self._inventory.values()))
158
159 def get_host_with_state(self, state: str = "") -> List[str]:
160 """return a list of host names in a specific state"""
161 return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state]
162
163 def save(self) -> None:
164 self.mgr.set_store('inventory', json.dumps(self._inventory))
165
166
167 class SpecDescription(NamedTuple):
168 spec: ServiceSpec
169 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]]
170 created: datetime.datetime
171 deleted: Optional[datetime.datetime]
172
173
174 class SpecStore():
175 def __init__(self, mgr):
176 # type: (CephadmOrchestrator) -> None
177 self.mgr = mgr
178 self._specs = {} # type: Dict[str, ServiceSpec]
179 # service_name -> rank -> gen -> daemon_id
180 self._rank_maps = {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]]
181 self.spec_created = {} # type: Dict[str, datetime.datetime]
182 self.spec_deleted = {} # type: Dict[str, datetime.datetime]
183 self.spec_preview = {} # type: Dict[str, ServiceSpec]
184
185 @property
186 def all_specs(self) -> Mapping[str, ServiceSpec]:
187 """
188 returns active and deleted specs. Returns read-only dict.
189 """
190 return self._specs
191
192 def __contains__(self, name: str) -> bool:
193 return name in self._specs
194
195 def __getitem__(self, name: str) -> SpecDescription:
196 if name not in self._specs:
197 raise OrchestratorError(f'Service {name} not found.')
198 return SpecDescription(self._specs[name],
199 self._rank_maps.get(name),
200 self.spec_created[name],
201 self.spec_deleted.get(name, None))
202
203 @property
204 def active_specs(self) -> Mapping[str, ServiceSpec]:
205 return {k: v for k, v in self._specs.items() if k not in self.spec_deleted}
206
207 def load(self):
208 # type: () -> None
209 for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items():
210 service_name = k[len(SPEC_STORE_PREFIX):]
211 try:
212 j = cast(Dict[str, dict], json.loads(v))
213 if (
214 (self.mgr.migration_current or 0) < 3
215 and j['spec'].get('service_type') == 'nfs'
216 ):
217 self.mgr.log.debug(f'found legacy nfs spec {j}')
218 queue_migrate_nfs_spec(self.mgr, j)
219 spec = ServiceSpec.from_json(j['spec'])
220 created = str_to_datetime(cast(str, j['created']))
221 self._specs[service_name] = spec
222 self.spec_created[service_name] = created
223
224 if 'deleted' in j:
225 deleted = str_to_datetime(cast(str, j['deleted']))
226 self.spec_deleted[service_name] = deleted
227
228 if 'rank_map' in j and isinstance(j['rank_map'], dict):
229 self._rank_maps[service_name] = {}
230 for rank_str, m in j['rank_map'].items():
231 try:
232 rank = int(rank_str)
233 except ValueError:
234 logger.exception(f"failed to parse rank in {j['rank_map']}")
235 continue
236 if isinstance(m, dict):
237 self._rank_maps[service_name][rank] = {}
238 for gen_str, name in m.items():
239 try:
240 gen = int(gen_str)
241 except ValueError:
242 logger.exception(f"failed to parse gen in {j['rank_map']}")
243 continue
244 if isinstance(name, str) or m is None:
245 self._rank_maps[service_name][rank][gen] = name
246
247 self.mgr.log.debug('SpecStore: loaded spec for %s' % (
248 service_name))
249 except Exception as e:
250 self.mgr.log.warning('unable to load spec for %s: %s' % (
251 service_name, e))
252 pass
253
254 def save(
255 self,
256 spec: ServiceSpec,
257 update_create: bool = True,
258 ) -> None:
259 name = spec.service_name()
260 if spec.preview_only:
261 self.spec_preview[name] = spec
262 return None
263 self._specs[name] = spec
264
265 if update_create:
266 self.spec_created[name] = datetime_now()
267 self._save(name)
268
269 def save_rank_map(self,
270 name: str,
271 rank_map: Dict[int, Dict[int, Optional[str]]]) -> None:
272 self._rank_maps[name] = rank_map
273 self._save(name)
274
275 def _save(self, name: str) -> None:
276 data: Dict[str, Any] = {
277 'spec': self._specs[name].to_json(),
278 'created': datetime_to_str(self.spec_created[name]),
279 }
280 if name in self._rank_maps:
281 data['rank_map'] = self._rank_maps[name]
282 if name in self.spec_deleted:
283 data['deleted'] = datetime_to_str(self.spec_deleted[name])
284
285 self.mgr.set_store(
286 SPEC_STORE_PREFIX + name,
287 json.dumps(data, sort_keys=True),
288 )
289 self.mgr.events.for_service(self._specs[name],
290 OrchestratorEvent.INFO,
291 'service was created')
292
293 def rm(self, service_name: str) -> bool:
294 if service_name not in self._specs:
295 return False
296
297 if self._specs[service_name].preview_only:
298 self.finally_rm(service_name)
299 return True
300
301 self.spec_deleted[service_name] = datetime_now()
302 self.save(self._specs[service_name], update_create=False)
303 return True
304
305 def finally_rm(self, service_name):
306 # type: (str) -> bool
307 found = service_name in self._specs
308 if found:
309 del self._specs[service_name]
310 if service_name in self._rank_maps:
311 del self._rank_maps[service_name]
312 del self.spec_created[service_name]
313 if service_name in self.spec_deleted:
314 del self.spec_deleted[service_name]
315 self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
316 return found
317
318 def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
319 return self.spec_created.get(spec.service_name())
320
321
322 class ClientKeyringSpec(object):
323 """
324 A client keyring file that we should maintain
325 """
326
327 def __init__(
328 self,
329 entity: str,
330 placement: PlacementSpec,
331 mode: Optional[int] = None,
332 uid: Optional[int] = None,
333 gid: Optional[int] = None,
334 ) -> None:
335 self.entity = entity
336 self.placement = placement
337 self.mode = mode or 0o600
338 self.uid = uid or 0
339 self.gid = gid or 0
340
341 def validate(self) -> None:
342 pass
343
344 def to_json(self) -> Dict[str, Any]:
345 return {
346 'entity': self.entity,
347 'placement': self.placement.to_json(),
348 'mode': self.mode,
349 'uid': self.uid,
350 'gid': self.gid,
351 }
352
353 @property
354 def path(self) -> str:
355 return f'/etc/ceph/ceph.{self.entity}.keyring'
356
357 @classmethod
358 def from_json(cls: Type, data: dict) -> 'ClientKeyringSpec':
359 c = data.copy()
360 if 'placement' in c:
361 c['placement'] = PlacementSpec.from_json(c['placement'])
362 _cls = cls(**c)
363 _cls.validate()
364 return _cls
365
366
367 class ClientKeyringStore():
368 """
369 Track client keyring files that we are supposed to maintain
370 """
371
372 def __init__(self, mgr):
373 # type: (CephadmOrchestrator) -> None
374 self.mgr: CephadmOrchestrator = mgr
375 self.mgr = mgr
376 self.keys: Dict[str, ClientKeyringSpec] = {}
377
378 def load(self) -> None:
379 c = self.mgr.get_store('client_keyrings') or b'{}'
380 j = json.loads(c)
381 for e, d in j.items():
382 self.keys[e] = ClientKeyringSpec.from_json(d)
383
384 def save(self) -> None:
385 data = {
386 k: v.to_json() for k, v in self.keys.items()
387 }
388 self.mgr.set_store('client_keyrings', json.dumps(data))
389
390 def update(self, ks: ClientKeyringSpec) -> None:
391 self.keys[ks.entity] = ks
392 self.save()
393
394 def rm(self, entity: str) -> None:
395 if entity in self.keys:
396 del self.keys[entity]
397 self.save()
398
399
400 class HostCache():
401 """
402 HostCache stores different things:
403
404 1. `daemons`: Deployed daemons O(daemons)
405
406 They're part of the configuration nowadays and need to be
407 persistent. The name "daemon cache" is unfortunately a bit misleading.
408 Like for example we really need to know where daemons are deployed on
409 hosts that are offline.
410
411 2. `devices`: ceph-volume inventory cache O(hosts)
412
413 As soon as this is populated, it becomes more or less read-only.
414
415 3. `networks`: network interfaces for each host. O(hosts)
416
417 This is needed in order to deploy MONs. As this is mostly read-only.
418
419 4. `last_client_files` O(hosts)
420
421 Stores the last digest and owner/mode for files we've pushed to /etc/ceph
422 (ceph.conf or client keyrings).
423
424 5. `scheduled_daemon_actions`: O(daemons)
425
426 Used to run daemon actions after deploying a daemon. We need to
427 store it persistently, in order to stay consistent across
428 MGR failovers.
429 """
430
431 def __init__(self, mgr):
432 # type: (CephadmOrchestrator) -> None
433 self.mgr: CephadmOrchestrator = mgr
434 self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
435 self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
436 self.devices = {} # type: Dict[str, List[inventory.Device]]
437 self.facts = {} # type: Dict[str, Dict[str, Any]]
438 self.last_facts_update = {} # type: Dict[str, datetime.datetime]
439 self.last_autotune = {} # type: Dict[str, datetime.datetime]
440 self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
441 self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]]
442 self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
443 self.last_network_update = {} # type: Dict[str, datetime.datetime]
444 self.last_device_update = {} # type: Dict[str, datetime.datetime]
445 self.last_device_change = {} # type: Dict[str, datetime.datetime]
446 self.daemon_refresh_queue = [] # type: List[str]
447 self.device_refresh_queue = [] # type: List[str]
448 self.network_refresh_queue = [] # type: List[str]
449 self.osdspec_previews_refresh_queue = [] # type: List[str]
450
451 # host -> daemon name -> dict
452 self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
453 self.last_host_check = {} # type: Dict[str, datetime.datetime]
454 self.loading_osdspec_preview = set() # type: Set[str]
455 self.last_client_files: Dict[str, Dict[str, Tuple[str, int, int, int]]] = {}
456 self.registry_login_queue: Set[str] = set()
457
458 self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
459
460 self.metadata_up_to_date = {} # type: Dict[str, bool]
461
462 def load(self):
463 # type: () -> None
464 for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
465 host = k[len(HOST_CACHE_PREFIX):]
466 if host not in self.mgr.inventory:
467 self.mgr.log.warning('removing stray HostCache host record %s' % (
468 host))
469 self.mgr.set_store(k, None)
470 try:
471 j = json.loads(v)
472 if 'last_device_update' in j:
473 self.last_device_update[host] = str_to_datetime(j['last_device_update'])
474 else:
475 self.device_refresh_queue.append(host)
476 if 'last_device_change' in j:
477 self.last_device_change[host] = str_to_datetime(j['last_device_change'])
478 # for services, we ignore the persisted last_*_update
479 # and always trigger a new scrape on mgr restart.
480 self.daemon_refresh_queue.append(host)
481 self.network_refresh_queue.append(host)
482 self.daemons[host] = {}
483 self.osdspec_previews[host] = []
484 self.osdspec_last_applied[host] = {}
485 self.devices[host] = []
486 self.networks[host] = {}
487 self.daemon_config_deps[host] = {}
488 for name, d in j.get('daemons', {}).items():
489 self.daemons[host][name] = \
490 orchestrator.DaemonDescription.from_json(d)
491 for d in j.get('devices', []):
492 self.devices[host].append(inventory.Device.from_json(d))
493 self.networks[host] = j.get('networks_and_interfaces', {})
494 self.osdspec_previews[host] = j.get('osdspec_previews', {})
495 self.last_client_files[host] = j.get('last_client_files', {})
496 for name, ts in j.get('osdspec_last_applied', {}).items():
497 self.osdspec_last_applied[host][name] = str_to_datetime(ts)
498
499 for name, d in j.get('daemon_config_deps', {}).items():
500 self.daemon_config_deps[host][name] = {
501 'deps': d.get('deps', []),
502 'last_config': str_to_datetime(d['last_config']),
503 }
504 if 'last_host_check' in j:
505 self.last_host_check[host] = str_to_datetime(j['last_host_check'])
506 self.registry_login_queue.add(host)
507 self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
508 self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False)
509
510 self.mgr.log.debug(
511 'HostCache.load: host %s has %d daemons, '
512 '%d devices, %d networks' % (
513 host, len(self.daemons[host]), len(self.devices[host]),
514 len(self.networks[host])))
515 except Exception as e:
516 self.mgr.log.warning('unable to load cached state for %s: %s' % (
517 host, e))
518 pass
519
520 def update_host_daemons(self, host, dm):
521 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
522 self.daemons[host] = dm
523 self.last_daemon_update[host] = datetime_now()
524
525 def update_host_facts(self, host, facts):
526 # type: (str, Dict[str, Dict[str, Any]]) -> None
527 self.facts[host] = facts
528 self.last_facts_update[host] = datetime_now()
529
530 def update_autotune(self, host: str) -> None:
531 self.last_autotune[host] = datetime_now()
532
533 def invalidate_autotune(self, host: str) -> None:
534 if host in self.last_autotune:
535 del self.last_autotune[host]
536
537 def devices_changed(self, host: str, b: List[inventory.Device]) -> bool:
538 a = self.devices[host]
539 if len(a) != len(b):
540 return True
541 aj = {d.path: d.to_json() for d in a}
542 bj = {d.path: d.to_json() for d in b}
543 if aj != bj:
544 self.mgr.log.info("Detected new or changed devices on %s" % host)
545 return True
546 return False
547
548 def update_host_devices(
549 self,
550 host: str,
551 dls: List[inventory.Device],
552 ) -> None:
553 if (
554 host not in self.devices
555 or host not in self.last_device_change
556 or self.devices_changed(host, dls)
557 ):
558 self.last_device_change[host] = datetime_now()
559 self.last_device_update[host] = datetime_now()
560 self.devices[host] = dls
561
562 def update_host_networks(
563 self,
564 host: str,
565 nets: Dict[str, Dict[str, List[str]]]
566 ) -> None:
567 self.networks[host] = nets
568 self.last_network_update[host] = datetime_now()
569
570 def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
571 self.daemon_config_deps[host][name] = {
572 'deps': deps,
573 'last_config': stamp,
574 }
575
576 def update_last_host_check(self, host):
577 # type: (str) -> None
578 self.last_host_check[host] = datetime_now()
579
580 def update_osdspec_last_applied(self, host, service_name, ts):
581 # type: (str, str, datetime.datetime) -> None
582 self.osdspec_last_applied[host][service_name] = ts
583
584 def update_client_file(self,
585 host: str,
586 path: str,
587 digest: str,
588 mode: int,
589 uid: int,
590 gid: int) -> None:
591 if host not in self.last_client_files:
592 self.last_client_files[host] = {}
593 self.last_client_files[host][path] = (digest, mode, uid, gid)
594
595 def removed_client_file(self, host: str, path: str) -> None:
596 if (
597 host in self.last_client_files
598 and path in self.last_client_files[host]
599 ):
600 del self.last_client_files[host][path]
601
602 def prime_empty_host(self, host):
603 # type: (str) -> None
604 """
605 Install an empty entry for a host
606 """
607 self.daemons[host] = {}
608 self.devices[host] = []
609 self.networks[host] = {}
610 self.osdspec_previews[host] = []
611 self.osdspec_last_applied[host] = {}
612 self.daemon_config_deps[host] = {}
613 self.daemon_refresh_queue.append(host)
614 self.device_refresh_queue.append(host)
615 self.network_refresh_queue.append(host)
616 self.osdspec_previews_refresh_queue.append(host)
617 self.registry_login_queue.add(host)
618 self.last_client_files[host] = {}
619
620 def refresh_all_host_info(self, host):
621 # type: (str) -> None
622
623 self.last_host_check.pop(host, None)
624 self.daemon_refresh_queue.append(host)
625 self.registry_login_queue.add(host)
626 self.device_refresh_queue.append(host)
627 self.last_facts_update.pop(host, None)
628 self.osdspec_previews_refresh_queue.append(host)
629 self.last_autotune.pop(host, None)
630
631 def invalidate_host_daemons(self, host):
632 # type: (str) -> None
633 self.daemon_refresh_queue.append(host)
634 if host in self.last_daemon_update:
635 del self.last_daemon_update[host]
636 self.mgr.event.set()
637
638 def invalidate_host_devices(self, host):
639 # type: (str) -> None
640 self.device_refresh_queue.append(host)
641 if host in self.last_device_update:
642 del self.last_device_update[host]
643 self.mgr.event.set()
644
645 def invalidate_host_networks(self, host):
646 # type: (str) -> None
647 self.network_refresh_queue.append(host)
648 if host in self.last_network_update:
649 del self.last_network_update[host]
650 self.mgr.event.set()
651
652 def distribute_new_registry_login_info(self) -> None:
653 self.registry_login_queue = set(self.mgr.inventory.keys())
654
655 def save_host(self, host: str) -> None:
656 j: Dict[str, Any] = {
657 'daemons': {},
658 'devices': [],
659 'osdspec_previews': [],
660 'osdspec_last_applied': {},
661 'daemon_config_deps': {},
662 }
663 if host in self.last_daemon_update:
664 j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
665 if host in self.last_device_update:
666 j['last_device_update'] = datetime_to_str(self.last_device_update[host])
667 if host in self.last_network_update:
668 j['last_network_update'] = datetime_to_str(self.last_network_update[host])
669 if host in self.last_device_change:
670 j['last_device_change'] = datetime_to_str(self.last_device_change[host])
671 if host in self.daemons:
672 for name, dd in self.daemons[host].items():
673 j['daemons'][name] = dd.to_json()
674 if host in self.devices:
675 for d in self.devices[host]:
676 j['devices'].append(d.to_json())
677 if host in self.networks:
678 j['networks_and_interfaces'] = self.networks[host]
679 if host in self.daemon_config_deps:
680 for name, depi in self.daemon_config_deps[host].items():
681 j['daemon_config_deps'][name] = {
682 'deps': depi.get('deps', []),
683 'last_config': datetime_to_str(depi['last_config']),
684 }
685 if host in self.osdspec_previews and self.osdspec_previews[host]:
686 j['osdspec_previews'] = self.osdspec_previews[host]
687 if host in self.osdspec_last_applied:
688 for name, ts in self.osdspec_last_applied[host].items():
689 j['osdspec_last_applied'][name] = datetime_to_str(ts)
690
691 if host in self.last_host_check:
692 j['last_host_check'] = datetime_to_str(self.last_host_check[host])
693
694 if host in self.last_client_files:
695 j['last_client_files'] = self.last_client_files[host]
696 if host in self.scheduled_daemon_actions:
697 j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
698 if host in self.metadata_up_to_date:
699 j['metadata_up_to_date'] = self.metadata_up_to_date[host]
700
701 self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
702
703 def rm_host(self, host):
704 # type: (str) -> None
705 if host in self.daemons:
706 del self.daemons[host]
707 if host in self.devices:
708 del self.devices[host]
709 if host in self.facts:
710 del self.facts[host]
711 if host in self.last_facts_update:
712 del self.last_facts_update[host]
713 if host in self.last_autotune:
714 del self.last_autotune[host]
715 if host in self.osdspec_previews:
716 del self.osdspec_previews[host]
717 if host in self.osdspec_last_applied:
718 del self.osdspec_last_applied[host]
719 if host in self.loading_osdspec_preview:
720 self.loading_osdspec_preview.remove(host)
721 if host in self.networks:
722 del self.networks[host]
723 if host in self.last_daemon_update:
724 del self.last_daemon_update[host]
725 if host in self.last_device_update:
726 del self.last_device_update[host]
727 if host in self.last_network_update:
728 del self.last_network_update[host]
729 if host in self.last_device_change:
730 del self.last_device_change[host]
731 if host in self.daemon_config_deps:
732 del self.daemon_config_deps[host]
733 if host in self.scheduled_daemon_actions:
734 del self.scheduled_daemon_actions[host]
735 if host in self.last_client_files:
736 del self.last_client_files[host]
737 self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
738
739 def get_hosts(self):
740 # type: () -> List[str]
741 return list(self.daemons)
742
743 def get_schedulable_hosts(self) -> List[HostSpec]:
744 """
745 Returns all usable hosts that went through _refresh_host_daemons().
746
747 This mitigates a potential race, where new host was added *after*
748 ``_refresh_host_daemons()`` was called, but *before*
749 ``_apply_all_specs()`` was called. thus we end up with a hosts
750 where daemons might be running, but we have not yet detected them.
751 """
752 return [
753 h for h in self.mgr.inventory.all_specs()
754 if (
755 self.host_had_daemon_refresh(h.hostname)
756 and '_no_schedule' not in h.labels
757 )
758 ]
759
760 def get_non_draining_hosts(self) -> List[HostSpec]:
761 """
762 Returns all hosts that do not have _no_schedule label.
763
764 Useful for the agent who needs this specific list rather than the
765 schedulable_hosts since the agent needs to be deployed on hosts with
766 no daemon refresh
767 """
768 return [
769 h for h in self.mgr.inventory.all_specs() if '_no_schedule' not in h.labels
770 ]
771
772 def get_unreachable_hosts(self) -> List[HostSpec]:
773 """
774 Return all hosts that are offline or in maintenance mode.
775
776 The idea is we should not touch the daemons on these hosts (since
777 in theory the hosts are inaccessible so we CAN'T touch them) but
778 we still want to count daemons that exist on these hosts toward the
779 placement so daemons on these hosts aren't just moved elsewhere
780 """
781 return [
782 h for h in self.mgr.inventory.all_specs()
783 if (
784 h.status.lower() in ['maintenance', 'offline']
785 or h.hostname in self.mgr.offline_hosts
786 )
787 ]
788
789 def get_facts(self, host: str) -> Dict[str, Any]:
790 return self.facts.get(host, {})
791
792 def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
793 for dm in self.daemons.copy().values():
794 yield from dm.values()
795
796 def get_daemons(self):
797 # type: () -> List[orchestrator.DaemonDescription]
798 return list(self._get_daemons())
799
800 def get_error_daemons(self) -> List[orchestrator.DaemonDescription]:
801 r = []
802 for dd in self._get_daemons():
803 if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error:
804 r.append(dd)
805 return r
806
807 def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
808 return list(self.daemons.get(host, {}).values())
809
810 def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription:
811 assert not daemon_name.startswith('ha-rgw.')
812 dds = self.get_daemons_by_host(host) if host else self._get_daemons()
813 for dd in dds:
814 if dd.name() == daemon_name:
815 return dd
816
817 raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
818
819 def has_daemon(self, daemon_name: str, host: Optional[str] = None) -> bool:
820 try:
821 self.get_daemon(daemon_name, host)
822 except orchestrator.OrchestratorError:
823 return False
824 return True
825
826 def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
827 def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
828 dd = copy(dd_orig)
829 if host in self.mgr.offline_hosts:
830 dd.status = orchestrator.DaemonDescriptionStatus.error
831 dd.status_desc = 'host is offline'
832 elif self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
833 # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
834 # could be wrong. We must assume maintenance is working and daemons are stopped
835 dd.status = orchestrator.DaemonDescriptionStatus.stopped
836 dd.events = self.mgr.events.get_for_daemon(dd.name())
837 return dd
838
839 for host, dm in self.daemons.copy().items():
840 yield host, {name: alter(host, d) for name, d in dm.items()}
841
842 def get_daemons_by_service(self, service_name):
843 # type: (str) -> List[orchestrator.DaemonDescription]
844 assert not service_name.startswith('keepalived.')
845 assert not service_name.startswith('haproxy.')
846
847 return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
848
849 def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
850 assert service_type not in ['keepalived', 'haproxy']
851
852 daemons = self.daemons[host].values() if host else self._get_daemons()
853
854 return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)]
855
856 def get_daemon_types(self, hostname: str) -> Set[str]:
857 """Provide a list of the types of daemons on the host"""
858 return cast(Set[str], {d.daemon_type for d in self.daemons[hostname].values()})
859
860 def get_daemon_names(self):
861 # type: () -> List[str]
862 return [d.name() for d in self._get_daemons()]
863
864 def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
865 if host in self.daemon_config_deps:
866 if name in self.daemon_config_deps[host]:
867 return self.daemon_config_deps[host][name].get('deps', []), \
868 self.daemon_config_deps[host][name].get('last_config', None)
869 return None, None
870
871 def get_host_client_files(self, host: str) -> Dict[str, Tuple[str, int, int, int]]:
872 return self.last_client_files.get(host, {})
873
874 def host_needs_daemon_refresh(self, host):
875 # type: (str) -> bool
876 if host in self.mgr.offline_hosts:
877 logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
878 return False
879 if host in self.daemon_refresh_queue:
880 self.daemon_refresh_queue.remove(host)
881 return True
882 cutoff = datetime_now() - datetime.timedelta(
883 seconds=self.mgr.daemon_cache_timeout)
884 if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
885 return True
886 if not self.mgr.cache.host_metadata_up_to_date(host):
887 return True
888 return False
889
890 def host_needs_facts_refresh(self, host):
891 # type: (str) -> bool
892 if host in self.mgr.offline_hosts:
893 logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh')
894 return False
895 cutoff = datetime_now() - datetime.timedelta(
896 seconds=self.mgr.facts_cache_timeout)
897 if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
898 return True
899 if not self.mgr.cache.host_metadata_up_to_date(host):
900 return True
901 return False
902
903 def host_needs_autotune_memory(self, host):
904 # type: (str) -> bool
905 if host in self.mgr.offline_hosts:
906 logger.debug(f'Host "{host}" marked as offline. Skipping autotune')
907 return False
908 cutoff = datetime_now() - datetime.timedelta(
909 seconds=self.mgr.autotune_interval)
910 if host not in self.last_autotune or self.last_autotune[host] < cutoff:
911 return True
912 return False
913
914 def host_had_daemon_refresh(self, host: str) -> bool:
915 """
916 ... at least once.
917 """
918 if host in self.last_daemon_update:
919 return True
920 if host not in self.daemons:
921 return False
922 return bool(self.daemons[host])
923
924 def host_needs_device_refresh(self, host):
925 # type: (str) -> bool
926 if host in self.mgr.offline_hosts:
927 logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
928 return False
929 if host in self.device_refresh_queue:
930 self.device_refresh_queue.remove(host)
931 return True
932 cutoff = datetime_now() - datetime.timedelta(
933 seconds=self.mgr.device_cache_timeout)
934 if host not in self.last_device_update or self.last_device_update[host] < cutoff:
935 return True
936 if not self.mgr.cache.host_metadata_up_to_date(host):
937 return True
938 return False
939
940 def host_needs_network_refresh(self, host):
941 # type: (str) -> bool
942 if host in self.mgr.offline_hosts:
943 logger.debug(f'Host "{host}" marked as offline. Skipping network refresh')
944 return False
945 if host in self.network_refresh_queue:
946 self.network_refresh_queue.remove(host)
947 return True
948 cutoff = datetime_now() - datetime.timedelta(
949 seconds=self.mgr.device_cache_timeout)
950 if host not in self.last_network_update or self.last_network_update[host] < cutoff:
951 return True
952 if not self.mgr.cache.host_metadata_up_to_date(host):
953 return True
954 return False
955
956 def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
957 if host in self.mgr.offline_hosts:
958 logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
959 return False
960 if host in self.osdspec_previews_refresh_queue:
961 self.osdspec_previews_refresh_queue.remove(host)
962 return True
963 # Since this is dependent on other factors (device and spec) this does not need
964 # to be updated periodically.
965 return False
966
967 def host_needs_check(self, host):
968 # type: (str) -> bool
969 cutoff = datetime_now() - datetime.timedelta(
970 seconds=self.mgr.host_check_interval)
971 return host not in self.last_host_check or self.last_host_check[host] < cutoff
972
973 def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool:
974 if (
975 host not in self.devices
976 or host not in self.last_device_change
977 or host not in self.last_device_update
978 or host not in self.osdspec_last_applied
979 or spec.service_name() not in self.osdspec_last_applied[host]
980 ):
981 return True
982 created = self.mgr.spec_store.get_created(spec)
983 if not created or created > self.last_device_change[host]:
984 return True
985 return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host]
986
987 def host_needs_registry_login(self, host: str) -> bool:
988 if host in self.mgr.offline_hosts:
989 return False
990 if host in self.registry_login_queue:
991 self.registry_login_queue.remove(host)
992 return True
993 return False
994
995 def host_metadata_up_to_date(self, host: str) -> bool:
996 if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]:
997 return False
998 return True
999
1000 def all_host_metadata_up_to_date(self) -> bool:
1001 unreachables = [h.hostname for h in self.get_unreachable_hosts()]
1002 if [h for h in self.get_hosts() if (not self.host_metadata_up_to_date(h) and h not in unreachables)]:
1003 # this function is primarily for telling if it's safe to try and apply a service
1004 # spec. Since offline/maintenance hosts aren't considered in that process anyway
1005 # we don't want to return False if the host without up-to-date metadata is in one
1006 # of those two categories.
1007 return False
1008 return True
1009
1010 def add_daemon(self, host, dd):
1011 # type: (str, orchestrator.DaemonDescription) -> None
1012 assert host in self.daemons
1013 self.daemons[host][dd.name()] = dd
1014
1015 def rm_daemon(self, host: str, name: str) -> None:
1016 assert not name.startswith('ha-rgw.')
1017
1018 if host in self.daemons:
1019 if name in self.daemons[host]:
1020 del self.daemons[host][name]
1021
1022 def daemon_cache_filled(self) -> bool:
1023 """
1024 i.e. we have checked the daemons for each hosts at least once.
1025 excluding offline hosts.
1026
1027 We're not checking for `host_needs_daemon_refresh`, as this might never be
1028 False for all hosts.
1029 """
1030 return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
1031 for h in self.get_hosts())
1032
1033 def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
1034 assert not daemon_name.startswith('ha-rgw.')
1035
1036 priorities = {
1037 'start': 1,
1038 'restart': 2,
1039 'reconfig': 3,
1040 'redeploy': 4,
1041 'stop': 5,
1042 }
1043 existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None)
1044 if existing_action and priorities[existing_action] > priorities[action]:
1045 logger.debug(
1046 f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
1047 return
1048
1049 if host not in self.scheduled_daemon_actions:
1050 self.scheduled_daemon_actions[host] = {}
1051 self.scheduled_daemon_actions[host][daemon_name] = action
1052
1053 def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> bool:
1054 found = False
1055 if host in self.scheduled_daemon_actions:
1056 if daemon_name in self.scheduled_daemon_actions[host]:
1057 del self.scheduled_daemon_actions[host][daemon_name]
1058 found = True
1059 if not self.scheduled_daemon_actions[host]:
1060 del self.scheduled_daemon_actions[host]
1061 return found
1062
1063 def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
1064 assert not daemon.startswith('ha-rgw.')
1065
1066 return self.scheduled_daemon_actions.get(host, {}).get(daemon)
1067
1068
1069 class AgentCache():
1070 """
1071 AgentCache is used for storing metadata about agent daemons that must be kept
1072 through MGR failovers
1073 """
1074
1075 def __init__(self, mgr):
1076 # type: (CephadmOrchestrator) -> None
1077 self.mgr: CephadmOrchestrator = mgr
1078 self.agent_config_deps = {} # type: Dict[str, Dict[str,Any]]
1079 self.agent_counter = {} # type: Dict[str, int]
1080 self.agent_timestamp = {} # type: Dict[str, datetime.datetime]
1081 self.agent_keys = {} # type: Dict[str, str]
1082 self.agent_ports = {} # type: Dict[str, int]
1083 self.sending_agent_message = {} # type: Dict[str, bool]
1084
1085 def load(self):
1086 # type: () -> None
1087 for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items():
1088 host = k[len(AGENT_CACHE_PREFIX):]
1089 if host not in self.mgr.inventory:
1090 self.mgr.log.warning('removing stray AgentCache record for agent on %s' % (
1091 host))
1092 self.mgr.set_store(k, None)
1093 try:
1094 j = json.loads(v)
1095 self.agent_config_deps[host] = {}
1096 conf_deps = j.get('agent_config_deps', {})
1097 if conf_deps:
1098 conf_deps['last_config'] = str_to_datetime(conf_deps['last_config'])
1099 self.agent_config_deps[host] = conf_deps
1100 self.agent_counter[host] = int(j.get('agent_counter', 1))
1101 self.agent_timestamp[host] = str_to_datetime(
1102 j.get('agent_timestamp', datetime_to_str(datetime_now())))
1103 self.agent_keys[host] = str(j.get('agent_keys', ''))
1104 agent_port = int(j.get('agent_ports', 0))
1105 if agent_port:
1106 self.agent_ports[host] = agent_port
1107
1108 except Exception as e:
1109 self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % (
1110 host, e))
1111 pass
1112
1113 def save_agent(self, host: str) -> None:
1114 j: Dict[str, Any] = {}
1115 if host in self.agent_config_deps:
1116 j['agent_config_deps'] = {
1117 'deps': self.agent_config_deps[host].get('deps', []),
1118 'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']),
1119 }
1120 if host in self.agent_counter:
1121 j['agent_counter'] = self.agent_counter[host]
1122 if host in self.agent_keys:
1123 j['agent_keys'] = self.agent_keys[host]
1124 if host in self.agent_ports:
1125 j['agent_ports'] = self.agent_ports[host]
1126 if host in self.agent_timestamp:
1127 j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host])
1128
1129 self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j))
1130
1131 def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None:
1132 self.agent_config_deps[host] = {
1133 'deps': deps,
1134 'last_config': stamp,
1135 }
1136
1137 def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
1138 if host in self.agent_config_deps:
1139 return self.agent_config_deps[host].get('deps', []), \
1140 self.agent_config_deps[host].get('last_config', None)
1141 return None, None
1142
1143 def messaging_agent(self, host: str) -> bool:
1144 if host not in self.sending_agent_message or not self.sending_agent_message[host]:
1145 return False
1146 return True
1147
1148 def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
1149 # agent successfully received new config. Update config/deps
1150 assert daemon_spec.service_name == 'agent'
1151 self.update_agent_config_deps(
1152 daemon_spec.host, daemon_spec.deps, datetime_now())
1153 self.agent_timestamp[daemon_spec.host] = datetime_now()
1154 self.agent_counter[daemon_spec.host] = 1
1155 self.save_agent(daemon_spec.host)
1156
1157
1158 class EventStore():
1159 def __init__(self, mgr):
1160 # type: (CephadmOrchestrator) -> None
1161 self.mgr: CephadmOrchestrator = mgr
1162 self.events = {} # type: Dict[str, List[OrchestratorEvent]]
1163
1164 def add(self, event: OrchestratorEvent) -> None:
1165
1166 if event.kind_subject() not in self.events:
1167 self.events[event.kind_subject()] = [event]
1168
1169 for e in self.events[event.kind_subject()]:
1170 if e.message == event.message:
1171 return
1172
1173 self.events[event.kind_subject()].append(event)
1174
1175 # limit to five events for now.
1176 self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
1177
1178 def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
1179 e = OrchestratorEvent(datetime_now(), 'service',
1180 spec.service_name(), level, message)
1181 self.add(e)
1182
1183 def from_orch_error(self, e: OrchestratorError) -> None:
1184 if e.event_subject is not None:
1185 self.add(OrchestratorEvent(
1186 datetime_now(),
1187 e.event_subject[0],
1188 e.event_subject[1],
1189 "ERROR",
1190 str(e)
1191 ))
1192
1193 def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
1194 e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
1195 self.add(e)
1196
1197 def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
1198 self.for_daemon(
1199 daemon_name,
1200 "ERROR",
1201 str(e)
1202 )
1203
1204 def cleanup(self) -> None:
1205 # Needs to be properly done, in case events are persistently stored.
1206
1207 unknowns: List[str] = []
1208 daemons = self.mgr.cache.get_daemon_names()
1209 specs = self.mgr.spec_store.all_specs.keys()
1210 for k_s, v in self.events.items():
1211 kind, subject = k_s.split(':')
1212 if kind == 'service':
1213 if subject not in specs:
1214 unknowns.append(k_s)
1215 elif kind == 'daemon':
1216 if subject not in daemons:
1217 unknowns.append(k_s)
1218
1219 for k_s in unknowns:
1220 del self.events[k_s]
1221
1222 def get_for_service(self, name: str) -> List[OrchestratorEvent]:
1223 return self.events.get('service:' + name, [])
1224
1225 def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
1226 return self.events.get('daemon:' + name, [])