]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/inventory.py
04385a4fa81cc8e31b420133fcc487bb7b27ecbe
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
1 import datetime
2 import enum
3 from copy import copy
4 import ipaddress
5 import json
6 import logging
7 import math
8 import socket
9 from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
10 NamedTuple, Type
11
12 import orchestrator
13 from ceph.deployment import inventory
14 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec
15 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
16 from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
17 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
18
19 from .utils import resolve_ip
20 from .migrations import queue_migrate_nfs_spec
21
22 if TYPE_CHECKING:
23 from .module import CephadmOrchestrator
24
25
26 logger = logging.getLogger(__name__)
27
28 HOST_CACHE_PREFIX = "host."
29 SPEC_STORE_PREFIX = "spec."
30 AGENT_CACHE_PREFIX = 'agent.'
31
32
33 class HostCacheStatus(enum.Enum):
34 stray = 'stray'
35 host = 'host'
36 devices = 'devices'
37
38
39 class Inventory:
40 """
41 The inventory stores a HostSpec for all hosts persistently.
42 """
43
44 def __init__(self, mgr: 'CephadmOrchestrator'):
45 self.mgr = mgr
46 adjusted_addrs = False
47
48 def is_valid_ip(ip: str) -> bool:
49 try:
50 ipaddress.ip_address(ip)
51 return True
52 except ValueError:
53 return False
54
55 # load inventory
56 i = self.mgr.get_store('inventory')
57 if i:
58 self._inventory: Dict[str, dict] = json.loads(i)
59 # handle old clusters missing 'hostname' key from hostspec
60 for k, v in self._inventory.items():
61 if 'hostname' not in v:
62 v['hostname'] = k
63
64 # convert legacy non-IP addr?
65 if is_valid_ip(str(v.get('addr'))):
66 continue
67 if len(self._inventory) > 1:
68 if k == socket.gethostname():
69 # Never try to resolve our own host! This is
70 # fraught and can lead to either a loopback
71 # address (due to podman's futzing with
72 # /etc/hosts) or a private IP based on the CNI
73 # configuration. Instead, wait until the mgr
74 # fails over to another host and let them resolve
75 # this host.
76 continue
77 ip = resolve_ip(cast(str, v.get('addr')))
78 else:
79 # we only have 1 node in the cluster, so we can't
80 # rely on another host doing the lookup. use the
81 # IP the mgr binds to.
82 ip = self.mgr.get_mgr_ip()
83 if is_valid_ip(ip) and not ip.startswith('127.0.'):
84 self.mgr.log.info(
85 f"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'"
86 )
87 v['addr'] = ip
88 adjusted_addrs = True
89 if adjusted_addrs:
90 self.save()
91 else:
92 self._inventory = dict()
93 logger.debug('Loaded inventory %s' % self._inventory)
94
95 def keys(self) -> List[str]:
96 return list(self._inventory.keys())
97
98 def __contains__(self, host: str) -> bool:
99 return host in self._inventory
100
101 def assert_host(self, host: str) -> None:
102 if host not in self._inventory:
103 raise OrchestratorError('host %s does not exist' % host)
104
105 def add_host(self, spec: HostSpec) -> None:
106 if spec.hostname in self._inventory:
107 # addr
108 if self.get_addr(spec.hostname) != spec.addr:
109 self.set_addr(spec.hostname, spec.addr)
110 # labels
111 for label in spec.labels:
112 self.add_label(spec.hostname, label)
113 else:
114 self._inventory[spec.hostname] = spec.to_json()
115 self.save()
116
117 def rm_host(self, host: str) -> None:
118 self.assert_host(host)
119 del self._inventory[host]
120 self.save()
121
122 def set_addr(self, host: str, addr: str) -> None:
123 self.assert_host(host)
124 self._inventory[host]['addr'] = addr
125 self.save()
126
127 def add_label(self, host: str, label: str) -> None:
128 self.assert_host(host)
129
130 if 'labels' not in self._inventory[host]:
131 self._inventory[host]['labels'] = list()
132 if label not in self._inventory[host]['labels']:
133 self._inventory[host]['labels'].append(label)
134 self.save()
135
136 def rm_label(self, host: str, label: str) -> None:
137 self.assert_host(host)
138
139 if 'labels' not in self._inventory[host]:
140 self._inventory[host]['labels'] = list()
141 if label in self._inventory[host]['labels']:
142 self._inventory[host]['labels'].remove(label)
143 self.save()
144
145 def has_label(self, host: str, label: str) -> bool:
146 return (
147 host in self._inventory
148 and label in self._inventory[host].get('labels', [])
149 )
150
151 def get_addr(self, host: str) -> str:
152 self.assert_host(host)
153 return self._inventory[host].get('addr', host)
154
155 def spec_from_dict(self, info: dict) -> HostSpec:
156 hostname = info['hostname']
157 return HostSpec(
158 hostname,
159 addr=info.get('addr', hostname),
160 labels=info.get('labels', []),
161 status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''),
162 )
163
164 def all_specs(self) -> List[HostSpec]:
165 return list(map(self.spec_from_dict, self._inventory.values()))
166
167 def get_host_with_state(self, state: str = "") -> List[str]:
168 """return a list of host names in a specific state"""
169 return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state]
170
171 def save(self) -> None:
172 self.mgr.set_store('inventory', json.dumps(self._inventory))
173
174
175 class SpecDescription(NamedTuple):
176 spec: ServiceSpec
177 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]]
178 created: datetime.datetime
179 deleted: Optional[datetime.datetime]
180
181
182 class SpecStore():
183 def __init__(self, mgr):
184 # type: (CephadmOrchestrator) -> None
185 self.mgr = mgr
186 self._specs = {} # type: Dict[str, ServiceSpec]
187 # service_name -> rank -> gen -> daemon_id
188 self._rank_maps = {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]]
189 self.spec_created = {} # type: Dict[str, datetime.datetime]
190 self.spec_deleted = {} # type: Dict[str, datetime.datetime]
191 self.spec_preview = {} # type: Dict[str, ServiceSpec]
192
193 @property
194 def all_specs(self) -> Mapping[str, ServiceSpec]:
195 """
196 returns active and deleted specs. Returns read-only dict.
197 """
198 return self._specs
199
200 def __contains__(self, name: str) -> bool:
201 return name in self._specs
202
203 def __getitem__(self, name: str) -> SpecDescription:
204 if name not in self._specs:
205 raise OrchestratorError(f'Service {name} not found.')
206 return SpecDescription(self._specs[name],
207 self._rank_maps.get(name),
208 self.spec_created[name],
209 self.spec_deleted.get(name, None))
210
211 @property
212 def active_specs(self) -> Mapping[str, ServiceSpec]:
213 return {k: v for k, v in self._specs.items() if k not in self.spec_deleted}
214
215 def load(self):
216 # type: () -> None
217 for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items():
218 service_name = k[len(SPEC_STORE_PREFIX):]
219 try:
220 j = cast(Dict[str, dict], json.loads(v))
221 if (
222 (self.mgr.migration_current or 0) < 3
223 and j['spec'].get('service_type') == 'nfs'
224 ):
225 self.mgr.log.debug(f'found legacy nfs spec {j}')
226 queue_migrate_nfs_spec(self.mgr, j)
227 spec = ServiceSpec.from_json(j['spec'])
228 created = str_to_datetime(cast(str, j['created']))
229 self._specs[service_name] = spec
230 self.spec_created[service_name] = created
231
232 if 'deleted' in j:
233 deleted = str_to_datetime(cast(str, j['deleted']))
234 self.spec_deleted[service_name] = deleted
235
236 if 'rank_map' in j and isinstance(j['rank_map'], dict):
237 self._rank_maps[service_name] = {}
238 for rank_str, m in j['rank_map'].items():
239 try:
240 rank = int(rank_str)
241 except ValueError:
242 logger.exception(f"failed to parse rank in {j['rank_map']}")
243 continue
244 if isinstance(m, dict):
245 self._rank_maps[service_name][rank] = {}
246 for gen_str, name in m.items():
247 try:
248 gen = int(gen_str)
249 except ValueError:
250 logger.exception(f"failed to parse gen in {j['rank_map']}")
251 continue
252 if isinstance(name, str) or m is None:
253 self._rank_maps[service_name][rank][gen] = name
254
255 self.mgr.log.debug('SpecStore: loaded spec for %s' % (
256 service_name))
257 except Exception as e:
258 self.mgr.log.warning('unable to load spec for %s: %s' % (
259 service_name, e))
260 pass
261
262 def save(
263 self,
264 spec: ServiceSpec,
265 update_create: bool = True,
266 ) -> None:
267 name = spec.service_name()
268 if spec.preview_only:
269 self.spec_preview[name] = spec
270 return None
271 self._specs[name] = spec
272
273 if update_create:
274 self.spec_created[name] = datetime_now()
275 self._save(name)
276
277 def save_rank_map(self,
278 name: str,
279 rank_map: Dict[int, Dict[int, Optional[str]]]) -> None:
280 self._rank_maps[name] = rank_map
281 self._save(name)
282
283 def _save(self, name: str) -> None:
284 data: Dict[str, Any] = {
285 'spec': self._specs[name].to_json(),
286 'created': datetime_to_str(self.spec_created[name]),
287 }
288 if name in self._rank_maps:
289 data['rank_map'] = self._rank_maps[name]
290 if name in self.spec_deleted:
291 data['deleted'] = datetime_to_str(self.spec_deleted[name])
292
293 self.mgr.set_store(
294 SPEC_STORE_PREFIX + name,
295 json.dumps(data, sort_keys=True),
296 )
297 self.mgr.events.for_service(self._specs[name],
298 OrchestratorEvent.INFO,
299 'service was created')
300
301 def rm(self, service_name: str) -> bool:
302 if service_name not in self._specs:
303 return False
304
305 if self._specs[service_name].preview_only:
306 self.finally_rm(service_name)
307 return True
308
309 self.spec_deleted[service_name] = datetime_now()
310 self.save(self._specs[service_name], update_create=False)
311 return True
312
313 def finally_rm(self, service_name):
314 # type: (str) -> bool
315 found = service_name in self._specs
316 if found:
317 del self._specs[service_name]
318 if service_name in self._rank_maps:
319 del self._rank_maps[service_name]
320 del self.spec_created[service_name]
321 if service_name in self.spec_deleted:
322 del self.spec_deleted[service_name]
323 self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
324 return found
325
326 def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
327 return self.spec_created.get(spec.service_name())
328
329
330 class ClientKeyringSpec(object):
331 """
332 A client keyring file that we should maintain
333 """
334
335 def __init__(
336 self,
337 entity: str,
338 placement: PlacementSpec,
339 mode: Optional[int] = None,
340 uid: Optional[int] = None,
341 gid: Optional[int] = None,
342 ) -> None:
343 self.entity = entity
344 self.placement = placement
345 self.mode = mode or 0o600
346 self.uid = uid or 0
347 self.gid = gid or 0
348
349 def validate(self) -> None:
350 pass
351
352 def to_json(self) -> Dict[str, Any]:
353 return {
354 'entity': self.entity,
355 'placement': self.placement.to_json(),
356 'mode': self.mode,
357 'uid': self.uid,
358 'gid': self.gid,
359 }
360
361 @property
362 def path(self) -> str:
363 return f'/etc/ceph/ceph.{self.entity}.keyring'
364
365 @classmethod
366 def from_json(cls: Type, data: dict) -> 'ClientKeyringSpec':
367 c = data.copy()
368 if 'placement' in c:
369 c['placement'] = PlacementSpec.from_json(c['placement'])
370 _cls = cls(**c)
371 _cls.validate()
372 return _cls
373
374
375 class ClientKeyringStore():
376 """
377 Track client keyring files that we are supposed to maintain
378 """
379
380 def __init__(self, mgr):
381 # type: (CephadmOrchestrator) -> None
382 self.mgr: CephadmOrchestrator = mgr
383 self.mgr = mgr
384 self.keys: Dict[str, ClientKeyringSpec] = {}
385
386 def load(self) -> None:
387 c = self.mgr.get_store('client_keyrings') or b'{}'
388 j = json.loads(c)
389 for e, d in j.items():
390 self.keys[e] = ClientKeyringSpec.from_json(d)
391
392 def save(self) -> None:
393 data = {
394 k: v.to_json() for k, v in self.keys.items()
395 }
396 self.mgr.set_store('client_keyrings', json.dumps(data))
397
398 def update(self, ks: ClientKeyringSpec) -> None:
399 self.keys[ks.entity] = ks
400 self.save()
401
402 def rm(self, entity: str) -> None:
403 if entity in self.keys:
404 del self.keys[entity]
405 self.save()
406
407
408 class TunedProfileStore():
409 """
410 Store for out tuned profile information
411 """
412
413 def __init__(self, mgr: "CephadmOrchestrator") -> None:
414 self.mgr: CephadmOrchestrator = mgr
415 self.mgr = mgr
416 self.profiles: Dict[str, TunedProfileSpec] = {}
417
418 def __contains__(self, profile: str) -> bool:
419 return profile in self.profiles
420
421 def load(self) -> None:
422 c = self.mgr.get_store('tuned_profiles') or b'{}'
423 j = json.loads(c)
424 for k, v in j.items():
425 self.profiles[k] = TunedProfileSpec.from_json(v)
426 self.profiles[k]._last_updated = datetime_to_str(datetime_now())
427
428 def exists(self, profile_name: str) -> bool:
429 return profile_name in self.profiles
430
431 def save(self) -> None:
432 profiles_json = {k: v.to_json() for k, v in self.profiles.items()}
433 self.mgr.set_store('tuned_profiles', json.dumps(profiles_json))
434
435 def add_setting(self, profile: str, setting: str, value: str) -> None:
436 if profile in self.profiles:
437 self.profiles[profile].settings[setting] = value
438 self.profiles[profile]._last_updated = datetime_to_str(datetime_now())
439 self.save()
440 else:
441 logger.error(
442 f'Attempted to set setting "{setting}" for nonexistent os tuning profile "{profile}"')
443
444 def rm_setting(self, profile: str, setting: str) -> None:
445 if profile in self.profiles:
446 if setting in self.profiles[profile].settings:
447 self.profiles[profile].settings.pop(setting, '')
448 self.profiles[profile]._last_updated = datetime_to_str(datetime_now())
449 self.save()
450 else:
451 logger.error(
452 f'Attemped to remove nonexistent setting "{setting}" from os tuning profile "{profile}"')
453 else:
454 logger.error(
455 f'Attempted to remove setting "{setting}" from nonexistent os tuning profile "{profile}"')
456
457 def add_profile(self, spec: TunedProfileSpec) -> None:
458 spec._last_updated = datetime_to_str(datetime_now())
459 self.profiles[spec.profile_name] = spec
460 self.save()
461
462 def rm_profile(self, profile: str) -> None:
463 if profile in self.profiles:
464 self.profiles.pop(profile, TunedProfileSpec(''))
465 else:
466 logger.error(f'Attempted to remove nonexistent os tuning profile "{profile}"')
467 self.save()
468
469 def last_updated(self, profile: str) -> Optional[datetime.datetime]:
470 if profile not in self.profiles or not self.profiles[profile]._last_updated:
471 return None
472 return str_to_datetime(self.profiles[profile]._last_updated)
473
474 def set_last_updated(self, profile: str, new_datetime: datetime.datetime) -> None:
475 if profile in self.profiles:
476 self.profiles[profile]._last_updated = datetime_to_str(new_datetime)
477
478 def list_profiles(self) -> List[TunedProfileSpec]:
479 return [p for p in self.profiles.values()]
480
481
482 class HostCache():
483 """
484 HostCache stores different things:
485
486 1. `daemons`: Deployed daemons O(daemons)
487
488 They're part of the configuration nowadays and need to be
489 persistent. The name "daemon cache" is unfortunately a bit misleading.
490 Like for example we really need to know where daemons are deployed on
491 hosts that are offline.
492
493 2. `devices`: ceph-volume inventory cache O(hosts)
494
495 As soon as this is populated, it becomes more or less read-only.
496
497 3. `networks`: network interfaces for each host. O(hosts)
498
499 This is needed in order to deploy MONs. As this is mostly read-only.
500
501 4. `last_client_files` O(hosts)
502
503 Stores the last digest and owner/mode for files we've pushed to /etc/ceph
504 (ceph.conf or client keyrings).
505
506 5. `scheduled_daemon_actions`: O(daemons)
507
508 Used to run daemon actions after deploying a daemon. We need to
509 store it persistently, in order to stay consistent across
510 MGR failovers.
511 """
512
513 def __init__(self, mgr):
514 # type: (CephadmOrchestrator) -> None
515 self.mgr: CephadmOrchestrator = mgr
516 self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
517 self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
518 self.devices = {} # type: Dict[str, List[inventory.Device]]
519 self.facts = {} # type: Dict[str, Dict[str, Any]]
520 self.last_facts_update = {} # type: Dict[str, datetime.datetime]
521 self.last_autotune = {} # type: Dict[str, datetime.datetime]
522 self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
523 self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]]
524 self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
525 self.last_network_update = {} # type: Dict[str, datetime.datetime]
526 self.last_device_update = {} # type: Dict[str, datetime.datetime]
527 self.last_device_change = {} # type: Dict[str, datetime.datetime]
528 self.last_tuned_profile_update = {} # type: Dict[str, datetime.datetime]
529 self.daemon_refresh_queue = [] # type: List[str]
530 self.device_refresh_queue = [] # type: List[str]
531 self.network_refresh_queue = [] # type: List[str]
532 self.osdspec_previews_refresh_queue = [] # type: List[str]
533
534 # host -> daemon name -> dict
535 self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
536 self.last_host_check = {} # type: Dict[str, datetime.datetime]
537 self.loading_osdspec_preview = set() # type: Set[str]
538 self.last_client_files: Dict[str, Dict[str, Tuple[str, int, int, int]]] = {}
539 self.registry_login_queue: Set[str] = set()
540
541 self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
542
543 self.metadata_up_to_date = {} # type: Dict[str, bool]
544
545 def load(self):
546 # type: () -> None
547 for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
548 host = k[len(HOST_CACHE_PREFIX):]
549 if self._get_host_cache_entry_status(host) != HostCacheStatus.host:
550 if self._get_host_cache_entry_status(host) == HostCacheStatus.devices:
551 continue
552 self.mgr.log.warning('removing stray HostCache host record %s' % (
553 host))
554 self.mgr.set_store(k, None)
555 try:
556 j = json.loads(v)
557 if 'last_device_update' in j:
558 self.last_device_update[host] = str_to_datetime(j['last_device_update'])
559 else:
560 self.device_refresh_queue.append(host)
561 if 'last_device_change' in j:
562 self.last_device_change[host] = str_to_datetime(j['last_device_change'])
563 # for services, we ignore the persisted last_*_update
564 # and always trigger a new scrape on mgr restart.
565 self.daemon_refresh_queue.append(host)
566 self.network_refresh_queue.append(host)
567 self.daemons[host] = {}
568 self.osdspec_previews[host] = []
569 self.osdspec_last_applied[host] = {}
570 self.networks[host] = {}
571 self.daemon_config_deps[host] = {}
572 for name, d in j.get('daemons', {}).items():
573 self.daemons[host][name] = \
574 orchestrator.DaemonDescription.from_json(d)
575 self.devices[host] = []
576 # still want to check old device location for upgrade scenarios
577 for d in j.get('devices', []):
578 self.devices[host].append(inventory.Device.from_json(d))
579 self.devices[host] += self.load_host_devices(host)
580 self.networks[host] = j.get('networks_and_interfaces', {})
581 self.osdspec_previews[host] = j.get('osdspec_previews', {})
582 self.last_client_files[host] = j.get('last_client_files', {})
583 for name, ts in j.get('osdspec_last_applied', {}).items():
584 self.osdspec_last_applied[host][name] = str_to_datetime(ts)
585
586 for name, d in j.get('daemon_config_deps', {}).items():
587 self.daemon_config_deps[host][name] = {
588 'deps': d.get('deps', []),
589 'last_config': str_to_datetime(d['last_config']),
590 }
591 if 'last_host_check' in j:
592 self.last_host_check[host] = str_to_datetime(j['last_host_check'])
593 if 'last_tuned_profile_update' in j:
594 self.last_tuned_profile_update[host] = str_to_datetime(
595 j['last_tuned_profile_update'])
596 self.registry_login_queue.add(host)
597 self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
598 self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False)
599
600 self.mgr.log.debug(
601 'HostCache.load: host %s has %d daemons, '
602 '%d devices, %d networks' % (
603 host, len(self.daemons[host]), len(self.devices[host]),
604 len(self.networks[host])))
605 except Exception as e:
606 self.mgr.log.warning('unable to load cached state for %s: %s' % (
607 host, e))
608 pass
609
610 def _get_host_cache_entry_status(self, host: str) -> HostCacheStatus:
611 # return whether a host cache entry in the config-key
612 # store is for a host, a set of devices or is stray.
613 # for a host, the entry name will match a hostname in our
614 # inventory. For devices, it will be formatted
615 # <hostname>.devices.<integer> where <hostname> is
616 # in out inventory. If neither case applies, it is stray
617 if host in self.mgr.inventory:
618 return HostCacheStatus.host
619 try:
620 # try stripping off the ".devices.<integer>" and see if we get
621 # a host name that matches our inventory
622 actual_host = '.'.join(host.split('.')[:-2])
623 return HostCacheStatus.devices if actual_host in self.mgr.inventory else HostCacheStatus.stray
624 except Exception:
625 return HostCacheStatus.stray
626
627 def update_host_daemons(self, host, dm):
628 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
629 self.daemons[host] = dm
630 self.last_daemon_update[host] = datetime_now()
631
632 def update_host_facts(self, host, facts):
633 # type: (str, Dict[str, Dict[str, Any]]) -> None
634 self.facts[host] = facts
635 self.last_facts_update[host] = datetime_now()
636
637 def update_autotune(self, host: str) -> None:
638 self.last_autotune[host] = datetime_now()
639
640 def invalidate_autotune(self, host: str) -> None:
641 if host in self.last_autotune:
642 del self.last_autotune[host]
643
644 def devices_changed(self, host: str, b: List[inventory.Device]) -> bool:
645 a = self.devices[host]
646 if len(a) != len(b):
647 return True
648 aj = {d.path: d.to_json() for d in a}
649 bj = {d.path: d.to_json() for d in b}
650 if aj != bj:
651 self.mgr.log.info("Detected new or changed devices on %s" % host)
652 return True
653 return False
654
655 def update_host_devices(
656 self,
657 host: str,
658 dls: List[inventory.Device],
659 ) -> None:
660 if (
661 host not in self.devices
662 or host not in self.last_device_change
663 or self.devices_changed(host, dls)
664 ):
665 self.last_device_change[host] = datetime_now()
666 self.last_device_update[host] = datetime_now()
667 self.devices[host] = dls
668
669 def update_host_networks(
670 self,
671 host: str,
672 nets: Dict[str, Dict[str, List[str]]]
673 ) -> None:
674 self.networks[host] = nets
675 self.last_network_update[host] = datetime_now()
676
677 def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
678 self.daemon_config_deps[host][name] = {
679 'deps': deps,
680 'last_config': stamp,
681 }
682
683 def update_last_host_check(self, host):
684 # type: (str) -> None
685 self.last_host_check[host] = datetime_now()
686
687 def update_osdspec_last_applied(self, host, service_name, ts):
688 # type: (str, str, datetime.datetime) -> None
689 self.osdspec_last_applied[host][service_name] = ts
690
691 def update_client_file(self,
692 host: str,
693 path: str,
694 digest: str,
695 mode: int,
696 uid: int,
697 gid: int) -> None:
698 if host not in self.last_client_files:
699 self.last_client_files[host] = {}
700 self.last_client_files[host][path] = (digest, mode, uid, gid)
701
702 def removed_client_file(self, host: str, path: str) -> None:
703 if (
704 host in self.last_client_files
705 and path in self.last_client_files[host]
706 ):
707 del self.last_client_files[host][path]
708
709 def prime_empty_host(self, host):
710 # type: (str) -> None
711 """
712 Install an empty entry for a host
713 """
714 self.daemons[host] = {}
715 self.devices[host] = []
716 self.networks[host] = {}
717 self.osdspec_previews[host] = []
718 self.osdspec_last_applied[host] = {}
719 self.daemon_config_deps[host] = {}
720 self.daemon_refresh_queue.append(host)
721 self.device_refresh_queue.append(host)
722 self.network_refresh_queue.append(host)
723 self.osdspec_previews_refresh_queue.append(host)
724 self.registry_login_queue.add(host)
725 self.last_client_files[host] = {}
726
727 def refresh_all_host_info(self, host):
728 # type: (str) -> None
729
730 self.last_host_check.pop(host, None)
731 self.daemon_refresh_queue.append(host)
732 self.registry_login_queue.add(host)
733 self.device_refresh_queue.append(host)
734 self.last_facts_update.pop(host, None)
735 self.osdspec_previews_refresh_queue.append(host)
736 self.last_autotune.pop(host, None)
737
738 def invalidate_host_daemons(self, host):
739 # type: (str) -> None
740 self.daemon_refresh_queue.append(host)
741 if host in self.last_daemon_update:
742 del self.last_daemon_update[host]
743 self.mgr.event.set()
744
745 def invalidate_host_devices(self, host):
746 # type: (str) -> None
747 self.device_refresh_queue.append(host)
748 if host in self.last_device_update:
749 del self.last_device_update[host]
750 self.mgr.event.set()
751
752 def invalidate_host_networks(self, host):
753 # type: (str) -> None
754 self.network_refresh_queue.append(host)
755 if host in self.last_network_update:
756 del self.last_network_update[host]
757 self.mgr.event.set()
758
759 def distribute_new_registry_login_info(self) -> None:
760 self.registry_login_queue = set(self.mgr.inventory.keys())
761
762 def save_host(self, host: str) -> None:
763 j: Dict[str, Any] = {
764 'daemons': {},
765 'devices': [],
766 'osdspec_previews': [],
767 'osdspec_last_applied': {},
768 'daemon_config_deps': {},
769 }
770 if host in self.last_daemon_update:
771 j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
772 if host in self.last_device_update:
773 j['last_device_update'] = datetime_to_str(self.last_device_update[host])
774 if host in self.last_network_update:
775 j['last_network_update'] = datetime_to_str(self.last_network_update[host])
776 if host in self.last_device_change:
777 j['last_device_change'] = datetime_to_str(self.last_device_change[host])
778 if host in self.last_tuned_profile_update:
779 j['last_tuned_profile_update'] = datetime_to_str(self.last_tuned_profile_update[host])
780 if host in self.daemons:
781 for name, dd in self.daemons[host].items():
782 j['daemons'][name] = dd.to_json()
783 if host in self.networks:
784 j['networks_and_interfaces'] = self.networks[host]
785 if host in self.daemon_config_deps:
786 for name, depi in self.daemon_config_deps[host].items():
787 j['daemon_config_deps'][name] = {
788 'deps': depi.get('deps', []),
789 'last_config': datetime_to_str(depi['last_config']),
790 }
791 if host in self.osdspec_previews and self.osdspec_previews[host]:
792 j['osdspec_previews'] = self.osdspec_previews[host]
793 if host in self.osdspec_last_applied:
794 for name, ts in self.osdspec_last_applied[host].items():
795 j['osdspec_last_applied'][name] = datetime_to_str(ts)
796
797 if host in self.last_host_check:
798 j['last_host_check'] = datetime_to_str(self.last_host_check[host])
799
800 if host in self.last_client_files:
801 j['last_client_files'] = self.last_client_files[host]
802 if host in self.scheduled_daemon_actions:
803 j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
804 if host in self.metadata_up_to_date:
805 j['metadata_up_to_date'] = self.metadata_up_to_date[host]
806 if host in self.devices:
807 self.save_host_devices(host)
808
809 self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
810
811 def save_host_devices(self, host: str) -> None:
812 if host not in self.devices or not self.devices[host]:
813 logger.debug(f'Host {host} has no devices to save')
814 return
815
816 devs: List[Dict[str, Any]] = []
817 for d in self.devices[host]:
818 devs.append(d.to_json())
819
820 def byte_len(s: str) -> int:
821 return len(s.encode('utf-8'))
822
823 dev_cache_counter: int = 0
824 cache_size: int = self.mgr.get_foreign_ceph_option('mon', 'mon_config_key_max_entry_size')
825 if cache_size is not None and cache_size != 0 and byte_len(json.dumps(devs)) > cache_size - 1024:
826 # no guarantee all device entries take up the same amount of space
827 # splitting it up so there's one more entry than we need should be fairly
828 # safe and save a lot of extra logic checking sizes
829 cache_entries_needed = math.ceil(byte_len(json.dumps(devs)) / cache_size) + 1
830 dev_sublist_size = math.ceil(len(devs) / cache_entries_needed)
831 dev_lists: List[List[Dict[str, Any]]] = [devs[i:i + dev_sublist_size]
832 for i in range(0, len(devs), dev_sublist_size)]
833 for dev_list in dev_lists:
834 dev_dict: Dict[str, Any] = {'devices': dev_list}
835 if dev_cache_counter == 0:
836 dev_dict.update({'entries': len(dev_lists)})
837 self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
838 + str(dev_cache_counter), json.dumps(dev_dict))
839 dev_cache_counter += 1
840 else:
841 self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.'
842 + str(dev_cache_counter), json.dumps({'devices': devs, 'entries': 1}))
843
844 def load_host_devices(self, host: str) -> List[inventory.Device]:
845 dev_cache_counter: int = 0
846 devs: List[Dict[str, Any]] = []
847 dev_entries: int = 0
848 try:
849 # number of entries for the host's devices should be in
850 # the "entries" field of the first entry
851 dev_entries = json.loads(self.mgr.get_store(
852 HOST_CACHE_PREFIX + host + '.devices.0')).get('entries')
853 except Exception:
854 logger.debug(f'No device entries found for host {host}')
855 for i in range(dev_entries):
856 try:
857 new_devs = json.loads(self.mgr.get_store(
858 HOST_CACHE_PREFIX + host + '.devices.' + str(i))).get('devices', [])
859 if len(new_devs) > 0:
860 # verify list contains actual device objects by trying to load one from json
861 inventory.Device.from_json(new_devs[0])
862 # if we didn't throw an Exception on above line, we can add the devices
863 devs = devs + new_devs
864 dev_cache_counter += 1
865 except Exception as e:
866 logger.error(('Hit exception trying to load devices from '
867 + f'{HOST_CACHE_PREFIX + host + ".devices." + str(dev_cache_counter)} in key store: {e}'))
868 return []
869 return [inventory.Device.from_json(d) for d in devs]
870
871 def rm_host(self, host):
872 # type: (str) -> None
873 if host in self.daemons:
874 del self.daemons[host]
875 if host in self.devices:
876 del self.devices[host]
877 if host in self.facts:
878 del self.facts[host]
879 if host in self.last_facts_update:
880 del self.last_facts_update[host]
881 if host in self.last_autotune:
882 del self.last_autotune[host]
883 if host in self.osdspec_previews:
884 del self.osdspec_previews[host]
885 if host in self.osdspec_last_applied:
886 del self.osdspec_last_applied[host]
887 if host in self.loading_osdspec_preview:
888 self.loading_osdspec_preview.remove(host)
889 if host in self.networks:
890 del self.networks[host]
891 if host in self.last_daemon_update:
892 del self.last_daemon_update[host]
893 if host in self.last_device_update:
894 del self.last_device_update[host]
895 if host in self.last_network_update:
896 del self.last_network_update[host]
897 if host in self.last_device_change:
898 del self.last_device_change[host]
899 if host in self.last_tuned_profile_update:
900 del self.last_tuned_profile_update[host]
901 if host in self.daemon_config_deps:
902 del self.daemon_config_deps[host]
903 if host in self.scheduled_daemon_actions:
904 del self.scheduled_daemon_actions[host]
905 if host in self.last_client_files:
906 del self.last_client_files[host]
907 self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
908
909 def get_hosts(self):
910 # type: () -> List[str]
911 return list(self.daemons)
912
913 def get_schedulable_hosts(self) -> List[HostSpec]:
914 """
915 Returns all usable hosts that went through _refresh_host_daemons().
916
917 This mitigates a potential race, where new host was added *after*
918 ``_refresh_host_daemons()`` was called, but *before*
919 ``_apply_all_specs()`` was called. thus we end up with a hosts
920 where daemons might be running, but we have not yet detected them.
921 """
922 return [
923 h for h in self.mgr.inventory.all_specs()
924 if (
925 self.host_had_daemon_refresh(h.hostname)
926 and '_no_schedule' not in h.labels
927 )
928 ]
929
930 def get_non_draining_hosts(self) -> List[HostSpec]:
931 """
932 Returns all hosts that do not have _no_schedule label.
933
934 Useful for the agent who needs this specific list rather than the
935 schedulable_hosts since the agent needs to be deployed on hosts with
936 no daemon refresh
937 """
938 return [
939 h for h in self.mgr.inventory.all_specs() if '_no_schedule' not in h.labels
940 ]
941
942 def get_draining_hosts(self) -> List[HostSpec]:
943 """
944 Returns all hosts that have _no_schedule label and therefore should have
945 no daemons placed on them, but are potentially still reachable
946 """
947 return [
948 h for h in self.mgr.inventory.all_specs() if '_no_schedule' in h.labels
949 ]
950
951 def get_unreachable_hosts(self) -> List[HostSpec]:
952 """
953 Return all hosts that are offline or in maintenance mode.
954
955 The idea is we should not touch the daemons on these hosts (since
956 in theory the hosts are inaccessible so we CAN'T touch them) but
957 we still want to count daemons that exist on these hosts toward the
958 placement so daemons on these hosts aren't just moved elsewhere
959 """
960 return [
961 h for h in self.mgr.inventory.all_specs()
962 if (
963 h.status.lower() in ['maintenance', 'offline']
964 or h.hostname in self.mgr.offline_hosts
965 )
966 ]
967
968 def get_facts(self, host: str) -> Dict[str, Any]:
969 return self.facts.get(host, {})
970
971 def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
972 for dm in self.daemons.copy().values():
973 yield from dm.values()
974
975 def get_daemons(self):
976 # type: () -> List[orchestrator.DaemonDescription]
977 return list(self._get_daemons())
978
979 def get_error_daemons(self) -> List[orchestrator.DaemonDescription]:
980 r = []
981 for dd in self._get_daemons():
982 if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error:
983 r.append(dd)
984 return r
985
986 def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
987 return list(self.daemons.get(host, {}).values())
988
989 def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription:
990 assert not daemon_name.startswith('ha-rgw.')
991 dds = self.get_daemons_by_host(host) if host else self._get_daemons()
992 for dd in dds:
993 if dd.name() == daemon_name:
994 return dd
995
996 raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
997
998 def has_daemon(self, daemon_name: str, host: Optional[str] = None) -> bool:
999 try:
1000 self.get_daemon(daemon_name, host)
1001 except orchestrator.OrchestratorError:
1002 return False
1003 return True
1004
1005 def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
1006 def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
1007 dd = copy(dd_orig)
1008 if host in self.mgr.offline_hosts:
1009 dd.status = orchestrator.DaemonDescriptionStatus.error
1010 dd.status_desc = 'host is offline'
1011 elif self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
1012 # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
1013 # could be wrong. We must assume maintenance is working and daemons are stopped
1014 dd.status = orchestrator.DaemonDescriptionStatus.stopped
1015 dd.events = self.mgr.events.get_for_daemon(dd.name())
1016 return dd
1017
1018 for host, dm in self.daemons.copy().items():
1019 yield host, {name: alter(host, d) for name, d in dm.items()}
1020
1021 def get_daemons_by_service(self, service_name):
1022 # type: (str) -> List[orchestrator.DaemonDescription]
1023 assert not service_name.startswith('keepalived.')
1024 assert not service_name.startswith('haproxy.')
1025
1026 return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
1027
1028 def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
1029 assert service_type not in ['keepalived', 'haproxy']
1030
1031 daemons = self.daemons[host].values() if host else self._get_daemons()
1032
1033 return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)]
1034
1035 def get_daemon_types(self, hostname: str) -> Set[str]:
1036 """Provide a list of the types of daemons on the host"""
1037 return cast(Set[str], {d.daemon_type for d in self.daemons[hostname].values()})
1038
1039 def get_daemon_names(self):
1040 # type: () -> List[str]
1041 return [d.name() for d in self._get_daemons()]
1042
1043 def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
1044 if host in self.daemon_config_deps:
1045 if name in self.daemon_config_deps[host]:
1046 return self.daemon_config_deps[host][name].get('deps', []), \
1047 self.daemon_config_deps[host][name].get('last_config', None)
1048 return None, None
1049
1050 def get_host_client_files(self, host: str) -> Dict[str, Tuple[str, int, int, int]]:
1051 return self.last_client_files.get(host, {})
1052
1053 def host_needs_daemon_refresh(self, host):
1054 # type: (str) -> bool
1055 if host in self.mgr.offline_hosts:
1056 logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
1057 return False
1058 if host in self.daemon_refresh_queue:
1059 self.daemon_refresh_queue.remove(host)
1060 return True
1061 cutoff = datetime_now() - datetime.timedelta(
1062 seconds=self.mgr.daemon_cache_timeout)
1063 if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
1064 return True
1065 if not self.mgr.cache.host_metadata_up_to_date(host):
1066 return True
1067 return False
1068
1069 def host_needs_facts_refresh(self, host):
1070 # type: (str) -> bool
1071 if host in self.mgr.offline_hosts:
1072 logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh')
1073 return False
1074 cutoff = datetime_now() - datetime.timedelta(
1075 seconds=self.mgr.facts_cache_timeout)
1076 if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
1077 return True
1078 if not self.mgr.cache.host_metadata_up_to_date(host):
1079 return True
1080 return False
1081
1082 def host_needs_autotune_memory(self, host):
1083 # type: (str) -> bool
1084 if host in self.mgr.offline_hosts:
1085 logger.debug(f'Host "{host}" marked as offline. Skipping autotune')
1086 return False
1087 cutoff = datetime_now() - datetime.timedelta(
1088 seconds=self.mgr.autotune_interval)
1089 if host not in self.last_autotune or self.last_autotune[host] < cutoff:
1090 return True
1091 return False
1092
1093 def host_needs_tuned_profile_update(self, host: str, profile: str) -> bool:
1094 if host in self.mgr.offline_hosts:
1095 logger.debug(f'Host "{host}" marked as offline. Cannot apply tuned profile')
1096 return False
1097 if profile not in self.mgr.tuned_profiles:
1098 logger.debug(
1099 f'Cannot apply tuned profile {profile} on host {host}. Profile does not exist')
1100 return False
1101 if host not in self.last_tuned_profile_update:
1102 return True
1103 last_profile_update = self.mgr.tuned_profiles.last_updated(profile)
1104 if last_profile_update is None:
1105 self.mgr.tuned_profiles.set_last_updated(profile, datetime_now())
1106 return True
1107 if self.last_tuned_profile_update[host] < last_profile_update:
1108 return True
1109 return False
1110
1111 def host_had_daemon_refresh(self, host: str) -> bool:
1112 """
1113 ... at least once.
1114 """
1115 if host in self.last_daemon_update:
1116 return True
1117 if host not in self.daemons:
1118 return False
1119 return bool(self.daemons[host])
1120
1121 def host_needs_device_refresh(self, host):
1122 # type: (str) -> bool
1123 if host in self.mgr.offline_hosts:
1124 logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
1125 return False
1126 if host in self.device_refresh_queue:
1127 self.device_refresh_queue.remove(host)
1128 return True
1129 cutoff = datetime_now() - datetime.timedelta(
1130 seconds=self.mgr.device_cache_timeout)
1131 if host not in self.last_device_update or self.last_device_update[host] < cutoff:
1132 return True
1133 if not self.mgr.cache.host_metadata_up_to_date(host):
1134 return True
1135 return False
1136
1137 def host_needs_network_refresh(self, host):
1138 # type: (str) -> bool
1139 if host in self.mgr.offline_hosts:
1140 logger.debug(f'Host "{host}" marked as offline. Skipping network refresh')
1141 return False
1142 if host in self.network_refresh_queue:
1143 self.network_refresh_queue.remove(host)
1144 return True
1145 cutoff = datetime_now() - datetime.timedelta(
1146 seconds=self.mgr.device_cache_timeout)
1147 if host not in self.last_network_update or self.last_network_update[host] < cutoff:
1148 return True
1149 if not self.mgr.cache.host_metadata_up_to_date(host):
1150 return True
1151 return False
1152
1153 def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
1154 if host in self.mgr.offline_hosts:
1155 logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
1156 return False
1157 if host in self.osdspec_previews_refresh_queue:
1158 self.osdspec_previews_refresh_queue.remove(host)
1159 return True
1160 # Since this is dependent on other factors (device and spec) this does not need
1161 # to be updated periodically.
1162 return False
1163
1164 def host_needs_check(self, host):
1165 # type: (str) -> bool
1166 cutoff = datetime_now() - datetime.timedelta(
1167 seconds=self.mgr.host_check_interval)
1168 return host not in self.last_host_check or self.last_host_check[host] < cutoff
1169
1170 def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool:
1171 if (
1172 host not in self.devices
1173 or host not in self.last_device_change
1174 or host not in self.last_device_update
1175 or host not in self.osdspec_last_applied
1176 or spec.service_name() not in self.osdspec_last_applied[host]
1177 ):
1178 return True
1179 created = self.mgr.spec_store.get_created(spec)
1180 if not created or created > self.last_device_change[host]:
1181 return True
1182 return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host]
1183
1184 def host_needs_registry_login(self, host: str) -> bool:
1185 if host in self.mgr.offline_hosts:
1186 return False
1187 if host in self.registry_login_queue:
1188 self.registry_login_queue.remove(host)
1189 return True
1190 return False
1191
1192 def host_metadata_up_to_date(self, host: str) -> bool:
1193 if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]:
1194 return False
1195 return True
1196
1197 def all_host_metadata_up_to_date(self) -> bool:
1198 unreachables = [h.hostname for h in self.get_unreachable_hosts()]
1199 if [h for h in self.get_hosts() if (not self.host_metadata_up_to_date(h) and h not in unreachables)]:
1200 # this function is primarily for telling if it's safe to try and apply a service
1201 # spec. Since offline/maintenance hosts aren't considered in that process anyway
1202 # we don't want to return False if the host without up-to-date metadata is in one
1203 # of those two categories.
1204 return False
1205 return True
1206
1207 def add_daemon(self, host, dd):
1208 # type: (str, orchestrator.DaemonDescription) -> None
1209 assert host in self.daemons
1210 self.daemons[host][dd.name()] = dd
1211
1212 def rm_daemon(self, host: str, name: str) -> None:
1213 assert not name.startswith('ha-rgw.')
1214
1215 if host in self.daemons:
1216 if name in self.daemons[host]:
1217 del self.daemons[host][name]
1218
1219 def daemon_cache_filled(self) -> bool:
1220 """
1221 i.e. we have checked the daemons for each hosts at least once.
1222 excluding offline hosts.
1223
1224 We're not checking for `host_needs_daemon_refresh`, as this might never be
1225 False for all hosts.
1226 """
1227 return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
1228 for h in self.get_hosts())
1229
1230 def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
1231 assert not daemon_name.startswith('ha-rgw.')
1232
1233 priorities = {
1234 'start': 1,
1235 'restart': 2,
1236 'reconfig': 3,
1237 'redeploy': 4,
1238 'stop': 5,
1239 }
1240 existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None)
1241 if existing_action and priorities[existing_action] > priorities[action]:
1242 logger.debug(
1243 f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
1244 return
1245
1246 if host not in self.scheduled_daemon_actions:
1247 self.scheduled_daemon_actions[host] = {}
1248 self.scheduled_daemon_actions[host][daemon_name] = action
1249
1250 def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> bool:
1251 found = False
1252 if host in self.scheduled_daemon_actions:
1253 if daemon_name in self.scheduled_daemon_actions[host]:
1254 del self.scheduled_daemon_actions[host][daemon_name]
1255 found = True
1256 if not self.scheduled_daemon_actions[host]:
1257 del self.scheduled_daemon_actions[host]
1258 return found
1259
1260 def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
1261 assert not daemon.startswith('ha-rgw.')
1262
1263 return self.scheduled_daemon_actions.get(host, {}).get(daemon)
1264
1265
1266 class AgentCache():
1267 """
1268 AgentCache is used for storing metadata about agent daemons that must be kept
1269 through MGR failovers
1270 """
1271
1272 def __init__(self, mgr):
1273 # type: (CephadmOrchestrator) -> None
1274 self.mgr: CephadmOrchestrator = mgr
1275 self.agent_config_deps = {} # type: Dict[str, Dict[str,Any]]
1276 self.agent_counter = {} # type: Dict[str, int]
1277 self.agent_timestamp = {} # type: Dict[str, datetime.datetime]
1278 self.agent_keys = {} # type: Dict[str, str]
1279 self.agent_ports = {} # type: Dict[str, int]
1280 self.sending_agent_message = {} # type: Dict[str, bool]
1281
1282 def load(self):
1283 # type: () -> None
1284 for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items():
1285 host = k[len(AGENT_CACHE_PREFIX):]
1286 if host not in self.mgr.inventory:
1287 self.mgr.log.warning('removing stray AgentCache record for agent on %s' % (
1288 host))
1289 self.mgr.set_store(k, None)
1290 try:
1291 j = json.loads(v)
1292 self.agent_config_deps[host] = {}
1293 conf_deps = j.get('agent_config_deps', {})
1294 if conf_deps:
1295 conf_deps['last_config'] = str_to_datetime(conf_deps['last_config'])
1296 self.agent_config_deps[host] = conf_deps
1297 self.agent_counter[host] = int(j.get('agent_counter', 1))
1298 self.agent_timestamp[host] = str_to_datetime(
1299 j.get('agent_timestamp', datetime_to_str(datetime_now())))
1300 self.agent_keys[host] = str(j.get('agent_keys', ''))
1301 agent_port = int(j.get('agent_ports', 0))
1302 if agent_port:
1303 self.agent_ports[host] = agent_port
1304
1305 except Exception as e:
1306 self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % (
1307 host, e))
1308 pass
1309
1310 def save_agent(self, host: str) -> None:
1311 j: Dict[str, Any] = {}
1312 if host in self.agent_config_deps:
1313 j['agent_config_deps'] = {
1314 'deps': self.agent_config_deps[host].get('deps', []),
1315 'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']),
1316 }
1317 if host in self.agent_counter:
1318 j['agent_counter'] = self.agent_counter[host]
1319 if host in self.agent_keys:
1320 j['agent_keys'] = self.agent_keys[host]
1321 if host in self.agent_ports:
1322 j['agent_ports'] = self.agent_ports[host]
1323 if host in self.agent_timestamp:
1324 j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host])
1325
1326 self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j))
1327
1328 def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None:
1329 self.agent_config_deps[host] = {
1330 'deps': deps,
1331 'last_config': stamp,
1332 }
1333
1334 def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
1335 if host in self.agent_config_deps:
1336 return self.agent_config_deps[host].get('deps', []), \
1337 self.agent_config_deps[host].get('last_config', None)
1338 return None, None
1339
1340 def messaging_agent(self, host: str) -> bool:
1341 if host not in self.sending_agent_message or not self.sending_agent_message[host]:
1342 return False
1343 return True
1344
1345 def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
1346 # agent successfully received new config. Update config/deps
1347 assert daemon_spec.service_name == 'agent'
1348 self.update_agent_config_deps(
1349 daemon_spec.host, daemon_spec.deps, datetime_now())
1350 self.agent_timestamp[daemon_spec.host] = datetime_now()
1351 self.agent_counter[daemon_spec.host] = 1
1352 self.save_agent(daemon_spec.host)
1353
1354
1355 class EventStore():
1356 def __init__(self, mgr):
1357 # type: (CephadmOrchestrator) -> None
1358 self.mgr: CephadmOrchestrator = mgr
1359 self.events = {} # type: Dict[str, List[OrchestratorEvent]]
1360
1361 def add(self, event: OrchestratorEvent) -> None:
1362
1363 if event.kind_subject() not in self.events:
1364 self.events[event.kind_subject()] = [event]
1365
1366 for e in self.events[event.kind_subject()]:
1367 if e.message == event.message:
1368 return
1369
1370 self.events[event.kind_subject()].append(event)
1371
1372 # limit to five events for now.
1373 self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
1374
1375 def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
1376 e = OrchestratorEvent(datetime_now(), 'service',
1377 spec.service_name(), level, message)
1378 self.add(e)
1379
1380 def from_orch_error(self, e: OrchestratorError) -> None:
1381 if e.event_subject is not None:
1382 self.add(OrchestratorEvent(
1383 datetime_now(),
1384 e.event_subject[0],
1385 e.event_subject[1],
1386 "ERROR",
1387 str(e)
1388 ))
1389
1390 def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
1391 e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
1392 self.add(e)
1393
1394 def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
1395 self.for_daemon(
1396 daemon_name,
1397 "ERROR",
1398 str(e)
1399 )
1400
1401 def cleanup(self) -> None:
1402 # Needs to be properly done, in case events are persistently stored.
1403
1404 unknowns: List[str] = []
1405 daemons = self.mgr.cache.get_daemon_names()
1406 specs = self.mgr.spec_store.all_specs.keys()
1407 for k_s, v in self.events.items():
1408 kind, subject = k_s.split(':')
1409 if kind == 'service':
1410 if subject not in specs:
1411 unknowns.append(k_s)
1412 elif kind == 'daemon':
1413 if subject not in daemons:
1414 unknowns.append(k_s)
1415
1416 for k_s in unknowns:
1417 del self.events[k_s]
1418
1419 def get_for_service(self, name: str) -> List[OrchestratorEvent]:
1420 return self.events.get('service:' + name, [])
1421
1422 def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
1423 return self.events.get('daemon:' + name, [])