]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/inventory.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
CommitLineData
e306af50 1import datetime
2a845540 2import enum
e306af50 3from copy import copy
b3b6e05e 4import ipaddress
1e59de90 5import itertools
e306af50
TL
6import json
7import logging
2a845540 8import math
b3b6e05e 9import socket
f67539c2 10from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
b3b6e05e 11 NamedTuple, Type
e306af50
TL
12
13import orchestrator
14from ceph.deployment import inventory
1e59de90 15from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec, IngressSpec
adb31ebb 16from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
f67539c2 17from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
20effc67 18from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
e306af50 19
aee94f69 20from .utils import resolve_ip, SpecialHostLabels
1e59de90 21from .migrations import queue_migrate_nfs_spec, queue_migrate_rgw_spec
b3b6e05e 22
e306af50
TL
23if TYPE_CHECKING:
24 from .module import CephadmOrchestrator
25
26
27logger = logging.getLogger(__name__)
28
29HOST_CACHE_PREFIX = "host."
30SPEC_STORE_PREFIX = "spec."
20effc67 31AGENT_CACHE_PREFIX = 'agent.'
e306af50
TL
32
33
2a845540
TL
34class HostCacheStatus(enum.Enum):
35 stray = 'stray'
36 host = 'host'
37 devices = 'devices'
38
39
e306af50 40class Inventory:
adb31ebb
TL
41 """
42 The inventory stores a HostSpec for all hosts persistently.
43 """
44
e306af50
TL
45 def __init__(self, mgr: 'CephadmOrchestrator'):
46 self.mgr = mgr
b3b6e05e
TL
47 adjusted_addrs = False
48
49 def is_valid_ip(ip: str) -> bool:
50 try:
51 ipaddress.ip_address(ip)
52 return True
53 except ValueError:
54 return False
55
e306af50
TL
56 # load inventory
57 i = self.mgr.get_store('inventory')
58 if i:
59 self._inventory: Dict[str, dict] = json.loads(i)
adb31ebb
TL
60 # handle old clusters missing 'hostname' key from hostspec
61 for k, v in self._inventory.items():
62 if 'hostname' not in v:
63 v['hostname'] = k
b3b6e05e
TL
64
65 # convert legacy non-IP addr?
66 if is_valid_ip(str(v.get('addr'))):
67 continue
68 if len(self._inventory) > 1:
69 if k == socket.gethostname():
70 # Never try to resolve our own host! This is
71 # fraught and can lead to either a loopback
72 # address (due to podman's futzing with
73 # /etc/hosts) or a private IP based on the CNI
74 # configuration. Instead, wait until the mgr
75 # fails over to another host and let them resolve
76 # this host.
77 continue
78 ip = resolve_ip(cast(str, v.get('addr')))
79 else:
80 # we only have 1 node in the cluster, so we can't
81 # rely on another host doing the lookup. use the
82 # IP the mgr binds to.
83 ip = self.mgr.get_mgr_ip()
84 if is_valid_ip(ip) and not ip.startswith('127.0.'):
85 self.mgr.log.info(
86 f"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'"
87 )
88 v['addr'] = ip
89 adjusted_addrs = True
90 if adjusted_addrs:
91 self.save()
e306af50
TL
92 else:
93 self._inventory = dict()
1e59de90 94 self._all_known_names: Dict[str, List[str]] = {}
e306af50
TL
95 logger.debug('Loaded inventory %s' % self._inventory)
96
97 def keys(self) -> List[str]:
98 return list(self._inventory.keys())
99
100 def __contains__(self, host: str) -> bool:
1e59de90
TL
101 return host in self._inventory or host in itertools.chain.from_iterable(self._all_known_names.values())
102
103 def _get_stored_name(self, host: str) -> str:
104 self.assert_host(host)
105 if host in self._inventory:
106 return host
107 for stored_name, all_names in self._all_known_names.items():
108 if host in all_names:
109 return stored_name
110 return host
111
112 def update_known_hostnames(self, hostname: str, shortname: str, fqdn: str) -> None:
113 for hname in [hostname, shortname, fqdn]:
114 # if we know the host by any of the names, store the full set of names
115 # in order to be able to check against those names for matching a host
116 if hname in self._inventory:
117 self._all_known_names[hname] = [hostname, shortname, fqdn]
118 return
119 logger.debug(f'got hostname set from gather-facts for unknown host: {[hostname, shortname, fqdn]}')
e306af50 120
adb31ebb 121 def assert_host(self, host: str) -> None:
1e59de90 122 if host not in self:
e306af50
TL
123 raise OrchestratorError('host %s does not exist' % host)
124
adb31ebb 125 def add_host(self, spec: HostSpec) -> None:
1e59de90 126 if spec.hostname in self:
a4b75251
TL
127 # addr
128 if self.get_addr(spec.hostname) != spec.addr:
129 self.set_addr(spec.hostname, spec.addr)
130 # labels
131 for label in spec.labels:
132 self.add_label(spec.hostname, label)
133 else:
134 self._inventory[spec.hostname] = spec.to_json()
135 self.save()
e306af50 136
adb31ebb 137 def rm_host(self, host: str) -> None:
1e59de90 138 host = self._get_stored_name(host)
e306af50 139 del self._inventory[host]
1e59de90 140 self._all_known_names.pop(host, [])
e306af50
TL
141 self.save()
142
adb31ebb 143 def set_addr(self, host: str, addr: str) -> None:
1e59de90 144 host = self._get_stored_name(host)
e306af50
TL
145 self._inventory[host]['addr'] = addr
146 self.save()
147
adb31ebb 148 def add_label(self, host: str, label: str) -> None:
1e59de90 149 host = self._get_stored_name(host)
e306af50
TL
150
151 if 'labels' not in self._inventory[host]:
152 self._inventory[host]['labels'] = list()
153 if label not in self._inventory[host]['labels']:
154 self._inventory[host]['labels'].append(label)
155 self.save()
156
adb31ebb 157 def rm_label(self, host: str, label: str) -> None:
1e59de90 158 host = self._get_stored_name(host)
e306af50
TL
159
160 if 'labels' not in self._inventory[host]:
161 self._inventory[host]['labels'] = list()
162 if label in self._inventory[host]['labels']:
163 self._inventory[host]['labels'].remove(label)
164 self.save()
165
b3b6e05e 166 def has_label(self, host: str, label: str) -> bool:
1e59de90 167 host = self._get_stored_name(host)
b3b6e05e
TL
168 return (
169 host in self._inventory
170 and label in self._inventory[host].get('labels', [])
171 )
172
adb31ebb 173 def get_addr(self, host: str) -> str:
1e59de90 174 host = self._get_stored_name(host)
e306af50
TL
175 return self._inventory[host].get('addr', host)
176
adb31ebb 177 def spec_from_dict(self, info: dict) -> HostSpec:
e306af50 178 hostname = info['hostname']
1e59de90 179 hostname = self._get_stored_name(hostname)
e306af50 180 return HostSpec(
f91f0fd5
TL
181 hostname,
182 addr=info.get('addr', hostname),
183 labels=info.get('labels', []),
184 status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''),
185 )
e306af50 186
f91f0fd5
TL
187 def all_specs(self) -> List[HostSpec]:
188 return list(map(self.spec_from_dict, self._inventory.values()))
e306af50 189
f67539c2
TL
190 def get_host_with_state(self, state: str = "") -> List[str]:
191 """return a list of host names in a specific state"""
192 return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state]
193
adb31ebb 194 def save(self) -> None:
e306af50
TL
195 self.mgr.set_store('inventory', json.dumps(self._inventory))
196
197
f67539c2
TL
198class SpecDescription(NamedTuple):
199 spec: ServiceSpec
b3b6e05e 200 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]]
f67539c2
TL
201 created: datetime.datetime
202 deleted: Optional[datetime.datetime]
203
204
e306af50
TL
205class SpecStore():
206 def __init__(self, mgr):
207 # type: (CephadmOrchestrator) -> None
208 self.mgr = mgr
f67539c2 209 self._specs = {} # type: Dict[str, ServiceSpec]
b3b6e05e
TL
210 # service_name -> rank -> gen -> daemon_id
211 self._rank_maps = {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]]
f91f0fd5 212 self.spec_created = {} # type: Dict[str, datetime.datetime]
f67539c2 213 self.spec_deleted = {} # type: Dict[str, datetime.datetime]
f91f0fd5 214 self.spec_preview = {} # type: Dict[str, ServiceSpec]
1e59de90 215 self._needs_configuration: Dict[str, bool] = {}
e306af50 216
f67539c2
TL
217 @property
218 def all_specs(self) -> Mapping[str, ServiceSpec]:
219 """
220 returns active and deleted specs. Returns read-only dict.
221 """
222 return self._specs
223
224 def __contains__(self, name: str) -> bool:
225 return name in self._specs
226
227 def __getitem__(self, name: str) -> SpecDescription:
228 if name not in self._specs:
229 raise OrchestratorError(f'Service {name} not found.')
230 return SpecDescription(self._specs[name],
b3b6e05e 231 self._rank_maps.get(name),
f67539c2
TL
232 self.spec_created[name],
233 self.spec_deleted.get(name, None))
234
235 @property
236 def active_specs(self) -> Mapping[str, ServiceSpec]:
237 return {k: v for k, v in self._specs.items() if k not in self.spec_deleted}
238
e306af50
TL
239 def load(self):
240 # type: () -> None
f67539c2 241 for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items():
e306af50
TL
242 service_name = k[len(SPEC_STORE_PREFIX):]
243 try:
f67539c2 244 j = cast(Dict[str, dict], json.loads(v))
a4b75251
TL
245 if (
246 (self.mgr.migration_current or 0) < 3
247 and j['spec'].get('service_type') == 'nfs'
248 ):
249 self.mgr.log.debug(f'found legacy nfs spec {j}')
250 queue_migrate_nfs_spec(self.mgr, j)
1e59de90
TL
251
252 if (
253 (self.mgr.migration_current or 0) < 6
254 and j['spec'].get('service_type') == 'rgw'
255 ):
256 queue_migrate_rgw_spec(self.mgr, j)
257
f67539c2
TL
258 spec = ServiceSpec.from_json(j['spec'])
259 created = str_to_datetime(cast(str, j['created']))
260 self._specs[service_name] = spec
e306af50 261 self.spec_created[service_name] = created
f67539c2 262
b3b6e05e 263 if 'deleted' in j:
f67539c2
TL
264 deleted = str_to_datetime(cast(str, j['deleted']))
265 self.spec_deleted[service_name] = deleted
266
1e59de90
TL
267 if 'needs_configuration' in j:
268 self._needs_configuration[service_name] = cast(bool, j['needs_configuration'])
269
b3b6e05e
TL
270 if 'rank_map' in j and isinstance(j['rank_map'], dict):
271 self._rank_maps[service_name] = {}
272 for rank_str, m in j['rank_map'].items():
273 try:
274 rank = int(rank_str)
275 except ValueError:
276 logger.exception(f"failed to parse rank in {j['rank_map']}")
277 continue
278 if isinstance(m, dict):
279 self._rank_maps[service_name][rank] = {}
280 for gen_str, name in m.items():
281 try:
282 gen = int(gen_str)
283 except ValueError:
284 logger.exception(f"failed to parse gen in {j['rank_map']}")
285 continue
286 if isinstance(name, str) or m is None:
287 self._rank_maps[service_name][rank][gen] = name
288
e306af50
TL
289 self.mgr.log.debug('SpecStore: loaded spec for %s' % (
290 service_name))
291 except Exception as e:
292 self.mgr.log.warning('unable to load spec for %s: %s' % (
293 service_name, e))
294 pass
295
b3b6e05e
TL
296 def save(
297 self,
298 spec: ServiceSpec,
299 update_create: bool = True,
300 ) -> None:
f67539c2 301 name = spec.service_name()
f6b5b4d7 302 if spec.preview_only:
f67539c2 303 self.spec_preview[name] = spec
f6b5b4d7 304 return None
f67539c2 305 self._specs[name] = spec
1e59de90 306 self._needs_configuration[name] = True
f67539c2
TL
307
308 if update_create:
309 self.spec_created[name] = datetime_now()
b3b6e05e 310 self._save(name)
f67539c2 311
b3b6e05e
TL
312 def save_rank_map(self,
313 name: str,
314 rank_map: Dict[int, Dict[int, Optional[str]]]) -> None:
315 self._rank_maps[name] = rank_map
316 self._save(name)
317
318 def _save(self, name: str) -> None:
319 data: Dict[str, Any] = {
320 'spec': self._specs[name].to_json(),
f67539c2 321 }
1e59de90
TL
322 if name in self.spec_created:
323 data['created'] = datetime_to_str(self.spec_created[name])
b3b6e05e
TL
324 if name in self._rank_maps:
325 data['rank_map'] = self._rank_maps[name]
f67539c2
TL
326 if name in self.spec_deleted:
327 data['deleted'] = datetime_to_str(self.spec_deleted[name])
1e59de90
TL
328 if name in self._needs_configuration:
329 data['needs_configuration'] = self._needs_configuration[name]
f67539c2 330
e306af50 331 self.mgr.set_store(
f67539c2
TL
332 SPEC_STORE_PREFIX + name,
333 json.dumps(data, sort_keys=True),
e306af50 334 )
b3b6e05e
TL
335 self.mgr.events.for_service(self._specs[name],
336 OrchestratorEvent.INFO,
337 'service was created')
e306af50 338
f67539c2
TL
339 def rm(self, service_name: str) -> bool:
340 if service_name not in self._specs:
341 return False
342
343 if self._specs[service_name].preview_only:
344 self.finally_rm(service_name)
345 return True
346
347 self.spec_deleted[service_name] = datetime_now()
348 self.save(self._specs[service_name], update_create=False)
349 return True
350
351 def finally_rm(self, service_name):
e306af50 352 # type: (str) -> bool
f67539c2 353 found = service_name in self._specs
e306af50 354 if found:
f67539c2 355 del self._specs[service_name]
b3b6e05e
TL
356 if service_name in self._rank_maps:
357 del self._rank_maps[service_name]
e306af50 358 del self.spec_created[service_name]
f67539c2
TL
359 if service_name in self.spec_deleted:
360 del self.spec_deleted[service_name]
1e59de90
TL
361 if service_name in self._needs_configuration:
362 del self._needs_configuration[service_name]
e306af50
TL
363 self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
364 return found
365
f67539c2
TL
366 def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
367 return self.spec_created.get(spec.service_name())
e306af50 368
1e59de90
TL
369 def set_unmanaged(self, service_name: str, value: bool) -> str:
370 if service_name not in self._specs:
371 return f'No service of name {service_name} found. Check "ceph orch ls" for all known services'
372 if self._specs[service_name].unmanaged == value:
373 return f'Service {service_name}{" already " if value else " not "}marked unmanaged. No action taken.'
374 self._specs[service_name].unmanaged = value
375 self.save(self._specs[service_name])
376 return f'Set unmanaged to {str(value)} for service {service_name}'
377
378 def needs_configuration(self, name: str) -> bool:
379 return self._needs_configuration.get(name, False)
380
381 def mark_needs_configuration(self, name: str) -> None:
382 if name in self._specs:
383 self._needs_configuration[name] = True
384 self._save(name)
385 else:
386 self.mgr.log.warning(f'Attempted to mark unknown service "{name}" as needing configuration')
387
388 def mark_configured(self, name: str) -> None:
389 if name in self._specs:
390 self._needs_configuration[name] = False
391 self._save(name)
392 else:
393 self.mgr.log.warning(f'Attempted to mark unknown service "{name}" as having been configured')
394
f91f0fd5 395
b3b6e05e
TL
396class ClientKeyringSpec(object):
397 """
398 A client keyring file that we should maintain
399 """
400
401 def __init__(
402 self,
403 entity: str,
404 placement: PlacementSpec,
405 mode: Optional[int] = None,
406 uid: Optional[int] = None,
407 gid: Optional[int] = None,
408 ) -> None:
409 self.entity = entity
410 self.placement = placement
411 self.mode = mode or 0o600
412 self.uid = uid or 0
413 self.gid = gid or 0
414
415 def validate(self) -> None:
416 pass
417
418 def to_json(self) -> Dict[str, Any]:
419 return {
420 'entity': self.entity,
421 'placement': self.placement.to_json(),
422 'mode': self.mode,
423 'uid': self.uid,
424 'gid': self.gid,
425 }
426
427 @property
428 def path(self) -> str:
429 return f'/etc/ceph/ceph.{self.entity}.keyring'
430
431 @classmethod
432 def from_json(cls: Type, data: dict) -> 'ClientKeyringSpec':
433 c = data.copy()
434 if 'placement' in c:
435 c['placement'] = PlacementSpec.from_json(c['placement'])
436 _cls = cls(**c)
437 _cls.validate()
438 return _cls
439
440
441class ClientKeyringStore():
442 """
443 Track client keyring files that we are supposed to maintain
444 """
445
446 def __init__(self, mgr):
447 # type: (CephadmOrchestrator) -> None
448 self.mgr: CephadmOrchestrator = mgr
449 self.mgr = mgr
450 self.keys: Dict[str, ClientKeyringSpec] = {}
451
452 def load(self) -> None:
453 c = self.mgr.get_store('client_keyrings') or b'{}'
454 j = json.loads(c)
455 for e, d in j.items():
456 self.keys[e] = ClientKeyringSpec.from_json(d)
457
458 def save(self) -> None:
459 data = {
460 k: v.to_json() for k, v in self.keys.items()
461 }
462 self.mgr.set_store('client_keyrings', json.dumps(data))
463
464 def update(self, ks: ClientKeyringSpec) -> None:
465 self.keys[ks.entity] = ks
466 self.save()
467
468 def rm(self, entity: str) -> None:
469 if entity in self.keys:
470 del self.keys[entity]
471 self.save()
472
473
2a845540
TL
474class TunedProfileStore():
475 """
476 Store for out tuned profile information
477 """
478
479 def __init__(self, mgr: "CephadmOrchestrator") -> None:
480 self.mgr: CephadmOrchestrator = mgr
481 self.mgr = mgr
482 self.profiles: Dict[str, TunedProfileSpec] = {}
483
484 def __contains__(self, profile: str) -> bool:
485 return profile in self.profiles
486
487 def load(self) -> None:
488 c = self.mgr.get_store('tuned_profiles') or b'{}'
489 j = json.loads(c)
490 for k, v in j.items():
491 self.profiles[k] = TunedProfileSpec.from_json(v)
492 self.profiles[k]._last_updated = datetime_to_str(datetime_now())
493
494 def exists(self, profile_name: str) -> bool:
495 return profile_name in self.profiles
496
497 def save(self) -> None:
498 profiles_json = {k: v.to_json() for k, v in self.profiles.items()}
499 self.mgr.set_store('tuned_profiles', json.dumps(profiles_json))
500
501 def add_setting(self, profile: str, setting: str, value: str) -> None:
502 if profile in self.profiles:
503 self.profiles[profile].settings[setting] = value
504 self.profiles[profile]._last_updated = datetime_to_str(datetime_now())
505 self.save()
506 else:
507 logger.error(
508 f'Attempted to set setting "{setting}" for nonexistent os tuning profile "{profile}"')
509
510 def rm_setting(self, profile: str, setting: str) -> None:
511 if profile in self.profiles:
512 if setting in self.profiles[profile].settings:
513 self.profiles[profile].settings.pop(setting, '')
514 self.profiles[profile]._last_updated = datetime_to_str(datetime_now())
515 self.save()
516 else:
517 logger.error(
518 f'Attemped to remove nonexistent setting "{setting}" from os tuning profile "{profile}"')
519 else:
520 logger.error(
521 f'Attempted to remove setting "{setting}" from nonexistent os tuning profile "{profile}"')
522
523 def add_profile(self, spec: TunedProfileSpec) -> None:
524 spec._last_updated = datetime_to_str(datetime_now())
525 self.profiles[spec.profile_name] = spec
526 self.save()
527
528 def rm_profile(self, profile: str) -> None:
529 if profile in self.profiles:
530 self.profiles.pop(profile, TunedProfileSpec(''))
531 else:
532 logger.error(f'Attempted to remove nonexistent os tuning profile "{profile}"')
533 self.save()
534
535 def last_updated(self, profile: str) -> Optional[datetime.datetime]:
536 if profile not in self.profiles or not self.profiles[profile]._last_updated:
537 return None
538 return str_to_datetime(self.profiles[profile]._last_updated)
539
540 def set_last_updated(self, profile: str, new_datetime: datetime.datetime) -> None:
541 if profile in self.profiles:
542 self.profiles[profile]._last_updated = datetime_to_str(new_datetime)
543
544 def list_profiles(self) -> List[TunedProfileSpec]:
545 return [p for p in self.profiles.values()]
546
547
e306af50 548class HostCache():
adb31ebb
TL
549 """
550 HostCache stores different things:
551
552 1. `daemons`: Deployed daemons O(daemons)
553
554 They're part of the configuration nowadays and need to be
555 persistent. The name "daemon cache" is unfortunately a bit misleading.
556 Like for example we really need to know where daemons are deployed on
557 hosts that are offline.
558
559 2. `devices`: ceph-volume inventory cache O(hosts)
560
561 As soon as this is populated, it becomes more or less read-only.
562
563 3. `networks`: network interfaces for each host. O(hosts)
564
565 This is needed in order to deploy MONs. As this is mostly read-only.
566
b3b6e05e 567 4. `last_client_files` O(hosts)
adb31ebb 568
b3b6e05e
TL
569 Stores the last digest and owner/mode for files we've pushed to /etc/ceph
570 (ceph.conf or client keyrings).
adb31ebb
TL
571
572 5. `scheduled_daemon_actions`: O(daemons)
573
574 Used to run daemon actions after deploying a daemon. We need to
575 store it persistently, in order to stay consistent across
f67539c2 576 MGR failovers.
adb31ebb
TL
577 """
578
e306af50
TL
579 def __init__(self, mgr):
580 # type: (CephadmOrchestrator) -> None
581 self.mgr: CephadmOrchestrator = mgr
582 self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
1e59de90 583 self._tmp_daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
e306af50
TL
584 self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
585 self.devices = {} # type: Dict[str, List[inventory.Device]]
adb31ebb
TL
586 self.facts = {} # type: Dict[str, Dict[str, Any]]
587 self.last_facts_update = {} # type: Dict[str, datetime.datetime]
b3b6e05e 588 self.last_autotune = {} # type: Dict[str, datetime.datetime]
e306af50 589 self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
f67539c2
TL
590 self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]]
591 self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
20effc67 592 self.last_network_update = {} # type: Dict[str, datetime.datetime]
e306af50 593 self.last_device_update = {} # type: Dict[str, datetime.datetime]
f67539c2 594 self.last_device_change = {} # type: Dict[str, datetime.datetime]
2a845540 595 self.last_tuned_profile_update = {} # type: Dict[str, datetime.datetime]
f91f0fd5
TL
596 self.daemon_refresh_queue = [] # type: List[str]
597 self.device_refresh_queue = [] # type: List[str]
20effc67 598 self.network_refresh_queue = [] # type: List[str]
f91f0fd5 599 self.osdspec_previews_refresh_queue = [] # type: List[str]
f6b5b4d7
TL
600
601 # host -> daemon name -> dict
e306af50
TL
602 self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
603 self.last_host_check = {} # type: Dict[str, datetime.datetime]
604 self.loading_osdspec_preview = set() # type: Set[str]
b3b6e05e 605 self.last_client_files: Dict[str, Dict[str, Tuple[str, int, int, int]]] = {}
f6b5b4d7 606 self.registry_login_queue: Set[str] = set()
e306af50 607
f91f0fd5
TL
608 self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
609
20effc67
TL
610 self.metadata_up_to_date = {} # type: Dict[str, bool]
611
e306af50
TL
612 def load(self):
613 # type: () -> None
f67539c2 614 for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
e306af50 615 host = k[len(HOST_CACHE_PREFIX):]
2a845540
TL
616 if self._get_host_cache_entry_status(host) != HostCacheStatus.host:
617 if self._get_host_cache_entry_status(host) == HostCacheStatus.devices:
618 continue
e306af50
TL
619 self.mgr.log.warning('removing stray HostCache host record %s' % (
620 host))
621 self.mgr.set_store(k, None)
622 try:
623 j = json.loads(v)
624 if 'last_device_update' in j:
f91f0fd5 625 self.last_device_update[host] = str_to_datetime(j['last_device_update'])
e306af50
TL
626 else:
627 self.device_refresh_queue.append(host)
f67539c2
TL
628 if 'last_device_change' in j:
629 self.last_device_change[host] = str_to_datetime(j['last_device_change'])
e306af50
TL
630 # for services, we ignore the persisted last_*_update
631 # and always trigger a new scrape on mgr restart.
632 self.daemon_refresh_queue.append(host)
20effc67 633 self.network_refresh_queue.append(host)
e306af50
TL
634 self.daemons[host] = {}
635 self.osdspec_previews[host] = []
f67539c2 636 self.osdspec_last_applied[host] = {}
e306af50
TL
637 self.networks[host] = {}
638 self.daemon_config_deps[host] = {}
639 for name, d in j.get('daemons', {}).items():
640 self.daemons[host][name] = \
641 orchestrator.DaemonDescription.from_json(d)
2a845540
TL
642 self.devices[host] = []
643 # still want to check old device location for upgrade scenarios
e306af50
TL
644 for d in j.get('devices', []):
645 self.devices[host].append(inventory.Device.from_json(d))
2a845540 646 self.devices[host] += self.load_host_devices(host)
f67539c2 647 self.networks[host] = j.get('networks_and_interfaces', {})
e306af50 648 self.osdspec_previews[host] = j.get('osdspec_previews', {})
b3b6e05e 649 self.last_client_files[host] = j.get('last_client_files', {})
f67539c2
TL
650 for name, ts in j.get('osdspec_last_applied', {}).items():
651 self.osdspec_last_applied[host][name] = str_to_datetime(ts)
e306af50
TL
652
653 for name, d in j.get('daemon_config_deps', {}).items():
654 self.daemon_config_deps[host][name] = {
655 'deps': d.get('deps', []),
f91f0fd5 656 'last_config': str_to_datetime(d['last_config']),
e306af50
TL
657 }
658 if 'last_host_check' in j:
f91f0fd5 659 self.last_host_check[host] = str_to_datetime(j['last_host_check'])
2a845540
TL
660 if 'last_tuned_profile_update' in j:
661 self.last_tuned_profile_update[host] = str_to_datetime(
662 j['last_tuned_profile_update'])
f6b5b4d7 663 self.registry_login_queue.add(host)
f91f0fd5 664 self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
20effc67 665 self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False)
f91f0fd5 666
e306af50
TL
667 self.mgr.log.debug(
668 'HostCache.load: host %s has %d daemons, '
669 '%d devices, %d networks' % (
670 host, len(self.daemons[host]), len(self.devices[host]),
671 len(self.networks[host])))
672 except Exception as e:
673 self.mgr.log.warning('unable to load cached state for %s: %s' % (
674 host, e))
675 pass
676
2a845540
TL
677 def _get_host_cache_entry_status(self, host: str) -> HostCacheStatus:
678 # return whether a host cache entry in the config-key
679 # store is for a host, a set of devices or is stray.
680 # for a host, the entry name will match a hostname in our
681 # inventory. For devices, it will be formatted
682 # <hostname>.devices.<integer> where <hostname> is
683 # in out inventory. If neither case applies, it is stray
684 if host in self.mgr.inventory:
685 return HostCacheStatus.host
686 try:
687 # try stripping off the ".devices.<integer>" and see if we get
688 # a host name that matches our inventory
689 actual_host = '.'.join(host.split('.')[:-2])
690 return HostCacheStatus.devices if actual_host in self.mgr.inventory else HostCacheStatus.stray
691 except Exception:
692 return HostCacheStatus.stray
693
e306af50
TL
694 def update_host_daemons(self, host, dm):
695 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
696 self.daemons[host] = dm
1e59de90 697 self._tmp_daemons.pop(host, {})
adb31ebb
TL
698 self.last_daemon_update[host] = datetime_now()
699
1e59de90
TL
700 def append_tmp_daemon(self, host: str, dd: orchestrator.DaemonDescription) -> None:
701 # for storing empty daemon descriptions representing daemons we have
702 # just deployed but not yet had the chance to pick up in a daemon refresh
703 # _tmp_daemons is cleared for a host upon receiving a real update of the
704 # host's dameons
705 if host not in self._tmp_daemons:
706 self._tmp_daemons[host] = {}
707 self._tmp_daemons[host][dd.name()] = dd
708
adb31ebb
TL
709 def update_host_facts(self, host, facts):
710 # type: (str, Dict[str, Dict[str, Any]]) -> None
711 self.facts[host] = facts
1e59de90
TL
712 hostnames: List[str] = []
713 for k in ['hostname', 'shortname', 'fqdn']:
714 v = facts.get(k, '')
715 hostnames.append(v if isinstance(v, str) else '')
716 self.mgr.inventory.update_known_hostnames(hostnames[0], hostnames[1], hostnames[2])
f67539c2
TL
717 self.last_facts_update[host] = datetime_now()
718
b3b6e05e
TL
719 def update_autotune(self, host: str) -> None:
720 self.last_autotune[host] = datetime_now()
721
722 def invalidate_autotune(self, host: str) -> None:
723 if host in self.last_autotune:
724 del self.last_autotune[host]
725
f67539c2 726 def devices_changed(self, host: str, b: List[inventory.Device]) -> bool:
39ae355f
TL
727 old_devs = inventory.Devices(self.devices[host])
728 new_devs = inventory.Devices(b)
729 # relying on Devices class __eq__ function here
730 if old_devs != new_devs:
f67539c2
TL
731 self.mgr.log.info("Detected new or changed devices on %s" % host)
732 return True
733 return False
e306af50 734
20effc67 735 def update_host_devices(
f67539c2
TL
736 self,
737 host: str,
738 dls: List[inventory.Device],
f67539c2
TL
739 ) -> None:
740 if (
741 host not in self.devices
742 or host not in self.last_device_change
743 or self.devices_changed(host, dls)
744 ):
745 self.last_device_change[host] = datetime_now()
746 self.last_device_update[host] = datetime_now()
e306af50 747 self.devices[host] = dls
20effc67
TL
748
749 def update_host_networks(
750 self,
751 host: str,
752 nets: Dict[str, Dict[str, List[str]]]
753 ) -> None:
e306af50 754 self.networks[host] = nets
20effc67 755 self.last_network_update[host] = datetime_now()
e306af50 756
adb31ebb 757 def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
e306af50
TL
758 self.daemon_config_deps[host][name] = {
759 'deps': deps,
760 'last_config': stamp,
761 }
762
763 def update_last_host_check(self, host):
764 # type: (str) -> None
adb31ebb 765 self.last_host_check[host] = datetime_now()
e306af50 766
f67539c2
TL
767 def update_osdspec_last_applied(self, host, service_name, ts):
768 # type: (str, str, datetime.datetime) -> None
769 self.osdspec_last_applied[host][service_name] = ts
770
b3b6e05e
TL
771 def update_client_file(self,
772 host: str,
773 path: str,
774 digest: str,
775 mode: int,
776 uid: int,
777 gid: int) -> None:
778 if host not in self.last_client_files:
779 self.last_client_files[host] = {}
780 self.last_client_files[host][path] = (digest, mode, uid, gid)
781
782 def removed_client_file(self, host: str, path: str) -> None:
783 if (
784 host in self.last_client_files
785 and path in self.last_client_files[host]
786 ):
787 del self.last_client_files[host][path]
788
e306af50
TL
789 def prime_empty_host(self, host):
790 # type: (str) -> None
791 """
792 Install an empty entry for a host
793 """
794 self.daemons[host] = {}
795 self.devices[host] = []
796 self.networks[host] = {}
797 self.osdspec_previews[host] = []
f67539c2 798 self.osdspec_last_applied[host] = {}
e306af50
TL
799 self.daemon_config_deps[host] = {}
800 self.daemon_refresh_queue.append(host)
801 self.device_refresh_queue.append(host)
20effc67 802 self.network_refresh_queue.append(host)
e306af50 803 self.osdspec_previews_refresh_queue.append(host)
f6b5b4d7 804 self.registry_login_queue.add(host)
b3b6e05e 805 self.last_client_files[host] = {}
e306af50 806
a4b75251
TL
807 def refresh_all_host_info(self, host):
808 # type: (str) -> None
809
810 self.last_host_check.pop(host, None)
811 self.daemon_refresh_queue.append(host)
812 self.registry_login_queue.add(host)
813 self.device_refresh_queue.append(host)
814 self.last_facts_update.pop(host, None)
815 self.osdspec_previews_refresh_queue.append(host)
816 self.last_autotune.pop(host, None)
817
e306af50
TL
818 def invalidate_host_daemons(self, host):
819 # type: (str) -> None
820 self.daemon_refresh_queue.append(host)
821 if host in self.last_daemon_update:
822 del self.last_daemon_update[host]
823 self.mgr.event.set()
824
825 def invalidate_host_devices(self, host):
826 # type: (str) -> None
827 self.device_refresh_queue.append(host)
828 if host in self.last_device_update:
829 del self.last_device_update[host]
830 self.mgr.event.set()
f91f0fd5 831
20effc67
TL
832 def invalidate_host_networks(self, host):
833 # type: (str) -> None
834 self.network_refresh_queue.append(host)
835 if host in self.last_network_update:
836 del self.last_network_update[host]
837 self.mgr.event.set()
838
adb31ebb 839 def distribute_new_registry_login_info(self) -> None:
f6b5b4d7 840 self.registry_login_queue = set(self.mgr.inventory.keys())
e306af50 841
f91f0fd5
TL
842 def save_host(self, host: str) -> None:
843 j: Dict[str, Any] = {
e306af50
TL
844 'daemons': {},
845 'devices': [],
846 'osdspec_previews': [],
f67539c2 847 'osdspec_last_applied': {},
e306af50
TL
848 'daemon_config_deps': {},
849 }
850 if host in self.last_daemon_update:
f91f0fd5 851 j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
e306af50 852 if host in self.last_device_update:
f91f0fd5 853 j['last_device_update'] = datetime_to_str(self.last_device_update[host])
20effc67
TL
854 if host in self.last_network_update:
855 j['last_network_update'] = datetime_to_str(self.last_network_update[host])
f67539c2
TL
856 if host in self.last_device_change:
857 j['last_device_change'] = datetime_to_str(self.last_device_change[host])
2a845540
TL
858 if host in self.last_tuned_profile_update:
859 j['last_tuned_profile_update'] = datetime_to_str(self.last_tuned_profile_update[host])
adb31ebb
TL
860 if host in self.daemons:
861 for name, dd in self.daemons[host].items():
862 j['daemons'][name] = dd.to_json()
adb31ebb 863 if host in self.networks:
f67539c2 864 j['networks_and_interfaces'] = self.networks[host]
adb31ebb
TL
865 if host in self.daemon_config_deps:
866 for name, depi in self.daemon_config_deps[host].items():
867 j['daemon_config_deps'][name] = {
868 'deps': depi.get('deps', []),
869 'last_config': datetime_to_str(depi['last_config']),
870 }
871 if host in self.osdspec_previews and self.osdspec_previews[host]:
e306af50 872 j['osdspec_previews'] = self.osdspec_previews[host]
f67539c2
TL
873 if host in self.osdspec_last_applied:
874 for name, ts in self.osdspec_last_applied[host].items():
875 j['osdspec_last_applied'][name] = datetime_to_str(ts)
e306af50
TL
876
877 if host in self.last_host_check:
f91f0fd5 878 j['last_host_check'] = datetime_to_str(self.last_host_check[host])
f6b5b4d7 879
b3b6e05e
TL
880 if host in self.last_client_files:
881 j['last_client_files'] = self.last_client_files[host]
adb31ebb 882 if host in self.scheduled_daemon_actions:
f91f0fd5 883 j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
20effc67
TL
884 if host in self.metadata_up_to_date:
885 j['metadata_up_to_date'] = self.metadata_up_to_date[host]
2a845540
TL
886 if host in self.devices:
887 self.save_host_devices(host)
f6b5b4d7 888
e306af50
TL
889 self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
890
2a845540
TL
891 def save_host_devices(self, host: str) -> None:
892 if host not in self.devices or not self.devices[host]:
893 logger.debug(f'Host {host} has no devices to save')
894 return
895
896 devs: List[Dict[str, Any]] = []
897 for d in self.devices[host]:
898 devs.append(d.to_json())
899
900 def byte_len(s: str) -> int:
901 return len(s.encode('utf-8'))
902
903 dev_cache_counter: int = 0
904 cache_size: int = self.mgr.get_foreign_ceph_option('mon', 'mon_config_key_max_entry_size')
905 if cache_size is not None and cache_size != 0 and byte_len(json.dumps(devs)) > cache_size - 1024:
906 # no guarantee all device entries take up the same amount of space
907 # splitting it up so there's one more entry than we need should be fairly
908 # safe and save a lot of extra logic checking sizes
909 cache_entries_needed = math.ceil(byte_len(json.dumps(devs)) / cache_size) + 1
910 dev_sublist_size = math.ceil(len(devs) / cache_entries_needed)
911 dev_lists: List[List[Dict[str, Any]]] = [devs[i:i + dev_sublist_size]
912 for i in range(0, len(devs), dev_sublist_size)]
913 for dev_list in dev_lists:
914 dev_dict: Dict[str, Any] = {'devices': dev_list}
915 if dev_cache_counter == 0:
916 dev_dict.update({'entries': len(dev_lists)})
917 self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
918 + str(dev_cache_counter), json.dumps(dev_dict))
919 dev_cache_counter += 1
920 else:
921 self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
922 + str(dev_cache_counter), json.dumps({'devices': devs, 'entries': 1}))
923
924 def load_host_devices(self, host: str) -> List[inventory.Device]:
925 dev_cache_counter: int = 0
926 devs: List[Dict[str, Any]] = []
927 dev_entries: int = 0
928 try:
929 # number of entries for the host's devices should be in
930 # the "entries" field of the first entry
931 dev_entries = json.loads(self.mgr.get_store(
932 HOST_CACHE_PREFIX + host + '.devices.0')).get('entries')
933 except Exception:
934 logger.debug(f'No device entries found for host {host}')
935 for i in range(dev_entries):
936 try:
937 new_devs = json.loads(self.mgr.get_store(
938 HOST_CACHE_PREFIX + host + '.devices.' + str(i))).get('devices', [])
939 if len(new_devs) > 0:
940 # verify list contains actual device objects by trying to load one from json
941 inventory.Device.from_json(new_devs[0])
942 # if we didn't throw an Exception on above line, we can add the devices
943 devs = devs + new_devs
944 dev_cache_counter += 1
945 except Exception as e:
946 logger.error(('Hit exception trying to load devices from '
947 + f'{HOST_CACHE_PREFIX + host + ".devices." + str(dev_cache_counter)} in key store: {e}'))
948 return []
949 return [inventory.Device.from_json(d) for d in devs]
950
e306af50
TL
951 def rm_host(self, host):
952 # type: (str) -> None
953 if host in self.daemons:
954 del self.daemons[host]
955 if host in self.devices:
956 del self.devices[host]
adb31ebb
TL
957 if host in self.facts:
958 del self.facts[host]
959 if host in self.last_facts_update:
960 del self.last_facts_update[host]
b3b6e05e
TL
961 if host in self.last_autotune:
962 del self.last_autotune[host]
e306af50
TL
963 if host in self.osdspec_previews:
964 del self.osdspec_previews[host]
f67539c2
TL
965 if host in self.osdspec_last_applied:
966 del self.osdspec_last_applied[host]
e306af50
TL
967 if host in self.loading_osdspec_preview:
968 self.loading_osdspec_preview.remove(host)
969 if host in self.networks:
970 del self.networks[host]
971 if host in self.last_daemon_update:
972 del self.last_daemon_update[host]
973 if host in self.last_device_update:
974 del self.last_device_update[host]
20effc67
TL
975 if host in self.last_network_update:
976 del self.last_network_update[host]
f67539c2
TL
977 if host in self.last_device_change:
978 del self.last_device_change[host]
2a845540
TL
979 if host in self.last_tuned_profile_update:
980 del self.last_tuned_profile_update[host]
e306af50
TL
981 if host in self.daemon_config_deps:
982 del self.daemon_config_deps[host]
f91f0fd5
TL
983 if host in self.scheduled_daemon_actions:
984 del self.scheduled_daemon_actions[host]
b3b6e05e
TL
985 if host in self.last_client_files:
986 del self.last_client_files[host]
e306af50
TL
987 self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
988
989 def get_hosts(self):
990 # type: () -> List[str]
20effc67
TL
991 return list(self.daemons)
992
993 def get_schedulable_hosts(self) -> List[HostSpec]:
994 """
995 Returns all usable hosts that went through _refresh_host_daemons().
996
997 This mitigates a potential race, where new host was added *after*
998 ``_refresh_host_daemons()`` was called, but *before*
999 ``_apply_all_specs()`` was called. thus we end up with a hosts
1000 where daemons might be running, but we have not yet detected them.
1001 """
1002 return [
1003 h for h in self.mgr.inventory.all_specs()
1004 if (
1005 self.host_had_daemon_refresh(h.hostname)
aee94f69
TL
1006 and SpecialHostLabels.DRAIN_DAEMONS not in h.labels
1007 )
1008 ]
1009
1010 def get_conf_keyring_available_hosts(self) -> List[HostSpec]:
1011 """
1012 Returns all hosts without the drain conf and keyrings
1013 label (SpecialHostLabels.DRAIN_CONF_KEYRING) that have
1014 had a refresh. That is equivalent to all hosts we
1015 consider eligible for deployment of conf and keyring files
1016
1017 Any host without that label is considered fair game for
1018 a client keyring spec to match. However, we want to still
1019 wait for refresh here so that we know what keyrings we've
1020 already deployed here
1021 """
1022 return [
1023 h for h in self.mgr.inventory.all_specs()
1024 if (
1025 self.host_had_daemon_refresh(h.hostname)
1026 and SpecialHostLabels.DRAIN_CONF_KEYRING not in h.labels
20effc67
TL
1027 )
1028 ]
1029
1030 def get_non_draining_hosts(self) -> List[HostSpec]:
1031 """
aee94f69
TL
1032 Returns all hosts that do not have drain daemon label
1033 (SpecialHostLabels.DRAIN_DAEMONS).
20effc67
TL
1034
1035 Useful for the agent who needs this specific list rather than the
1036 schedulable_hosts since the agent needs to be deployed on hosts with
1037 no daemon refresh
1038 """
1039 return [
aee94f69 1040 h for h in self.mgr.inventory.all_specs() if SpecialHostLabels.DRAIN_DAEMONS not in h.labels
20effc67
TL
1041 ]
1042
2a845540
TL
1043 def get_draining_hosts(self) -> List[HostSpec]:
1044 """
aee94f69
TL
1045 Returns all hosts that have the drain daemons label (SpecialHostLabels.DRAIN_DAEMONS)
1046 and therefore should have no daemons placed on them, but are potentially still reachable
2a845540
TL
1047 """
1048 return [
aee94f69
TL
1049 h for h in self.mgr.inventory.all_specs() if SpecialHostLabels.DRAIN_DAEMONS in h.labels
1050 ]
1051
1052 def get_conf_keyring_draining_hosts(self) -> List[HostSpec]:
1053 """
1054 Returns all hosts that have drain conf and keyrings label (SpecialHostLabels.DRAIN_CONF_KEYRING)
1055 and therefore should have no config files or client keyring placed on them, but are
1056 potentially still reachable
1057 """
1058 return [
1059 h for h in self.mgr.inventory.all_specs() if SpecialHostLabels.DRAIN_CONF_KEYRING in h.labels
2a845540
TL
1060 ]
1061
20effc67
TL
1062 def get_unreachable_hosts(self) -> List[HostSpec]:
1063 """
1064 Return all hosts that are offline or in maintenance mode.
1065
1066 The idea is we should not touch the daemons on these hosts (since
1067 in theory the hosts are inaccessible so we CAN'T touch them) but
1068 we still want to count daemons that exist on these hosts toward the
1069 placement so daemons on these hosts aren't just moved elsewhere
1070 """
1071 return [
1072 h for h in self.mgr.inventory.all_specs()
1073 if (
1074 h.status.lower() in ['maintenance', 'offline']
1075 or h.hostname in self.mgr.offline_hosts
1076 )
1077 ]
e306af50 1078
aee94f69
TL
1079 def is_host_unreachable(self, hostname: str) -> bool:
1080 # take hostname and return if it matches the hostname of an unreachable host
1081 return hostname in [h.hostname for h in self.get_unreachable_hosts()]
1082
1083 def is_host_schedulable(self, hostname: str) -> bool:
1084 # take hostname and return if it matches the hostname of a schedulable host
1085 return hostname in [h.hostname for h in self.get_schedulable_hosts()]
1086
1087 def is_host_draining(self, hostname: str) -> bool:
1088 # take hostname and return if it matches the hostname of a draining host
1089 return hostname in [h.hostname for h in self.get_draining_hosts()]
1090
b3b6e05e
TL
1091 def get_facts(self, host: str) -> Dict[str, Any]:
1092 return self.facts.get(host, {})
1093
20effc67
TL
1094 def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
1095 for dm in self.daemons.copy().values():
1096 yield from dm.values()
1097
1e59de90
TL
1098 def _get_tmp_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
1099 for dm in self._tmp_daemons.copy().values():
1100 yield from dm.values()
1101
e306af50
TL
1102 def get_daemons(self):
1103 # type: () -> List[orchestrator.DaemonDescription]
20effc67
TL
1104 return list(self._get_daemons())
1105
1106 def get_error_daemons(self) -> List[orchestrator.DaemonDescription]:
e306af50 1107 r = []
20effc67
TL
1108 for dd in self._get_daemons():
1109 if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error:
e306af50
TL
1110 r.append(dd)
1111 return r
1112
b3b6e05e
TL
1113 def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
1114 return list(self.daemons.get(host, {}).values())
1115
20effc67 1116 def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription:
f67539c2 1117 assert not daemon_name.startswith('ha-rgw.')
20effc67
TL
1118 dds = self.get_daemons_by_host(host) if host else self._get_daemons()
1119 for dd in dds:
1120 if dd.name() == daemon_name:
1121 return dd
1122
f6b5b4d7
TL
1123 raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
1124
20effc67 1125 def has_daemon(self, daemon_name: str, host: Optional[str] = None) -> bool:
a4b75251 1126 try:
20effc67 1127 self.get_daemon(daemon_name, host)
a4b75251
TL
1128 except orchestrator.OrchestratorError:
1129 return False
1130 return True
1131
e306af50 1132 def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
adb31ebb 1133 def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
f6b5b4d7 1134 dd = copy(dd_orig)
e306af50 1135 if host in self.mgr.offline_hosts:
f67539c2 1136 dd.status = orchestrator.DaemonDescriptionStatus.error
f6b5b4d7 1137 dd.status_desc = 'host is offline'
b3b6e05e
TL
1138 elif self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
1139 # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
1140 # could be wrong. We must assume maintenance is working and daemons are stopped
1141 dd.status = orchestrator.DaemonDescriptionStatus.stopped
f6b5b4d7
TL
1142 dd.events = self.mgr.events.get_for_daemon(dd.name())
1143 return dd
1144
20effc67 1145 for host, dm in self.daemons.copy().items():
f6b5b4d7 1146 yield host, {name: alter(host, d) for name, d in dm.items()}
e306af50
TL
1147
1148 def get_daemons_by_service(self, service_name):
1149 # type: (str) -> List[orchestrator.DaemonDescription]
f67539c2
TL
1150 assert not service_name.startswith('keepalived.')
1151 assert not service_name.startswith('haproxy.')
1152
20effc67 1153 return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
f6b5b4d7 1154
1e59de90
TL
1155 def get_related_service_daemons(self, service_spec: ServiceSpec) -> Optional[List[orchestrator.DaemonDescription]]:
1156 if service_spec.service_type == 'ingress':
1157 dds = list(dd for dd in self._get_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service)
1158 dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service)
1159 logger.debug(f'Found related daemons {dds} for service {service_spec.service_name()}')
1160 return dds
1161 else:
1162 for ingress_spec in [cast(IngressSpec, s) for s in self.mgr.spec_store.active_specs.values() if s.service_type == 'ingress']:
1163 if ingress_spec.backend_service == service_spec.service_name():
1164 dds = list(dd for dd in self._get_daemons() if dd.service_name() == ingress_spec.service_name())
1165 dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == ingress_spec.service_name())
1166 logger.debug(f'Found related daemons {dds} for service {service_spec.service_name()}')
1167 return dds
1168 return None
1169
20effc67 1170 def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
f67539c2
TL
1171 assert service_type not in ['keepalived', 'haproxy']
1172
20effc67 1173 daemons = self.daemons[host].values() if host else self._get_daemons()
e306af50 1174
20effc67
TL
1175 return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)]
1176
1177 def get_daemon_types(self, hostname: str) -> Set[str]:
f67539c2 1178 """Provide a list of the types of daemons on the host"""
20effc67 1179 return cast(Set[str], {d.daemon_type for d in self.daemons[hostname].values()})
f67539c2 1180
e306af50
TL
1181 def get_daemon_names(self):
1182 # type: () -> List[str]
20effc67 1183 return [d.name() for d in self._get_daemons()]
e306af50 1184
adb31ebb 1185 def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
e306af50
TL
1186 if host in self.daemon_config_deps:
1187 if name in self.daemon_config_deps[host]:
1188 return self.daemon_config_deps[host][name].get('deps', []), \
1189 self.daemon_config_deps[host][name].get('last_config', None)
1190 return None, None
1191
b3b6e05e
TL
1192 def get_host_client_files(self, host: str) -> Dict[str, Tuple[str, int, int, int]]:
1193 return self.last_client_files.get(host, {})
1194
e306af50
TL
1195 def host_needs_daemon_refresh(self, host):
1196 # type: (str) -> bool
1197 if host in self.mgr.offline_hosts:
1198 logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
1199 return False
1200 if host in self.daemon_refresh_queue:
1201 self.daemon_refresh_queue.remove(host)
1202 return True
adb31ebb 1203 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
1204 seconds=self.mgr.daemon_cache_timeout)
1205 if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
1206 return True
20effc67
TL
1207 if not self.mgr.cache.host_metadata_up_to_date(host):
1208 return True
e306af50
TL
1209 return False
1210
adb31ebb
TL
1211 def host_needs_facts_refresh(self, host):
1212 # type: (str) -> bool
1213 if host in self.mgr.offline_hosts:
1214 logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh')
1215 return False
f67539c2 1216 cutoff = datetime_now() - datetime.timedelta(
adb31ebb
TL
1217 seconds=self.mgr.facts_cache_timeout)
1218 if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
1219 return True
20effc67
TL
1220 if not self.mgr.cache.host_metadata_up_to_date(host):
1221 return True
adb31ebb
TL
1222 return False
1223
b3b6e05e
TL
1224 def host_needs_autotune_memory(self, host):
1225 # type: (str) -> bool
1226 if host in self.mgr.offline_hosts:
1227 logger.debug(f'Host "{host}" marked as offline. Skipping autotune')
1228 return False
1229 cutoff = datetime_now() - datetime.timedelta(
1230 seconds=self.mgr.autotune_interval)
1231 if host not in self.last_autotune or self.last_autotune[host] < cutoff:
1232 return True
1233 return False
1234
2a845540
TL
1235 def host_needs_tuned_profile_update(self, host: str, profile: str) -> bool:
1236 if host in self.mgr.offline_hosts:
1237 logger.debug(f'Host "{host}" marked as offline. Cannot apply tuned profile')
1238 return False
1239 if profile not in self.mgr.tuned_profiles:
1240 logger.debug(
1241 f'Cannot apply tuned profile {profile} on host {host}. Profile does not exist')
1242 return False
1243 if host not in self.last_tuned_profile_update:
1244 return True
1245 last_profile_update = self.mgr.tuned_profiles.last_updated(profile)
1246 if last_profile_update is None:
1247 self.mgr.tuned_profiles.set_last_updated(profile, datetime_now())
1248 return True
1249 if self.last_tuned_profile_update[host] < last_profile_update:
1250 return True
1251 return False
1252
f91f0fd5
TL
1253 def host_had_daemon_refresh(self, host: str) -> bool:
1254 """
1255 ... at least once.
1256 """
1257 if host in self.last_daemon_update:
1258 return True
1259 if host not in self.daemons:
1260 return False
1261 return bool(self.daemons[host])
1262
e306af50
TL
1263 def host_needs_device_refresh(self, host):
1264 # type: (str) -> bool
1265 if host in self.mgr.offline_hosts:
1266 logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
1267 return False
1268 if host in self.device_refresh_queue:
1269 self.device_refresh_queue.remove(host)
1270 return True
adb31ebb 1271 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
1272 seconds=self.mgr.device_cache_timeout)
1273 if host not in self.last_device_update or self.last_device_update[host] < cutoff:
1274 return True
20effc67
TL
1275 if not self.mgr.cache.host_metadata_up_to_date(host):
1276 return True
1277 return False
1278
1279 def host_needs_network_refresh(self, host):
1280 # type: (str) -> bool
1281 if host in self.mgr.offline_hosts:
1282 logger.debug(f'Host "{host}" marked as offline. Skipping network refresh')
1283 return False
1284 if host in self.network_refresh_queue:
1285 self.network_refresh_queue.remove(host)
1286 return True
1287 cutoff = datetime_now() - datetime.timedelta(
1288 seconds=self.mgr.device_cache_timeout)
1289 if host not in self.last_network_update or self.last_network_update[host] < cutoff:
1290 return True
1291 if not self.mgr.cache.host_metadata_up_to_date(host):
1292 return True
e306af50
TL
1293 return False
1294
adb31ebb 1295 def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
e306af50
TL
1296 if host in self.mgr.offline_hosts:
1297 logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
1298 return False
1299 if host in self.osdspec_previews_refresh_queue:
1300 self.osdspec_previews_refresh_queue.remove(host)
1301 return True
1302 # Since this is dependent on other factors (device and spec) this does not need
1303 # to be updated periodically.
1304 return False
1305
1306 def host_needs_check(self, host):
1307 # type: (str) -> bool
adb31ebb 1308 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
1309 seconds=self.mgr.host_check_interval)
1310 return host not in self.last_host_check or self.last_host_check[host] < cutoff
1311
f67539c2
TL
1312 def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool:
1313 if (
1314 host not in self.devices
1315 or host not in self.last_device_change
1316 or host not in self.last_device_update
1317 or host not in self.osdspec_last_applied
1318 or spec.service_name() not in self.osdspec_last_applied[host]
1319 ):
1320 return True
1321 created = self.mgr.spec_store.get_created(spec)
1322 if not created or created > self.last_device_change[host]:
1323 return True
1324 return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host]
1325
f91f0fd5 1326 def host_needs_registry_login(self, host: str) -> bool:
f6b5b4d7
TL
1327 if host in self.mgr.offline_hosts:
1328 return False
1329 if host in self.registry_login_queue:
1330 self.registry_login_queue.remove(host)
1331 return True
1332 return False
1333
20effc67
TL
1334 def host_metadata_up_to_date(self, host: str) -> bool:
1335 if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]:
1336 return False
1337 return True
1338
1339 def all_host_metadata_up_to_date(self) -> bool:
aee94f69 1340 if [h for h in self.get_hosts() if (not self.host_metadata_up_to_date(h) and not self.is_host_unreachable(h))]:
20effc67
TL
1341 # this function is primarily for telling if it's safe to try and apply a service
1342 # spec. Since offline/maintenance hosts aren't considered in that process anyway
1343 # we don't want to return False if the host without up-to-date metadata is in one
1344 # of those two categories.
1345 return False
1346 return True
1347
e306af50
TL
1348 def add_daemon(self, host, dd):
1349 # type: (str, orchestrator.DaemonDescription) -> None
1350 assert host in self.daemons
1351 self.daemons[host][dd.name()] = dd
1352
adb31ebb 1353 def rm_daemon(self, host: str, name: str) -> None:
f67539c2
TL
1354 assert not name.startswith('ha-rgw.')
1355
e306af50
TL
1356 if host in self.daemons:
1357 if name in self.daemons[host]:
f6b5b4d7
TL
1358 del self.daemons[host][name]
1359
adb31ebb 1360 def daemon_cache_filled(self) -> bool:
f6b5b4d7
TL
1361 """
1362 i.e. we have checked the daemons for each hosts at least once.
1363 excluding offline hosts.
1364
1365 We're not checking for `host_needs_daemon_refresh`, as this might never be
1366 False for all hosts.
1367 """
f91f0fd5 1368 return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
f6b5b4d7
TL
1369 for h in self.get_hosts())
1370
adb31ebb 1371 def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
f67539c2
TL
1372 assert not daemon_name.startswith('ha-rgw.')
1373
f91f0fd5
TL
1374 priorities = {
1375 'start': 1,
1376 'restart': 2,
1377 'reconfig': 3,
1378 'redeploy': 4,
1379 'stop': 5,
39ae355f 1380 'rotate-key': 6,
f91f0fd5
TL
1381 }
1382 existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None)
1383 if existing_action and priorities[existing_action] > priorities[action]:
1384 logger.debug(
1385 f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
1386 return
1387
1388 if host not in self.scheduled_daemon_actions:
1389 self.scheduled_daemon_actions[host] = {}
1390 self.scheduled_daemon_actions[host][daemon_name] = action
1391
20effc67
TL
1392 def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> bool:
1393 found = False
f91f0fd5
TL
1394 if host in self.scheduled_daemon_actions:
1395 if daemon_name in self.scheduled_daemon_actions[host]:
1396 del self.scheduled_daemon_actions[host][daemon_name]
20effc67 1397 found = True
f91f0fd5
TL
1398 if not self.scheduled_daemon_actions[host]:
1399 del self.scheduled_daemon_actions[host]
20effc67 1400 return found
f91f0fd5 1401
adb31ebb 1402 def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
f67539c2
TL
1403 assert not daemon.startswith('ha-rgw.')
1404
f91f0fd5
TL
1405 return self.scheduled_daemon_actions.get(host, {}).get(daemon)
1406
f6b5b4d7 1407
20effc67
TL
1408class AgentCache():
1409 """
1410 AgentCache is used for storing metadata about agent daemons that must be kept
1411 through MGR failovers
1412 """
1413
1414 def __init__(self, mgr):
1415 # type: (CephadmOrchestrator) -> None
1416 self.mgr: CephadmOrchestrator = mgr
1417 self.agent_config_deps = {} # type: Dict[str, Dict[str,Any]]
1418 self.agent_counter = {} # type: Dict[str, int]
1419 self.agent_timestamp = {} # type: Dict[str, datetime.datetime]
1420 self.agent_keys = {} # type: Dict[str, str]
1421 self.agent_ports = {} # type: Dict[str, int]
1422 self.sending_agent_message = {} # type: Dict[str, bool]
1423
1424 def load(self):
1425 # type: () -> None
1426 for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items():
1427 host = k[len(AGENT_CACHE_PREFIX):]
1428 if host not in self.mgr.inventory:
1429 self.mgr.log.warning('removing stray AgentCache record for agent on %s' % (
1430 host))
1431 self.mgr.set_store(k, None)
1432 try:
1433 j = json.loads(v)
1434 self.agent_config_deps[host] = {}
1435 conf_deps = j.get('agent_config_deps', {})
1436 if conf_deps:
1437 conf_deps['last_config'] = str_to_datetime(conf_deps['last_config'])
1438 self.agent_config_deps[host] = conf_deps
1439 self.agent_counter[host] = int(j.get('agent_counter', 1))
1440 self.agent_timestamp[host] = str_to_datetime(
1441 j.get('agent_timestamp', datetime_to_str(datetime_now())))
1442 self.agent_keys[host] = str(j.get('agent_keys', ''))
1443 agent_port = int(j.get('agent_ports', 0))
1444 if agent_port:
1445 self.agent_ports[host] = agent_port
1446
1447 except Exception as e:
1448 self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % (
1449 host, e))
1450 pass
1451
1452 def save_agent(self, host: str) -> None:
1453 j: Dict[str, Any] = {}
1454 if host in self.agent_config_deps:
1455 j['agent_config_deps'] = {
1456 'deps': self.agent_config_deps[host].get('deps', []),
1457 'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']),
1458 }
1459 if host in self.agent_counter:
1460 j['agent_counter'] = self.agent_counter[host]
1461 if host in self.agent_keys:
1462 j['agent_keys'] = self.agent_keys[host]
1463 if host in self.agent_ports:
1464 j['agent_ports'] = self.agent_ports[host]
1465 if host in self.agent_timestamp:
1466 j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host])
1467
1468 self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j))
1469
1470 def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None:
1471 self.agent_config_deps[host] = {
1472 'deps': deps,
1473 'last_config': stamp,
1474 }
1475
1476 def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
1477 if host in self.agent_config_deps:
1478 return self.agent_config_deps[host].get('deps', []), \
1479 self.agent_config_deps[host].get('last_config', None)
1480 return None, None
1481
1482 def messaging_agent(self, host: str) -> bool:
1483 if host not in self.sending_agent_message or not self.sending_agent_message[host]:
1484 return False
1485 return True
1486
1487 def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
1488 # agent successfully received new config. Update config/deps
1489 assert daemon_spec.service_name == 'agent'
1490 self.update_agent_config_deps(
1491 daemon_spec.host, daemon_spec.deps, datetime_now())
1492 self.agent_timestamp[daemon_spec.host] = datetime_now()
1493 self.agent_counter[daemon_spec.host] = 1
1494 self.save_agent(daemon_spec.host)
1495
1496
f6b5b4d7
TL
1497class EventStore():
1498 def __init__(self, mgr):
1499 # type: (CephadmOrchestrator) -> None
1500 self.mgr: CephadmOrchestrator = mgr
f91f0fd5 1501 self.events = {} # type: Dict[str, List[OrchestratorEvent]]
f6b5b4d7
TL
1502
1503 def add(self, event: OrchestratorEvent) -> None:
1504
1505 if event.kind_subject() not in self.events:
1506 self.events[event.kind_subject()] = [event]
1507
1508 for e in self.events[event.kind_subject()]:
1509 if e.message == event.message:
1510 return
1511
1512 self.events[event.kind_subject()].append(event)
1513
1514 # limit to five events for now.
1515 self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
1516
adb31ebb
TL
1517 def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
1518 e = OrchestratorEvent(datetime_now(), 'service',
f91f0fd5 1519 spec.service_name(), level, message)
f6b5b4d7
TL
1520 self.add(e)
1521
adb31ebb 1522 def from_orch_error(self, e: OrchestratorError) -> None:
f6b5b4d7
TL
1523 if e.event_subject is not None:
1524 self.add(OrchestratorEvent(
adb31ebb 1525 datetime_now(),
f6b5b4d7
TL
1526 e.event_subject[0],
1527 e.event_subject[1],
1528 "ERROR",
1529 str(e)
1530 ))
1531
adb31ebb
TL
1532 def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
1533 e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
f6b5b4d7
TL
1534 self.add(e)
1535
adb31ebb 1536 def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
f6b5b4d7
TL
1537 self.for_daemon(
1538 daemon_name,
1539 "ERROR",
1540 str(e)
1541 )
1542
1543 def cleanup(self) -> None:
1544 # Needs to be properly done, in case events are persistently stored.
1545
1546 unknowns: List[str] = []
1547 daemons = self.mgr.cache.get_daemon_names()
f67539c2 1548 specs = self.mgr.spec_store.all_specs.keys()
f6b5b4d7
TL
1549 for k_s, v in self.events.items():
1550 kind, subject = k_s.split(':')
1551 if kind == 'service':
1552 if subject not in specs:
1553 unknowns.append(k_s)
1554 elif kind == 'daemon':
1555 if subject not in daemons:
1556 unknowns.append(k_s)
1557
1558 for k_s in unknowns:
1559 del self.events[k_s]
1560
adb31ebb 1561 def get_for_service(self, name: str) -> List[OrchestratorEvent]:
f6b5b4d7
TL
1562 return self.events.get('service:' + name, [])
1563
adb31ebb 1564 def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
f6b5b4d7 1565 return self.events.get('daemon:' + name, [])