]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/inventory.py
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
1 import datetime
2 import enum
3 from copy import copy
4 import ipaddress
5 import itertools
6 import json
7 import logging
8 import math
9 import socket
10 from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
11 NamedTuple, Type
12
13 import orchestrator
14 from ceph.deployment import inventory
15 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec, IngressSpec
16 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
17 from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
18 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
19
20 from .utils import resolve_ip
21 from .migrations import queue_migrate_nfs_spec, queue_migrate_rgw_spec
22
23 if TYPE_CHECKING:
24 from .module import CephadmOrchestrator
25
26
27 logger = logging.getLogger(__name__)
28
29 HOST_CACHE_PREFIX = "host."
30 SPEC_STORE_PREFIX = "spec."
31 AGENT_CACHE_PREFIX = 'agent.'
32
33
34 class HostCacheStatus(enum.Enum):
35 stray = 'stray'
36 host = 'host'
37 devices = 'devices'
38
39
40 class Inventory:
41 """
42 The inventory stores a HostSpec for all hosts persistently.
43 """
44
45 def __init__(self, mgr: 'CephadmOrchestrator'):
46 self.mgr = mgr
47 adjusted_addrs = False
48
49 def is_valid_ip(ip: str) -> bool:
50 try:
51 ipaddress.ip_address(ip)
52 return True
53 except ValueError:
54 return False
55
56 # load inventory
57 i = self.mgr.get_store('inventory')
58 if i:
59 self._inventory: Dict[str, dict] = json.loads(i)
60 # handle old clusters missing 'hostname' key from hostspec
61 for k, v in self._inventory.items():
62 if 'hostname' not in v:
63 v['hostname'] = k
64
65 # convert legacy non-IP addr?
66 if is_valid_ip(str(v.get('addr'))):
67 continue
68 if len(self._inventory) > 1:
69 if k == socket.gethostname():
70 # Never try to resolve our own host! This is
71 # fraught and can lead to either a loopback
72 # address (due to podman's futzing with
73 # /etc/hosts) or a private IP based on the CNI
74 # configuration. Instead, wait until the mgr
75 # fails over to another host and let them resolve
76 # this host.
77 continue
78 ip = resolve_ip(cast(str, v.get('addr')))
79 else:
80 # we only have 1 node in the cluster, so we can't
81 # rely on another host doing the lookup. use the
82 # IP the mgr binds to.
83 ip = self.mgr.get_mgr_ip()
84 if is_valid_ip(ip) and not ip.startswith('127.0.'):
85 self.mgr.log.info(
86 f"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'"
87 )
88 v['addr'] = ip
89 adjusted_addrs = True
90 if adjusted_addrs:
91 self.save()
92 else:
93 self._inventory = dict()
94 self._all_known_names: Dict[str, List[str]] = {}
95 logger.debug('Loaded inventory %s' % self._inventory)
96
97 def keys(self) -> List[str]:
98 return list(self._inventory.keys())
99
100 def __contains__(self, host: str) -> bool:
101 return host in self._inventory or host in itertools.chain.from_iterable(self._all_known_names.values())
102
103 def _get_stored_name(self, host: str) -> str:
104 self.assert_host(host)
105 if host in self._inventory:
106 return host
107 for stored_name, all_names in self._all_known_names.items():
108 if host in all_names:
109 return stored_name
110 return host
111
112 def update_known_hostnames(self, hostname: str, shortname: str, fqdn: str) -> None:
113 for hname in [hostname, shortname, fqdn]:
114 # if we know the host by any of the names, store the full set of names
115 # in order to be able to check against those names for matching a host
116 if hname in self._inventory:
117 self._all_known_names[hname] = [hostname, shortname, fqdn]
118 return
119 logger.debug(f'got hostname set from gather-facts for unknown host: {[hostname, shortname, fqdn]}')
120
121 def assert_host(self, host: str) -> None:
122 if host not in self:
123 raise OrchestratorError('host %s does not exist' % host)
124
125 def add_host(self, spec: HostSpec) -> None:
126 if spec.hostname in self:
127 # addr
128 if self.get_addr(spec.hostname) != spec.addr:
129 self.set_addr(spec.hostname, spec.addr)
130 # labels
131 for label in spec.labels:
132 self.add_label(spec.hostname, label)
133 else:
134 self._inventory[spec.hostname] = spec.to_json()
135 self.save()
136
137 def rm_host(self, host: str) -> None:
138 host = self._get_stored_name(host)
139 del self._inventory[host]
140 self._all_known_names.pop(host, [])
141 self.save()
142
143 def set_addr(self, host: str, addr: str) -> None:
144 host = self._get_stored_name(host)
145 self._inventory[host]['addr'] = addr
146 self.save()
147
148 def add_label(self, host: str, label: str) -> None:
149 host = self._get_stored_name(host)
150
151 if 'labels' not in self._inventory[host]:
152 self._inventory[host]['labels'] = list()
153 if label not in self._inventory[host]['labels']:
154 self._inventory[host]['labels'].append(label)
155 self.save()
156
157 def rm_label(self, host: str, label: str) -> None:
158 host = self._get_stored_name(host)
159
160 if 'labels' not in self._inventory[host]:
161 self._inventory[host]['labels'] = list()
162 if label in self._inventory[host]['labels']:
163 self._inventory[host]['labels'].remove(label)
164 self.save()
165
166 def has_label(self, host: str, label: str) -> bool:
167 host = self._get_stored_name(host)
168 return (
169 host in self._inventory
170 and label in self._inventory[host].get('labels', [])
171 )
172
173 def get_addr(self, host: str) -> str:
174 host = self._get_stored_name(host)
175 return self._inventory[host].get('addr', host)
176
177 def spec_from_dict(self, info: dict) -> HostSpec:
178 hostname = info['hostname']
179 hostname = self._get_stored_name(hostname)
180 return HostSpec(
181 hostname,
182 addr=info.get('addr', hostname),
183 labels=info.get('labels', []),
184 status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''),
185 )
186
187 def all_specs(self) -> List[HostSpec]:
188 return list(map(self.spec_from_dict, self._inventory.values()))
189
190 def get_host_with_state(self, state: str = "") -> List[str]:
191 """return a list of host names in a specific state"""
192 return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state]
193
194 def save(self) -> None:
195 self.mgr.set_store('inventory', json.dumps(self._inventory))
196
197
198 class SpecDescription(NamedTuple):
199 spec: ServiceSpec
200 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]]
201 created: datetime.datetime
202 deleted: Optional[datetime.datetime]
203
204
205 class SpecStore():
206 def __init__(self, mgr):
207 # type: (CephadmOrchestrator) -> None
208 self.mgr = mgr
209 self._specs = {} # type: Dict[str, ServiceSpec]
210 # service_name -> rank -> gen -> daemon_id
211 self._rank_maps = {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]]
212 self.spec_created = {} # type: Dict[str, datetime.datetime]
213 self.spec_deleted = {} # type: Dict[str, datetime.datetime]
214 self.spec_preview = {} # type: Dict[str, ServiceSpec]
215 self._needs_configuration: Dict[str, bool] = {}
216
217 @property
218 def all_specs(self) -> Mapping[str, ServiceSpec]:
219 """
220 returns active and deleted specs. Returns read-only dict.
221 """
222 return self._specs
223
224 def __contains__(self, name: str) -> bool:
225 return name in self._specs
226
227 def __getitem__(self, name: str) -> SpecDescription:
228 if name not in self._specs:
229 raise OrchestratorError(f'Service {name} not found.')
230 return SpecDescription(self._specs[name],
231 self._rank_maps.get(name),
232 self.spec_created[name],
233 self.spec_deleted.get(name, None))
234
235 @property
236 def active_specs(self) -> Mapping[str, ServiceSpec]:
237 return {k: v for k, v in self._specs.items() if k not in self.spec_deleted}
238
239 def load(self):
240 # type: () -> None
241 for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items():
242 service_name = k[len(SPEC_STORE_PREFIX):]
243 try:
244 j = cast(Dict[str, dict], json.loads(v))
245 if (
246 (self.mgr.migration_current or 0) < 3
247 and j['spec'].get('service_type') == 'nfs'
248 ):
249 self.mgr.log.debug(f'found legacy nfs spec {j}')
250 queue_migrate_nfs_spec(self.mgr, j)
251
252 if (
253 (self.mgr.migration_current or 0) < 6
254 and j['spec'].get('service_type') == 'rgw'
255 ):
256 queue_migrate_rgw_spec(self.mgr, j)
257
258 spec = ServiceSpec.from_json(j['spec'])
259 created = str_to_datetime(cast(str, j['created']))
260 self._specs[service_name] = spec
261 self.spec_created[service_name] = created
262
263 if 'deleted' in j:
264 deleted = str_to_datetime(cast(str, j['deleted']))
265 self.spec_deleted[service_name] = deleted
266
267 if 'needs_configuration' in j:
268 self._needs_configuration[service_name] = cast(bool, j['needs_configuration'])
269
270 if 'rank_map' in j and isinstance(j['rank_map'], dict):
271 self._rank_maps[service_name] = {}
272 for rank_str, m in j['rank_map'].items():
273 try:
274 rank = int(rank_str)
275 except ValueError:
276 logger.exception(f"failed to parse rank in {j['rank_map']}")
277 continue
278 if isinstance(m, dict):
279 self._rank_maps[service_name][rank] = {}
280 for gen_str, name in m.items():
281 try:
282 gen = int(gen_str)
283 except ValueError:
284 logger.exception(f"failed to parse gen in {j['rank_map']}")
285 continue
286 if isinstance(name, str) or m is None:
287 self._rank_maps[service_name][rank][gen] = name
288
289 self.mgr.log.debug('SpecStore: loaded spec for %s' % (
290 service_name))
291 except Exception as e:
292 self.mgr.log.warning('unable to load spec for %s: %s' % (
293 service_name, e))
294 pass
295
296 def save(
297 self,
298 spec: ServiceSpec,
299 update_create: bool = True,
300 ) -> None:
301 name = spec.service_name()
302 if spec.preview_only:
303 self.spec_preview[name] = spec
304 return None
305 self._specs[name] = spec
306 self._needs_configuration[name] = True
307
308 if update_create:
309 self.spec_created[name] = datetime_now()
310 self._save(name)
311
312 def save_rank_map(self,
313 name: str,
314 rank_map: Dict[int, Dict[int, Optional[str]]]) -> None:
315 self._rank_maps[name] = rank_map
316 self._save(name)
317
318 def _save(self, name: str) -> None:
319 data: Dict[str, Any] = {
320 'spec': self._specs[name].to_json(),
321 }
322 if name in self.spec_created:
323 data['created'] = datetime_to_str(self.spec_created[name])
324 if name in self._rank_maps:
325 data['rank_map'] = self._rank_maps[name]
326 if name in self.spec_deleted:
327 data['deleted'] = datetime_to_str(self.spec_deleted[name])
328 if name in self._needs_configuration:
329 data['needs_configuration'] = self._needs_configuration[name]
330
331 self.mgr.set_store(
332 SPEC_STORE_PREFIX + name,
333 json.dumps(data, sort_keys=True),
334 )
335 self.mgr.events.for_service(self._specs[name],
336 OrchestratorEvent.INFO,
337 'service was created')
338
339 def rm(self, service_name: str) -> bool:
340 if service_name not in self._specs:
341 return False
342
343 if self._specs[service_name].preview_only:
344 self.finally_rm(service_name)
345 return True
346
347 self.spec_deleted[service_name] = datetime_now()
348 self.save(self._specs[service_name], update_create=False)
349 return True
350
351 def finally_rm(self, service_name):
352 # type: (str) -> bool
353 found = service_name in self._specs
354 if found:
355 del self._specs[service_name]
356 if service_name in self._rank_maps:
357 del self._rank_maps[service_name]
358 del self.spec_created[service_name]
359 if service_name in self.spec_deleted:
360 del self.spec_deleted[service_name]
361 if service_name in self._needs_configuration:
362 del self._needs_configuration[service_name]
363 self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
364 return found
365
366 def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
367 return self.spec_created.get(spec.service_name())
368
369 def set_unmanaged(self, service_name: str, value: bool) -> str:
370 if service_name not in self._specs:
371 return f'No service of name {service_name} found. Check "ceph orch ls" for all known services'
372 if self._specs[service_name].unmanaged == value:
373 return f'Service {service_name}{" already " if value else " not "}marked unmanaged. No action taken.'
374 self._specs[service_name].unmanaged = value
375 self.save(self._specs[service_name])
376 return f'Set unmanaged to {str(value)} for service {service_name}'
377
378 def needs_configuration(self, name: str) -> bool:
379 return self._needs_configuration.get(name, False)
380
381 def mark_needs_configuration(self, name: str) -> None:
382 if name in self._specs:
383 self._needs_configuration[name] = True
384 self._save(name)
385 else:
386 self.mgr.log.warning(f'Attempted to mark unknown service "{name}" as needing configuration')
387
388 def mark_configured(self, name: str) -> None:
389 if name in self._specs:
390 self._needs_configuration[name] = False
391 self._save(name)
392 else:
393 self.mgr.log.warning(f'Attempted to mark unknown service "{name}" as having been configured')
394
395
396 class ClientKeyringSpec(object):
397 """
398 A client keyring file that we should maintain
399 """
400
401 def __init__(
402 self,
403 entity: str,
404 placement: PlacementSpec,
405 mode: Optional[int] = None,
406 uid: Optional[int] = None,
407 gid: Optional[int] = None,
408 ) -> None:
409 self.entity = entity
410 self.placement = placement
411 self.mode = mode or 0o600
412 self.uid = uid or 0
413 self.gid = gid or 0
414
415 def validate(self) -> None:
416 pass
417
418 def to_json(self) -> Dict[str, Any]:
419 return {
420 'entity': self.entity,
421 'placement': self.placement.to_json(),
422 'mode': self.mode,
423 'uid': self.uid,
424 'gid': self.gid,
425 }
426
427 @property
428 def path(self) -> str:
429 return f'/etc/ceph/ceph.{self.entity}.keyring'
430
431 @classmethod
432 def from_json(cls: Type, data: dict) -> 'ClientKeyringSpec':
433 c = data.copy()
434 if 'placement' in c:
435 c['placement'] = PlacementSpec.from_json(c['placement'])
436 _cls = cls(**c)
437 _cls.validate()
438 return _cls
439
440
441 class ClientKeyringStore():
442 """
443 Track client keyring files that we are supposed to maintain
444 """
445
446 def __init__(self, mgr):
447 # type: (CephadmOrchestrator) -> None
448 self.mgr: CephadmOrchestrator = mgr
449 self.mgr = mgr
450 self.keys: Dict[str, ClientKeyringSpec] = {}
451
452 def load(self) -> None:
453 c = self.mgr.get_store('client_keyrings') or b'{}'
454 j = json.loads(c)
455 for e, d in j.items():
456 self.keys[e] = ClientKeyringSpec.from_json(d)
457
458 def save(self) -> None:
459 data = {
460 k: v.to_json() for k, v in self.keys.items()
461 }
462 self.mgr.set_store('client_keyrings', json.dumps(data))
463
464 def update(self, ks: ClientKeyringSpec) -> None:
465 self.keys[ks.entity] = ks
466 self.save()
467
468 def rm(self, entity: str) -> None:
469 if entity in self.keys:
470 del self.keys[entity]
471 self.save()
472
473
474 class TunedProfileStore():
475 """
476 Store for out tuned profile information
477 """
478
479 def __init__(self, mgr: "CephadmOrchestrator") -> None:
480 self.mgr: CephadmOrchestrator = mgr
481 self.mgr = mgr
482 self.profiles: Dict[str, TunedProfileSpec] = {}
483
484 def __contains__(self, profile: str) -> bool:
485 return profile in self.profiles
486
487 def load(self) -> None:
488 c = self.mgr.get_store('tuned_profiles') or b'{}'
489 j = json.loads(c)
490 for k, v in j.items():
491 self.profiles[k] = TunedProfileSpec.from_json(v)
492 self.profiles[k]._last_updated = datetime_to_str(datetime_now())
493
494 def exists(self, profile_name: str) -> bool:
495 return profile_name in self.profiles
496
497 def save(self) -> None:
498 profiles_json = {k: v.to_json() for k, v in self.profiles.items()}
499 self.mgr.set_store('tuned_profiles', json.dumps(profiles_json))
500
501 def add_setting(self, profile: str, setting: str, value: str) -> None:
502 if profile in self.profiles:
503 self.profiles[profile].settings[setting] = value
504 self.profiles[profile]._last_updated = datetime_to_str(datetime_now())
505 self.save()
506 else:
507 logger.error(
508 f'Attempted to set setting "{setting}" for nonexistent os tuning profile "{profile}"')
509
510 def rm_setting(self, profile: str, setting: str) -> None:
511 if profile in self.profiles:
512 if setting in self.profiles[profile].settings:
513 self.profiles[profile].settings.pop(setting, '')
514 self.profiles[profile]._last_updated = datetime_to_str(datetime_now())
515 self.save()
516 else:
517 logger.error(
518 f'Attemped to remove nonexistent setting "{setting}" from os tuning profile "{profile}"')
519 else:
520 logger.error(
521 f'Attempted to remove setting "{setting}" from nonexistent os tuning profile "{profile}"')
522
523 def add_profile(self, spec: TunedProfileSpec) -> None:
524 spec._last_updated = datetime_to_str(datetime_now())
525 self.profiles[spec.profile_name] = spec
526 self.save()
527
528 def rm_profile(self, profile: str) -> None:
529 if profile in self.profiles:
530 self.profiles.pop(profile, TunedProfileSpec(''))
531 else:
532 logger.error(f'Attempted to remove nonexistent os tuning profile "{profile}"')
533 self.save()
534
535 def last_updated(self, profile: str) -> Optional[datetime.datetime]:
536 if profile not in self.profiles or not self.profiles[profile]._last_updated:
537 return None
538 return str_to_datetime(self.profiles[profile]._last_updated)
539
540 def set_last_updated(self, profile: str, new_datetime: datetime.datetime) -> None:
541 if profile in self.profiles:
542 self.profiles[profile]._last_updated = datetime_to_str(new_datetime)
543
544 def list_profiles(self) -> List[TunedProfileSpec]:
545 return [p for p in self.profiles.values()]
546
547
548 class HostCache():
549 """
550 HostCache stores different things:
551
552 1. `daemons`: Deployed daemons O(daemons)
553
554 They're part of the configuration nowadays and need to be
555 persistent. The name "daemon cache" is unfortunately a bit misleading.
556 Like for example we really need to know where daemons are deployed on
557 hosts that are offline.
558
559 2. `devices`: ceph-volume inventory cache O(hosts)
560
561 As soon as this is populated, it becomes more or less read-only.
562
563 3. `networks`: network interfaces for each host. O(hosts)
564
565 This is needed in order to deploy MONs. As this is mostly read-only.
566
567 4. `last_client_files` O(hosts)
568
569 Stores the last digest and owner/mode for files we've pushed to /etc/ceph
570 (ceph.conf or client keyrings).
571
572 5. `scheduled_daemon_actions`: O(daemons)
573
574 Used to run daemon actions after deploying a daemon. We need to
575 store it persistently, in order to stay consistent across
576 MGR failovers.
577 """
578
579 def __init__(self, mgr):
580 # type: (CephadmOrchestrator) -> None
581 self.mgr: CephadmOrchestrator = mgr
582 self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
583 self._tmp_daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
584 self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
585 self.devices = {} # type: Dict[str, List[inventory.Device]]
586 self.facts = {} # type: Dict[str, Dict[str, Any]]
587 self.last_facts_update = {} # type: Dict[str, datetime.datetime]
588 self.last_autotune = {} # type: Dict[str, datetime.datetime]
589 self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
590 self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]]
591 self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
592 self.last_network_update = {} # type: Dict[str, datetime.datetime]
593 self.last_device_update = {} # type: Dict[str, datetime.datetime]
594 self.last_device_change = {} # type: Dict[str, datetime.datetime]
595 self.last_tuned_profile_update = {} # type: Dict[str, datetime.datetime]
596 self.daemon_refresh_queue = [] # type: List[str]
597 self.device_refresh_queue = [] # type: List[str]
598 self.network_refresh_queue = [] # type: List[str]
599 self.osdspec_previews_refresh_queue = [] # type: List[str]
600
601 # host -> daemon name -> dict
602 self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
603 self.last_host_check = {} # type: Dict[str, datetime.datetime]
604 self.loading_osdspec_preview = set() # type: Set[str]
605 self.last_client_files: Dict[str, Dict[str, Tuple[str, int, int, int]]] = {}
606 self.registry_login_queue: Set[str] = set()
607
608 self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
609
610 self.metadata_up_to_date = {} # type: Dict[str, bool]
611
612 def load(self):
613 # type: () -> None
614 for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
615 host = k[len(HOST_CACHE_PREFIX):]
616 if self._get_host_cache_entry_status(host) != HostCacheStatus.host:
617 if self._get_host_cache_entry_status(host) == HostCacheStatus.devices:
618 continue
619 self.mgr.log.warning('removing stray HostCache host record %s' % (
620 host))
621 self.mgr.set_store(k, None)
622 try:
623 j = json.loads(v)
624 if 'last_device_update' in j:
625 self.last_device_update[host] = str_to_datetime(j['last_device_update'])
626 else:
627 self.device_refresh_queue.append(host)
628 if 'last_device_change' in j:
629 self.last_device_change[host] = str_to_datetime(j['last_device_change'])
630 # for services, we ignore the persisted last_*_update
631 # and always trigger a new scrape on mgr restart.
632 self.daemon_refresh_queue.append(host)
633 self.network_refresh_queue.append(host)
634 self.daemons[host] = {}
635 self.osdspec_previews[host] = []
636 self.osdspec_last_applied[host] = {}
637 self.networks[host] = {}
638 self.daemon_config_deps[host] = {}
639 for name, d in j.get('daemons', {}).items():
640 self.daemons[host][name] = \
641 orchestrator.DaemonDescription.from_json(d)
642 self.devices[host] = []
643 # still want to check old device location for upgrade scenarios
644 for d in j.get('devices', []):
645 self.devices[host].append(inventory.Device.from_json(d))
646 self.devices[host] += self.load_host_devices(host)
647 self.networks[host] = j.get('networks_and_interfaces', {})
648 self.osdspec_previews[host] = j.get('osdspec_previews', {})
649 self.last_client_files[host] = j.get('last_client_files', {})
650 for name, ts in j.get('osdspec_last_applied', {}).items():
651 self.osdspec_last_applied[host][name] = str_to_datetime(ts)
652
653 for name, d in j.get('daemon_config_deps', {}).items():
654 self.daemon_config_deps[host][name] = {
655 'deps': d.get('deps', []),
656 'last_config': str_to_datetime(d['last_config']),
657 }
658 if 'last_host_check' in j:
659 self.last_host_check[host] = str_to_datetime(j['last_host_check'])
660 if 'last_tuned_profile_update' in j:
661 self.last_tuned_profile_update[host] = str_to_datetime(
662 j['last_tuned_profile_update'])
663 self.registry_login_queue.add(host)
664 self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
665 self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False)
666
667 self.mgr.log.debug(
668 'HostCache.load: host %s has %d daemons, '
669 '%d devices, %d networks' % (
670 host, len(self.daemons[host]), len(self.devices[host]),
671 len(self.networks[host])))
672 except Exception as e:
673 self.mgr.log.warning('unable to load cached state for %s: %s' % (
674 host, e))
675 pass
676
677 def _get_host_cache_entry_status(self, host: str) -> HostCacheStatus:
678 # return whether a host cache entry in the config-key
679 # store is for a host, a set of devices or is stray.
680 # for a host, the entry name will match a hostname in our
681 # inventory. For devices, it will be formatted
682 # <hostname>.devices.<integer> where <hostname> is
683 # in out inventory. If neither case applies, it is stray
684 if host in self.mgr.inventory:
685 return HostCacheStatus.host
686 try:
687 # try stripping off the ".devices.<integer>" and see if we get
688 # a host name that matches our inventory
689 actual_host = '.'.join(host.split('.')[:-2])
690 return HostCacheStatus.devices if actual_host in self.mgr.inventory else HostCacheStatus.stray
691 except Exception:
692 return HostCacheStatus.stray
693
694 def update_host_daemons(self, host, dm):
695 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
696 self.daemons[host] = dm
697 self._tmp_daemons.pop(host, {})
698 self.last_daemon_update[host] = datetime_now()
699
700 def append_tmp_daemon(self, host: str, dd: orchestrator.DaemonDescription) -> None:
701 # for storing empty daemon descriptions representing daemons we have
702 # just deployed but not yet had the chance to pick up in a daemon refresh
703 # _tmp_daemons is cleared for a host upon receiving a real update of the
704 # host's dameons
705 if host not in self._tmp_daemons:
706 self._tmp_daemons[host] = {}
707 self._tmp_daemons[host][dd.name()] = dd
708
709 def update_host_facts(self, host, facts):
710 # type: (str, Dict[str, Dict[str, Any]]) -> None
711 self.facts[host] = facts
712 hostnames: List[str] = []
713 for k in ['hostname', 'shortname', 'fqdn']:
714 v = facts.get(k, '')
715 hostnames.append(v if isinstance(v, str) else '')
716 self.mgr.inventory.update_known_hostnames(hostnames[0], hostnames[1], hostnames[2])
717 self.last_facts_update[host] = datetime_now()
718
719 def update_autotune(self, host: str) -> None:
720 self.last_autotune[host] = datetime_now()
721
722 def invalidate_autotune(self, host: str) -> None:
723 if host in self.last_autotune:
724 del self.last_autotune[host]
725
726 def devices_changed(self, host: str, b: List[inventory.Device]) -> bool:
727 old_devs = inventory.Devices(self.devices[host])
728 new_devs = inventory.Devices(b)
729 # relying on Devices class __eq__ function here
730 if old_devs != new_devs:
731 self.mgr.log.info("Detected new or changed devices on %s" % host)
732 return True
733 return False
734
735 def update_host_devices(
736 self,
737 host: str,
738 dls: List[inventory.Device],
739 ) -> None:
740 if (
741 host not in self.devices
742 or host not in self.last_device_change
743 or self.devices_changed(host, dls)
744 ):
745 self.last_device_change[host] = datetime_now()
746 self.last_device_update[host] = datetime_now()
747 self.devices[host] = dls
748
749 def update_host_networks(
750 self,
751 host: str,
752 nets: Dict[str, Dict[str, List[str]]]
753 ) -> None:
754 self.networks[host] = nets
755 self.last_network_update[host] = datetime_now()
756
757 def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
758 self.daemon_config_deps[host][name] = {
759 'deps': deps,
760 'last_config': stamp,
761 }
762
763 def update_last_host_check(self, host):
764 # type: (str) -> None
765 self.last_host_check[host] = datetime_now()
766
767 def update_osdspec_last_applied(self, host, service_name, ts):
768 # type: (str, str, datetime.datetime) -> None
769 self.osdspec_last_applied[host][service_name] = ts
770
771 def update_client_file(self,
772 host: str,
773 path: str,
774 digest: str,
775 mode: int,
776 uid: int,
777 gid: int) -> None:
778 if host not in self.last_client_files:
779 self.last_client_files[host] = {}
780 self.last_client_files[host][path] = (digest, mode, uid, gid)
781
782 def removed_client_file(self, host: str, path: str) -> None:
783 if (
784 host in self.last_client_files
785 and path in self.last_client_files[host]
786 ):
787 del self.last_client_files[host][path]
788
789 def prime_empty_host(self, host):
790 # type: (str) -> None
791 """
792 Install an empty entry for a host
793 """
794 self.daemons[host] = {}
795 self.devices[host] = []
796 self.networks[host] = {}
797 self.osdspec_previews[host] = []
798 self.osdspec_last_applied[host] = {}
799 self.daemon_config_deps[host] = {}
800 self.daemon_refresh_queue.append(host)
801 self.device_refresh_queue.append(host)
802 self.network_refresh_queue.append(host)
803 self.osdspec_previews_refresh_queue.append(host)
804 self.registry_login_queue.add(host)
805 self.last_client_files[host] = {}
806
807 def refresh_all_host_info(self, host):
808 # type: (str) -> None
809
810 self.last_host_check.pop(host, None)
811 self.daemon_refresh_queue.append(host)
812 self.registry_login_queue.add(host)
813 self.device_refresh_queue.append(host)
814 self.last_facts_update.pop(host, None)
815 self.osdspec_previews_refresh_queue.append(host)
816 self.last_autotune.pop(host, None)
817
818 def invalidate_host_daemons(self, host):
819 # type: (str) -> None
820 self.daemon_refresh_queue.append(host)
821 if host in self.last_daemon_update:
822 del self.last_daemon_update[host]
823 self.mgr.event.set()
824
825 def invalidate_host_devices(self, host):
826 # type: (str) -> None
827 self.device_refresh_queue.append(host)
828 if host in self.last_device_update:
829 del self.last_device_update[host]
830 self.mgr.event.set()
831
832 def invalidate_host_networks(self, host):
833 # type: (str) -> None
834 self.network_refresh_queue.append(host)
835 if host in self.last_network_update:
836 del self.last_network_update[host]
837 self.mgr.event.set()
838
839 def distribute_new_registry_login_info(self) -> None:
840 self.registry_login_queue = set(self.mgr.inventory.keys())
841
842 def save_host(self, host: str) -> None:
843 j: Dict[str, Any] = {
844 'daemons': {},
845 'devices': [],
846 'osdspec_previews': [],
847 'osdspec_last_applied': {},
848 'daemon_config_deps': {},
849 }
850 if host in self.last_daemon_update:
851 j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
852 if host in self.last_device_update:
853 j['last_device_update'] = datetime_to_str(self.last_device_update[host])
854 if host in self.last_network_update:
855 j['last_network_update'] = datetime_to_str(self.last_network_update[host])
856 if host in self.last_device_change:
857 j['last_device_change'] = datetime_to_str(self.last_device_change[host])
858 if host in self.last_tuned_profile_update:
859 j['last_tuned_profile_update'] = datetime_to_str(self.last_tuned_profile_update[host])
860 if host in self.daemons:
861 for name, dd in self.daemons[host].items():
862 j['daemons'][name] = dd.to_json()
863 if host in self.networks:
864 j['networks_and_interfaces'] = self.networks[host]
865 if host in self.daemon_config_deps:
866 for name, depi in self.daemon_config_deps[host].items():
867 j['daemon_config_deps'][name] = {
868 'deps': depi.get('deps', []),
869 'last_config': datetime_to_str(depi['last_config']),
870 }
871 if host in self.osdspec_previews and self.osdspec_previews[host]:
872 j['osdspec_previews'] = self.osdspec_previews[host]
873 if host in self.osdspec_last_applied:
874 for name, ts in self.osdspec_last_applied[host].items():
875 j['osdspec_last_applied'][name] = datetime_to_str(ts)
876
877 if host in self.last_host_check:
878 j['last_host_check'] = datetime_to_str(self.last_host_check[host])
879
880 if host in self.last_client_files:
881 j['last_client_files'] = self.last_client_files[host]
882 if host in self.scheduled_daemon_actions:
883 j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
884 if host in self.metadata_up_to_date:
885 j['metadata_up_to_date'] = self.metadata_up_to_date[host]
886 if host in self.devices:
887 self.save_host_devices(host)
888
889 self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
890
891 def save_host_devices(self, host: str) -> None:
892 if host not in self.devices or not self.devices[host]:
893 logger.debug(f'Host {host} has no devices to save')
894 return
895
896 devs: List[Dict[str, Any]] = []
897 for d in self.devices[host]:
898 devs.append(d.to_json())
899
900 def byte_len(s: str) -> int:
901 return len(s.encode('utf-8'))
902
903 dev_cache_counter: int = 0
904 cache_size: int = self.mgr.get_foreign_ceph_option('mon', 'mon_config_key_max_entry_size')
905 if cache_size is not None and cache_size != 0 and byte_len(json.dumps(devs)) > cache_size - 1024:
906 # no guarantee all device entries take up the same amount of space
907 # splitting it up so there's one more entry than we need should be fairly
908 # safe and save a lot of extra logic checking sizes
909 cache_entries_needed = math.ceil(byte_len(json.dumps(devs)) / cache_size) + 1
910 dev_sublist_size = math.ceil(len(devs) / cache_entries_needed)
911 dev_lists: List[List[Dict[str, Any]]] = [devs[i:i + dev_sublist_size]
912 for i in range(0, len(devs), dev_sublist_size)]
913 for dev_list in dev_lists:
914 dev_dict: Dict[str, Any] = {'devices': dev_list}
915 if dev_cache_counter == 0:
916 dev_dict.update({'entries': len(dev_lists)})
917 self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
918 + str(dev_cache_counter), json.dumps(dev_dict))
919 dev_cache_counter += 1
920 else:
921 self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
922 + str(dev_cache_counter), json.dumps({'devices': devs, 'entries': 1}))
923
924 def load_host_devices(self, host: str) -> List[inventory.Device]:
925 dev_cache_counter: int = 0
926 devs: List[Dict[str, Any]] = []
927 dev_entries: int = 0
928 try:
929 # number of entries for the host's devices should be in
930 # the "entries" field of the first entry
931 dev_entries = json.loads(self.mgr.get_store(
932 HOST_CACHE_PREFIX + host + '.devices.0')).get('entries')
933 except Exception:
934 logger.debug(f'No device entries found for host {host}')
935 for i in range(dev_entries):
936 try:
937 new_devs = json.loads(self.mgr.get_store(
938 HOST_CACHE_PREFIX + host + '.devices.' + str(i))).get('devices', [])
939 if len(new_devs) > 0:
940 # verify list contains actual device objects by trying to load one from json
941 inventory.Device.from_json(new_devs[0])
942 # if we didn't throw an Exception on above line, we can add the devices
943 devs = devs + new_devs
944 dev_cache_counter += 1
945 except Exception as e:
946 logger.error(('Hit exception trying to load devices from '
947 + f'{HOST_CACHE_PREFIX + host + ".devices." + str(dev_cache_counter)} in key store: {e}'))
948 return []
949 return [inventory.Device.from_json(d) for d in devs]
950
951 def rm_host(self, host):
952 # type: (str) -> None
953 if host in self.daemons:
954 del self.daemons[host]
955 if host in self.devices:
956 del self.devices[host]
957 if host in self.facts:
958 del self.facts[host]
959 if host in self.last_facts_update:
960 del self.last_facts_update[host]
961 if host in self.last_autotune:
962 del self.last_autotune[host]
963 if host in self.osdspec_previews:
964 del self.osdspec_previews[host]
965 if host in self.osdspec_last_applied:
966 del self.osdspec_last_applied[host]
967 if host in self.loading_osdspec_preview:
968 self.loading_osdspec_preview.remove(host)
969 if host in self.networks:
970 del self.networks[host]
971 if host in self.last_daemon_update:
972 del self.last_daemon_update[host]
973 if host in self.last_device_update:
974 del self.last_device_update[host]
975 if host in self.last_network_update:
976 del self.last_network_update[host]
977 if host in self.last_device_change:
978 del self.last_device_change[host]
979 if host in self.last_tuned_profile_update:
980 del self.last_tuned_profile_update[host]
981 if host in self.daemon_config_deps:
982 del self.daemon_config_deps[host]
983 if host in self.scheduled_daemon_actions:
984 del self.scheduled_daemon_actions[host]
985 if host in self.last_client_files:
986 del self.last_client_files[host]
987 self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
988
989 def get_hosts(self):
990 # type: () -> List[str]
991 return list(self.daemons)
992
993 def get_schedulable_hosts(self) -> List[HostSpec]:
994 """
995 Returns all usable hosts that went through _refresh_host_daemons().
996
997 This mitigates a potential race, where new host was added *after*
998 ``_refresh_host_daemons()`` was called, but *before*
999 ``_apply_all_specs()`` was called. thus we end up with a hosts
1000 where daemons might be running, but we have not yet detected them.
1001 """
1002 return [
1003 h for h in self.mgr.inventory.all_specs()
1004 if (
1005 self.host_had_daemon_refresh(h.hostname)
1006 and '_no_schedule' not in h.labels
1007 )
1008 ]
1009
1010 def get_non_draining_hosts(self) -> List[HostSpec]:
1011 """
1012 Returns all hosts that do not have _no_schedule label.
1013
1014 Useful for the agent who needs this specific list rather than the
1015 schedulable_hosts since the agent needs to be deployed on hosts with
1016 no daemon refresh
1017 """
1018 return [
1019 h for h in self.mgr.inventory.all_specs() if '_no_schedule' not in h.labels
1020 ]
1021
1022 def get_draining_hosts(self) -> List[HostSpec]:
1023 """
1024 Returns all hosts that have _no_schedule label and therefore should have
1025 no daemons placed on them, but are potentially still reachable
1026 """
1027 return [
1028 h for h in self.mgr.inventory.all_specs() if '_no_schedule' in h.labels
1029 ]
1030
1031 def get_unreachable_hosts(self) -> List[HostSpec]:
1032 """
1033 Return all hosts that are offline or in maintenance mode.
1034
1035 The idea is we should not touch the daemons on these hosts (since
1036 in theory the hosts are inaccessible so we CAN'T touch them) but
1037 we still want to count daemons that exist on these hosts toward the
1038 placement so daemons on these hosts aren't just moved elsewhere
1039 """
1040 return [
1041 h for h in self.mgr.inventory.all_specs()
1042 if (
1043 h.status.lower() in ['maintenance', 'offline']
1044 or h.hostname in self.mgr.offline_hosts
1045 )
1046 ]
1047
1048 def get_facts(self, host: str) -> Dict[str, Any]:
1049 return self.facts.get(host, {})
1050
1051 def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
1052 for dm in self.daemons.copy().values():
1053 yield from dm.values()
1054
1055 def _get_tmp_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
1056 for dm in self._tmp_daemons.copy().values():
1057 yield from dm.values()
1058
1059 def get_daemons(self):
1060 # type: () -> List[orchestrator.DaemonDescription]
1061 return list(self._get_daemons())
1062
1063 def get_error_daemons(self) -> List[orchestrator.DaemonDescription]:
1064 r = []
1065 for dd in self._get_daemons():
1066 if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error:
1067 r.append(dd)
1068 return r
1069
1070 def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
1071 return list(self.daemons.get(host, {}).values())
1072
1073 def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription:
1074 assert not daemon_name.startswith('ha-rgw.')
1075 dds = self.get_daemons_by_host(host) if host else self._get_daemons()
1076 for dd in dds:
1077 if dd.name() == daemon_name:
1078 return dd
1079
1080 raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
1081
1082 def has_daemon(self, daemon_name: str, host: Optional[str] = None) -> bool:
1083 try:
1084 self.get_daemon(daemon_name, host)
1085 except orchestrator.OrchestratorError:
1086 return False
1087 return True
1088
1089 def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
1090 def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
1091 dd = copy(dd_orig)
1092 if host in self.mgr.offline_hosts:
1093 dd.status = orchestrator.DaemonDescriptionStatus.error
1094 dd.status_desc = 'host is offline'
1095 elif self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
1096 # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
1097 # could be wrong. We must assume maintenance is working and daemons are stopped
1098 dd.status = orchestrator.DaemonDescriptionStatus.stopped
1099 dd.events = self.mgr.events.get_for_daemon(dd.name())
1100 return dd
1101
1102 for host, dm in self.daemons.copy().items():
1103 yield host, {name: alter(host, d) for name, d in dm.items()}
1104
1105 def get_daemons_by_service(self, service_name):
1106 # type: (str) -> List[orchestrator.DaemonDescription]
1107 assert not service_name.startswith('keepalived.')
1108 assert not service_name.startswith('haproxy.')
1109
1110 return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
1111
1112 def get_related_service_daemons(self, service_spec: ServiceSpec) -> Optional[List[orchestrator.DaemonDescription]]:
1113 if service_spec.service_type == 'ingress':
1114 dds = list(dd for dd in self._get_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service)
1115 dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service)
1116 logger.debug(f'Found related daemons {dds} for service {service_spec.service_name()}')
1117 return dds
1118 else:
1119 for ingress_spec in [cast(IngressSpec, s) for s in self.mgr.spec_store.active_specs.values() if s.service_type == 'ingress']:
1120 if ingress_spec.backend_service == service_spec.service_name():
1121 dds = list(dd for dd in self._get_daemons() if dd.service_name() == ingress_spec.service_name())
1122 dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == ingress_spec.service_name())
1123 logger.debug(f'Found related daemons {dds} for service {service_spec.service_name()}')
1124 return dds
1125 return None
1126
1127 def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
1128 assert service_type not in ['keepalived', 'haproxy']
1129
1130 daemons = self.daemons[host].values() if host else self._get_daemons()
1131
1132 return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)]
1133
1134 def get_daemon_types(self, hostname: str) -> Set[str]:
1135 """Provide a list of the types of daemons on the host"""
1136 return cast(Set[str], {d.daemon_type for d in self.daemons[hostname].values()})
1137
1138 def get_daemon_names(self):
1139 # type: () -> List[str]
1140 return [d.name() for d in self._get_daemons()]
1141
1142 def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
1143 if host in self.daemon_config_deps:
1144 if name in self.daemon_config_deps[host]:
1145 return self.daemon_config_deps[host][name].get('deps', []), \
1146 self.daemon_config_deps[host][name].get('last_config', None)
1147 return None, None
1148
1149 def get_host_client_files(self, host: str) -> Dict[str, Tuple[str, int, int, int]]:
1150 return self.last_client_files.get(host, {})
1151
1152 def host_needs_daemon_refresh(self, host):
1153 # type: (str) -> bool
1154 if host in self.mgr.offline_hosts:
1155 logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
1156 return False
1157 if host in self.daemon_refresh_queue:
1158 self.daemon_refresh_queue.remove(host)
1159 return True
1160 cutoff = datetime_now() - datetime.timedelta(
1161 seconds=self.mgr.daemon_cache_timeout)
1162 if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
1163 return True
1164 if not self.mgr.cache.host_metadata_up_to_date(host):
1165 return True
1166 return False
1167
1168 def host_needs_facts_refresh(self, host):
1169 # type: (str) -> bool
1170 if host in self.mgr.offline_hosts:
1171 logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh')
1172 return False
1173 cutoff = datetime_now() - datetime.timedelta(
1174 seconds=self.mgr.facts_cache_timeout)
1175 if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
1176 return True
1177 if not self.mgr.cache.host_metadata_up_to_date(host):
1178 return True
1179 return False
1180
1181 def host_needs_autotune_memory(self, host):
1182 # type: (str) -> bool
1183 if host in self.mgr.offline_hosts:
1184 logger.debug(f'Host "{host}" marked as offline. Skipping autotune')
1185 return False
1186 cutoff = datetime_now() - datetime.timedelta(
1187 seconds=self.mgr.autotune_interval)
1188 if host not in self.last_autotune or self.last_autotune[host] < cutoff:
1189 return True
1190 return False
1191
1192 def host_needs_tuned_profile_update(self, host: str, profile: str) -> bool:
1193 if host in self.mgr.offline_hosts:
1194 logger.debug(f'Host "{host}" marked as offline. Cannot apply tuned profile')
1195 return False
1196 if profile not in self.mgr.tuned_profiles:
1197 logger.debug(
1198 f'Cannot apply tuned profile {profile} on host {host}. Profile does not exist')
1199 return False
1200 if host not in self.last_tuned_profile_update:
1201 return True
1202 last_profile_update = self.mgr.tuned_profiles.last_updated(profile)
1203 if last_profile_update is None:
1204 self.mgr.tuned_profiles.set_last_updated(profile, datetime_now())
1205 return True
1206 if self.last_tuned_profile_update[host] < last_profile_update:
1207 return True
1208 return False
1209
1210 def host_had_daemon_refresh(self, host: str) -> bool:
1211 """
1212 ... at least once.
1213 """
1214 if host in self.last_daemon_update:
1215 return True
1216 if host not in self.daemons:
1217 return False
1218 return bool(self.daemons[host])
1219
1220 def host_needs_device_refresh(self, host):
1221 # type: (str) -> bool
1222 if host in self.mgr.offline_hosts:
1223 logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
1224 return False
1225 if host in self.device_refresh_queue:
1226 self.device_refresh_queue.remove(host)
1227 return True
1228 cutoff = datetime_now() - datetime.timedelta(
1229 seconds=self.mgr.device_cache_timeout)
1230 if host not in self.last_device_update or self.last_device_update[host] < cutoff:
1231 return True
1232 if not self.mgr.cache.host_metadata_up_to_date(host):
1233 return True
1234 return False
1235
1236 def host_needs_network_refresh(self, host):
1237 # type: (str) -> bool
1238 if host in self.mgr.offline_hosts:
1239 logger.debug(f'Host "{host}" marked as offline. Skipping network refresh')
1240 return False
1241 if host in self.network_refresh_queue:
1242 self.network_refresh_queue.remove(host)
1243 return True
1244 cutoff = datetime_now() - datetime.timedelta(
1245 seconds=self.mgr.device_cache_timeout)
1246 if host not in self.last_network_update or self.last_network_update[host] < cutoff:
1247 return True
1248 if not self.mgr.cache.host_metadata_up_to_date(host):
1249 return True
1250 return False
1251
1252 def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
1253 if host in self.mgr.offline_hosts:
1254 logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
1255 return False
1256 if host in self.osdspec_previews_refresh_queue:
1257 self.osdspec_previews_refresh_queue.remove(host)
1258 return True
1259 # Since this is dependent on other factors (device and spec) this does not need
1260 # to be updated periodically.
1261 return False
1262
1263 def host_needs_check(self, host):
1264 # type: (str) -> bool
1265 cutoff = datetime_now() - datetime.timedelta(
1266 seconds=self.mgr.host_check_interval)
1267 return host not in self.last_host_check or self.last_host_check[host] < cutoff
1268
1269 def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool:
1270 if (
1271 host not in self.devices
1272 or host not in self.last_device_change
1273 or host not in self.last_device_update
1274 or host not in self.osdspec_last_applied
1275 or spec.service_name() not in self.osdspec_last_applied[host]
1276 ):
1277 return True
1278 created = self.mgr.spec_store.get_created(spec)
1279 if not created or created > self.last_device_change[host]:
1280 return True
1281 return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host]
1282
1283 def host_needs_registry_login(self, host: str) -> bool:
1284 if host in self.mgr.offline_hosts:
1285 return False
1286 if host in self.registry_login_queue:
1287 self.registry_login_queue.remove(host)
1288 return True
1289 return False
1290
1291 def host_metadata_up_to_date(self, host: str) -> bool:
1292 if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]:
1293 return False
1294 return True
1295
1296 def all_host_metadata_up_to_date(self) -> bool:
1297 unreachables = [h.hostname for h in self.get_unreachable_hosts()]
1298 if [h for h in self.get_hosts() if (not self.host_metadata_up_to_date(h) and h not in unreachables)]:
1299 # this function is primarily for telling if it's safe to try and apply a service
1300 # spec. Since offline/maintenance hosts aren't considered in that process anyway
1301 # we don't want to return False if the host without up-to-date metadata is in one
1302 # of those two categories.
1303 return False
1304 return True
1305
1306 def add_daemon(self, host, dd):
1307 # type: (str, orchestrator.DaemonDescription) -> None
1308 assert host in self.daemons
1309 self.daemons[host][dd.name()] = dd
1310
1311 def rm_daemon(self, host: str, name: str) -> None:
1312 assert not name.startswith('ha-rgw.')
1313
1314 if host in self.daemons:
1315 if name in self.daemons[host]:
1316 del self.daemons[host][name]
1317
1318 def daemon_cache_filled(self) -> bool:
1319 """
1320 i.e. we have checked the daemons for each hosts at least once.
1321 excluding offline hosts.
1322
1323 We're not checking for `host_needs_daemon_refresh`, as this might never be
1324 False for all hosts.
1325 """
1326 return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
1327 for h in self.get_hosts())
1328
1329 def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
1330 assert not daemon_name.startswith('ha-rgw.')
1331
1332 priorities = {
1333 'start': 1,
1334 'restart': 2,
1335 'reconfig': 3,
1336 'redeploy': 4,
1337 'stop': 5,
1338 'rotate-key': 6,
1339 }
1340 existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None)
1341 if existing_action and priorities[existing_action] > priorities[action]:
1342 logger.debug(
1343 f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
1344 return
1345
1346 if host not in self.scheduled_daemon_actions:
1347 self.scheduled_daemon_actions[host] = {}
1348 self.scheduled_daemon_actions[host][daemon_name] = action
1349
1350 def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> bool:
1351 found = False
1352 if host in self.scheduled_daemon_actions:
1353 if daemon_name in self.scheduled_daemon_actions[host]:
1354 del self.scheduled_daemon_actions[host][daemon_name]
1355 found = True
1356 if not self.scheduled_daemon_actions[host]:
1357 del self.scheduled_daemon_actions[host]
1358 return found
1359
1360 def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
1361 assert not daemon.startswith('ha-rgw.')
1362
1363 return self.scheduled_daemon_actions.get(host, {}).get(daemon)
1364
1365
1366 class AgentCache():
1367 """
1368 AgentCache is used for storing metadata about agent daemons that must be kept
1369 through MGR failovers
1370 """
1371
1372 def __init__(self, mgr):
1373 # type: (CephadmOrchestrator) -> None
1374 self.mgr: CephadmOrchestrator = mgr
1375 self.agent_config_deps = {} # type: Dict[str, Dict[str,Any]]
1376 self.agent_counter = {} # type: Dict[str, int]
1377 self.agent_timestamp = {} # type: Dict[str, datetime.datetime]
1378 self.agent_keys = {} # type: Dict[str, str]
1379 self.agent_ports = {} # type: Dict[str, int]
1380 self.sending_agent_message = {} # type: Dict[str, bool]
1381
1382 def load(self):
1383 # type: () -> None
1384 for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items():
1385 host = k[len(AGENT_CACHE_PREFIX):]
1386 if host not in self.mgr.inventory:
1387 self.mgr.log.warning('removing stray AgentCache record for agent on %s' % (
1388 host))
1389 self.mgr.set_store(k, None)
1390 try:
1391 j = json.loads(v)
1392 self.agent_config_deps[host] = {}
1393 conf_deps = j.get('agent_config_deps', {})
1394 if conf_deps:
1395 conf_deps['last_config'] = str_to_datetime(conf_deps['last_config'])
1396 self.agent_config_deps[host] = conf_deps
1397 self.agent_counter[host] = int(j.get('agent_counter', 1))
1398 self.agent_timestamp[host] = str_to_datetime(
1399 j.get('agent_timestamp', datetime_to_str(datetime_now())))
1400 self.agent_keys[host] = str(j.get('agent_keys', ''))
1401 agent_port = int(j.get('agent_ports', 0))
1402 if agent_port:
1403 self.agent_ports[host] = agent_port
1404
1405 except Exception as e:
1406 self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % (
1407 host, e))
1408 pass
1409
1410 def save_agent(self, host: str) -> None:
1411 j: Dict[str, Any] = {}
1412 if host in self.agent_config_deps:
1413 j['agent_config_deps'] = {
1414 'deps': self.agent_config_deps[host].get('deps', []),
1415 'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']),
1416 }
1417 if host in self.agent_counter:
1418 j['agent_counter'] = self.agent_counter[host]
1419 if host in self.agent_keys:
1420 j['agent_keys'] = self.agent_keys[host]
1421 if host in self.agent_ports:
1422 j['agent_ports'] = self.agent_ports[host]
1423 if host in self.agent_timestamp:
1424 j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host])
1425
1426 self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j))
1427
1428 def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None:
1429 self.agent_config_deps[host] = {
1430 'deps': deps,
1431 'last_config': stamp,
1432 }
1433
1434 def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
1435 if host in self.agent_config_deps:
1436 return self.agent_config_deps[host].get('deps', []), \
1437 self.agent_config_deps[host].get('last_config', None)
1438 return None, None
1439
1440 def messaging_agent(self, host: str) -> bool:
1441 if host not in self.sending_agent_message or not self.sending_agent_message[host]:
1442 return False
1443 return True
1444
1445 def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
1446 # agent successfully received new config. Update config/deps
1447 assert daemon_spec.service_name == 'agent'
1448 self.update_agent_config_deps(
1449 daemon_spec.host, daemon_spec.deps, datetime_now())
1450 self.agent_timestamp[daemon_spec.host] = datetime_now()
1451 self.agent_counter[daemon_spec.host] = 1
1452 self.save_agent(daemon_spec.host)
1453
1454
1455 class EventStore():
1456 def __init__(self, mgr):
1457 # type: (CephadmOrchestrator) -> None
1458 self.mgr: CephadmOrchestrator = mgr
1459 self.events = {} # type: Dict[str, List[OrchestratorEvent]]
1460
1461 def add(self, event: OrchestratorEvent) -> None:
1462
1463 if event.kind_subject() not in self.events:
1464 self.events[event.kind_subject()] = [event]
1465
1466 for e in self.events[event.kind_subject()]:
1467 if e.message == event.message:
1468 return
1469
1470 self.events[event.kind_subject()].append(event)
1471
1472 # limit to five events for now.
1473 self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
1474
1475 def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
1476 e = OrchestratorEvent(datetime_now(), 'service',
1477 spec.service_name(), level, message)
1478 self.add(e)
1479
1480 def from_orch_error(self, e: OrchestratorError) -> None:
1481 if e.event_subject is not None:
1482 self.add(OrchestratorEvent(
1483 datetime_now(),
1484 e.event_subject[0],
1485 e.event_subject[1],
1486 "ERROR",
1487 str(e)
1488 ))
1489
1490 def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
1491 e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
1492 self.add(e)
1493
1494 def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
1495 self.for_daemon(
1496 daemon_name,
1497 "ERROR",
1498 str(e)
1499 )
1500
1501 def cleanup(self) -> None:
1502 # Needs to be properly done, in case events are persistently stored.
1503
1504 unknowns: List[str] = []
1505 daemons = self.mgr.cache.get_daemon_names()
1506 specs = self.mgr.spec_store.all_specs.keys()
1507 for k_s, v in self.events.items():
1508 kind, subject = k_s.split(':')
1509 if kind == 'service':
1510 if subject not in specs:
1511 unknowns.append(k_s)
1512 elif kind == 'daemon':
1513 if subject not in daemons:
1514 unknowns.append(k_s)
1515
1516 for k_s in unknowns:
1517 del self.events[k_s]
1518
1519 def get_for_service(self, name: str) -> List[OrchestratorEvent]:
1520 return self.events.get('service:' + name, [])
1521
1522 def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
1523 return self.events.get('daemon:' + name, [])