]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/inventory.py
check in ceph 17.2.3 sources
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
CommitLineData
e306af50
TL
1import datetime
2from copy import copy
b3b6e05e 3import ipaddress
e306af50
TL
4import json
5import logging
b3b6e05e 6import socket
f67539c2 7from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
b3b6e05e 8 NamedTuple, Type
e306af50
TL
9
10import orchestrator
11from ceph.deployment import inventory
b3b6e05e 12from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
adb31ebb 13from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
f67539c2 14from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
20effc67 15from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
e306af50 16
b3b6e05e 17from .utils import resolve_ip
a4b75251 18from .migrations import queue_migrate_nfs_spec
b3b6e05e 19
e306af50
TL
20if TYPE_CHECKING:
21 from .module import CephadmOrchestrator
22
23
24logger = logging.getLogger(__name__)
25
26HOST_CACHE_PREFIX = "host."
27SPEC_STORE_PREFIX = "spec."
20effc67 28AGENT_CACHE_PREFIX = 'agent.'
e306af50
TL
29
30
31class Inventory:
adb31ebb
TL
32 """
33 The inventory stores a HostSpec for all hosts persistently.
34 """
35
e306af50
TL
36 def __init__(self, mgr: 'CephadmOrchestrator'):
37 self.mgr = mgr
b3b6e05e
TL
38 adjusted_addrs = False
39
40 def is_valid_ip(ip: str) -> bool:
41 try:
42 ipaddress.ip_address(ip)
43 return True
44 except ValueError:
45 return False
46
e306af50
TL
47 # load inventory
48 i = self.mgr.get_store('inventory')
49 if i:
50 self._inventory: Dict[str, dict] = json.loads(i)
adb31ebb
TL
51 # handle old clusters missing 'hostname' key from hostspec
52 for k, v in self._inventory.items():
53 if 'hostname' not in v:
54 v['hostname'] = k
b3b6e05e
TL
55
56 # convert legacy non-IP addr?
57 if is_valid_ip(str(v.get('addr'))):
58 continue
59 if len(self._inventory) > 1:
60 if k == socket.gethostname():
61 # Never try to resolve our own host! This is
62 # fraught and can lead to either a loopback
63 # address (due to podman's futzing with
64 # /etc/hosts) or a private IP based on the CNI
65 # configuration. Instead, wait until the mgr
66 # fails over to another host and let them resolve
67 # this host.
68 continue
69 ip = resolve_ip(cast(str, v.get('addr')))
70 else:
71 # we only have 1 node in the cluster, so we can't
72 # rely on another host doing the lookup. use the
73 # IP the mgr binds to.
74 ip = self.mgr.get_mgr_ip()
75 if is_valid_ip(ip) and not ip.startswith('127.0.'):
76 self.mgr.log.info(
77 f"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'"
78 )
79 v['addr'] = ip
80 adjusted_addrs = True
81 if adjusted_addrs:
82 self.save()
e306af50
TL
83 else:
84 self._inventory = dict()
85 logger.debug('Loaded inventory %s' % self._inventory)
86
87 def keys(self) -> List[str]:
88 return list(self._inventory.keys())
89
90 def __contains__(self, host: str) -> bool:
91 return host in self._inventory
92
adb31ebb 93 def assert_host(self, host: str) -> None:
e306af50
TL
94 if host not in self._inventory:
95 raise OrchestratorError('host %s does not exist' % host)
96
adb31ebb 97 def add_host(self, spec: HostSpec) -> None:
a4b75251
TL
98 if spec.hostname in self._inventory:
99 # addr
100 if self.get_addr(spec.hostname) != spec.addr:
101 self.set_addr(spec.hostname, spec.addr)
102 # labels
103 for label in spec.labels:
104 self.add_label(spec.hostname, label)
105 else:
106 self._inventory[spec.hostname] = spec.to_json()
107 self.save()
e306af50 108
adb31ebb 109 def rm_host(self, host: str) -> None:
e306af50
TL
110 self.assert_host(host)
111 del self._inventory[host]
112 self.save()
113
adb31ebb 114 def set_addr(self, host: str, addr: str) -> None:
e306af50
TL
115 self.assert_host(host)
116 self._inventory[host]['addr'] = addr
117 self.save()
118
adb31ebb 119 def add_label(self, host: str, label: str) -> None:
e306af50
TL
120 self.assert_host(host)
121
122 if 'labels' not in self._inventory[host]:
123 self._inventory[host]['labels'] = list()
124 if label not in self._inventory[host]['labels']:
125 self._inventory[host]['labels'].append(label)
126 self.save()
127
adb31ebb 128 def rm_label(self, host: str, label: str) -> None:
e306af50
TL
129 self.assert_host(host)
130
131 if 'labels' not in self._inventory[host]:
132 self._inventory[host]['labels'] = list()
133 if label in self._inventory[host]['labels']:
134 self._inventory[host]['labels'].remove(label)
135 self.save()
136
b3b6e05e
TL
137 def has_label(self, host: str, label: str) -> bool:
138 return (
139 host in self._inventory
140 and label in self._inventory[host].get('labels', [])
141 )
142
adb31ebb 143 def get_addr(self, host: str) -> str:
e306af50
TL
144 self.assert_host(host)
145 return self._inventory[host].get('addr', host)
146
adb31ebb 147 def spec_from_dict(self, info: dict) -> HostSpec:
e306af50
TL
148 hostname = info['hostname']
149 return HostSpec(
f91f0fd5
TL
150 hostname,
151 addr=info.get('addr', hostname),
152 labels=info.get('labels', []),
153 status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''),
154 )
e306af50 155
f91f0fd5
TL
156 def all_specs(self) -> List[HostSpec]:
157 return list(map(self.spec_from_dict, self._inventory.values()))
e306af50 158
f67539c2
TL
159 def get_host_with_state(self, state: str = "") -> List[str]:
160 """return a list of host names in a specific state"""
161 return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state]
162
adb31ebb 163 def save(self) -> None:
e306af50
TL
164 self.mgr.set_store('inventory', json.dumps(self._inventory))
165
166
f67539c2
TL
167class SpecDescription(NamedTuple):
168 spec: ServiceSpec
b3b6e05e 169 rank_map: Optional[Dict[int, Dict[int, Optional[str]]]]
f67539c2
TL
170 created: datetime.datetime
171 deleted: Optional[datetime.datetime]
172
173
e306af50
TL
174class SpecStore():
175 def __init__(self, mgr):
176 # type: (CephadmOrchestrator) -> None
177 self.mgr = mgr
f67539c2 178 self._specs = {} # type: Dict[str, ServiceSpec]
b3b6e05e
TL
179 # service_name -> rank -> gen -> daemon_id
180 self._rank_maps = {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]]
f91f0fd5 181 self.spec_created = {} # type: Dict[str, datetime.datetime]
f67539c2 182 self.spec_deleted = {} # type: Dict[str, datetime.datetime]
f91f0fd5 183 self.spec_preview = {} # type: Dict[str, ServiceSpec]
e306af50 184
f67539c2
TL
185 @property
186 def all_specs(self) -> Mapping[str, ServiceSpec]:
187 """
188 returns active and deleted specs. Returns read-only dict.
189 """
190 return self._specs
191
192 def __contains__(self, name: str) -> bool:
193 return name in self._specs
194
195 def __getitem__(self, name: str) -> SpecDescription:
196 if name not in self._specs:
197 raise OrchestratorError(f'Service {name} not found.')
198 return SpecDescription(self._specs[name],
b3b6e05e 199 self._rank_maps.get(name),
f67539c2
TL
200 self.spec_created[name],
201 self.spec_deleted.get(name, None))
202
203 @property
204 def active_specs(self) -> Mapping[str, ServiceSpec]:
205 return {k: v for k, v in self._specs.items() if k not in self.spec_deleted}
206
e306af50
TL
207 def load(self):
208 # type: () -> None
f67539c2 209 for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items():
e306af50
TL
210 service_name = k[len(SPEC_STORE_PREFIX):]
211 try:
f67539c2 212 j = cast(Dict[str, dict], json.loads(v))
a4b75251
TL
213 if (
214 (self.mgr.migration_current or 0) < 3
215 and j['spec'].get('service_type') == 'nfs'
216 ):
217 self.mgr.log.debug(f'found legacy nfs spec {j}')
218 queue_migrate_nfs_spec(self.mgr, j)
f67539c2
TL
219 spec = ServiceSpec.from_json(j['spec'])
220 created = str_to_datetime(cast(str, j['created']))
221 self._specs[service_name] = spec
e306af50 222 self.spec_created[service_name] = created
f67539c2 223
b3b6e05e 224 if 'deleted' in j:
f67539c2
TL
225 deleted = str_to_datetime(cast(str, j['deleted']))
226 self.spec_deleted[service_name] = deleted
227
b3b6e05e
TL
228 if 'rank_map' in j and isinstance(j['rank_map'], dict):
229 self._rank_maps[service_name] = {}
230 for rank_str, m in j['rank_map'].items():
231 try:
232 rank = int(rank_str)
233 except ValueError:
234 logger.exception(f"failed to parse rank in {j['rank_map']}")
235 continue
236 if isinstance(m, dict):
237 self._rank_maps[service_name][rank] = {}
238 for gen_str, name in m.items():
239 try:
240 gen = int(gen_str)
241 except ValueError:
242 logger.exception(f"failed to parse gen in {j['rank_map']}")
243 continue
244 if isinstance(name, str) or m is None:
245 self._rank_maps[service_name][rank][gen] = name
246
e306af50
TL
247 self.mgr.log.debug('SpecStore: loaded spec for %s' % (
248 service_name))
249 except Exception as e:
250 self.mgr.log.warning('unable to load spec for %s: %s' % (
251 service_name, e))
252 pass
253
b3b6e05e
TL
254 def save(
255 self,
256 spec: ServiceSpec,
257 update_create: bool = True,
258 ) -> None:
f67539c2 259 name = spec.service_name()
f6b5b4d7 260 if spec.preview_only:
f67539c2 261 self.spec_preview[name] = spec
f6b5b4d7 262 return None
f67539c2
TL
263 self._specs[name] = spec
264
265 if update_create:
266 self.spec_created[name] = datetime_now()
b3b6e05e 267 self._save(name)
f67539c2 268
b3b6e05e
TL
269 def save_rank_map(self,
270 name: str,
271 rank_map: Dict[int, Dict[int, Optional[str]]]) -> None:
272 self._rank_maps[name] = rank_map
273 self._save(name)
274
275 def _save(self, name: str) -> None:
276 data: Dict[str, Any] = {
277 'spec': self._specs[name].to_json(),
f67539c2
TL
278 'created': datetime_to_str(self.spec_created[name]),
279 }
b3b6e05e
TL
280 if name in self._rank_maps:
281 data['rank_map'] = self._rank_maps[name]
f67539c2
TL
282 if name in self.spec_deleted:
283 data['deleted'] = datetime_to_str(self.spec_deleted[name])
284
e306af50 285 self.mgr.set_store(
f67539c2
TL
286 SPEC_STORE_PREFIX + name,
287 json.dumps(data, sort_keys=True),
e306af50 288 )
b3b6e05e
TL
289 self.mgr.events.for_service(self._specs[name],
290 OrchestratorEvent.INFO,
291 'service was created')
e306af50 292
f67539c2
TL
293 def rm(self, service_name: str) -> bool:
294 if service_name not in self._specs:
295 return False
296
297 if self._specs[service_name].preview_only:
298 self.finally_rm(service_name)
299 return True
300
301 self.spec_deleted[service_name] = datetime_now()
302 self.save(self._specs[service_name], update_create=False)
303 return True
304
305 def finally_rm(self, service_name):
e306af50 306 # type: (str) -> bool
f67539c2 307 found = service_name in self._specs
e306af50 308 if found:
f67539c2 309 del self._specs[service_name]
b3b6e05e
TL
310 if service_name in self._rank_maps:
311 del self._rank_maps[service_name]
e306af50 312 del self.spec_created[service_name]
f67539c2
TL
313 if service_name in self.spec_deleted:
314 del self.spec_deleted[service_name]
e306af50
TL
315 self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
316 return found
317
f67539c2
TL
318 def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
319 return self.spec_created.get(spec.service_name())
e306af50 320
f91f0fd5 321
b3b6e05e
TL
322class ClientKeyringSpec(object):
323 """
324 A client keyring file that we should maintain
325 """
326
327 def __init__(
328 self,
329 entity: str,
330 placement: PlacementSpec,
331 mode: Optional[int] = None,
332 uid: Optional[int] = None,
333 gid: Optional[int] = None,
334 ) -> None:
335 self.entity = entity
336 self.placement = placement
337 self.mode = mode or 0o600
338 self.uid = uid or 0
339 self.gid = gid or 0
340
341 def validate(self) -> None:
342 pass
343
344 def to_json(self) -> Dict[str, Any]:
345 return {
346 'entity': self.entity,
347 'placement': self.placement.to_json(),
348 'mode': self.mode,
349 'uid': self.uid,
350 'gid': self.gid,
351 }
352
353 @property
354 def path(self) -> str:
355 return f'/etc/ceph/ceph.{self.entity}.keyring'
356
357 @classmethod
358 def from_json(cls: Type, data: dict) -> 'ClientKeyringSpec':
359 c = data.copy()
360 if 'placement' in c:
361 c['placement'] = PlacementSpec.from_json(c['placement'])
362 _cls = cls(**c)
363 _cls.validate()
364 return _cls
365
366
367class ClientKeyringStore():
368 """
369 Track client keyring files that we are supposed to maintain
370 """
371
372 def __init__(self, mgr):
373 # type: (CephadmOrchestrator) -> None
374 self.mgr: CephadmOrchestrator = mgr
375 self.mgr = mgr
376 self.keys: Dict[str, ClientKeyringSpec] = {}
377
378 def load(self) -> None:
379 c = self.mgr.get_store('client_keyrings') or b'{}'
380 j = json.loads(c)
381 for e, d in j.items():
382 self.keys[e] = ClientKeyringSpec.from_json(d)
383
384 def save(self) -> None:
385 data = {
386 k: v.to_json() for k, v in self.keys.items()
387 }
388 self.mgr.set_store('client_keyrings', json.dumps(data))
389
390 def update(self, ks: ClientKeyringSpec) -> None:
391 self.keys[ks.entity] = ks
392 self.save()
393
394 def rm(self, entity: str) -> None:
395 if entity in self.keys:
396 del self.keys[entity]
397 self.save()
398
399
e306af50 400class HostCache():
adb31ebb
TL
401 """
402 HostCache stores different things:
403
404 1. `daemons`: Deployed daemons O(daemons)
405
406 They're part of the configuration nowadays and need to be
407 persistent. The name "daemon cache" is unfortunately a bit misleading.
408 Like for example we really need to know where daemons are deployed on
409 hosts that are offline.
410
411 2. `devices`: ceph-volume inventory cache O(hosts)
412
413 As soon as this is populated, it becomes more or less read-only.
414
415 3. `networks`: network interfaces for each host. O(hosts)
416
417 This is needed in order to deploy MONs. As this is mostly read-only.
418
b3b6e05e 419 4. `last_client_files` O(hosts)
adb31ebb 420
b3b6e05e
TL
421 Stores the last digest and owner/mode for files we've pushed to /etc/ceph
422 (ceph.conf or client keyrings).
adb31ebb
TL
423
424 5. `scheduled_daemon_actions`: O(daemons)
425
426 Used to run daemon actions after deploying a daemon. We need to
427 store it persistently, in order to stay consistent across
f67539c2 428 MGR failovers.
adb31ebb
TL
429 """
430
e306af50
TL
431 def __init__(self, mgr):
432 # type: (CephadmOrchestrator) -> None
433 self.mgr: CephadmOrchestrator = mgr
434 self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
435 self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
436 self.devices = {} # type: Dict[str, List[inventory.Device]]
adb31ebb
TL
437 self.facts = {} # type: Dict[str, Dict[str, Any]]
438 self.last_facts_update = {} # type: Dict[str, datetime.datetime]
b3b6e05e 439 self.last_autotune = {} # type: Dict[str, datetime.datetime]
e306af50 440 self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
f67539c2
TL
441 self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]]
442 self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
20effc67 443 self.last_network_update = {} # type: Dict[str, datetime.datetime]
e306af50 444 self.last_device_update = {} # type: Dict[str, datetime.datetime]
f67539c2 445 self.last_device_change = {} # type: Dict[str, datetime.datetime]
f91f0fd5
TL
446 self.daemon_refresh_queue = [] # type: List[str]
447 self.device_refresh_queue = [] # type: List[str]
20effc67 448 self.network_refresh_queue = [] # type: List[str]
f91f0fd5 449 self.osdspec_previews_refresh_queue = [] # type: List[str]
f6b5b4d7
TL
450
451 # host -> daemon name -> dict
e306af50
TL
452 self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
453 self.last_host_check = {} # type: Dict[str, datetime.datetime]
454 self.loading_osdspec_preview = set() # type: Set[str]
b3b6e05e 455 self.last_client_files: Dict[str, Dict[str, Tuple[str, int, int, int]]] = {}
f6b5b4d7 456 self.registry_login_queue: Set[str] = set()
e306af50 457
f91f0fd5
TL
458 self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
459
20effc67
TL
460 self.metadata_up_to_date = {} # type: Dict[str, bool]
461
e306af50
TL
462 def load(self):
463 # type: () -> None
f67539c2 464 for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
e306af50
TL
465 host = k[len(HOST_CACHE_PREFIX):]
466 if host not in self.mgr.inventory:
467 self.mgr.log.warning('removing stray HostCache host record %s' % (
468 host))
469 self.mgr.set_store(k, None)
470 try:
471 j = json.loads(v)
472 if 'last_device_update' in j:
f91f0fd5 473 self.last_device_update[host] = str_to_datetime(j['last_device_update'])
e306af50
TL
474 else:
475 self.device_refresh_queue.append(host)
f67539c2
TL
476 if 'last_device_change' in j:
477 self.last_device_change[host] = str_to_datetime(j['last_device_change'])
e306af50
TL
478 # for services, we ignore the persisted last_*_update
479 # and always trigger a new scrape on mgr restart.
480 self.daemon_refresh_queue.append(host)
20effc67 481 self.network_refresh_queue.append(host)
e306af50
TL
482 self.daemons[host] = {}
483 self.osdspec_previews[host] = []
f67539c2 484 self.osdspec_last_applied[host] = {}
e306af50
TL
485 self.devices[host] = []
486 self.networks[host] = {}
487 self.daemon_config_deps[host] = {}
488 for name, d in j.get('daemons', {}).items():
489 self.daemons[host][name] = \
490 orchestrator.DaemonDescription.from_json(d)
491 for d in j.get('devices', []):
492 self.devices[host].append(inventory.Device.from_json(d))
f67539c2 493 self.networks[host] = j.get('networks_and_interfaces', {})
e306af50 494 self.osdspec_previews[host] = j.get('osdspec_previews', {})
b3b6e05e 495 self.last_client_files[host] = j.get('last_client_files', {})
f67539c2
TL
496 for name, ts in j.get('osdspec_last_applied', {}).items():
497 self.osdspec_last_applied[host][name] = str_to_datetime(ts)
e306af50
TL
498
499 for name, d in j.get('daemon_config_deps', {}).items():
500 self.daemon_config_deps[host][name] = {
501 'deps': d.get('deps', []),
f91f0fd5 502 'last_config': str_to_datetime(d['last_config']),
e306af50
TL
503 }
504 if 'last_host_check' in j:
f91f0fd5 505 self.last_host_check[host] = str_to_datetime(j['last_host_check'])
f6b5b4d7 506 self.registry_login_queue.add(host)
f91f0fd5 507 self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
20effc67 508 self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False)
f91f0fd5 509
e306af50
TL
510 self.mgr.log.debug(
511 'HostCache.load: host %s has %d daemons, '
512 '%d devices, %d networks' % (
513 host, len(self.daemons[host]), len(self.devices[host]),
514 len(self.networks[host])))
515 except Exception as e:
516 self.mgr.log.warning('unable to load cached state for %s: %s' % (
517 host, e))
518 pass
519
520 def update_host_daemons(self, host, dm):
521 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
522 self.daemons[host] = dm
adb31ebb
TL
523 self.last_daemon_update[host] = datetime_now()
524
525 def update_host_facts(self, host, facts):
526 # type: (str, Dict[str, Dict[str, Any]]) -> None
527 self.facts[host] = facts
f67539c2
TL
528 self.last_facts_update[host] = datetime_now()
529
b3b6e05e
TL
530 def update_autotune(self, host: str) -> None:
531 self.last_autotune[host] = datetime_now()
532
533 def invalidate_autotune(self, host: str) -> None:
534 if host in self.last_autotune:
535 del self.last_autotune[host]
536
f67539c2
TL
537 def devices_changed(self, host: str, b: List[inventory.Device]) -> bool:
538 a = self.devices[host]
539 if len(a) != len(b):
540 return True
541 aj = {d.path: d.to_json() for d in a}
542 bj = {d.path: d.to_json() for d in b}
543 if aj != bj:
544 self.mgr.log.info("Detected new or changed devices on %s" % host)
545 return True
546 return False
e306af50 547
20effc67 548 def update_host_devices(
f67539c2
TL
549 self,
550 host: str,
551 dls: List[inventory.Device],
f67539c2
TL
552 ) -> None:
553 if (
554 host not in self.devices
555 or host not in self.last_device_change
556 or self.devices_changed(host, dls)
557 ):
558 self.last_device_change[host] = datetime_now()
559 self.last_device_update[host] = datetime_now()
e306af50 560 self.devices[host] = dls
20effc67
TL
561
562 def update_host_networks(
563 self,
564 host: str,
565 nets: Dict[str, Dict[str, List[str]]]
566 ) -> None:
e306af50 567 self.networks[host] = nets
20effc67 568 self.last_network_update[host] = datetime_now()
e306af50 569
adb31ebb 570 def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
e306af50
TL
571 self.daemon_config_deps[host][name] = {
572 'deps': deps,
573 'last_config': stamp,
574 }
575
576 def update_last_host_check(self, host):
577 # type: (str) -> None
adb31ebb 578 self.last_host_check[host] = datetime_now()
e306af50 579
f67539c2
TL
580 def update_osdspec_last_applied(self, host, service_name, ts):
581 # type: (str, str, datetime.datetime) -> None
582 self.osdspec_last_applied[host][service_name] = ts
583
b3b6e05e
TL
584 def update_client_file(self,
585 host: str,
586 path: str,
587 digest: str,
588 mode: int,
589 uid: int,
590 gid: int) -> None:
591 if host not in self.last_client_files:
592 self.last_client_files[host] = {}
593 self.last_client_files[host][path] = (digest, mode, uid, gid)
594
595 def removed_client_file(self, host: str, path: str) -> None:
596 if (
597 host in self.last_client_files
598 and path in self.last_client_files[host]
599 ):
600 del self.last_client_files[host][path]
601
e306af50
TL
602 def prime_empty_host(self, host):
603 # type: (str) -> None
604 """
605 Install an empty entry for a host
606 """
607 self.daemons[host] = {}
608 self.devices[host] = []
609 self.networks[host] = {}
610 self.osdspec_previews[host] = []
f67539c2 611 self.osdspec_last_applied[host] = {}
e306af50
TL
612 self.daemon_config_deps[host] = {}
613 self.daemon_refresh_queue.append(host)
614 self.device_refresh_queue.append(host)
20effc67 615 self.network_refresh_queue.append(host)
e306af50 616 self.osdspec_previews_refresh_queue.append(host)
f6b5b4d7 617 self.registry_login_queue.add(host)
b3b6e05e 618 self.last_client_files[host] = {}
e306af50 619
a4b75251
TL
620 def refresh_all_host_info(self, host):
621 # type: (str) -> None
622
623 self.last_host_check.pop(host, None)
624 self.daemon_refresh_queue.append(host)
625 self.registry_login_queue.add(host)
626 self.device_refresh_queue.append(host)
627 self.last_facts_update.pop(host, None)
628 self.osdspec_previews_refresh_queue.append(host)
629 self.last_autotune.pop(host, None)
630
e306af50
TL
631 def invalidate_host_daemons(self, host):
632 # type: (str) -> None
633 self.daemon_refresh_queue.append(host)
634 if host in self.last_daemon_update:
635 del self.last_daemon_update[host]
636 self.mgr.event.set()
637
638 def invalidate_host_devices(self, host):
639 # type: (str) -> None
640 self.device_refresh_queue.append(host)
641 if host in self.last_device_update:
642 del self.last_device_update[host]
643 self.mgr.event.set()
f91f0fd5 644
20effc67
TL
645 def invalidate_host_networks(self, host):
646 # type: (str) -> None
647 self.network_refresh_queue.append(host)
648 if host in self.last_network_update:
649 del self.last_network_update[host]
650 self.mgr.event.set()
651
adb31ebb 652 def distribute_new_registry_login_info(self) -> None:
f6b5b4d7 653 self.registry_login_queue = set(self.mgr.inventory.keys())
e306af50 654
f91f0fd5
TL
655 def save_host(self, host: str) -> None:
656 j: Dict[str, Any] = {
e306af50
TL
657 'daemons': {},
658 'devices': [],
659 'osdspec_previews': [],
f67539c2 660 'osdspec_last_applied': {},
e306af50
TL
661 'daemon_config_deps': {},
662 }
663 if host in self.last_daemon_update:
f91f0fd5 664 j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
e306af50 665 if host in self.last_device_update:
f91f0fd5 666 j['last_device_update'] = datetime_to_str(self.last_device_update[host])
20effc67
TL
667 if host in self.last_network_update:
668 j['last_network_update'] = datetime_to_str(self.last_network_update[host])
f67539c2
TL
669 if host in self.last_device_change:
670 j['last_device_change'] = datetime_to_str(self.last_device_change[host])
adb31ebb
TL
671 if host in self.daemons:
672 for name, dd in self.daemons[host].items():
673 j['daemons'][name] = dd.to_json()
674 if host in self.devices:
675 for d in self.devices[host]:
676 j['devices'].append(d.to_json())
677 if host in self.networks:
f67539c2 678 j['networks_and_interfaces'] = self.networks[host]
adb31ebb
TL
679 if host in self.daemon_config_deps:
680 for name, depi in self.daemon_config_deps[host].items():
681 j['daemon_config_deps'][name] = {
682 'deps': depi.get('deps', []),
683 'last_config': datetime_to_str(depi['last_config']),
684 }
685 if host in self.osdspec_previews and self.osdspec_previews[host]:
e306af50 686 j['osdspec_previews'] = self.osdspec_previews[host]
f67539c2
TL
687 if host in self.osdspec_last_applied:
688 for name, ts in self.osdspec_last_applied[host].items():
689 j['osdspec_last_applied'][name] = datetime_to_str(ts)
e306af50
TL
690
691 if host in self.last_host_check:
f91f0fd5 692 j['last_host_check'] = datetime_to_str(self.last_host_check[host])
f6b5b4d7 693
b3b6e05e
TL
694 if host in self.last_client_files:
695 j['last_client_files'] = self.last_client_files[host]
adb31ebb 696 if host in self.scheduled_daemon_actions:
f91f0fd5 697 j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
20effc67
TL
698 if host in self.metadata_up_to_date:
699 j['metadata_up_to_date'] = self.metadata_up_to_date[host]
f6b5b4d7 700
e306af50
TL
701 self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
702
703 def rm_host(self, host):
704 # type: (str) -> None
705 if host in self.daemons:
706 del self.daemons[host]
707 if host in self.devices:
708 del self.devices[host]
adb31ebb
TL
709 if host in self.facts:
710 del self.facts[host]
711 if host in self.last_facts_update:
712 del self.last_facts_update[host]
b3b6e05e
TL
713 if host in self.last_autotune:
714 del self.last_autotune[host]
e306af50
TL
715 if host in self.osdspec_previews:
716 del self.osdspec_previews[host]
f67539c2
TL
717 if host in self.osdspec_last_applied:
718 del self.osdspec_last_applied[host]
e306af50
TL
719 if host in self.loading_osdspec_preview:
720 self.loading_osdspec_preview.remove(host)
721 if host in self.networks:
722 del self.networks[host]
723 if host in self.last_daemon_update:
724 del self.last_daemon_update[host]
725 if host in self.last_device_update:
726 del self.last_device_update[host]
20effc67
TL
727 if host in self.last_network_update:
728 del self.last_network_update[host]
f67539c2
TL
729 if host in self.last_device_change:
730 del self.last_device_change[host]
e306af50
TL
731 if host in self.daemon_config_deps:
732 del self.daemon_config_deps[host]
f91f0fd5
TL
733 if host in self.scheduled_daemon_actions:
734 del self.scheduled_daemon_actions[host]
b3b6e05e
TL
735 if host in self.last_client_files:
736 del self.last_client_files[host]
e306af50
TL
737 self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
738
739 def get_hosts(self):
740 # type: () -> List[str]
20effc67
TL
741 return list(self.daemons)
742
743 def get_schedulable_hosts(self) -> List[HostSpec]:
744 """
745 Returns all usable hosts that went through _refresh_host_daemons().
746
747 This mitigates a potential race, where new host was added *after*
748 ``_refresh_host_daemons()`` was called, but *before*
749 ``_apply_all_specs()`` was called. thus we end up with a hosts
750 where daemons might be running, but we have not yet detected them.
751 """
752 return [
753 h for h in self.mgr.inventory.all_specs()
754 if (
755 self.host_had_daemon_refresh(h.hostname)
756 and '_no_schedule' not in h.labels
757 )
758 ]
759
760 def get_non_draining_hosts(self) -> List[HostSpec]:
761 """
762 Returns all hosts that do not have _no_schedule label.
763
764 Useful for the agent who needs this specific list rather than the
765 schedulable_hosts since the agent needs to be deployed on hosts with
766 no daemon refresh
767 """
768 return [
769 h for h in self.mgr.inventory.all_specs() if '_no_schedule' not in h.labels
770 ]
771
772 def get_unreachable_hosts(self) -> List[HostSpec]:
773 """
774 Return all hosts that are offline or in maintenance mode.
775
776 The idea is we should not touch the daemons on these hosts (since
777 in theory the hosts are inaccessible so we CAN'T touch them) but
778 we still want to count daemons that exist on these hosts toward the
779 placement so daemons on these hosts aren't just moved elsewhere
780 """
781 return [
782 h for h in self.mgr.inventory.all_specs()
783 if (
784 h.status.lower() in ['maintenance', 'offline']
785 or h.hostname in self.mgr.offline_hosts
786 )
787 ]
e306af50 788
b3b6e05e
TL
789 def get_facts(self, host: str) -> Dict[str, Any]:
790 return self.facts.get(host, {})
791
20effc67
TL
792 def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
793 for dm in self.daemons.copy().values():
794 yield from dm.values()
795
e306af50
TL
796 def get_daemons(self):
797 # type: () -> List[orchestrator.DaemonDescription]
20effc67
TL
798 return list(self._get_daemons())
799
800 def get_error_daemons(self) -> List[orchestrator.DaemonDescription]:
e306af50 801 r = []
20effc67
TL
802 for dd in self._get_daemons():
803 if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error:
e306af50
TL
804 r.append(dd)
805 return r
806
b3b6e05e
TL
807 def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
808 return list(self.daemons.get(host, {}).values())
809
20effc67 810 def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription:
f67539c2 811 assert not daemon_name.startswith('ha-rgw.')
20effc67
TL
812 dds = self.get_daemons_by_host(host) if host else self._get_daemons()
813 for dd in dds:
814 if dd.name() == daemon_name:
815 return dd
816
f6b5b4d7
TL
817 raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
818
20effc67 819 def has_daemon(self, daemon_name: str, host: Optional[str] = None) -> bool:
a4b75251 820 try:
20effc67 821 self.get_daemon(daemon_name, host)
a4b75251
TL
822 except orchestrator.OrchestratorError:
823 return False
824 return True
825
e306af50 826 def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
adb31ebb 827 def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
f6b5b4d7 828 dd = copy(dd_orig)
e306af50 829 if host in self.mgr.offline_hosts:
f67539c2 830 dd.status = orchestrator.DaemonDescriptionStatus.error
f6b5b4d7 831 dd.status_desc = 'host is offline'
b3b6e05e
TL
832 elif self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
833 # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
834 # could be wrong. We must assume maintenance is working and daemons are stopped
835 dd.status = orchestrator.DaemonDescriptionStatus.stopped
f6b5b4d7
TL
836 dd.events = self.mgr.events.get_for_daemon(dd.name())
837 return dd
838
20effc67 839 for host, dm in self.daemons.copy().items():
f6b5b4d7 840 yield host, {name: alter(host, d) for name, d in dm.items()}
e306af50
TL
841
842 def get_daemons_by_service(self, service_name):
843 # type: (str) -> List[orchestrator.DaemonDescription]
f67539c2
TL
844 assert not service_name.startswith('keepalived.')
845 assert not service_name.startswith('haproxy.')
846
20effc67 847 return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
f6b5b4d7 848
20effc67 849 def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
f67539c2
TL
850 assert service_type not in ['keepalived', 'haproxy']
851
20effc67 852 daemons = self.daemons[host].values() if host else self._get_daemons()
e306af50 853
20effc67
TL
854 return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)]
855
856 def get_daemon_types(self, hostname: str) -> Set[str]:
f67539c2 857 """Provide a list of the types of daemons on the host"""
20effc67 858 return cast(Set[str], {d.daemon_type for d in self.daemons[hostname].values()})
f67539c2 859
e306af50
TL
860 def get_daemon_names(self):
861 # type: () -> List[str]
20effc67 862 return [d.name() for d in self._get_daemons()]
e306af50 863
adb31ebb 864 def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
e306af50
TL
865 if host in self.daemon_config_deps:
866 if name in self.daemon_config_deps[host]:
867 return self.daemon_config_deps[host][name].get('deps', []), \
868 self.daemon_config_deps[host][name].get('last_config', None)
869 return None, None
870
b3b6e05e
TL
871 def get_host_client_files(self, host: str) -> Dict[str, Tuple[str, int, int, int]]:
872 return self.last_client_files.get(host, {})
873
e306af50
TL
874 def host_needs_daemon_refresh(self, host):
875 # type: (str) -> bool
876 if host in self.mgr.offline_hosts:
877 logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
878 return False
879 if host in self.daemon_refresh_queue:
880 self.daemon_refresh_queue.remove(host)
881 return True
adb31ebb 882 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
883 seconds=self.mgr.daemon_cache_timeout)
884 if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
885 return True
20effc67
TL
886 if not self.mgr.cache.host_metadata_up_to_date(host):
887 return True
e306af50
TL
888 return False
889
adb31ebb
TL
890 def host_needs_facts_refresh(self, host):
891 # type: (str) -> bool
892 if host in self.mgr.offline_hosts:
893 logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh')
894 return False
f67539c2 895 cutoff = datetime_now() - datetime.timedelta(
adb31ebb
TL
896 seconds=self.mgr.facts_cache_timeout)
897 if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
898 return True
20effc67
TL
899 if not self.mgr.cache.host_metadata_up_to_date(host):
900 return True
adb31ebb
TL
901 return False
902
b3b6e05e
TL
903 def host_needs_autotune_memory(self, host):
904 # type: (str) -> bool
905 if host in self.mgr.offline_hosts:
906 logger.debug(f'Host "{host}" marked as offline. Skipping autotune')
907 return False
908 cutoff = datetime_now() - datetime.timedelta(
909 seconds=self.mgr.autotune_interval)
910 if host not in self.last_autotune or self.last_autotune[host] < cutoff:
911 return True
912 return False
913
f91f0fd5
TL
914 def host_had_daemon_refresh(self, host: str) -> bool:
915 """
916 ... at least once.
917 """
918 if host in self.last_daemon_update:
919 return True
920 if host not in self.daemons:
921 return False
922 return bool(self.daemons[host])
923
e306af50
TL
924 def host_needs_device_refresh(self, host):
925 # type: (str) -> bool
926 if host in self.mgr.offline_hosts:
927 logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
928 return False
929 if host in self.device_refresh_queue:
930 self.device_refresh_queue.remove(host)
931 return True
adb31ebb 932 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
933 seconds=self.mgr.device_cache_timeout)
934 if host not in self.last_device_update or self.last_device_update[host] < cutoff:
935 return True
20effc67
TL
936 if not self.mgr.cache.host_metadata_up_to_date(host):
937 return True
938 return False
939
940 def host_needs_network_refresh(self, host):
941 # type: (str) -> bool
942 if host in self.mgr.offline_hosts:
943 logger.debug(f'Host "{host}" marked as offline. Skipping network refresh')
944 return False
945 if host in self.network_refresh_queue:
946 self.network_refresh_queue.remove(host)
947 return True
948 cutoff = datetime_now() - datetime.timedelta(
949 seconds=self.mgr.device_cache_timeout)
950 if host not in self.last_network_update or self.last_network_update[host] < cutoff:
951 return True
952 if not self.mgr.cache.host_metadata_up_to_date(host):
953 return True
e306af50
TL
954 return False
955
adb31ebb 956 def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
e306af50
TL
957 if host in self.mgr.offline_hosts:
958 logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
959 return False
960 if host in self.osdspec_previews_refresh_queue:
961 self.osdspec_previews_refresh_queue.remove(host)
962 return True
963 # Since this is dependent on other factors (device and spec) this does not need
964 # to be updated periodically.
965 return False
966
967 def host_needs_check(self, host):
968 # type: (str) -> bool
adb31ebb 969 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
970 seconds=self.mgr.host_check_interval)
971 return host not in self.last_host_check or self.last_host_check[host] < cutoff
972
f67539c2
TL
973 def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool:
974 if (
975 host not in self.devices
976 or host not in self.last_device_change
977 or host not in self.last_device_update
978 or host not in self.osdspec_last_applied
979 or spec.service_name() not in self.osdspec_last_applied[host]
980 ):
981 return True
982 created = self.mgr.spec_store.get_created(spec)
983 if not created or created > self.last_device_change[host]:
984 return True
985 return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host]
986
f91f0fd5 987 def host_needs_registry_login(self, host: str) -> bool:
f6b5b4d7
TL
988 if host in self.mgr.offline_hosts:
989 return False
990 if host in self.registry_login_queue:
991 self.registry_login_queue.remove(host)
992 return True
993 return False
994
20effc67
TL
995 def host_metadata_up_to_date(self, host: str) -> bool:
996 if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]:
997 return False
998 return True
999
1000 def all_host_metadata_up_to_date(self) -> bool:
1001 unreachables = [h.hostname for h in self.get_unreachable_hosts()]
1002 if [h for h in self.get_hosts() if (not self.host_metadata_up_to_date(h) and h not in unreachables)]:
1003 # this function is primarily for telling if it's safe to try and apply a service
1004 # spec. Since offline/maintenance hosts aren't considered in that process anyway
1005 # we don't want to return False if the host without up-to-date metadata is in one
1006 # of those two categories.
1007 return False
1008 return True
1009
e306af50
TL
1010 def add_daemon(self, host, dd):
1011 # type: (str, orchestrator.DaemonDescription) -> None
1012 assert host in self.daemons
1013 self.daemons[host][dd.name()] = dd
1014
adb31ebb 1015 def rm_daemon(self, host: str, name: str) -> None:
f67539c2
TL
1016 assert not name.startswith('ha-rgw.')
1017
e306af50
TL
1018 if host in self.daemons:
1019 if name in self.daemons[host]:
f6b5b4d7
TL
1020 del self.daemons[host][name]
1021
adb31ebb 1022 def daemon_cache_filled(self) -> bool:
f6b5b4d7
TL
1023 """
1024 i.e. we have checked the daemons for each hosts at least once.
1025 excluding offline hosts.
1026
1027 We're not checking for `host_needs_daemon_refresh`, as this might never be
1028 False for all hosts.
1029 """
f91f0fd5 1030 return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
f6b5b4d7
TL
1031 for h in self.get_hosts())
1032
adb31ebb 1033 def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
f67539c2
TL
1034 assert not daemon_name.startswith('ha-rgw.')
1035
f91f0fd5
TL
1036 priorities = {
1037 'start': 1,
1038 'restart': 2,
1039 'reconfig': 3,
1040 'redeploy': 4,
1041 'stop': 5,
1042 }
1043 existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None)
1044 if existing_action and priorities[existing_action] > priorities[action]:
1045 logger.debug(
1046 f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
1047 return
1048
1049 if host not in self.scheduled_daemon_actions:
1050 self.scheduled_daemon_actions[host] = {}
1051 self.scheduled_daemon_actions[host][daemon_name] = action
1052
20effc67
TL
1053 def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> bool:
1054 found = False
f91f0fd5
TL
1055 if host in self.scheduled_daemon_actions:
1056 if daemon_name in self.scheduled_daemon_actions[host]:
1057 del self.scheduled_daemon_actions[host][daemon_name]
20effc67 1058 found = True
f91f0fd5
TL
1059 if not self.scheduled_daemon_actions[host]:
1060 del self.scheduled_daemon_actions[host]
20effc67 1061 return found
f91f0fd5 1062
adb31ebb 1063 def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
f67539c2
TL
1064 assert not daemon.startswith('ha-rgw.')
1065
f91f0fd5
TL
1066 return self.scheduled_daemon_actions.get(host, {}).get(daemon)
1067
f6b5b4d7 1068
20effc67
TL
1069class AgentCache():
1070 """
1071 AgentCache is used for storing metadata about agent daemons that must be kept
1072 through MGR failovers
1073 """
1074
1075 def __init__(self, mgr):
1076 # type: (CephadmOrchestrator) -> None
1077 self.mgr: CephadmOrchestrator = mgr
1078 self.agent_config_deps = {} # type: Dict[str, Dict[str,Any]]
1079 self.agent_counter = {} # type: Dict[str, int]
1080 self.agent_timestamp = {} # type: Dict[str, datetime.datetime]
1081 self.agent_keys = {} # type: Dict[str, str]
1082 self.agent_ports = {} # type: Dict[str, int]
1083 self.sending_agent_message = {} # type: Dict[str, bool]
1084
1085 def load(self):
1086 # type: () -> None
1087 for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items():
1088 host = k[len(AGENT_CACHE_PREFIX):]
1089 if host not in self.mgr.inventory:
1090 self.mgr.log.warning('removing stray AgentCache record for agent on %s' % (
1091 host))
1092 self.mgr.set_store(k, None)
1093 try:
1094 j = json.loads(v)
1095 self.agent_config_deps[host] = {}
1096 conf_deps = j.get('agent_config_deps', {})
1097 if conf_deps:
1098 conf_deps['last_config'] = str_to_datetime(conf_deps['last_config'])
1099 self.agent_config_deps[host] = conf_deps
1100 self.agent_counter[host] = int(j.get('agent_counter', 1))
1101 self.agent_timestamp[host] = str_to_datetime(
1102 j.get('agent_timestamp', datetime_to_str(datetime_now())))
1103 self.agent_keys[host] = str(j.get('agent_keys', ''))
1104 agent_port = int(j.get('agent_ports', 0))
1105 if agent_port:
1106 self.agent_ports[host] = agent_port
1107
1108 except Exception as e:
1109 self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % (
1110 host, e))
1111 pass
1112
1113 def save_agent(self, host: str) -> None:
1114 j: Dict[str, Any] = {}
1115 if host in self.agent_config_deps:
1116 j['agent_config_deps'] = {
1117 'deps': self.agent_config_deps[host].get('deps', []),
1118 'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']),
1119 }
1120 if host in self.agent_counter:
1121 j['agent_counter'] = self.agent_counter[host]
1122 if host in self.agent_keys:
1123 j['agent_keys'] = self.agent_keys[host]
1124 if host in self.agent_ports:
1125 j['agent_ports'] = self.agent_ports[host]
1126 if host in self.agent_timestamp:
1127 j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host])
1128
1129 self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j))
1130
1131 def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None:
1132 self.agent_config_deps[host] = {
1133 'deps': deps,
1134 'last_config': stamp,
1135 }
1136
1137 def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
1138 if host in self.agent_config_deps:
1139 return self.agent_config_deps[host].get('deps', []), \
1140 self.agent_config_deps[host].get('last_config', None)
1141 return None, None
1142
1143 def messaging_agent(self, host: str) -> bool:
1144 if host not in self.sending_agent_message or not self.sending_agent_message[host]:
1145 return False
1146 return True
1147
1148 def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
1149 # agent successfully received new config. Update config/deps
1150 assert daemon_spec.service_name == 'agent'
1151 self.update_agent_config_deps(
1152 daemon_spec.host, daemon_spec.deps, datetime_now())
1153 self.agent_timestamp[daemon_spec.host] = datetime_now()
1154 self.agent_counter[daemon_spec.host] = 1
1155 self.save_agent(daemon_spec.host)
1156
1157
f6b5b4d7
TL
1158class EventStore():
1159 def __init__(self, mgr):
1160 # type: (CephadmOrchestrator) -> None
1161 self.mgr: CephadmOrchestrator = mgr
f91f0fd5 1162 self.events = {} # type: Dict[str, List[OrchestratorEvent]]
f6b5b4d7
TL
1163
1164 def add(self, event: OrchestratorEvent) -> None:
1165
1166 if event.kind_subject() not in self.events:
1167 self.events[event.kind_subject()] = [event]
1168
1169 for e in self.events[event.kind_subject()]:
1170 if e.message == event.message:
1171 return
1172
1173 self.events[event.kind_subject()].append(event)
1174
1175 # limit to five events for now.
1176 self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
1177
adb31ebb
TL
1178 def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
1179 e = OrchestratorEvent(datetime_now(), 'service',
f91f0fd5 1180 spec.service_name(), level, message)
f6b5b4d7
TL
1181 self.add(e)
1182
adb31ebb 1183 def from_orch_error(self, e: OrchestratorError) -> None:
f6b5b4d7
TL
1184 if e.event_subject is not None:
1185 self.add(OrchestratorEvent(
adb31ebb 1186 datetime_now(),
f6b5b4d7
TL
1187 e.event_subject[0],
1188 e.event_subject[1],
1189 "ERROR",
1190 str(e)
1191 ))
1192
adb31ebb
TL
1193 def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
1194 e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
f6b5b4d7
TL
1195 self.add(e)
1196
adb31ebb 1197 def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
f6b5b4d7
TL
1198 self.for_daemon(
1199 daemon_name,
1200 "ERROR",
1201 str(e)
1202 )
1203
1204 def cleanup(self) -> None:
1205 # Needs to be properly done, in case events are persistently stored.
1206
1207 unknowns: List[str] = []
1208 daemons = self.mgr.cache.get_daemon_names()
f67539c2 1209 specs = self.mgr.spec_store.all_specs.keys()
f6b5b4d7
TL
1210 for k_s, v in self.events.items():
1211 kind, subject = k_s.split(':')
1212 if kind == 'service':
1213 if subject not in specs:
1214 unknowns.append(k_s)
1215 elif kind == 'daemon':
1216 if subject not in daemons:
1217 unknowns.append(k_s)
1218
1219 for k_s in unknowns:
1220 del self.events[k_s]
1221
adb31ebb 1222 def get_for_service(self, name: str) -> List[OrchestratorEvent]:
f6b5b4d7
TL
1223 return self.events.get('service:' + name, [])
1224
adb31ebb 1225 def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
f6b5b4d7 1226 return self.events.get('daemon:' + name, [])