]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/inventory.py
import ceph quincy 17.2.6
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
CommitLineData
e306af50 1import datetime
2a845540 2import enum
e306af50 3from copy import copy
b3b6e05e 4import ipaddress
e306af50
TL
5import json
6import logging
2a845540 7import math
b3b6e05e 8import socket
f67539c2 9from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
b3b6e05e 10 NamedTuple, Type
e306af50
TL
11
12import orchestrator
13from ceph.deployment import inventory
2a845540 14from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec
adb31ebb 15from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
f67539c2 16from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
20effc67 17from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
e306af50 18
b3b6e05e 19from .utils import resolve_ip
a4b75251 20from .migrations import queue_migrate_nfs_spec
b3b6e05e 21
e306af50
TL
22if TYPE_CHECKING:
23 from .module import CephadmOrchestrator
24
25
26logger = logging.getLogger(__name__)
27
28HOST_CACHE_PREFIX = "host."
29SPEC_STORE_PREFIX = "spec."
20effc67 30AGENT_CACHE_PREFIX = 'agent.'
e306af50
TL
31
32
2a845540
TL
33class HostCacheStatus(enum.Enum):
34 stray = 'stray'
35 host = 'host'
36 devices = 'devices'
37
38
e306af50 39class Inventory:
adb31ebb
TL
40 """
41 The inventory stores a HostSpec for all hosts persistently.
42 """
43
e306af50
TL
44 def __init__(self, mgr: 'CephadmOrchestrator'):
45 self.mgr = mgr
b3b6e05e
TL
46 adjusted_addrs = False
47
48 def is_valid_ip(ip: str) -> bool:
49 try:
50 ipaddress.ip_address(ip)
51 return True
52 except ValueError:
53 return False
54
e306af50
TL
55 # load inventory
56 i = self.mgr.get_store('inventory')
57 if i:
58 self._inventory: Dict[str, dict] = json.loads(i)
adb31ebb
TL
59 # handle old clusters missing 'hostname' key from hostspec
60 for k, v in self._inventory.items():
61 if 'hostname' not in v:
62 v['hostname'] = k
b3b6e05e
TL
63
64 # convert legacy non-IP addr?
65 if is_valid_ip(str(v.get('addr'))):
66 continue
67 if len(self._inventory) > 1:
68 if k == socket.gethostname():
69 # Never try to resolve our own host! This is
70 # fraught and can lead to either a loopback
71 # address (due to podman's futzing with
72 # /etc/hosts) or a private IP based on the CNI
73 # configuration. Instead, wait until the mgr
74 # fails over to another host and let them resolve
75 # this host.
76 continue
77 ip = resolve_ip(cast(str, v.get('addr')))
78 else:
79 # we only have 1 node in the cluster, so we can't
80 # rely on another host doing the lookup. use the
81 # IP the mgr binds to.
82 ip = self.mgr.get_mgr_ip()
83 if is_valid_ip(ip) and not ip.startswith('127.0.'):
84 self.mgr.log.info(
85 f"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'"
86 )
87 v['addr'] = ip
88 adjusted_addrs = True
89 if adjusted_addrs:
90 self.save()
e306af50
TL
91 else:
92 self._inventory = dict()
93 logger.debug('Loaded inventory %s' % self._inventory)
94
95 def keys(self) -> List[str]:
96 return list(self._inventory.keys())
97
98 def __contains__(self, host: str) -> bool:
99 return host in self._inventory
100
adb31ebb 101 def assert_host(self, host: str) -> None:
e306af50
TL
102 if host not in self._inventory:
103 raise OrchestratorError('host %s does not exist' % host)
104
adb31ebb 105 def add_host(self, spec: HostSpec) -> None:
a4b75251
TL
106 if spec.hostname in self._inventory:
107 # addr
108 if self.get_addr(spec.hostname) != spec.addr:
109 self.set_addr(spec.hostname, spec.addr)
110 # labels
111 for label in spec.labels:
112 self.add_label(spec.hostname, label)
113 else:
114 self._inventory[spec.hostname] = spec.to_json()
115 self.save()
e306af50 116
adb31ebb 117 def rm_host(self, host: str) -> None:
e306af50
TL
118 self.assert_host(host)
119 del self._inventory[host]
120 self.save()
121
adb31ebb 122 def set_addr(self, host: str, addr: str) -> None:
e306af50
TL
123 self.assert_host(host)
124 self._inventory[host]['addr'] = addr
125 self.save()
126
adb31ebb 127 def add_label(self, host: str, label: str) -> None:
e306af50
TL
128 self.assert_host(host)
129
130 if 'labels' not in self._inventory[host]:
131 self._inventory[host]['labels'] = list()
132 if label not in self._inventory[host]['labels']:
133 self._inventory[host]['labels'].append(label)
134 self.save()
135
adb31ebb 136 def rm_label(self, host: str, label: str) -> None:
e306af50
TL
137 self.assert_host(host)
138
139 if 'labels' not in self._inventory[host]:
140 self._inventory[host]['labels'] = list()
141 if label in self._inventory[host]['labels']:
142 self._inventory[host]['labels'].remove(label)
143 self.save()
144
b3b6e05e
TL
145 def has_label(self, host: str, label: str) -> bool:
146 return (
147 host in self._inventory
148 and label in self._inventory[host].get('labels', [])
149 )
150
adb31ebb 151 def get_addr(self, host: str) -> str:
e306af50
TL
152 self.assert_host(host)
153 return self._inventory[host].get('addr', host)
154
adb31ebb 155 def spec_from_dict(self, info: dict) -> HostSpec:
e306af50
TL
156 hostname = info['hostname']
157 return HostSpec(
f91f0fd5
TL
158 hostname,
159 addr=info.get('addr', hostname),
160 labels=info.get('labels', []),
161 status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''),
162 )
e306af50 163
f91f0fd5
TL
164 def all_specs(self) -> List[HostSpec]:
165 return list(map(self.spec_from_dict, self._inventory.values()))
e306af50 166
f67539c2
TL
167 def get_host_with_state(self, state: str = "") -> List[str]:
168 """return a list of host names in a specific state"""
169 return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state]
170
adb31ebb 171 def save(self) -> None:
e306af50
TL
172 self.mgr.set_store('inventory', json.dumps(self._inventory))
173
174
f67539c2
TL
175class SpecDescription(NamedTuple):
176 spec: ServiceSpec
b3b6e05e 177 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]]
f67539c2
TL
178 created: datetime.datetime
179 deleted: Optional[datetime.datetime]
180
181
e306af50
TL
182class SpecStore():
183 def __init__(self, mgr):
184 # type: (CephadmOrchestrator) -> None
185 self.mgr = mgr
f67539c2 186 self._specs = {} # type: Dict[str, ServiceSpec]
b3b6e05e
TL
187 # service_name -> rank -> gen -> daemon_id
188 self._rank_maps = {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]]
f91f0fd5 189 self.spec_created = {} # type: Dict[str, datetime.datetime]
f67539c2 190 self.spec_deleted = {} # type: Dict[str, datetime.datetime]
f91f0fd5 191 self.spec_preview = {} # type: Dict[str, ServiceSpec]
e306af50 192
f67539c2
TL
193 @property
194 def all_specs(self) -> Mapping[str, ServiceSpec]:
195 """
196 returns active and deleted specs. Returns read-only dict.
197 """
198 return self._specs
199
200 def __contains__(self, name: str) -> bool:
201 return name in self._specs
202
203 def __getitem__(self, name: str) -> SpecDescription:
204 if name not in self._specs:
205 raise OrchestratorError(f'Service {name} not found.')
206 return SpecDescription(self._specs[name],
b3b6e05e 207 self._rank_maps.get(name),
f67539c2
TL
208 self.spec_created[name],
209 self.spec_deleted.get(name, None))
210
211 @property
212 def active_specs(self) -> Mapping[str, ServiceSpec]:
213 return {k: v for k, v in self._specs.items() if k not in self.spec_deleted}
214
e306af50
TL
215 def load(self):
216 # type: () -> None
f67539c2 217 for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items():
e306af50
TL
218 service_name = k[len(SPEC_STORE_PREFIX):]
219 try:
f67539c2 220 j = cast(Dict[str, dict], json.loads(v))
a4b75251
TL
221 if (
222 (self.mgr.migration_current or 0) < 3
223 and j['spec'].get('service_type') == 'nfs'
224 ):
225 self.mgr.log.debug(f'found legacy nfs spec {j}')
226 queue_migrate_nfs_spec(self.mgr, j)
f67539c2
TL
227 spec = ServiceSpec.from_json(j['spec'])
228 created = str_to_datetime(cast(str, j['created']))
229 self._specs[service_name] = spec
e306af50 230 self.spec_created[service_name] = created
f67539c2 231
b3b6e05e 232 if 'deleted' in j:
f67539c2
TL
233 deleted = str_to_datetime(cast(str, j['deleted']))
234 self.spec_deleted[service_name] = deleted
235
b3b6e05e
TL
236 if 'rank_map' in j and isinstance(j['rank_map'], dict):
237 self._rank_maps[service_name] = {}
238 for rank_str, m in j['rank_map'].items():
239 try:
240 rank = int(rank_str)
241 except ValueError:
242 logger.exception(f"failed to parse rank in {j['rank_map']}")
243 continue
244 if isinstance(m, dict):
245 self._rank_maps[service_name][rank] = {}
246 for gen_str, name in m.items():
247 try:
248 gen = int(gen_str)
249 except ValueError:
250 logger.exception(f"failed to parse gen in {j['rank_map']}")
251 continue
252 if isinstance(name, str) or m is None:
253 self._rank_maps[service_name][rank][gen] = name
254
e306af50
TL
255 self.mgr.log.debug('SpecStore: loaded spec for %s' % (
256 service_name))
257 except Exception as e:
258 self.mgr.log.warning('unable to load spec for %s: %s' % (
259 service_name, e))
260 pass
261
b3b6e05e
TL
262 def save(
263 self,
264 spec: ServiceSpec,
265 update_create: bool = True,
266 ) -> None:
f67539c2 267 name = spec.service_name()
f6b5b4d7 268 if spec.preview_only:
f67539c2 269 self.spec_preview[name] = spec
f6b5b4d7 270 return None
f67539c2
TL
271 self._specs[name] = spec
272
273 if update_create:
274 self.spec_created[name] = datetime_now()
b3b6e05e 275 self._save(name)
f67539c2 276
b3b6e05e
TL
277 def save_rank_map(self,
278 name: str,
279 rank_map: Dict[int, Dict[int, Optional[str]]]) -> None:
280 self._rank_maps[name] = rank_map
281 self._save(name)
282
283 def _save(self, name: str) -> None:
284 data: Dict[str, Any] = {
285 'spec': self._specs[name].to_json(),
f67539c2
TL
286 'created': datetime_to_str(self.spec_created[name]),
287 }
b3b6e05e
TL
288 if name in self._rank_maps:
289 data['rank_map'] = self._rank_maps[name]
f67539c2
TL
290 if name in self.spec_deleted:
291 data['deleted'] = datetime_to_str(self.spec_deleted[name])
292
e306af50 293 self.mgr.set_store(
f67539c2
TL
294 SPEC_STORE_PREFIX + name,
295 json.dumps(data, sort_keys=True),
e306af50 296 )
b3b6e05e
TL
297 self.mgr.events.for_service(self._specs[name],
298 OrchestratorEvent.INFO,
299 'service was created')
e306af50 300
f67539c2
TL
301 def rm(self, service_name: str) -> bool:
302 if service_name not in self._specs:
303 return False
304
305 if self._specs[service_name].preview_only:
306 self.finally_rm(service_name)
307 return True
308
309 self.spec_deleted[service_name] = datetime_now()
310 self.save(self._specs[service_name], update_create=False)
311 return True
312
313 def finally_rm(self, service_name):
e306af50 314 # type: (str) -> bool
f67539c2 315 found = service_name in self._specs
e306af50 316 if found:
f67539c2 317 del self._specs[service_name]
b3b6e05e
TL
318 if service_name in self._rank_maps:
319 del self._rank_maps[service_name]
e306af50 320 del self.spec_created[service_name]
f67539c2
TL
321 if service_name in self.spec_deleted:
322 del self.spec_deleted[service_name]
e306af50
TL
323 self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
324 return found
325
f67539c2
TL
326 def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
327 return self.spec_created.get(spec.service_name())
e306af50 328
f91f0fd5 329
b3b6e05e
TL
330class ClientKeyringSpec(object):
331 """
332 A client keyring file that we should maintain
333 """
334
335 def __init__(
336 self,
337 entity: str,
338 placement: PlacementSpec,
339 mode: Optional[int] = None,
340 uid: Optional[int] = None,
341 gid: Optional[int] = None,
342 ) -> None:
343 self.entity = entity
344 self.placement = placement
345 self.mode = mode or 0o600
346 self.uid = uid or 0
347 self.gid = gid or 0
348
349 def validate(self) -> None:
350 pass
351
352 def to_json(self) -> Dict[str, Any]:
353 return {
354 'entity': self.entity,
355 'placement': self.placement.to_json(),
356 'mode': self.mode,
357 'uid': self.uid,
358 'gid': self.gid,
359 }
360
361 @property
362 def path(self) -> str:
363 return f'/etc/ceph/ceph.{self.entity}.keyring'
364
365 @classmethod
366 def from_json(cls: Type, data: dict) -> 'ClientKeyringSpec':
367 c = data.copy()
368 if 'placement' in c:
369 c['placement'] = PlacementSpec.from_json(c['placement'])
370 _cls = cls(**c)
371 _cls.validate()
372 return _cls
373
374
375class ClientKeyringStore():
376 """
377 Track client keyring files that we are supposed to maintain
378 """
379
380 def __init__(self, mgr):
381 # type: (CephadmOrchestrator) -> None
382 self.mgr: CephadmOrchestrator = mgr
383 self.mgr = mgr
384 self.keys: Dict[str, ClientKeyringSpec] = {}
385
386 def load(self) -> None:
387 c = self.mgr.get_store('client_keyrings') or b'{}'
388 j = json.loads(c)
389 for e, d in j.items():
390 self.keys[e] = ClientKeyringSpec.from_json(d)
391
392 def save(self) -> None:
393 data = {
394 k: v.to_json() for k, v in self.keys.items()
395 }
396 self.mgr.set_store('client_keyrings', json.dumps(data))
397
398 def update(self, ks: ClientKeyringSpec) -> None:
399 self.keys[ks.entity] = ks
400 self.save()
401
402 def rm(self, entity: str) -> None:
403 if entity in self.keys:
404 del self.keys[entity]
405 self.save()
406
407
2a845540
TL
408class TunedProfileStore():
409 """
410 Store for out tuned profile information
411 """
412
413 def __init__(self, mgr: "CephadmOrchestrator") -> None:
414 self.mgr: CephadmOrchestrator = mgr
415 self.mgr = mgr
416 self.profiles: Dict[str, TunedProfileSpec] = {}
417
418 def __contains__(self, profile: str) -> bool:
419 return profile in self.profiles
420
421 def load(self) -> None:
422 c = self.mgr.get_store('tuned_profiles') or b'{}'
423 j = json.loads(c)
424 for k, v in j.items():
425 self.profiles[k] = TunedProfileSpec.from_json(v)
426 self.profiles[k]._last_updated = datetime_to_str(datetime_now())
427
428 def exists(self, profile_name: str) -> bool:
429 return profile_name in self.profiles
430
431 def save(self) -> None:
432 profiles_json = {k: v.to_json() for k, v in self.profiles.items()}
433 self.mgr.set_store('tuned_profiles', json.dumps(profiles_json))
434
435 def add_setting(self, profile: str, setting: str, value: str) -> None:
436 if profile in self.profiles:
437 self.profiles[profile].settings[setting] = value
438 self.profiles[profile]._last_updated = datetime_to_str(datetime_now())
439 self.save()
440 else:
441 logger.error(
442 f'Attempted to set setting "{setting}" for nonexistent os tuning profile "{profile}"')
443
444 def rm_setting(self, profile: str, setting: str) -> None:
445 if profile in self.profiles:
446 if setting in self.profiles[profile].settings:
447 self.profiles[profile].settings.pop(setting, '')
448 self.profiles[profile]._last_updated = datetime_to_str(datetime_now())
449 self.save()
450 else:
451 logger.error(
452 f'Attemped to remove nonexistent setting "{setting}" from os tuning profile "{profile}"')
453 else:
454 logger.error(
455 f'Attempted to remove setting "{setting}" from nonexistent os tuning profile "{profile}"')
456
457 def add_profile(self, spec: TunedProfileSpec) -> None:
458 spec._last_updated = datetime_to_str(datetime_now())
459 self.profiles[spec.profile_name] = spec
460 self.save()
461
462 def rm_profile(self, profile: str) -> None:
463 if profile in self.profiles:
464 self.profiles.pop(profile, TunedProfileSpec(''))
465 else:
466 logger.error(f'Attempted to remove nonexistent os tuning profile "{profile}"')
467 self.save()
468
469 def last_updated(self, profile: str) -> Optional[datetime.datetime]:
470 if profile not in self.profiles or not self.profiles[profile]._last_updated:
471 return None
472 return str_to_datetime(self.profiles[profile]._last_updated)
473
474 def set_last_updated(self, profile: str, new_datetime: datetime.datetime) -> None:
475 if profile in self.profiles:
476 self.profiles[profile]._last_updated = datetime_to_str(new_datetime)
477
478 def list_profiles(self) -> List[TunedProfileSpec]:
479 return [p for p in self.profiles.values()]
480
481
e306af50 482class HostCache():
adb31ebb
TL
483 """
484 HostCache stores different things:
485
486 1. `daemons`: Deployed daemons O(daemons)
487
488 They're part of the configuration nowadays and need to be
489 persistent. The name "daemon cache" is unfortunately a bit misleading.
490 Like for example we really need to know where daemons are deployed on
491 hosts that are offline.
492
493 2. `devices`: ceph-volume inventory cache O(hosts)
494
495 As soon as this is populated, it becomes more or less read-only.
496
497 3. `networks`: network interfaces for each host. O(hosts)
498
499 This is needed in order to deploy MONs. As this is mostly read-only.
500
b3b6e05e 501 4. `last_client_files` O(hosts)
adb31ebb 502
b3b6e05e
TL
503 Stores the last digest and owner/mode for files we've pushed to /etc/ceph
504 (ceph.conf or client keyrings).
adb31ebb
TL
505
506 5. `scheduled_daemon_actions`: O(daemons)
507
508 Used to run daemon actions after deploying a daemon. We need to
509 store it persistently, in order to stay consistent across
f67539c2 510 MGR failovers.
adb31ebb
TL
511 """
512
e306af50
TL
513 def __init__(self, mgr):
514 # type: (CephadmOrchestrator) -> None
515 self.mgr: CephadmOrchestrator = mgr
516 self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
517 self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
518 self.devices = {} # type: Dict[str, List[inventory.Device]]
adb31ebb
TL
519 self.facts = {} # type: Dict[str, Dict[str, Any]]
520 self.last_facts_update = {} # type: Dict[str, datetime.datetime]
b3b6e05e 521 self.last_autotune = {} # type: Dict[str, datetime.datetime]
e306af50 522 self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
f67539c2
TL
523 self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]]
524 self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
20effc67 525 self.last_network_update = {} # type: Dict[str, datetime.datetime]
e306af50 526 self.last_device_update = {} # type: Dict[str, datetime.datetime]
f67539c2 527 self.last_device_change = {} # type: Dict[str, datetime.datetime]
2a845540 528 self.last_tuned_profile_update = {} # type: Dict[str, datetime.datetime]
f91f0fd5
TL
529 self.daemon_refresh_queue = [] # type: List[str]
530 self.device_refresh_queue = [] # type: List[str]
20effc67 531 self.network_refresh_queue = [] # type: List[str]
f91f0fd5 532 self.osdspec_previews_refresh_queue = [] # type: List[str]
f6b5b4d7
TL
533
534 # host -> daemon name -> dict
e306af50
TL
535 self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
536 self.last_host_check = {} # type: Dict[str, datetime.datetime]
537 self.loading_osdspec_preview = set() # type: Set[str]
b3b6e05e 538 self.last_client_files: Dict[str, Dict[str, Tuple[str, int, int, int]]] = {}
f6b5b4d7 539 self.registry_login_queue: Set[str] = set()
e306af50 540
f91f0fd5
TL
541 self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
542
20effc67
TL
543 self.metadata_up_to_date = {} # type: Dict[str, bool]
544
e306af50
TL
545 def load(self):
546 # type: () -> None
f67539c2 547 for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
e306af50 548 host = k[len(HOST_CACHE_PREFIX):]
2a845540
TL
549 if self._get_host_cache_entry_status(host) != HostCacheStatus.host:
550 if self._get_host_cache_entry_status(host) == HostCacheStatus.devices:
551 continue
e306af50
TL
552 self.mgr.log.warning('removing stray HostCache host record %s' % (
553 host))
554 self.mgr.set_store(k, None)
555 try:
556 j = json.loads(v)
557 if 'last_device_update' in j:
f91f0fd5 558 self.last_device_update[host] = str_to_datetime(j['last_device_update'])
e306af50
TL
559 else:
560 self.device_refresh_queue.append(host)
f67539c2
TL
561 if 'last_device_change' in j:
562 self.last_device_change[host] = str_to_datetime(j['last_device_change'])
e306af50
TL
563 # for services, we ignore the persisted last_*_update
564 # and always trigger a new scrape on mgr restart.
565 self.daemon_refresh_queue.append(host)
20effc67 566 self.network_refresh_queue.append(host)
e306af50
TL
567 self.daemons[host] = {}
568 self.osdspec_previews[host] = []
f67539c2 569 self.osdspec_last_applied[host] = {}
e306af50
TL
570 self.networks[host] = {}
571 self.daemon_config_deps[host] = {}
572 for name, d in j.get('daemons', {}).items():
573 self.daemons[host][name] = \
574 orchestrator.DaemonDescription.from_json(d)
2a845540
TL
575 self.devices[host] = []
576 # still want to check old device location for upgrade scenarios
e306af50
TL
577 for d in j.get('devices', []):
578 self.devices[host].append(inventory.Device.from_json(d))
2a845540 579 self.devices[host] += self.load_host_devices(host)
f67539c2 580 self.networks[host] = j.get('networks_and_interfaces', {})
e306af50 581 self.osdspec_previews[host] = j.get('osdspec_previews', {})
b3b6e05e 582 self.last_client_files[host] = j.get('last_client_files', {})
f67539c2
TL
583 for name, ts in j.get('osdspec_last_applied', {}).items():
584 self.osdspec_last_applied[host][name] = str_to_datetime(ts)
e306af50
TL
585
586 for name, d in j.get('daemon_config_deps', {}).items():
587 self.daemon_config_deps[host][name] = {
588 'deps': d.get('deps', []),
f91f0fd5 589 'last_config': str_to_datetime(d['last_config']),
e306af50
TL
590 }
591 if 'last_host_check' in j:
f91f0fd5 592 self.last_host_check[host] = str_to_datetime(j['last_host_check'])
2a845540
TL
593 if 'last_tuned_profile_update' in j:
594 self.last_tuned_profile_update[host] = str_to_datetime(
595 j['last_tuned_profile_update'])
f6b5b4d7 596 self.registry_login_queue.add(host)
f91f0fd5 597 self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
20effc67 598 self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False)
f91f0fd5 599
e306af50
TL
600 self.mgr.log.debug(
601 'HostCache.load: host %s has %d daemons, '
602 '%d devices, %d networks' % (
603 host, len(self.daemons[host]), len(self.devices[host]),
604 len(self.networks[host])))
605 except Exception as e:
606 self.mgr.log.warning('unable to load cached state for %s: %s' % (
607 host, e))
608 pass
609
2a845540
TL
610 def _get_host_cache_entry_status(self, host: str) -> HostCacheStatus:
611 # return whether a host cache entry in the config-key
612 # store is for a host, a set of devices or is stray.
613 # for a host, the entry name will match a hostname in our
614 # inventory. For devices, it will be formatted
615 # <hostname>.devices.<integer> where <hostname> is
616 # in out inventory. If neither case applies, it is stray
617 if host in self.mgr.inventory:
618 return HostCacheStatus.host
619 try:
620 # try stripping off the ".devices.<integer>" and see if we get
621 # a host name that matches our inventory
622 actual_host = '.'.join(host.split('.')[:-2])
623 return HostCacheStatus.devices if actual_host in self.mgr.inventory else HostCacheStatus.stray
624 except Exception:
625 return HostCacheStatus.stray
626
e306af50
TL
627 def update_host_daemons(self, host, dm):
628 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
629 self.daemons[host] = dm
adb31ebb
TL
630 self.last_daemon_update[host] = datetime_now()
631
632 def update_host_facts(self, host, facts):
633 # type: (str, Dict[str, Dict[str, Any]]) -> None
634 self.facts[host] = facts
f67539c2
TL
635 self.last_facts_update[host] = datetime_now()
636
b3b6e05e
TL
637 def update_autotune(self, host: str) -> None:
638 self.last_autotune[host] = datetime_now()
639
640 def invalidate_autotune(self, host: str) -> None:
641 if host in self.last_autotune:
642 del self.last_autotune[host]
643
f67539c2 644 def devices_changed(self, host: str, b: List[inventory.Device]) -> bool:
39ae355f
TL
645 old_devs = inventory.Devices(self.devices[host])
646 new_devs = inventory.Devices(b)
647 # relying on Devices class __eq__ function here
648 if old_devs != new_devs:
f67539c2
TL
649 self.mgr.log.info("Detected new or changed devices on %s" % host)
650 return True
651 return False
e306af50 652
20effc67 653 def update_host_devices(
f67539c2
TL
654 self,
655 host: str,
656 dls: List[inventory.Device],
f67539c2
TL
657 ) -> None:
658 if (
659 host not in self.devices
660 or host not in self.last_device_change
661 or self.devices_changed(host, dls)
662 ):
663 self.last_device_change[host] = datetime_now()
664 self.last_device_update[host] = datetime_now()
e306af50 665 self.devices[host] = dls
20effc67
TL
666
667 def update_host_networks(
668 self,
669 host: str,
670 nets: Dict[str, Dict[str, List[str]]]
671 ) -> None:
e306af50 672 self.networks[host] = nets
20effc67 673 self.last_network_update[host] = datetime_now()
e306af50 674
adb31ebb 675 def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
e306af50
TL
676 self.daemon_config_deps[host][name] = {
677 'deps': deps,
678 'last_config': stamp,
679 }
680
681 def update_last_host_check(self, host):
682 # type: (str) -> None
adb31ebb 683 self.last_host_check[host] = datetime_now()
e306af50 684
f67539c2
TL
685 def update_osdspec_last_applied(self, host, service_name, ts):
686 # type: (str, str, datetime.datetime) -> None
687 self.osdspec_last_applied[host][service_name] = ts
688
b3b6e05e
TL
689 def update_client_file(self,
690 host: str,
691 path: str,
692 digest: str,
693 mode: int,
694 uid: int,
695 gid: int) -> None:
696 if host not in self.last_client_files:
697 self.last_client_files[host] = {}
698 self.last_client_files[host][path] = (digest, mode, uid, gid)
699
700 def removed_client_file(self, host: str, path: str) -> None:
701 if (
702 host in self.last_client_files
703 and path in self.last_client_files[host]
704 ):
705 del self.last_client_files[host][path]
706
e306af50
TL
707 def prime_empty_host(self, host):
708 # type: (str) -> None
709 """
710 Install an empty entry for a host
711 """
712 self.daemons[host] = {}
713 self.devices[host] = []
714 self.networks[host] = {}
715 self.osdspec_previews[host] = []
f67539c2 716 self.osdspec_last_applied[host] = {}
e306af50
TL
717 self.daemon_config_deps[host] = {}
718 self.daemon_refresh_queue.append(host)
719 self.device_refresh_queue.append(host)
20effc67 720 self.network_refresh_queue.append(host)
e306af50 721 self.osdspec_previews_refresh_queue.append(host)
f6b5b4d7 722 self.registry_login_queue.add(host)
b3b6e05e 723 self.last_client_files[host] = {}
e306af50 724
a4b75251
TL
725 def refresh_all_host_info(self, host):
726 # type: (str) -> None
727
728 self.last_host_check.pop(host, None)
729 self.daemon_refresh_queue.append(host)
730 self.registry_login_queue.add(host)
731 self.device_refresh_queue.append(host)
732 self.last_facts_update.pop(host, None)
733 self.osdspec_previews_refresh_queue.append(host)
734 self.last_autotune.pop(host, None)
735
e306af50
TL
736 def invalidate_host_daemons(self, host):
737 # type: (str) -> None
738 self.daemon_refresh_queue.append(host)
739 if host in self.last_daemon_update:
740 del self.last_daemon_update[host]
741 self.mgr.event.set()
742
743 def invalidate_host_devices(self, host):
744 # type: (str) -> None
745 self.device_refresh_queue.append(host)
746 if host in self.last_device_update:
747 del self.last_device_update[host]
748 self.mgr.event.set()
f91f0fd5 749
20effc67
TL
750 def invalidate_host_networks(self, host):
751 # type: (str) -> None
752 self.network_refresh_queue.append(host)
753 if host in self.last_network_update:
754 del self.last_network_update[host]
755 self.mgr.event.set()
756
adb31ebb 757 def distribute_new_registry_login_info(self) -> None:
f6b5b4d7 758 self.registry_login_queue = set(self.mgr.inventory.keys())
e306af50 759
f91f0fd5
TL
760 def save_host(self, host: str) -> None:
761 j: Dict[str, Any] = {
e306af50
TL
762 'daemons': {},
763 'devices': [],
764 'osdspec_previews': [],
f67539c2 765 'osdspec_last_applied': {},
e306af50
TL
766 'daemon_config_deps': {},
767 }
768 if host in self.last_daemon_update:
f91f0fd5 769 j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
e306af50 770 if host in self.last_device_update:
f91f0fd5 771 j['last_device_update'] = datetime_to_str(self.last_device_update[host])
20effc67
TL
772 if host in self.last_network_update:
773 j['last_network_update'] = datetime_to_str(self.last_network_update[host])
f67539c2
TL
774 if host in self.last_device_change:
775 j['last_device_change'] = datetime_to_str(self.last_device_change[host])
2a845540
TL
776 if host in self.last_tuned_profile_update:
777 j['last_tuned_profile_update'] = datetime_to_str(self.last_tuned_profile_update[host])
adb31ebb
TL
778 if host in self.daemons:
779 for name, dd in self.daemons[host].items():
780 j['daemons'][name] = dd.to_json()
adb31ebb 781 if host in self.networks:
f67539c2 782 j['networks_and_interfaces'] = self.networks[host]
adb31ebb
TL
783 if host in self.daemon_config_deps:
784 for name, depi in self.daemon_config_deps[host].items():
785 j['daemon_config_deps'][name] = {
786 'deps': depi.get('deps', []),
787 'last_config': datetime_to_str(depi['last_config']),
788 }
789 if host in self.osdspec_previews and self.osdspec_previews[host]:
e306af50 790 j['osdspec_previews'] = self.osdspec_previews[host]
f67539c2
TL
791 if host in self.osdspec_last_applied:
792 for name, ts in self.osdspec_last_applied[host].items():
793 j['osdspec_last_applied'][name] = datetime_to_str(ts)
e306af50
TL
794
795 if host in self.last_host_check:
f91f0fd5 796 j['last_host_check'] = datetime_to_str(self.last_host_check[host])
f6b5b4d7 797
b3b6e05e
TL
798 if host in self.last_client_files:
799 j['last_client_files'] = self.last_client_files[host]
adb31ebb 800 if host in self.scheduled_daemon_actions:
f91f0fd5 801 j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
20effc67
TL
802 if host in self.metadata_up_to_date:
803 j['metadata_up_to_date'] = self.metadata_up_to_date[host]
2a845540
TL
804 if host in self.devices:
805 self.save_host_devices(host)
f6b5b4d7 806
e306af50
TL
807 self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
808
2a845540
TL
809 def save_host_devices(self, host: str) -> None:
810 if host not in self.devices or not self.devices[host]:
811 logger.debug(f'Host {host} has no devices to save')
812 return
813
814 devs: List[Dict[str, Any]] = []
815 for d in self.devices[host]:
816 devs.append(d.to_json())
817
818 def byte_len(s: str) -> int:
819 return len(s.encode('utf-8'))
820
821 dev_cache_counter: int = 0
822 cache_size: int = self.mgr.get_foreign_ceph_option('mon', 'mon_config_key_max_entry_size')
823 if cache_size is not None and cache_size != 0 and byte_len(json.dumps(devs)) > cache_size - 1024:
824 # no guarantee all device entries take up the same amount of space
825 # splitting it up so there's one more entry than we need should be fairly
826 # safe and save a lot of extra logic checking sizes
827 cache_entries_needed = math.ceil(byte_len(json.dumps(devs)) / cache_size) + 1
828 dev_sublist_size = math.ceil(len(devs) / cache_entries_needed)
829 dev_lists: List[List[Dict[str, Any]]] = [devs[i:i + dev_sublist_size]
830 for i in range(0, len(devs), dev_sublist_size)]
831 for dev_list in dev_lists:
832 dev_dict: Dict[str, Any] = {'devices': dev_list}
833 if dev_cache_counter == 0:
834 dev_dict.update({'entries': len(dev_lists)})
835 self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
836 + str(dev_cache_counter), json.dumps(dev_dict))
837 dev_cache_counter += 1
838 else:
839 self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
840 + str(dev_cache_counter), json.dumps({'devices': devs, 'entries': 1}))
841
842 def load_host_devices(self, host: str) -> List[inventory.Device]:
843 dev_cache_counter: int = 0
844 devs: List[Dict[str, Any]] = []
845 dev_entries: int = 0
846 try:
847 # number of entries for the host's devices should be in
848 # the "entries" field of the first entry
849 dev_entries = json.loads(self.mgr.get_store(
850 HOST_CACHE_PREFIX + host + '.devices.0')).get('entries')
851 except Exception:
852 logger.debug(f'No device entries found for host {host}')
853 for i in range(dev_entries):
854 try:
855 new_devs = json.loads(self.mgr.get_store(
856 HOST_CACHE_PREFIX + host + '.devices.' + str(i))).get('devices', [])
857 if len(new_devs) > 0:
858 # verify list contains actual device objects by trying to load one from json
859 inventory.Device.from_json(new_devs[0])
860 # if we didn't throw an Exception on above line, we can add the devices
861 devs = devs + new_devs
862 dev_cache_counter += 1
863 except Exception as e:
864 logger.error(('Hit exception trying to load devices from '
865 + f'{HOST_CACHE_PREFIX + host + ".devices." + str(dev_cache_counter)} in key store: {e}'))
866 return []
867 return [inventory.Device.from_json(d) for d in devs]
868
e306af50
TL
869 def rm_host(self, host):
870 # type: (str) -> None
871 if host in self.daemons:
872 del self.daemons[host]
873 if host in self.devices:
874 del self.devices[host]
adb31ebb
TL
875 if host in self.facts:
876 del self.facts[host]
877 if host in self.last_facts_update:
878 del self.last_facts_update[host]
b3b6e05e
TL
879 if host in self.last_autotune:
880 del self.last_autotune[host]
e306af50
TL
881 if host in self.osdspec_previews:
882 del self.osdspec_previews[host]
f67539c2
TL
883 if host in self.osdspec_last_applied:
884 del self.osdspec_last_applied[host]
e306af50
TL
885 if host in self.loading_osdspec_preview:
886 self.loading_osdspec_preview.remove(host)
887 if host in self.networks:
888 del self.networks[host]
889 if host in self.last_daemon_update:
890 del self.last_daemon_update[host]
891 if host in self.last_device_update:
892 del self.last_device_update[host]
20effc67
TL
893 if host in self.last_network_update:
894 del self.last_network_update[host]
f67539c2
TL
895 if host in self.last_device_change:
896 del self.last_device_change[host]
2a845540
TL
897 if host in self.last_tuned_profile_update:
898 del self.last_tuned_profile_update[host]
e306af50
TL
899 if host in self.daemon_config_deps:
900 del self.daemon_config_deps[host]
f91f0fd5
TL
901 if host in self.scheduled_daemon_actions:
902 del self.scheduled_daemon_actions[host]
b3b6e05e
TL
903 if host in self.last_client_files:
904 del self.last_client_files[host]
e306af50
TL
905 self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
906
907 def get_hosts(self):
908 # type: () -> List[str]
20effc67
TL
909 return list(self.daemons)
910
911 def get_schedulable_hosts(self) -> List[HostSpec]:
912 """
913 Returns all usable hosts that went through _refresh_host_daemons().
914
915 This mitigates a potential race, where new host was added *after*
916 ``_refresh_host_daemons()`` was called, but *before*
917 ``_apply_all_specs()`` was called. thus we end up with a hosts
918 where daemons might be running, but we have not yet detected them.
919 """
920 return [
921 h for h in self.mgr.inventory.all_specs()
922 if (
923 self.host_had_daemon_refresh(h.hostname)
924 and '_no_schedule' not in h.labels
925 )
926 ]
927
928 def get_non_draining_hosts(self) -> List[HostSpec]:
929 """
930 Returns all hosts that do not have _no_schedule label.
931
932 Useful for the agent who needs this specific list rather than the
933 schedulable_hosts since the agent needs to be deployed on hosts with
934 no daemon refresh
935 """
936 return [
937 h for h in self.mgr.inventory.all_specs() if '_no_schedule' not in h.labels
938 ]
939
2a845540
TL
940 def get_draining_hosts(self) -> List[HostSpec]:
941 """
942 Returns all hosts that have _no_schedule label and therefore should have
943 no daemons placed on them, but are potentially still reachable
944 """
945 return [
946 h for h in self.mgr.inventory.all_specs() if '_no_schedule' in h.labels
947 ]
948
20effc67
TL
949 def get_unreachable_hosts(self) -> List[HostSpec]:
950 """
951 Return all hosts that are offline or in maintenance mode.
952
953 The idea is we should not touch the daemons on these hosts (since
954 in theory the hosts are inaccessible so we CAN'T touch them) but
955 we still want to count daemons that exist on these hosts toward the
956 placement so daemons on these hosts aren't just moved elsewhere
957 """
958 return [
959 h for h in self.mgr.inventory.all_specs()
960 if (
961 h.status.lower() in ['maintenance', 'offline']
962 or h.hostname in self.mgr.offline_hosts
963 )
964 ]
e306af50 965
b3b6e05e
TL
966 def get_facts(self, host: str) -> Dict[str, Any]:
967 return self.facts.get(host, {})
968
20effc67
TL
969 def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
970 for dm in self.daemons.copy().values():
971 yield from dm.values()
972
e306af50
TL
973 def get_daemons(self):
974 # type: () -> List[orchestrator.DaemonDescription]
20effc67
TL
975 return list(self._get_daemons())
976
977 def get_error_daemons(self) -> List[orchestrator.DaemonDescription]:
e306af50 978 r = []
20effc67
TL
979 for dd in self._get_daemons():
980 if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error:
e306af50
TL
981 r.append(dd)
982 return r
983
b3b6e05e
TL
984 def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
985 return list(self.daemons.get(host, {}).values())
986
20effc67 987 def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription:
f67539c2 988 assert not daemon_name.startswith('ha-rgw.')
20effc67
TL
989 dds = self.get_daemons_by_host(host) if host else self._get_daemons()
990 for dd in dds:
991 if dd.name() == daemon_name:
992 return dd
993
f6b5b4d7
TL
994 raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
995
20effc67 996 def has_daemon(self, daemon_name: str, host: Optional[str] = None) -> bool:
a4b75251 997 try:
20effc67 998 self.get_daemon(daemon_name, host)
a4b75251
TL
999 except orchestrator.OrchestratorError:
1000 return False
1001 return True
1002
e306af50 1003 def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
adb31ebb 1004 def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
f6b5b4d7 1005 dd = copy(dd_orig)
e306af50 1006 if host in self.mgr.offline_hosts:
f67539c2 1007 dd.status = orchestrator.DaemonDescriptionStatus.error
f6b5b4d7 1008 dd.status_desc = 'host is offline'
b3b6e05e
TL
1009 elif self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
1010 # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
1011 # could be wrong. We must assume maintenance is working and daemons are stopped
1012 dd.status = orchestrator.DaemonDescriptionStatus.stopped
f6b5b4d7
TL
1013 dd.events = self.mgr.events.get_for_daemon(dd.name())
1014 return dd
1015
20effc67 1016 for host, dm in self.daemons.copy().items():
f6b5b4d7 1017 yield host, {name: alter(host, d) for name, d in dm.items()}
e306af50
TL
1018
1019 def get_daemons_by_service(self, service_name):
1020 # type: (str) -> List[orchestrator.DaemonDescription]
f67539c2
TL
1021 assert not service_name.startswith('keepalived.')
1022 assert not service_name.startswith('haproxy.')
1023
20effc67 1024 return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
f6b5b4d7 1025
20effc67 1026 def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
f67539c2
TL
1027 assert service_type not in ['keepalived', 'haproxy']
1028
20effc67 1029 daemons = self.daemons[host].values() if host else self._get_daemons()
e306af50 1030
20effc67
TL
1031 return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)]
1032
1033 def get_daemon_types(self, hostname: str) -> Set[str]:
f67539c2 1034 """Provide a list of the types of daemons on the host"""
20effc67 1035 return cast(Set[str], {d.daemon_type for d in self.daemons[hostname].values()})
f67539c2 1036
e306af50
TL
1037 def get_daemon_names(self):
1038 # type: () -> List[str]
20effc67 1039 return [d.name() for d in self._get_daemons()]
e306af50 1040
adb31ebb 1041 def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
e306af50
TL
1042 if host in self.daemon_config_deps:
1043 if name in self.daemon_config_deps[host]:
1044 return self.daemon_config_deps[host][name].get('deps', []), \
1045 self.daemon_config_deps[host][name].get('last_config', None)
1046 return None, None
1047
b3b6e05e
TL
1048 def get_host_client_files(self, host: str) -> Dict[str, Tuple[str, int, int, int]]:
1049 return self.last_client_files.get(host, {})
1050
e306af50
TL
1051 def host_needs_daemon_refresh(self, host):
1052 # type: (str) -> bool
1053 if host in self.mgr.offline_hosts:
1054 logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
1055 return False
1056 if host in self.daemon_refresh_queue:
1057 self.daemon_refresh_queue.remove(host)
1058 return True
adb31ebb 1059 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
1060 seconds=self.mgr.daemon_cache_timeout)
1061 if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
1062 return True
20effc67
TL
1063 if not self.mgr.cache.host_metadata_up_to_date(host):
1064 return True
e306af50
TL
1065 return False
1066
adb31ebb
TL
1067 def host_needs_facts_refresh(self, host):
1068 # type: (str) -> bool
1069 if host in self.mgr.offline_hosts:
1070 logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh')
1071 return False
f67539c2 1072 cutoff = datetime_now() - datetime.timedelta(
adb31ebb
TL
1073 seconds=self.mgr.facts_cache_timeout)
1074 if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
1075 return True
20effc67
TL
1076 if not self.mgr.cache.host_metadata_up_to_date(host):
1077 return True
adb31ebb
TL
1078 return False
1079
b3b6e05e
TL
1080 def host_needs_autotune_memory(self, host):
1081 # type: (str) -> bool
1082 if host in self.mgr.offline_hosts:
1083 logger.debug(f'Host "{host}" marked as offline. Skipping autotune')
1084 return False
1085 cutoff = datetime_now() - datetime.timedelta(
1086 seconds=self.mgr.autotune_interval)
1087 if host not in self.last_autotune or self.last_autotune[host] < cutoff:
1088 return True
1089 return False
1090
2a845540
TL
1091 def host_needs_tuned_profile_update(self, host: str, profile: str) -> bool:
1092 if host in self.mgr.offline_hosts:
1093 logger.debug(f'Host "{host}" marked as offline. Cannot apply tuned profile')
1094 return False
1095 if profile not in self.mgr.tuned_profiles:
1096 logger.debug(
1097 f'Cannot apply tuned profile {profile} on host {host}. Profile does not exist')
1098 return False
1099 if host not in self.last_tuned_profile_update:
1100 return True
1101 last_profile_update = self.mgr.tuned_profiles.last_updated(profile)
1102 if last_profile_update is None:
1103 self.mgr.tuned_profiles.set_last_updated(profile, datetime_now())
1104 return True
1105 if self.last_tuned_profile_update[host] < last_profile_update:
1106 return True
1107 return False
1108
f91f0fd5
TL
1109 def host_had_daemon_refresh(self, host: str) -> bool:
1110 """
1111 ... at least once.
1112 """
1113 if host in self.last_daemon_update:
1114 return True
1115 if host not in self.daemons:
1116 return False
1117 return bool(self.daemons[host])
1118
e306af50
TL
1119 def host_needs_device_refresh(self, host):
1120 # type: (str) -> bool
1121 if host in self.mgr.offline_hosts:
1122 logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
1123 return False
1124 if host in self.device_refresh_queue:
1125 self.device_refresh_queue.remove(host)
1126 return True
adb31ebb 1127 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
1128 seconds=self.mgr.device_cache_timeout)
1129 if host not in self.last_device_update or self.last_device_update[host] < cutoff:
1130 return True
20effc67
TL
1131 if not self.mgr.cache.host_metadata_up_to_date(host):
1132 return True
1133 return False
1134
1135 def host_needs_network_refresh(self, host):
1136 # type: (str) -> bool
1137 if host in self.mgr.offline_hosts:
1138 logger.debug(f'Host "{host}" marked as offline. Skipping network refresh')
1139 return False
1140 if host in self.network_refresh_queue:
1141 self.network_refresh_queue.remove(host)
1142 return True
1143 cutoff = datetime_now() - datetime.timedelta(
1144 seconds=self.mgr.device_cache_timeout)
1145 if host not in self.last_network_update or self.last_network_update[host] < cutoff:
1146 return True
1147 if not self.mgr.cache.host_metadata_up_to_date(host):
1148 return True
e306af50
TL
1149 return False
1150
adb31ebb 1151 def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
e306af50
TL
1152 if host in self.mgr.offline_hosts:
1153 logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
1154 return False
1155 if host in self.osdspec_previews_refresh_queue:
1156 self.osdspec_previews_refresh_queue.remove(host)
1157 return True
1158 # Since this is dependent on other factors (device and spec) this does not need
1159 # to be updated periodically.
1160 return False
1161
1162 def host_needs_check(self, host):
1163 # type: (str) -> bool
adb31ebb 1164 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
1165 seconds=self.mgr.host_check_interval)
1166 return host not in self.last_host_check or self.last_host_check[host] < cutoff
1167
f67539c2
TL
1168 def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool:
1169 if (
1170 host not in self.devices
1171 or host not in self.last_device_change
1172 or host not in self.last_device_update
1173 or host not in self.osdspec_last_applied
1174 or spec.service_name() not in self.osdspec_last_applied[host]
1175 ):
1176 return True
1177 created = self.mgr.spec_store.get_created(spec)
1178 if not created or created > self.last_device_change[host]:
1179 return True
1180 return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host]
1181
f91f0fd5 1182 def host_needs_registry_login(self, host: str) -> bool:
f6b5b4d7
TL
1183 if host in self.mgr.offline_hosts:
1184 return False
1185 if host in self.registry_login_queue:
1186 self.registry_login_queue.remove(host)
1187 return True
1188 return False
1189
20effc67
TL
1190 def host_metadata_up_to_date(self, host: str) -> bool:
1191 if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]:
1192 return False
1193 return True
1194
1195 def all_host_metadata_up_to_date(self) -> bool:
1196 unreachables = [h.hostname for h in self.get_unreachable_hosts()]
1197 if [h for h in self.get_hosts() if (not self.host_metadata_up_to_date(h) and h not in unreachables)]:
1198 # this function is primarily for telling if it's safe to try and apply a service
1199 # spec. Since offline/maintenance hosts aren't considered in that process anyway
1200 # we don't want to return False if the host without up-to-date metadata is in one
1201 # of those two categories.
1202 return False
1203 return True
1204
e306af50
TL
1205 def add_daemon(self, host, dd):
1206 # type: (str, orchestrator.DaemonDescription) -> None
1207 assert host in self.daemons
1208 self.daemons[host][dd.name()] = dd
1209
adb31ebb 1210 def rm_daemon(self, host: str, name: str) -> None:
f67539c2
TL
1211 assert not name.startswith('ha-rgw.')
1212
e306af50
TL
1213 if host in self.daemons:
1214 if name in self.daemons[host]:
f6b5b4d7
TL
1215 del self.daemons[host][name]
1216
adb31ebb 1217 def daemon_cache_filled(self) -> bool:
f6b5b4d7
TL
1218 """
1219 i.e. we have checked the daemons for each hosts at least once.
1220 excluding offline hosts.
1221
1222 We're not checking for `host_needs_daemon_refresh`, as this might never be
1223 False for all hosts.
1224 """
f91f0fd5 1225 return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
f6b5b4d7
TL
1226 for h in self.get_hosts())
1227
adb31ebb 1228 def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
f67539c2
TL
1229 assert not daemon_name.startswith('ha-rgw.')
1230
f91f0fd5
TL
1231 priorities = {
1232 'start': 1,
1233 'restart': 2,
1234 'reconfig': 3,
1235 'redeploy': 4,
1236 'stop': 5,
39ae355f 1237 'rotate-key': 6,
f91f0fd5
TL
1238 }
1239 existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None)
1240 if existing_action and priorities[existing_action] > priorities[action]:
1241 logger.debug(
1242 f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
1243 return
1244
1245 if host not in self.scheduled_daemon_actions:
1246 self.scheduled_daemon_actions[host] = {}
1247 self.scheduled_daemon_actions[host][daemon_name] = action
1248
20effc67
TL
1249 def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> bool:
1250 found = False
f91f0fd5
TL
1251 if host in self.scheduled_daemon_actions:
1252 if daemon_name in self.scheduled_daemon_actions[host]:
1253 del self.scheduled_daemon_actions[host][daemon_name]
20effc67 1254 found = True
f91f0fd5
TL
1255 if not self.scheduled_daemon_actions[host]:
1256 del self.scheduled_daemon_actions[host]
20effc67 1257 return found
f91f0fd5 1258
adb31ebb 1259 def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
f67539c2
TL
1260 assert not daemon.startswith('ha-rgw.')
1261
f91f0fd5
TL
1262 return self.scheduled_daemon_actions.get(host, {}).get(daemon)
1263
f6b5b4d7 1264
20effc67
TL
1265class AgentCache():
1266 """
1267 AgentCache is used for storing metadata about agent daemons that must be kept
1268 through MGR failovers
1269 """
1270
1271 def __init__(self, mgr):
1272 # type: (CephadmOrchestrator) -> None
1273 self.mgr: CephadmOrchestrator = mgr
1274 self.agent_config_deps = {} # type: Dict[str, Dict[str,Any]]
1275 self.agent_counter = {} # type: Dict[str, int]
1276 self.agent_timestamp = {} # type: Dict[str, datetime.datetime]
1277 self.agent_keys = {} # type: Dict[str, str]
1278 self.agent_ports = {} # type: Dict[str, int]
1279 self.sending_agent_message = {} # type: Dict[str, bool]
1280
1281 def load(self):
1282 # type: () -> None
1283 for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items():
1284 host = k[len(AGENT_CACHE_PREFIX):]
1285 if host not in self.mgr.inventory:
1286 self.mgr.log.warning('removing stray AgentCache record for agent on %s' % (
1287 host))
1288 self.mgr.set_store(k, None)
1289 try:
1290 j = json.loads(v)
1291 self.agent_config_deps[host] = {}
1292 conf_deps = j.get('agent_config_deps', {})
1293 if conf_deps:
1294 conf_deps['last_config'] = str_to_datetime(conf_deps['last_config'])
1295 self.agent_config_deps[host] = conf_deps
1296 self.agent_counter[host] = int(j.get('agent_counter', 1))
1297 self.agent_timestamp[host] = str_to_datetime(
1298 j.get('agent_timestamp', datetime_to_str(datetime_now())))
1299 self.agent_keys[host] = str(j.get('agent_keys', ''))
1300 agent_port = int(j.get('agent_ports', 0))
1301 if agent_port:
1302 self.agent_ports[host] = agent_port
1303
1304 except Exception as e:
1305 self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % (
1306 host, e))
1307 pass
1308
1309 def save_agent(self, host: str) -> None:
1310 j: Dict[str, Any] = {}
1311 if host in self.agent_config_deps:
1312 j['agent_config_deps'] = {
1313 'deps': self.agent_config_deps[host].get('deps', []),
1314 'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']),
1315 }
1316 if host in self.agent_counter:
1317 j['agent_counter'] = self.agent_counter[host]
1318 if host in self.agent_keys:
1319 j['agent_keys'] = self.agent_keys[host]
1320 if host in self.agent_ports:
1321 j['agent_ports'] = self.agent_ports[host]
1322 if host in self.agent_timestamp:
1323 j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host])
1324
1325 self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j))
1326
1327 def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None:
1328 self.agent_config_deps[host] = {
1329 'deps': deps,
1330 'last_config': stamp,
1331 }
1332
1333 def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
1334 if host in self.agent_config_deps:
1335 return self.agent_config_deps[host].get('deps', []), \
1336 self.agent_config_deps[host].get('last_config', None)
1337 return None, None
1338
1339 def messaging_agent(self, host: str) -> bool:
1340 if host not in self.sending_agent_message or not self.sending_agent_message[host]:
1341 return False
1342 return True
1343
1344 def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
1345 # agent successfully received new config. Update config/deps
1346 assert daemon_spec.service_name == 'agent'
1347 self.update_agent_config_deps(
1348 daemon_spec.host, daemon_spec.deps, datetime_now())
1349 self.agent_timestamp[daemon_spec.host] = datetime_now()
1350 self.agent_counter[daemon_spec.host] = 1
1351 self.save_agent(daemon_spec.host)
1352
1353
f6b5b4d7
TL
1354class EventStore():
1355 def __init__(self, mgr):
1356 # type: (CephadmOrchestrator) -> None
1357 self.mgr: CephadmOrchestrator = mgr
f91f0fd5 1358 self.events = {} # type: Dict[str, List[OrchestratorEvent]]
f6b5b4d7
TL
1359
1360 def add(self, event: OrchestratorEvent) -> None:
1361
1362 if event.kind_subject() not in self.events:
1363 self.events[event.kind_subject()] = [event]
1364
1365 for e in self.events[event.kind_subject()]:
1366 if e.message == event.message:
1367 return
1368
1369 self.events[event.kind_subject()].append(event)
1370
1371 # limit to five events for now.
1372 self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
1373
adb31ebb
TL
1374 def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
1375 e = OrchestratorEvent(datetime_now(), 'service',
f91f0fd5 1376 spec.service_name(), level, message)
f6b5b4d7
TL
1377 self.add(e)
1378
adb31ebb 1379 def from_orch_error(self, e: OrchestratorError) -> None:
f6b5b4d7
TL
1380 if e.event_subject is not None:
1381 self.add(OrchestratorEvent(
adb31ebb 1382 datetime_now(),
f6b5b4d7
TL
1383 e.event_subject[0],
1384 e.event_subject[1],
1385 "ERROR",
1386 str(e)
1387 ))
1388
adb31ebb
TL
1389 def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
1390 e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
f6b5b4d7
TL
1391 self.add(e)
1392
adb31ebb 1393 def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
f6b5b4d7
TL
1394 self.for_daemon(
1395 daemon_name,
1396 "ERROR",
1397 str(e)
1398 )
1399
1400 def cleanup(self) -> None:
1401 # Needs to be properly done, in case events are persistently stored.
1402
1403 unknowns: List[str] = []
1404 daemons = self.mgr.cache.get_daemon_names()
f67539c2 1405 specs = self.mgr.spec_store.all_specs.keys()
f6b5b4d7
TL
1406 for k_s, v in self.events.items():
1407 kind, subject = k_s.split(':')
1408 if kind == 'service':
1409 if subject not in specs:
1410 unknowns.append(k_s)
1411 elif kind == 'daemon':
1412 if subject not in daemons:
1413 unknowns.append(k_s)
1414
1415 for k_s in unknowns:
1416 del self.events[k_s]
1417
adb31ebb 1418 def get_for_service(self, name: str) -> List[OrchestratorEvent]:
f6b5b4d7
TL
1419 return self.events.get('service:' + name, [])
1420
adb31ebb 1421 def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
f6b5b4d7 1422 return self.events.get('daemon:' + name, [])