]>
Commit | Line | Data |
---|---|---|
e306af50 | 1 | import datetime |
2a845540 | 2 | import enum |
e306af50 | 3 | from copy import copy |
b3b6e05e | 4 | import ipaddress |
1e59de90 | 5 | import itertools |
e306af50 TL |
6 | import json |
7 | import logging | |
2a845540 | 8 | import math |
b3b6e05e | 9 | import socket |
f67539c2 | 10 | from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \ |
b3b6e05e | 11 | NamedTuple, Type |
e306af50 TL |
12 | |
13 | import orchestrator | |
14 | from ceph.deployment import inventory | |
1e59de90 | 15 | from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec, IngressSpec |
adb31ebb | 16 | from ceph.utils import str_to_datetime, datetime_to_str, datetime_now |
f67539c2 | 17 | from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types |
20effc67 | 18 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec |
e306af50 | 19 | |
b3b6e05e | 20 | from .utils import resolve_ip |
1e59de90 | 21 | from .migrations import queue_migrate_nfs_spec, queue_migrate_rgw_spec |
b3b6e05e | 22 | |
e306af50 TL |
23 | if TYPE_CHECKING: |
24 | from .module import CephadmOrchestrator | |
25 | ||
26 | ||
27 | logger = logging.getLogger(__name__) | |
28 | ||
29 | HOST_CACHE_PREFIX = "host." | |
30 | SPEC_STORE_PREFIX = "spec." | |
20effc67 | 31 | AGENT_CACHE_PREFIX = 'agent.' |
e306af50 TL |
32 | |
33 | ||
2a845540 TL |
34 | class HostCacheStatus(enum.Enum): |
35 | stray = 'stray' | |
36 | host = 'host' | |
37 | devices = 'devices' | |
38 | ||
39 | ||
e306af50 | 40 | class Inventory: |
adb31ebb TL |
41 | """ |
42 | The inventory stores a HostSpec for all hosts persistently. | |
43 | """ | |
44 | ||
e306af50 TL |
45 | def __init__(self, mgr: 'CephadmOrchestrator'): |
46 | self.mgr = mgr | |
b3b6e05e TL |
47 | adjusted_addrs = False |
48 | ||
49 | def is_valid_ip(ip: str) -> bool: | |
50 | try: | |
51 | ipaddress.ip_address(ip) | |
52 | return True | |
53 | except ValueError: | |
54 | return False | |
55 | ||
e306af50 TL |
56 | # load inventory |
57 | i = self.mgr.get_store('inventory') | |
58 | if i: | |
59 | self._inventory: Dict[str, dict] = json.loads(i) | |
adb31ebb TL |
60 | # handle old clusters missing 'hostname' key from hostspec |
61 | for k, v in self._inventory.items(): | |
62 | if 'hostname' not in v: | |
63 | v['hostname'] = k | |
b3b6e05e TL |
64 | |
65 | # convert legacy non-IP addr? | |
66 | if is_valid_ip(str(v.get('addr'))): | |
67 | continue | |
68 | if len(self._inventory) > 1: | |
69 | if k == socket.gethostname(): | |
70 | # Never try to resolve our own host! This is | |
71 | # fraught and can lead to either a loopback | |
72 | # address (due to podman's futzing with | |
73 | # /etc/hosts) or a private IP based on the CNI | |
74 | # configuration. Instead, wait until the mgr | |
75 | # fails over to another host and let them resolve | |
76 | # this host. | |
77 | continue | |
78 | ip = resolve_ip(cast(str, v.get('addr'))) | |
79 | else: | |
80 | # we only have 1 node in the cluster, so we can't | |
81 | # rely on another host doing the lookup. use the | |
82 | # IP the mgr binds to. | |
83 | ip = self.mgr.get_mgr_ip() | |
84 | if is_valid_ip(ip) and not ip.startswith('127.0.'): | |
85 | self.mgr.log.info( | |
86 | f"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'" | |
87 | ) | |
88 | v['addr'] = ip | |
89 | adjusted_addrs = True | |
90 | if adjusted_addrs: | |
91 | self.save() | |
e306af50 TL |
92 | else: |
93 | self._inventory = dict() | |
1e59de90 | 94 | self._all_known_names: Dict[str, List[str]] = {} |
e306af50 TL |
95 | logger.debug('Loaded inventory %s' % self._inventory) |
96 | ||
97 | def keys(self) -> List[str]: | |
98 | return list(self._inventory.keys()) | |
99 | ||
100 | def __contains__(self, host: str) -> bool: | |
1e59de90 TL |
101 | return host in self._inventory or host in itertools.chain.from_iterable(self._all_known_names.values()) |
102 | ||
103 | def _get_stored_name(self, host: str) -> str: | |
104 | self.assert_host(host) | |
105 | if host in self._inventory: | |
106 | return host | |
107 | for stored_name, all_names in self._all_known_names.items(): | |
108 | if host in all_names: | |
109 | return stored_name | |
110 | return host | |
111 | ||
112 | def update_known_hostnames(self, hostname: str, shortname: str, fqdn: str) -> None: | |
113 | for hname in [hostname, shortname, fqdn]: | |
114 | # if we know the host by any of the names, store the full set of names | |
115 | # in order to be able to check against those names for matching a host | |
116 | if hname in self._inventory: | |
117 | self._all_known_names[hname] = [hostname, shortname, fqdn] | |
118 | return | |
119 | logger.debug(f'got hostname set from gather-facts for unknown host: {[hostname, shortname, fqdn]}') | |
e306af50 | 120 | |
adb31ebb | 121 | def assert_host(self, host: str) -> None: |
1e59de90 | 122 | if host not in self: |
e306af50 TL |
123 | raise OrchestratorError('host %s does not exist' % host) |
124 | ||
adb31ebb | 125 | def add_host(self, spec: HostSpec) -> None: |
1e59de90 | 126 | if spec.hostname in self: |
a4b75251 TL |
127 | # addr |
128 | if self.get_addr(spec.hostname) != spec.addr: | |
129 | self.set_addr(spec.hostname, spec.addr) | |
130 | # labels | |
131 | for label in spec.labels: | |
132 | self.add_label(spec.hostname, label) | |
133 | else: | |
134 | self._inventory[spec.hostname] = spec.to_json() | |
135 | self.save() | |
e306af50 | 136 | |
adb31ebb | 137 | def rm_host(self, host: str) -> None: |
1e59de90 | 138 | host = self._get_stored_name(host) |
e306af50 | 139 | del self._inventory[host] |
1e59de90 | 140 | self._all_known_names.pop(host, []) |
e306af50 TL |
141 | self.save() |
142 | ||
adb31ebb | 143 | def set_addr(self, host: str, addr: str) -> None: |
1e59de90 | 144 | host = self._get_stored_name(host) |
e306af50 TL |
145 | self._inventory[host]['addr'] = addr |
146 | self.save() | |
147 | ||
adb31ebb | 148 | def add_label(self, host: str, label: str) -> None: |
1e59de90 | 149 | host = self._get_stored_name(host) |
e306af50 TL |
150 | |
151 | if 'labels' not in self._inventory[host]: | |
152 | self._inventory[host]['labels'] = list() | |
153 | if label not in self._inventory[host]['labels']: | |
154 | self._inventory[host]['labels'].append(label) | |
155 | self.save() | |
156 | ||
adb31ebb | 157 | def rm_label(self, host: str, label: str) -> None: |
1e59de90 | 158 | host = self._get_stored_name(host) |
e306af50 TL |
159 | |
160 | if 'labels' not in self._inventory[host]: | |
161 | self._inventory[host]['labels'] = list() | |
162 | if label in self._inventory[host]['labels']: | |
163 | self._inventory[host]['labels'].remove(label) | |
164 | self.save() | |
165 | ||
b3b6e05e | 166 | def has_label(self, host: str, label: str) -> bool: |
1e59de90 | 167 | host = self._get_stored_name(host) |
b3b6e05e TL |
168 | return ( |
169 | host in self._inventory | |
170 | and label in self._inventory[host].get('labels', []) | |
171 | ) | |
172 | ||
adb31ebb | 173 | def get_addr(self, host: str) -> str: |
1e59de90 | 174 | host = self._get_stored_name(host) |
e306af50 TL |
175 | return self._inventory[host].get('addr', host) |
176 | ||
adb31ebb | 177 | def spec_from_dict(self, info: dict) -> HostSpec: |
e306af50 | 178 | hostname = info['hostname'] |
1e59de90 | 179 | hostname = self._get_stored_name(hostname) |
e306af50 | 180 | return HostSpec( |
f91f0fd5 TL |
181 | hostname, |
182 | addr=info.get('addr', hostname), | |
183 | labels=info.get('labels', []), | |
184 | status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''), | |
185 | ) | |
e306af50 | 186 | |
f91f0fd5 TL |
187 | def all_specs(self) -> List[HostSpec]: |
188 | return list(map(self.spec_from_dict, self._inventory.values())) | |
e306af50 | 189 | |
f67539c2 TL |
190 | def get_host_with_state(self, state: str = "") -> List[str]: |
191 | """return a list of host names in a specific state""" | |
192 | return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state] | |
193 | ||
adb31ebb | 194 | def save(self) -> None: |
e306af50 TL |
195 | self.mgr.set_store('inventory', json.dumps(self._inventory)) |
196 | ||
197 | ||
f67539c2 TL |
198 | class SpecDescription(NamedTuple): |
199 | spec: ServiceSpec | |
b3b6e05e | 200 | rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] |
f67539c2 TL |
201 | created: datetime.datetime |
202 | deleted: Optional[datetime.datetime] | |
203 | ||
204 | ||
e306af50 TL |
205 | class SpecStore(): |
206 | def __init__(self, mgr): | |
207 | # type: (CephadmOrchestrator) -> None | |
208 | self.mgr = mgr | |
f67539c2 | 209 | self._specs = {} # type: Dict[str, ServiceSpec] |
b3b6e05e TL |
210 | # service_name -> rank -> gen -> daemon_id |
211 | self._rank_maps = {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]] | |
f91f0fd5 | 212 | self.spec_created = {} # type: Dict[str, datetime.datetime] |
f67539c2 | 213 | self.spec_deleted = {} # type: Dict[str, datetime.datetime] |
f91f0fd5 | 214 | self.spec_preview = {} # type: Dict[str, ServiceSpec] |
1e59de90 | 215 | self._needs_configuration: Dict[str, bool] = {} |
e306af50 | 216 | |
f67539c2 TL |
217 | @property |
218 | def all_specs(self) -> Mapping[str, ServiceSpec]: | |
219 | """ | |
220 | returns active and deleted specs. Returns read-only dict. | |
221 | """ | |
222 | return self._specs | |
223 | ||
224 | def __contains__(self, name: str) -> bool: | |
225 | return name in self._specs | |
226 | ||
227 | def __getitem__(self, name: str) -> SpecDescription: | |
228 | if name not in self._specs: | |
229 | raise OrchestratorError(f'Service {name} not found.') | |
230 | return SpecDescription(self._specs[name], | |
b3b6e05e | 231 | self._rank_maps.get(name), |
f67539c2 TL |
232 | self.spec_created[name], |
233 | self.spec_deleted.get(name, None)) | |
234 | ||
235 | @property | |
236 | def active_specs(self) -> Mapping[str, ServiceSpec]: | |
237 | return {k: v for k, v in self._specs.items() if k not in self.spec_deleted} | |
238 | ||
e306af50 TL |
239 | def load(self): |
240 | # type: () -> None | |
f67539c2 | 241 | for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items(): |
e306af50 TL |
242 | service_name = k[len(SPEC_STORE_PREFIX):] |
243 | try: | |
f67539c2 | 244 | j = cast(Dict[str, dict], json.loads(v)) |
a4b75251 TL |
245 | if ( |
246 | (self.mgr.migration_current or 0) < 3 | |
247 | and j['spec'].get('service_type') == 'nfs' | |
248 | ): | |
249 | self.mgr.log.debug(f'found legacy nfs spec {j}') | |
250 | queue_migrate_nfs_spec(self.mgr, j) | |
1e59de90 TL |
251 | |
252 | if ( | |
253 | (self.mgr.migration_current or 0) < 6 | |
254 | and j['spec'].get('service_type') == 'rgw' | |
255 | ): | |
256 | queue_migrate_rgw_spec(self.mgr, j) | |
257 | ||
f67539c2 TL |
258 | spec = ServiceSpec.from_json(j['spec']) |
259 | created = str_to_datetime(cast(str, j['created'])) | |
260 | self._specs[service_name] = spec | |
e306af50 | 261 | self.spec_created[service_name] = created |
f67539c2 | 262 | |
b3b6e05e | 263 | if 'deleted' in j: |
f67539c2 TL |
264 | deleted = str_to_datetime(cast(str, j['deleted'])) |
265 | self.spec_deleted[service_name] = deleted | |
266 | ||
1e59de90 TL |
267 | if 'needs_configuration' in j: |
268 | self._needs_configuration[service_name] = cast(bool, j['needs_configuration']) | |
269 | ||
b3b6e05e TL |
270 | if 'rank_map' in j and isinstance(j['rank_map'], dict): |
271 | self._rank_maps[service_name] = {} | |
272 | for rank_str, m in j['rank_map'].items(): | |
273 | try: | |
274 | rank = int(rank_str) | |
275 | except ValueError: | |
276 | logger.exception(f"failed to parse rank in {j['rank_map']}") | |
277 | continue | |
278 | if isinstance(m, dict): | |
279 | self._rank_maps[service_name][rank] = {} | |
280 | for gen_str, name in m.items(): | |
281 | try: | |
282 | gen = int(gen_str) | |
283 | except ValueError: | |
284 | logger.exception(f"failed to parse gen in {j['rank_map']}") | |
285 | continue | |
286 | if isinstance(name, str) or m is None: | |
287 | self._rank_maps[service_name][rank][gen] = name | |
288 | ||
e306af50 TL |
289 | self.mgr.log.debug('SpecStore: loaded spec for %s' % ( |
290 | service_name)) | |
291 | except Exception as e: | |
292 | self.mgr.log.warning('unable to load spec for %s: %s' % ( | |
293 | service_name, e)) | |
294 | pass | |
295 | ||
b3b6e05e TL |
296 | def save( |
297 | self, | |
298 | spec: ServiceSpec, | |
299 | update_create: bool = True, | |
300 | ) -> None: | |
f67539c2 | 301 | name = spec.service_name() |
f6b5b4d7 | 302 | if spec.preview_only: |
f67539c2 | 303 | self.spec_preview[name] = spec |
f6b5b4d7 | 304 | return None |
f67539c2 | 305 | self._specs[name] = spec |
1e59de90 | 306 | self._needs_configuration[name] = True |
f67539c2 TL |
307 | |
308 | if update_create: | |
309 | self.spec_created[name] = datetime_now() | |
b3b6e05e | 310 | self._save(name) |
f67539c2 | 311 | |
b3b6e05e TL |
312 | def save_rank_map(self, |
313 | name: str, | |
314 | rank_map: Dict[int, Dict[int, Optional[str]]]) -> None: | |
315 | self._rank_maps[name] = rank_map | |
316 | self._save(name) | |
317 | ||
318 | def _save(self, name: str) -> None: | |
319 | data: Dict[str, Any] = { | |
320 | 'spec': self._specs[name].to_json(), | |
f67539c2 | 321 | } |
1e59de90 TL |
322 | if name in self.spec_created: |
323 | data['created'] = datetime_to_str(self.spec_created[name]) | |
b3b6e05e TL |
324 | if name in self._rank_maps: |
325 | data['rank_map'] = self._rank_maps[name] | |
f67539c2 TL |
326 | if name in self.spec_deleted: |
327 | data['deleted'] = datetime_to_str(self.spec_deleted[name]) | |
1e59de90 TL |
328 | if name in self._needs_configuration: |
329 | data['needs_configuration'] = self._needs_configuration[name] | |
f67539c2 | 330 | |
e306af50 | 331 | self.mgr.set_store( |
f67539c2 TL |
332 | SPEC_STORE_PREFIX + name, |
333 | json.dumps(data, sort_keys=True), | |
e306af50 | 334 | ) |
b3b6e05e TL |
335 | self.mgr.events.for_service(self._specs[name], |
336 | OrchestratorEvent.INFO, | |
337 | 'service was created') | |
e306af50 | 338 | |
f67539c2 TL |
339 | def rm(self, service_name: str) -> bool: |
340 | if service_name not in self._specs: | |
341 | return False | |
342 | ||
343 | if self._specs[service_name].preview_only: | |
344 | self.finally_rm(service_name) | |
345 | return True | |
346 | ||
347 | self.spec_deleted[service_name] = datetime_now() | |
348 | self.save(self._specs[service_name], update_create=False) | |
349 | return True | |
350 | ||
351 | def finally_rm(self, service_name): | |
e306af50 | 352 | # type: (str) -> bool |
f67539c2 | 353 | found = service_name in self._specs |
e306af50 | 354 | if found: |
f67539c2 | 355 | del self._specs[service_name] |
b3b6e05e TL |
356 | if service_name in self._rank_maps: |
357 | del self._rank_maps[service_name] | |
e306af50 | 358 | del self.spec_created[service_name] |
f67539c2 TL |
359 | if service_name in self.spec_deleted: |
360 | del self.spec_deleted[service_name] | |
1e59de90 TL |
361 | if service_name in self._needs_configuration: |
362 | del self._needs_configuration[service_name] | |
e306af50 TL |
363 | self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None) |
364 | return found | |
365 | ||
f67539c2 TL |
366 | def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]: |
367 | return self.spec_created.get(spec.service_name()) | |
e306af50 | 368 | |
1e59de90 TL |
369 | def set_unmanaged(self, service_name: str, value: bool) -> str: |
370 | if service_name not in self._specs: | |
371 | return f'No service of name {service_name} found. Check "ceph orch ls" for all known services' | |
372 | if self._specs[service_name].unmanaged == value: | |
373 | return f'Service {service_name}{" already " if value else " not "}marked unmanaged. No action taken.' | |
374 | self._specs[service_name].unmanaged = value | |
375 | self.save(self._specs[service_name]) | |
376 | return f'Set unmanaged to {str(value)} for service {service_name}' | |
377 | ||
378 | def needs_configuration(self, name: str) -> bool: | |
379 | return self._needs_configuration.get(name, False) | |
380 | ||
381 | def mark_needs_configuration(self, name: str) -> None: | |
382 | if name in self._specs: | |
383 | self._needs_configuration[name] = True | |
384 | self._save(name) | |
385 | else: | |
386 | self.mgr.log.warning(f'Attempted to mark unknown service "{name}" as needing configuration') | |
387 | ||
388 | def mark_configured(self, name: str) -> None: | |
389 | if name in self._specs: | |
390 | self._needs_configuration[name] = False | |
391 | self._save(name) | |
392 | else: | |
393 | self.mgr.log.warning(f'Attempted to mark unknown service "{name}" as having been configured') | |
394 | ||
f91f0fd5 | 395 | |
b3b6e05e TL |
396 | class ClientKeyringSpec(object): |
397 | """ | |
398 | A client keyring file that we should maintain | |
399 | """ | |
400 | ||
401 | def __init__( | |
402 | self, | |
403 | entity: str, | |
404 | placement: PlacementSpec, | |
405 | mode: Optional[int] = None, | |
406 | uid: Optional[int] = None, | |
407 | gid: Optional[int] = None, | |
408 | ) -> None: | |
409 | self.entity = entity | |
410 | self.placement = placement | |
411 | self.mode = mode or 0o600 | |
412 | self.uid = uid or 0 | |
413 | self.gid = gid or 0 | |
414 | ||
415 | def validate(self) -> None: | |
416 | pass | |
417 | ||
418 | def to_json(self) -> Dict[str, Any]: | |
419 | return { | |
420 | 'entity': self.entity, | |
421 | 'placement': self.placement.to_json(), | |
422 | 'mode': self.mode, | |
423 | 'uid': self.uid, | |
424 | 'gid': self.gid, | |
425 | } | |
426 | ||
427 | @property | |
428 | def path(self) -> str: | |
429 | return f'/etc/ceph/ceph.{self.entity}.keyring' | |
430 | ||
431 | @classmethod | |
432 | def from_json(cls: Type, data: dict) -> 'ClientKeyringSpec': | |
433 | c = data.copy() | |
434 | if 'placement' in c: | |
435 | c['placement'] = PlacementSpec.from_json(c['placement']) | |
436 | _cls = cls(**c) | |
437 | _cls.validate() | |
438 | return _cls | |
439 | ||
440 | ||
441 | class ClientKeyringStore(): | |
442 | """ | |
443 | Track client keyring files that we are supposed to maintain | |
444 | """ | |
445 | ||
446 | def __init__(self, mgr): | |
447 | # type: (CephadmOrchestrator) -> None | |
448 | self.mgr: CephadmOrchestrator = mgr | |
449 | self.mgr = mgr | |
450 | self.keys: Dict[str, ClientKeyringSpec] = {} | |
451 | ||
452 | def load(self) -> None: | |
453 | c = self.mgr.get_store('client_keyrings') or b'{}' | |
454 | j = json.loads(c) | |
455 | for e, d in j.items(): | |
456 | self.keys[e] = ClientKeyringSpec.from_json(d) | |
457 | ||
458 | def save(self) -> None: | |
459 | data = { | |
460 | k: v.to_json() for k, v in self.keys.items() | |
461 | } | |
462 | self.mgr.set_store('client_keyrings', json.dumps(data)) | |
463 | ||
464 | def update(self, ks: ClientKeyringSpec) -> None: | |
465 | self.keys[ks.entity] = ks | |
466 | self.save() | |
467 | ||
468 | def rm(self, entity: str) -> None: | |
469 | if entity in self.keys: | |
470 | del self.keys[entity] | |
471 | self.save() | |
472 | ||
473 | ||
2a845540 TL |
474 | class TunedProfileStore(): |
475 | """ | |
476 | Store for out tuned profile information | |
477 | """ | |
478 | ||
479 | def __init__(self, mgr: "CephadmOrchestrator") -> None: | |
480 | self.mgr: CephadmOrchestrator = mgr | |
481 | self.mgr = mgr | |
482 | self.profiles: Dict[str, TunedProfileSpec] = {} | |
483 | ||
484 | def __contains__(self, profile: str) -> bool: | |
485 | return profile in self.profiles | |
486 | ||
487 | def load(self) -> None: | |
488 | c = self.mgr.get_store('tuned_profiles') or b'{}' | |
489 | j = json.loads(c) | |
490 | for k, v in j.items(): | |
491 | self.profiles[k] = TunedProfileSpec.from_json(v) | |
492 | self.profiles[k]._last_updated = datetime_to_str(datetime_now()) | |
493 | ||
494 | def exists(self, profile_name: str) -> bool: | |
495 | return profile_name in self.profiles | |
496 | ||
497 | def save(self) -> None: | |
498 | profiles_json = {k: v.to_json() for k, v in self.profiles.items()} | |
499 | self.mgr.set_store('tuned_profiles', json.dumps(profiles_json)) | |
500 | ||
501 | def add_setting(self, profile: str, setting: str, value: str) -> None: | |
502 | if profile in self.profiles: | |
503 | self.profiles[profile].settings[setting] = value | |
504 | self.profiles[profile]._last_updated = datetime_to_str(datetime_now()) | |
505 | self.save() | |
506 | else: | |
507 | logger.error( | |
508 | f'Attempted to set setting "{setting}" for nonexistent os tuning profile "{profile}"') | |
509 | ||
510 | def rm_setting(self, profile: str, setting: str) -> None: | |
511 | if profile in self.profiles: | |
512 | if setting in self.profiles[profile].settings: | |
513 | self.profiles[profile].settings.pop(setting, '') | |
514 | self.profiles[profile]._last_updated = datetime_to_str(datetime_now()) | |
515 | self.save() | |
516 | else: | |
517 | logger.error( | |
518 | f'Attemped to remove nonexistent setting "{setting}" from os tuning profile "{profile}"') | |
519 | else: | |
520 | logger.error( | |
521 | f'Attempted to remove setting "{setting}" from nonexistent os tuning profile "{profile}"') | |
522 | ||
523 | def add_profile(self, spec: TunedProfileSpec) -> None: | |
524 | spec._last_updated = datetime_to_str(datetime_now()) | |
525 | self.profiles[spec.profile_name] = spec | |
526 | self.save() | |
527 | ||
528 | def rm_profile(self, profile: str) -> None: | |
529 | if profile in self.profiles: | |
530 | self.profiles.pop(profile, TunedProfileSpec('')) | |
531 | else: | |
532 | logger.error(f'Attempted to remove nonexistent os tuning profile "{profile}"') | |
533 | self.save() | |
534 | ||
535 | def last_updated(self, profile: str) -> Optional[datetime.datetime]: | |
536 | if profile not in self.profiles or not self.profiles[profile]._last_updated: | |
537 | return None | |
538 | return str_to_datetime(self.profiles[profile]._last_updated) | |
539 | ||
540 | def set_last_updated(self, profile: str, new_datetime: datetime.datetime) -> None: | |
541 | if profile in self.profiles: | |
542 | self.profiles[profile]._last_updated = datetime_to_str(new_datetime) | |
543 | ||
544 | def list_profiles(self) -> List[TunedProfileSpec]: | |
545 | return [p for p in self.profiles.values()] | |
546 | ||
547 | ||
e306af50 | 548 | class HostCache(): |
adb31ebb TL |
549 | """ |
550 | HostCache stores different things: | |
551 | ||
552 | 1. `daemons`: Deployed daemons O(daemons) | |
553 | ||
554 | They're part of the configuration nowadays and need to be | |
555 | persistent. The name "daemon cache" is unfortunately a bit misleading. | |
556 | Like for example we really need to know where daemons are deployed on | |
557 | hosts that are offline. | |
558 | ||
559 | 2. `devices`: ceph-volume inventory cache O(hosts) | |
560 | ||
561 | As soon as this is populated, it becomes more or less read-only. | |
562 | ||
563 | 3. `networks`: network interfaces for each host. O(hosts) | |
564 | ||
565 | This is needed in order to deploy MONs. As this is mostly read-only. | |
566 | ||
b3b6e05e | 567 | 4. `last_client_files` O(hosts) |
adb31ebb | 568 | |
b3b6e05e TL |
569 | Stores the last digest and owner/mode for files we've pushed to /etc/ceph |
570 | (ceph.conf or client keyrings). | |
adb31ebb TL |
571 | |
572 | 5. `scheduled_daemon_actions`: O(daemons) | |
573 | ||
574 | Used to run daemon actions after deploying a daemon. We need to | |
575 | store it persistently, in order to stay consistent across | |
f67539c2 | 576 | MGR failovers. |
adb31ebb TL |
577 | """ |
578 | ||
e306af50 TL |
579 | def __init__(self, mgr): |
580 | # type: (CephadmOrchestrator) -> None | |
581 | self.mgr: CephadmOrchestrator = mgr | |
582 | self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] | |
1e59de90 | 583 | self._tmp_daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] |
e306af50 TL |
584 | self.last_daemon_update = {} # type: Dict[str, datetime.datetime] |
585 | self.devices = {} # type: Dict[str, List[inventory.Device]] | |
adb31ebb TL |
586 | self.facts = {} # type: Dict[str, Dict[str, Any]] |
587 | self.last_facts_update = {} # type: Dict[str, datetime.datetime] | |
b3b6e05e | 588 | self.last_autotune = {} # type: Dict[str, datetime.datetime] |
e306af50 | 589 | self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]] |
f67539c2 TL |
590 | self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]] |
591 | self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]] | |
20effc67 | 592 | self.last_network_update = {} # type: Dict[str, datetime.datetime] |
e306af50 | 593 | self.last_device_update = {} # type: Dict[str, datetime.datetime] |
f67539c2 | 594 | self.last_device_change = {} # type: Dict[str, datetime.datetime] |
2a845540 | 595 | self.last_tuned_profile_update = {} # type: Dict[str, datetime.datetime] |
f91f0fd5 TL |
596 | self.daemon_refresh_queue = [] # type: List[str] |
597 | self.device_refresh_queue = [] # type: List[str] | |
20effc67 | 598 | self.network_refresh_queue = [] # type: List[str] |
f91f0fd5 | 599 | self.osdspec_previews_refresh_queue = [] # type: List[str] |
f6b5b4d7 TL |
600 | |
601 | # host -> daemon name -> dict | |
e306af50 TL |
602 | self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]] |
603 | self.last_host_check = {} # type: Dict[str, datetime.datetime] | |
604 | self.loading_osdspec_preview = set() # type: Set[str] | |
b3b6e05e | 605 | self.last_client_files: Dict[str, Dict[str, Tuple[str, int, int, int]]] = {} |
f6b5b4d7 | 606 | self.registry_login_queue: Set[str] = set() |
e306af50 | 607 | |
f91f0fd5 TL |
608 | self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {} |
609 | ||
20effc67 TL |
610 | self.metadata_up_to_date = {} # type: Dict[str, bool] |
611 | ||
e306af50 TL |
612 | def load(self): |
613 | # type: () -> None | |
f67539c2 | 614 | for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items(): |
e306af50 | 615 | host = k[len(HOST_CACHE_PREFIX):] |
2a845540 TL |
616 | if self._get_host_cache_entry_status(host) != HostCacheStatus.host: |
617 | if self._get_host_cache_entry_status(host) == HostCacheStatus.devices: | |
618 | continue | |
e306af50 TL |
619 | self.mgr.log.warning('removing stray HostCache host record %s' % ( |
620 | host)) | |
621 | self.mgr.set_store(k, None) | |
622 | try: | |
623 | j = json.loads(v) | |
624 | if 'last_device_update' in j: | |
f91f0fd5 | 625 | self.last_device_update[host] = str_to_datetime(j['last_device_update']) |
e306af50 TL |
626 | else: |
627 | self.device_refresh_queue.append(host) | |
f67539c2 TL |
628 | if 'last_device_change' in j: |
629 | self.last_device_change[host] = str_to_datetime(j['last_device_change']) | |
e306af50 TL |
630 | # for services, we ignore the persisted last_*_update |
631 | # and always trigger a new scrape on mgr restart. | |
632 | self.daemon_refresh_queue.append(host) | |
20effc67 | 633 | self.network_refresh_queue.append(host) |
e306af50 TL |
634 | self.daemons[host] = {} |
635 | self.osdspec_previews[host] = [] | |
f67539c2 | 636 | self.osdspec_last_applied[host] = {} |
e306af50 TL |
637 | self.networks[host] = {} |
638 | self.daemon_config_deps[host] = {} | |
639 | for name, d in j.get('daemons', {}).items(): | |
640 | self.daemons[host][name] = \ | |
641 | orchestrator.DaemonDescription.from_json(d) | |
2a845540 TL |
642 | self.devices[host] = [] |
643 | # still want to check old device location for upgrade scenarios | |
e306af50 TL |
644 | for d in j.get('devices', []): |
645 | self.devices[host].append(inventory.Device.from_json(d)) | |
2a845540 | 646 | self.devices[host] += self.load_host_devices(host) |
f67539c2 | 647 | self.networks[host] = j.get('networks_and_interfaces', {}) |
e306af50 | 648 | self.osdspec_previews[host] = j.get('osdspec_previews', {}) |
b3b6e05e | 649 | self.last_client_files[host] = j.get('last_client_files', {}) |
f67539c2 TL |
650 | for name, ts in j.get('osdspec_last_applied', {}).items(): |
651 | self.osdspec_last_applied[host][name] = str_to_datetime(ts) | |
e306af50 TL |
652 | |
653 | for name, d in j.get('daemon_config_deps', {}).items(): | |
654 | self.daemon_config_deps[host][name] = { | |
655 | 'deps': d.get('deps', []), | |
f91f0fd5 | 656 | 'last_config': str_to_datetime(d['last_config']), |
e306af50 TL |
657 | } |
658 | if 'last_host_check' in j: | |
f91f0fd5 | 659 | self.last_host_check[host] = str_to_datetime(j['last_host_check']) |
2a845540 TL |
660 | if 'last_tuned_profile_update' in j: |
661 | self.last_tuned_profile_update[host] = str_to_datetime( | |
662 | j['last_tuned_profile_update']) | |
f6b5b4d7 | 663 | self.registry_login_queue.add(host) |
f91f0fd5 | 664 | self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {}) |
20effc67 | 665 | self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False) |
f91f0fd5 | 666 | |
e306af50 TL |
667 | self.mgr.log.debug( |
668 | 'HostCache.load: host %s has %d daemons, ' | |
669 | '%d devices, %d networks' % ( | |
670 | host, len(self.daemons[host]), len(self.devices[host]), | |
671 | len(self.networks[host]))) | |
672 | except Exception as e: | |
673 | self.mgr.log.warning('unable to load cached state for %s: %s' % ( | |
674 | host, e)) | |
675 | pass | |
676 | ||
2a845540 TL |
677 | def _get_host_cache_entry_status(self, host: str) -> HostCacheStatus: |
678 | # return whether a host cache entry in the config-key | |
679 | # store is for a host, a set of devices or is stray. | |
680 | # for a host, the entry name will match a hostname in our | |
681 | # inventory. For devices, it will be formatted | |
682 | # <hostname>.devices.<integer> where <hostname> is | |
683 | # in out inventory. If neither case applies, it is stray | |
684 | if host in self.mgr.inventory: | |
685 | return HostCacheStatus.host | |
686 | try: | |
687 | # try stripping off the ".devices.<integer>" and see if we get | |
688 | # a host name that matches our inventory | |
689 | actual_host = '.'.join(host.split('.')[:-2]) | |
690 | return HostCacheStatus.devices if actual_host in self.mgr.inventory else HostCacheStatus.stray | |
691 | except Exception: | |
692 | return HostCacheStatus.stray | |
693 | ||
e306af50 TL |
694 | def update_host_daemons(self, host, dm): |
695 | # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None | |
696 | self.daemons[host] = dm | |
1e59de90 | 697 | self._tmp_daemons.pop(host, {}) |
adb31ebb TL |
698 | self.last_daemon_update[host] = datetime_now() |
699 | ||
1e59de90 TL |
700 | def append_tmp_daemon(self, host: str, dd: orchestrator.DaemonDescription) -> None: |
701 | # for storing empty daemon descriptions representing daemons we have | |
702 | # just deployed but not yet had the chance to pick up in a daemon refresh | |
703 | # _tmp_daemons is cleared for a host upon receiving a real update of the | |
704 | # host's dameons | |
705 | if host not in self._tmp_daemons: | |
706 | self._tmp_daemons[host] = {} | |
707 | self._tmp_daemons[host][dd.name()] = dd | |
708 | ||
adb31ebb TL |
709 | def update_host_facts(self, host, facts): |
710 | # type: (str, Dict[str, Dict[str, Any]]) -> None | |
711 | self.facts[host] = facts | |
1e59de90 TL |
712 | hostnames: List[str] = [] |
713 | for k in ['hostname', 'shortname', 'fqdn']: | |
714 | v = facts.get(k, '') | |
715 | hostnames.append(v if isinstance(v, str) else '') | |
716 | self.mgr.inventory.update_known_hostnames(hostnames[0], hostnames[1], hostnames[2]) | |
f67539c2 TL |
717 | self.last_facts_update[host] = datetime_now() |
718 | ||
b3b6e05e TL |
719 | def update_autotune(self, host: str) -> None: |
720 | self.last_autotune[host] = datetime_now() | |
721 | ||
722 | def invalidate_autotune(self, host: str) -> None: | |
723 | if host in self.last_autotune: | |
724 | del self.last_autotune[host] | |
725 | ||
f67539c2 | 726 | def devices_changed(self, host: str, b: List[inventory.Device]) -> bool: |
39ae355f TL |
727 | old_devs = inventory.Devices(self.devices[host]) |
728 | new_devs = inventory.Devices(b) | |
729 | # relying on Devices class __eq__ function here | |
730 | if old_devs != new_devs: | |
f67539c2 TL |
731 | self.mgr.log.info("Detected new or changed devices on %s" % host) |
732 | return True | |
733 | return False | |
e306af50 | 734 | |
20effc67 | 735 | def update_host_devices( |
f67539c2 TL |
736 | self, |
737 | host: str, | |
738 | dls: List[inventory.Device], | |
f67539c2 TL |
739 | ) -> None: |
740 | if ( | |
741 | host not in self.devices | |
742 | or host not in self.last_device_change | |
743 | or self.devices_changed(host, dls) | |
744 | ): | |
745 | self.last_device_change[host] = datetime_now() | |
746 | self.last_device_update[host] = datetime_now() | |
e306af50 | 747 | self.devices[host] = dls |
20effc67 TL |
748 | |
749 | def update_host_networks( | |
750 | self, | |
751 | host: str, | |
752 | nets: Dict[str, Dict[str, List[str]]] | |
753 | ) -> None: | |
e306af50 | 754 | self.networks[host] = nets |
20effc67 | 755 | self.last_network_update[host] = datetime_now() |
e306af50 | 756 | |
adb31ebb | 757 | def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None: |
e306af50 TL |
758 | self.daemon_config_deps[host][name] = { |
759 | 'deps': deps, | |
760 | 'last_config': stamp, | |
761 | } | |
762 | ||
763 | def update_last_host_check(self, host): | |
764 | # type: (str) -> None | |
adb31ebb | 765 | self.last_host_check[host] = datetime_now() |
e306af50 | 766 | |
f67539c2 TL |
767 | def update_osdspec_last_applied(self, host, service_name, ts): |
768 | # type: (str, str, datetime.datetime) -> None | |
769 | self.osdspec_last_applied[host][service_name] = ts | |
770 | ||
b3b6e05e TL |
771 | def update_client_file(self, |
772 | host: str, | |
773 | path: str, | |
774 | digest: str, | |
775 | mode: int, | |
776 | uid: int, | |
777 | gid: int) -> None: | |
778 | if host not in self.last_client_files: | |
779 | self.last_client_files[host] = {} | |
780 | self.last_client_files[host][path] = (digest, mode, uid, gid) | |
781 | ||
782 | def removed_client_file(self, host: str, path: str) -> None: | |
783 | if ( | |
784 | host in self.last_client_files | |
785 | and path in self.last_client_files[host] | |
786 | ): | |
787 | del self.last_client_files[host][path] | |
788 | ||
e306af50 TL |
789 | def prime_empty_host(self, host): |
790 | # type: (str) -> None | |
791 | """ | |
792 | Install an empty entry for a host | |
793 | """ | |
794 | self.daemons[host] = {} | |
795 | self.devices[host] = [] | |
796 | self.networks[host] = {} | |
797 | self.osdspec_previews[host] = [] | |
f67539c2 | 798 | self.osdspec_last_applied[host] = {} |
e306af50 TL |
799 | self.daemon_config_deps[host] = {} |
800 | self.daemon_refresh_queue.append(host) | |
801 | self.device_refresh_queue.append(host) | |
20effc67 | 802 | self.network_refresh_queue.append(host) |
e306af50 | 803 | self.osdspec_previews_refresh_queue.append(host) |
f6b5b4d7 | 804 | self.registry_login_queue.add(host) |
b3b6e05e | 805 | self.last_client_files[host] = {} |
e306af50 | 806 | |
a4b75251 TL |
807 | def refresh_all_host_info(self, host): |
808 | # type: (str) -> None | |
809 | ||
810 | self.last_host_check.pop(host, None) | |
811 | self.daemon_refresh_queue.append(host) | |
812 | self.registry_login_queue.add(host) | |
813 | self.device_refresh_queue.append(host) | |
814 | self.last_facts_update.pop(host, None) | |
815 | self.osdspec_previews_refresh_queue.append(host) | |
816 | self.last_autotune.pop(host, None) | |
817 | ||
e306af50 TL |
818 | def invalidate_host_daemons(self, host): |
819 | # type: (str) -> None | |
820 | self.daemon_refresh_queue.append(host) | |
821 | if host in self.last_daemon_update: | |
822 | del self.last_daemon_update[host] | |
823 | self.mgr.event.set() | |
824 | ||
825 | def invalidate_host_devices(self, host): | |
826 | # type: (str) -> None | |
827 | self.device_refresh_queue.append(host) | |
828 | if host in self.last_device_update: | |
829 | del self.last_device_update[host] | |
830 | self.mgr.event.set() | |
f91f0fd5 | 831 | |
20effc67 TL |
832 | def invalidate_host_networks(self, host): |
833 | # type: (str) -> None | |
834 | self.network_refresh_queue.append(host) | |
835 | if host in self.last_network_update: | |
836 | del self.last_network_update[host] | |
837 | self.mgr.event.set() | |
838 | ||
adb31ebb | 839 | def distribute_new_registry_login_info(self) -> None: |
f6b5b4d7 | 840 | self.registry_login_queue = set(self.mgr.inventory.keys()) |
e306af50 | 841 | |
f91f0fd5 TL |
842 | def save_host(self, host: str) -> None: |
843 | j: Dict[str, Any] = { | |
e306af50 TL |
844 | 'daemons': {}, |
845 | 'devices': [], | |
846 | 'osdspec_previews': [], | |
f67539c2 | 847 | 'osdspec_last_applied': {}, |
e306af50 TL |
848 | 'daemon_config_deps': {}, |
849 | } | |
850 | if host in self.last_daemon_update: | |
f91f0fd5 | 851 | j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host]) |
e306af50 | 852 | if host in self.last_device_update: |
f91f0fd5 | 853 | j['last_device_update'] = datetime_to_str(self.last_device_update[host]) |
20effc67 TL |
854 | if host in self.last_network_update: |
855 | j['last_network_update'] = datetime_to_str(self.last_network_update[host]) | |
f67539c2 TL |
856 | if host in self.last_device_change: |
857 | j['last_device_change'] = datetime_to_str(self.last_device_change[host]) | |
2a845540 TL |
858 | if host in self.last_tuned_profile_update: |
859 | j['last_tuned_profile_update'] = datetime_to_str(self.last_tuned_profile_update[host]) | |
adb31ebb TL |
860 | if host in self.daemons: |
861 | for name, dd in self.daemons[host].items(): | |
862 | j['daemons'][name] = dd.to_json() | |
adb31ebb | 863 | if host in self.networks: |
f67539c2 | 864 | j['networks_and_interfaces'] = self.networks[host] |
adb31ebb TL |
865 | if host in self.daemon_config_deps: |
866 | for name, depi in self.daemon_config_deps[host].items(): | |
867 | j['daemon_config_deps'][name] = { | |
868 | 'deps': depi.get('deps', []), | |
869 | 'last_config': datetime_to_str(depi['last_config']), | |
870 | } | |
871 | if host in self.osdspec_previews and self.osdspec_previews[host]: | |
e306af50 | 872 | j['osdspec_previews'] = self.osdspec_previews[host] |
f67539c2 TL |
873 | if host in self.osdspec_last_applied: |
874 | for name, ts in self.osdspec_last_applied[host].items(): | |
875 | j['osdspec_last_applied'][name] = datetime_to_str(ts) | |
e306af50 TL |
876 | |
877 | if host in self.last_host_check: | |
f91f0fd5 | 878 | j['last_host_check'] = datetime_to_str(self.last_host_check[host]) |
f6b5b4d7 | 879 | |
b3b6e05e TL |
880 | if host in self.last_client_files: |
881 | j['last_client_files'] = self.last_client_files[host] | |
adb31ebb | 882 | if host in self.scheduled_daemon_actions: |
f91f0fd5 | 883 | j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host] |
20effc67 TL |
884 | if host in self.metadata_up_to_date: |
885 | j['metadata_up_to_date'] = self.metadata_up_to_date[host] | |
2a845540 TL |
886 | if host in self.devices: |
887 | self.save_host_devices(host) | |
f6b5b4d7 | 888 | |
e306af50 TL |
889 | self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) |
890 | ||
2a845540 TL |
891 | def save_host_devices(self, host: str) -> None: |
892 | if host not in self.devices or not self.devices[host]: | |
893 | logger.debug(f'Host {host} has no devices to save') | |
894 | return | |
895 | ||
896 | devs: List[Dict[str, Any]] = [] | |
897 | for d in self.devices[host]: | |
898 | devs.append(d.to_json()) | |
899 | ||
900 | def byte_len(s: str) -> int: | |
901 | return len(s.encode('utf-8')) | |
902 | ||
903 | dev_cache_counter: int = 0 | |
904 | cache_size: int = self.mgr.get_foreign_ceph_option('mon', 'mon_config_key_max_entry_size') | |
905 | if cache_size is not None and cache_size != 0 and byte_len(json.dumps(devs)) > cache_size - 1024: | |
906 | # no guarantee all device entries take up the same amount of space | |
907 | # splitting it up so there's one more entry than we need should be fairly | |
908 | # safe and save a lot of extra logic checking sizes | |
909 | cache_entries_needed = math.ceil(byte_len(json.dumps(devs)) / cache_size) + 1 | |
910 | dev_sublist_size = math.ceil(len(devs) / cache_entries_needed) | |
911 | dev_lists: List[List[Dict[str, Any]]] = [devs[i:i + dev_sublist_size] | |
912 | for i in range(0, len(devs), dev_sublist_size)] | |
913 | for dev_list in dev_lists: | |
914 | dev_dict: Dict[str, Any] = {'devices': dev_list} | |
915 | if dev_cache_counter == 0: | |
916 | dev_dict.update({'entries': len(dev_lists)}) | |
917 | self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.' | |
918 | + str(dev_cache_counter), json.dumps(dev_dict)) | |
919 | dev_cache_counter += 1 | |
920 | else: | |
921 | self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.' | |
922 | + str(dev_cache_counter), json.dumps({'devices': devs, 'entries': 1})) | |
923 | ||
924 | def load_host_devices(self, host: str) -> List[inventory.Device]: | |
925 | dev_cache_counter: int = 0 | |
926 | devs: List[Dict[str, Any]] = [] | |
927 | dev_entries: int = 0 | |
928 | try: | |
929 | # number of entries for the host's devices should be in | |
930 | # the "entries" field of the first entry | |
931 | dev_entries = json.loads(self.mgr.get_store( | |
932 | HOST_CACHE_PREFIX + host + '.devices.0')).get('entries') | |
933 | except Exception: | |
934 | logger.debug(f'No device entries found for host {host}') | |
935 | for i in range(dev_entries): | |
936 | try: | |
937 | new_devs = json.loads(self.mgr.get_store( | |
938 | HOST_CACHE_PREFIX + host + '.devices.' + str(i))).get('devices', []) | |
939 | if len(new_devs) > 0: | |
940 | # verify list contains actual device objects by trying to load one from json | |
941 | inventory.Device.from_json(new_devs[0]) | |
942 | # if we didn't throw an Exception on above line, we can add the devices | |
943 | devs = devs + new_devs | |
944 | dev_cache_counter += 1 | |
945 | except Exception as e: | |
946 | logger.error(('Hit exception trying to load devices from ' | |
947 | + f'{HOST_CACHE_PREFIX + host + ".devices." + str(dev_cache_counter)} in key store: {e}')) | |
948 | return [] | |
949 | return [inventory.Device.from_json(d) for d in devs] | |
950 | ||
e306af50 TL |
951 | def rm_host(self, host): |
952 | # type: (str) -> None | |
953 | if host in self.daemons: | |
954 | del self.daemons[host] | |
955 | if host in self.devices: | |
956 | del self.devices[host] | |
adb31ebb TL |
957 | if host in self.facts: |
958 | del self.facts[host] | |
959 | if host in self.last_facts_update: | |
960 | del self.last_facts_update[host] | |
b3b6e05e TL |
961 | if host in self.last_autotune: |
962 | del self.last_autotune[host] | |
e306af50 TL |
963 | if host in self.osdspec_previews: |
964 | del self.osdspec_previews[host] | |
f67539c2 TL |
965 | if host in self.osdspec_last_applied: |
966 | del self.osdspec_last_applied[host] | |
e306af50 TL |
967 | if host in self.loading_osdspec_preview: |
968 | self.loading_osdspec_preview.remove(host) | |
969 | if host in self.networks: | |
970 | del self.networks[host] | |
971 | if host in self.last_daemon_update: | |
972 | del self.last_daemon_update[host] | |
973 | if host in self.last_device_update: | |
974 | del self.last_device_update[host] | |
20effc67 TL |
975 | if host in self.last_network_update: |
976 | del self.last_network_update[host] | |
f67539c2 TL |
977 | if host in self.last_device_change: |
978 | del self.last_device_change[host] | |
2a845540 TL |
979 | if host in self.last_tuned_profile_update: |
980 | del self.last_tuned_profile_update[host] | |
e306af50 TL |
981 | if host in self.daemon_config_deps: |
982 | del self.daemon_config_deps[host] | |
f91f0fd5 TL |
983 | if host in self.scheduled_daemon_actions: |
984 | del self.scheduled_daemon_actions[host] | |
b3b6e05e TL |
985 | if host in self.last_client_files: |
986 | del self.last_client_files[host] | |
e306af50 TL |
987 | self.mgr.set_store(HOST_CACHE_PREFIX + host, None) |
988 | ||
989 | def get_hosts(self): | |
990 | # type: () -> List[str] | |
20effc67 TL |
991 | return list(self.daemons) |
992 | ||
993 | def get_schedulable_hosts(self) -> List[HostSpec]: | |
994 | """ | |
995 | Returns all usable hosts that went through _refresh_host_daemons(). | |
996 | ||
997 | This mitigates a potential race, where new host was added *after* | |
998 | ``_refresh_host_daemons()`` was called, but *before* | |
999 | ``_apply_all_specs()`` was called. thus we end up with a hosts | |
1000 | where daemons might be running, but we have not yet detected them. | |
1001 | """ | |
1002 | return [ | |
1003 | h for h in self.mgr.inventory.all_specs() | |
1004 | if ( | |
1005 | self.host_had_daemon_refresh(h.hostname) | |
1006 | and '_no_schedule' not in h.labels | |
1007 | ) | |
1008 | ] | |
1009 | ||
1010 | def get_non_draining_hosts(self) -> List[HostSpec]: | |
1011 | """ | |
1012 | Returns all hosts that do not have _no_schedule label. | |
1013 | ||
1014 | Useful for the agent who needs this specific list rather than the | |
1015 | schedulable_hosts since the agent needs to be deployed on hosts with | |
1016 | no daemon refresh | |
1017 | """ | |
1018 | return [ | |
1019 | h for h in self.mgr.inventory.all_specs() if '_no_schedule' not in h.labels | |
1020 | ] | |
1021 | ||
2a845540 TL |
1022 | def get_draining_hosts(self) -> List[HostSpec]: |
1023 | """ | |
1024 | Returns all hosts that have _no_schedule label and therefore should have | |
1025 | no daemons placed on them, but are potentially still reachable | |
1026 | """ | |
1027 | return [ | |
1028 | h for h in self.mgr.inventory.all_specs() if '_no_schedule' in h.labels | |
1029 | ] | |
1030 | ||
20effc67 TL |
1031 | def get_unreachable_hosts(self) -> List[HostSpec]: |
1032 | """ | |
1033 | Return all hosts that are offline or in maintenance mode. | |
1034 | ||
1035 | The idea is we should not touch the daemons on these hosts (since | |
1036 | in theory the hosts are inaccessible so we CAN'T touch them) but | |
1037 | we still want to count daemons that exist on these hosts toward the | |
1038 | placement so daemons on these hosts aren't just moved elsewhere | |
1039 | """ | |
1040 | return [ | |
1041 | h for h in self.mgr.inventory.all_specs() | |
1042 | if ( | |
1043 | h.status.lower() in ['maintenance', 'offline'] | |
1044 | or h.hostname in self.mgr.offline_hosts | |
1045 | ) | |
1046 | ] | |
e306af50 | 1047 | |
b3b6e05e TL |
1048 | def get_facts(self, host: str) -> Dict[str, Any]: |
1049 | return self.facts.get(host, {}) | |
1050 | ||
20effc67 TL |
1051 | def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]: |
1052 | for dm in self.daemons.copy().values(): | |
1053 | yield from dm.values() | |
1054 | ||
1e59de90 TL |
1055 | def _get_tmp_daemons(self) -> Iterator[orchestrator.DaemonDescription]: |
1056 | for dm in self._tmp_daemons.copy().values(): | |
1057 | yield from dm.values() | |
1058 | ||
e306af50 TL |
1059 | def get_daemons(self): |
1060 | # type: () -> List[orchestrator.DaemonDescription] | |
20effc67 TL |
1061 | return list(self._get_daemons()) |
1062 | ||
1063 | def get_error_daemons(self) -> List[orchestrator.DaemonDescription]: | |
e306af50 | 1064 | r = [] |
20effc67 TL |
1065 | for dd in self._get_daemons(): |
1066 | if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error: | |
e306af50 TL |
1067 | r.append(dd) |
1068 | return r | |
1069 | ||
b3b6e05e TL |
1070 | def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]: |
1071 | return list(self.daemons.get(host, {}).values()) | |
1072 | ||
20effc67 | 1073 | def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription: |
f67539c2 | 1074 | assert not daemon_name.startswith('ha-rgw.') |
20effc67 TL |
1075 | dds = self.get_daemons_by_host(host) if host else self._get_daemons() |
1076 | for dd in dds: | |
1077 | if dd.name() == daemon_name: | |
1078 | return dd | |
1079 | ||
f6b5b4d7 TL |
1080 | raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)') |
1081 | ||
20effc67 | 1082 | def has_daemon(self, daemon_name: str, host: Optional[str] = None) -> bool: |
a4b75251 | 1083 | try: |
20effc67 | 1084 | self.get_daemon(daemon_name, host) |
a4b75251 TL |
1085 | except orchestrator.OrchestratorError: |
1086 | return False | |
1087 | return True | |
1088 | ||
e306af50 | 1089 | def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]: |
adb31ebb | 1090 | def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription: |
f6b5b4d7 | 1091 | dd = copy(dd_orig) |
e306af50 | 1092 | if host in self.mgr.offline_hosts: |
f67539c2 | 1093 | dd.status = orchestrator.DaemonDescriptionStatus.error |
f6b5b4d7 | 1094 | dd.status_desc = 'host is offline' |
b3b6e05e TL |
1095 | elif self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance": |
1096 | # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses | |
1097 | # could be wrong. We must assume maintenance is working and daemons are stopped | |
1098 | dd.status = orchestrator.DaemonDescriptionStatus.stopped | |
f6b5b4d7 TL |
1099 | dd.events = self.mgr.events.get_for_daemon(dd.name()) |
1100 | return dd | |
1101 | ||
20effc67 | 1102 | for host, dm in self.daemons.copy().items(): |
f6b5b4d7 | 1103 | yield host, {name: alter(host, d) for name, d in dm.items()} |
e306af50 TL |
1104 | |
1105 | def get_daemons_by_service(self, service_name): | |
1106 | # type: (str) -> List[orchestrator.DaemonDescription] | |
f67539c2 TL |
1107 | assert not service_name.startswith('keepalived.') |
1108 | assert not service_name.startswith('haproxy.') | |
1109 | ||
20effc67 | 1110 | return list(dd for dd in self._get_daemons() if dd.service_name() == service_name) |
f6b5b4d7 | 1111 | |
1e59de90 TL |
1112 | def get_related_service_daemons(self, service_spec: ServiceSpec) -> Optional[List[orchestrator.DaemonDescription]]: |
1113 | if service_spec.service_type == 'ingress': | |
1114 | dds = list(dd for dd in self._get_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service) | |
1115 | dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == cast(IngressSpec, service_spec).backend_service) | |
1116 | logger.debug(f'Found related daemons {dds} for service {service_spec.service_name()}') | |
1117 | return dds | |
1118 | else: | |
1119 | for ingress_spec in [cast(IngressSpec, s) for s in self.mgr.spec_store.active_specs.values() if s.service_type == 'ingress']: | |
1120 | if ingress_spec.backend_service == service_spec.service_name(): | |
1121 | dds = list(dd for dd in self._get_daemons() if dd.service_name() == ingress_spec.service_name()) | |
1122 | dds += list(dd for dd in self._get_tmp_daemons() if dd.service_name() == ingress_spec.service_name()) | |
1123 | logger.debug(f'Found related daemons {dds} for service {service_spec.service_name()}') | |
1124 | return dds | |
1125 | return None | |
1126 | ||
20effc67 | 1127 | def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]: |
f67539c2 TL |
1128 | assert service_type not in ['keepalived', 'haproxy'] |
1129 | ||
20effc67 | 1130 | daemons = self.daemons[host].values() if host else self._get_daemons() |
e306af50 | 1131 | |
20effc67 TL |
1132 | return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)] |
1133 | ||
1134 | def get_daemon_types(self, hostname: str) -> Set[str]: | |
f67539c2 | 1135 | """Provide a list of the types of daemons on the host""" |
20effc67 | 1136 | return cast(Set[str], {d.daemon_type for d in self.daemons[hostname].values()}) |
f67539c2 | 1137 | |
e306af50 TL |
1138 | def get_daemon_names(self): |
1139 | # type: () -> List[str] | |
20effc67 | 1140 | return [d.name() for d in self._get_daemons()] |
e306af50 | 1141 | |
adb31ebb | 1142 | def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]: |
e306af50 TL |
1143 | if host in self.daemon_config_deps: |
1144 | if name in self.daemon_config_deps[host]: | |
1145 | return self.daemon_config_deps[host][name].get('deps', []), \ | |
1146 | self.daemon_config_deps[host][name].get('last_config', None) | |
1147 | return None, None | |
1148 | ||
b3b6e05e TL |
1149 | def get_host_client_files(self, host: str) -> Dict[str, Tuple[str, int, int, int]]: |
1150 | return self.last_client_files.get(host, {}) | |
1151 | ||
e306af50 TL |
1152 | def host_needs_daemon_refresh(self, host): |
1153 | # type: (str) -> bool | |
1154 | if host in self.mgr.offline_hosts: | |
1155 | logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh') | |
1156 | return False | |
1157 | if host in self.daemon_refresh_queue: | |
1158 | self.daemon_refresh_queue.remove(host) | |
1159 | return True | |
adb31ebb | 1160 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
1161 | seconds=self.mgr.daemon_cache_timeout) |
1162 | if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff: | |
1163 | return True | |
20effc67 TL |
1164 | if not self.mgr.cache.host_metadata_up_to_date(host): |
1165 | return True | |
e306af50 TL |
1166 | return False |
1167 | ||
adb31ebb TL |
1168 | def host_needs_facts_refresh(self, host): |
1169 | # type: (str) -> bool | |
1170 | if host in self.mgr.offline_hosts: | |
1171 | logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh') | |
1172 | return False | |
f67539c2 | 1173 | cutoff = datetime_now() - datetime.timedelta( |
adb31ebb TL |
1174 | seconds=self.mgr.facts_cache_timeout) |
1175 | if host not in self.last_facts_update or self.last_facts_update[host] < cutoff: | |
1176 | return True | |
20effc67 TL |
1177 | if not self.mgr.cache.host_metadata_up_to_date(host): |
1178 | return True | |
adb31ebb TL |
1179 | return False |
1180 | ||
b3b6e05e TL |
1181 | def host_needs_autotune_memory(self, host): |
1182 | # type: (str) -> bool | |
1183 | if host in self.mgr.offline_hosts: | |
1184 | logger.debug(f'Host "{host}" marked as offline. Skipping autotune') | |
1185 | return False | |
1186 | cutoff = datetime_now() - datetime.timedelta( | |
1187 | seconds=self.mgr.autotune_interval) | |
1188 | if host not in self.last_autotune or self.last_autotune[host] < cutoff: | |
1189 | return True | |
1190 | return False | |
1191 | ||
2a845540 TL |
1192 | def host_needs_tuned_profile_update(self, host: str, profile: str) -> bool: |
1193 | if host in self.mgr.offline_hosts: | |
1194 | logger.debug(f'Host "{host}" marked as offline. Cannot apply tuned profile') | |
1195 | return False | |
1196 | if profile not in self.mgr.tuned_profiles: | |
1197 | logger.debug( | |
1198 | f'Cannot apply tuned profile {profile} on host {host}. Profile does not exist') | |
1199 | return False | |
1200 | if host not in self.last_tuned_profile_update: | |
1201 | return True | |
1202 | last_profile_update = self.mgr.tuned_profiles.last_updated(profile) | |
1203 | if last_profile_update is None: | |
1204 | self.mgr.tuned_profiles.set_last_updated(profile, datetime_now()) | |
1205 | return True | |
1206 | if self.last_tuned_profile_update[host] < last_profile_update: | |
1207 | return True | |
1208 | return False | |
1209 | ||
f91f0fd5 TL |
1210 | def host_had_daemon_refresh(self, host: str) -> bool: |
1211 | """ | |
1212 | ... at least once. | |
1213 | """ | |
1214 | if host in self.last_daemon_update: | |
1215 | return True | |
1216 | if host not in self.daemons: | |
1217 | return False | |
1218 | return bool(self.daemons[host]) | |
1219 | ||
e306af50 TL |
1220 | def host_needs_device_refresh(self, host): |
1221 | # type: (str) -> bool | |
1222 | if host in self.mgr.offline_hosts: | |
1223 | logger.debug(f'Host "{host}" marked as offline. Skipping device refresh') | |
1224 | return False | |
1225 | if host in self.device_refresh_queue: | |
1226 | self.device_refresh_queue.remove(host) | |
1227 | return True | |
adb31ebb | 1228 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
1229 | seconds=self.mgr.device_cache_timeout) |
1230 | if host not in self.last_device_update or self.last_device_update[host] < cutoff: | |
1231 | return True | |
20effc67 TL |
1232 | if not self.mgr.cache.host_metadata_up_to_date(host): |
1233 | return True | |
1234 | return False | |
1235 | ||
1236 | def host_needs_network_refresh(self, host): | |
1237 | # type: (str) -> bool | |
1238 | if host in self.mgr.offline_hosts: | |
1239 | logger.debug(f'Host "{host}" marked as offline. Skipping network refresh') | |
1240 | return False | |
1241 | if host in self.network_refresh_queue: | |
1242 | self.network_refresh_queue.remove(host) | |
1243 | return True | |
1244 | cutoff = datetime_now() - datetime.timedelta( | |
1245 | seconds=self.mgr.device_cache_timeout) | |
1246 | if host not in self.last_network_update or self.last_network_update[host] < cutoff: | |
1247 | return True | |
1248 | if not self.mgr.cache.host_metadata_up_to_date(host): | |
1249 | return True | |
e306af50 TL |
1250 | return False |
1251 | ||
adb31ebb | 1252 | def host_needs_osdspec_preview_refresh(self, host: str) -> bool: |
e306af50 TL |
1253 | if host in self.mgr.offline_hosts: |
1254 | logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh') | |
1255 | return False | |
1256 | if host in self.osdspec_previews_refresh_queue: | |
1257 | self.osdspec_previews_refresh_queue.remove(host) | |
1258 | return True | |
1259 | # Since this is dependent on other factors (device and spec) this does not need | |
1260 | # to be updated periodically. | |
1261 | return False | |
1262 | ||
1263 | def host_needs_check(self, host): | |
1264 | # type: (str) -> bool | |
adb31ebb | 1265 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
1266 | seconds=self.mgr.host_check_interval) |
1267 | return host not in self.last_host_check or self.last_host_check[host] < cutoff | |
1268 | ||
f67539c2 TL |
1269 | def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool: |
1270 | if ( | |
1271 | host not in self.devices | |
1272 | or host not in self.last_device_change | |
1273 | or host not in self.last_device_update | |
1274 | or host not in self.osdspec_last_applied | |
1275 | or spec.service_name() not in self.osdspec_last_applied[host] | |
1276 | ): | |
1277 | return True | |
1278 | created = self.mgr.spec_store.get_created(spec) | |
1279 | if not created or created > self.last_device_change[host]: | |
1280 | return True | |
1281 | return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host] | |
1282 | ||
f91f0fd5 | 1283 | def host_needs_registry_login(self, host: str) -> bool: |
f6b5b4d7 TL |
1284 | if host in self.mgr.offline_hosts: |
1285 | return False | |
1286 | if host in self.registry_login_queue: | |
1287 | self.registry_login_queue.remove(host) | |
1288 | return True | |
1289 | return False | |
1290 | ||
20effc67 TL |
1291 | def host_metadata_up_to_date(self, host: str) -> bool: |
1292 | if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]: | |
1293 | return False | |
1294 | return True | |
1295 | ||
1296 | def all_host_metadata_up_to_date(self) -> bool: | |
1297 | unreachables = [h.hostname for h in self.get_unreachable_hosts()] | |
1298 | if [h for h in self.get_hosts() if (not self.host_metadata_up_to_date(h) and h not in unreachables)]: | |
1299 | # this function is primarily for telling if it's safe to try and apply a service | |
1300 | # spec. Since offline/maintenance hosts aren't considered in that process anyway | |
1301 | # we don't want to return False if the host without up-to-date metadata is in one | |
1302 | # of those two categories. | |
1303 | return False | |
1304 | return True | |
1305 | ||
e306af50 TL |
1306 | def add_daemon(self, host, dd): |
1307 | # type: (str, orchestrator.DaemonDescription) -> None | |
1308 | assert host in self.daemons | |
1309 | self.daemons[host][dd.name()] = dd | |
1310 | ||
adb31ebb | 1311 | def rm_daemon(self, host: str, name: str) -> None: |
f67539c2 TL |
1312 | assert not name.startswith('ha-rgw.') |
1313 | ||
e306af50 TL |
1314 | if host in self.daemons: |
1315 | if name in self.daemons[host]: | |
f6b5b4d7 TL |
1316 | del self.daemons[host][name] |
1317 | ||
adb31ebb | 1318 | def daemon_cache_filled(self) -> bool: |
f6b5b4d7 TL |
1319 | """ |
1320 | i.e. we have checked the daemons for each hosts at least once. | |
1321 | excluding offline hosts. | |
1322 | ||
1323 | We're not checking for `host_needs_daemon_refresh`, as this might never be | |
1324 | False for all hosts. | |
1325 | """ | |
f91f0fd5 | 1326 | return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts) |
f6b5b4d7 TL |
1327 | for h in self.get_hosts()) |
1328 | ||
adb31ebb | 1329 | def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None: |
f67539c2 TL |
1330 | assert not daemon_name.startswith('ha-rgw.') |
1331 | ||
f91f0fd5 TL |
1332 | priorities = { |
1333 | 'start': 1, | |
1334 | 'restart': 2, | |
1335 | 'reconfig': 3, | |
1336 | 'redeploy': 4, | |
1337 | 'stop': 5, | |
39ae355f | 1338 | 'rotate-key': 6, |
f91f0fd5 TL |
1339 | } |
1340 | existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None) | |
1341 | if existing_action and priorities[existing_action] > priorities[action]: | |
1342 | logger.debug( | |
1343 | f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.') | |
1344 | return | |
1345 | ||
1346 | if host not in self.scheduled_daemon_actions: | |
1347 | self.scheduled_daemon_actions[host] = {} | |
1348 | self.scheduled_daemon_actions[host][daemon_name] = action | |
1349 | ||
20effc67 TL |
1350 | def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> bool: |
1351 | found = False | |
f91f0fd5 TL |
1352 | if host in self.scheduled_daemon_actions: |
1353 | if daemon_name in self.scheduled_daemon_actions[host]: | |
1354 | del self.scheduled_daemon_actions[host][daemon_name] | |
20effc67 | 1355 | found = True |
f91f0fd5 TL |
1356 | if not self.scheduled_daemon_actions[host]: |
1357 | del self.scheduled_daemon_actions[host] | |
20effc67 | 1358 | return found |
f91f0fd5 | 1359 | |
adb31ebb | 1360 | def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]: |
f67539c2 TL |
1361 | assert not daemon.startswith('ha-rgw.') |
1362 | ||
f91f0fd5 TL |
1363 | return self.scheduled_daemon_actions.get(host, {}).get(daemon) |
1364 | ||
f6b5b4d7 | 1365 | |
20effc67 TL |
1366 | class AgentCache(): |
1367 | """ | |
1368 | AgentCache is used for storing metadata about agent daemons that must be kept | |
1369 | through MGR failovers | |
1370 | """ | |
1371 | ||
1372 | def __init__(self, mgr): | |
1373 | # type: (CephadmOrchestrator) -> None | |
1374 | self.mgr: CephadmOrchestrator = mgr | |
1375 | self.agent_config_deps = {} # type: Dict[str, Dict[str,Any]] | |
1376 | self.agent_counter = {} # type: Dict[str, int] | |
1377 | self.agent_timestamp = {} # type: Dict[str, datetime.datetime] | |
1378 | self.agent_keys = {} # type: Dict[str, str] | |
1379 | self.agent_ports = {} # type: Dict[str, int] | |
1380 | self.sending_agent_message = {} # type: Dict[str, bool] | |
1381 | ||
1382 | def load(self): | |
1383 | # type: () -> None | |
1384 | for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items(): | |
1385 | host = k[len(AGENT_CACHE_PREFIX):] | |
1386 | if host not in self.mgr.inventory: | |
1387 | self.mgr.log.warning('removing stray AgentCache record for agent on %s' % ( | |
1388 | host)) | |
1389 | self.mgr.set_store(k, None) | |
1390 | try: | |
1391 | j = json.loads(v) | |
1392 | self.agent_config_deps[host] = {} | |
1393 | conf_deps = j.get('agent_config_deps', {}) | |
1394 | if conf_deps: | |
1395 | conf_deps['last_config'] = str_to_datetime(conf_deps['last_config']) | |
1396 | self.agent_config_deps[host] = conf_deps | |
1397 | self.agent_counter[host] = int(j.get('agent_counter', 1)) | |
1398 | self.agent_timestamp[host] = str_to_datetime( | |
1399 | j.get('agent_timestamp', datetime_to_str(datetime_now()))) | |
1400 | self.agent_keys[host] = str(j.get('agent_keys', '')) | |
1401 | agent_port = int(j.get('agent_ports', 0)) | |
1402 | if agent_port: | |
1403 | self.agent_ports[host] = agent_port | |
1404 | ||
1405 | except Exception as e: | |
1406 | self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % ( | |
1407 | host, e)) | |
1408 | pass | |
1409 | ||
1410 | def save_agent(self, host: str) -> None: | |
1411 | j: Dict[str, Any] = {} | |
1412 | if host in self.agent_config_deps: | |
1413 | j['agent_config_deps'] = { | |
1414 | 'deps': self.agent_config_deps[host].get('deps', []), | |
1415 | 'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']), | |
1416 | } | |
1417 | if host in self.agent_counter: | |
1418 | j['agent_counter'] = self.agent_counter[host] | |
1419 | if host in self.agent_keys: | |
1420 | j['agent_keys'] = self.agent_keys[host] | |
1421 | if host in self.agent_ports: | |
1422 | j['agent_ports'] = self.agent_ports[host] | |
1423 | if host in self.agent_timestamp: | |
1424 | j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host]) | |
1425 | ||
1426 | self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j)) | |
1427 | ||
1428 | def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None: | |
1429 | self.agent_config_deps[host] = { | |
1430 | 'deps': deps, | |
1431 | 'last_config': stamp, | |
1432 | } | |
1433 | ||
1434 | def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]: | |
1435 | if host in self.agent_config_deps: | |
1436 | return self.agent_config_deps[host].get('deps', []), \ | |
1437 | self.agent_config_deps[host].get('last_config', None) | |
1438 | return None, None | |
1439 | ||
1440 | def messaging_agent(self, host: str) -> bool: | |
1441 | if host not in self.sending_agent_message or not self.sending_agent_message[host]: | |
1442 | return False | |
1443 | return True | |
1444 | ||
1445 | def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None: | |
1446 | # agent successfully received new config. Update config/deps | |
1447 | assert daemon_spec.service_name == 'agent' | |
1448 | self.update_agent_config_deps( | |
1449 | daemon_spec.host, daemon_spec.deps, datetime_now()) | |
1450 | self.agent_timestamp[daemon_spec.host] = datetime_now() | |
1451 | self.agent_counter[daemon_spec.host] = 1 | |
1452 | self.save_agent(daemon_spec.host) | |
1453 | ||
1454 | ||
f6b5b4d7 TL |
1455 | class EventStore(): |
1456 | def __init__(self, mgr): | |
1457 | # type: (CephadmOrchestrator) -> None | |
1458 | self.mgr: CephadmOrchestrator = mgr | |
f91f0fd5 | 1459 | self.events = {} # type: Dict[str, List[OrchestratorEvent]] |
f6b5b4d7 TL |
1460 | |
1461 | def add(self, event: OrchestratorEvent) -> None: | |
1462 | ||
1463 | if event.kind_subject() not in self.events: | |
1464 | self.events[event.kind_subject()] = [event] | |
1465 | ||
1466 | for e in self.events[event.kind_subject()]: | |
1467 | if e.message == event.message: | |
1468 | return | |
1469 | ||
1470 | self.events[event.kind_subject()].append(event) | |
1471 | ||
1472 | # limit to five events for now. | |
1473 | self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:] | |
1474 | ||
adb31ebb TL |
1475 | def for_service(self, spec: ServiceSpec, level: str, message: str) -> None: |
1476 | e = OrchestratorEvent(datetime_now(), 'service', | |
f91f0fd5 | 1477 | spec.service_name(), level, message) |
f6b5b4d7 TL |
1478 | self.add(e) |
1479 | ||
adb31ebb | 1480 | def from_orch_error(self, e: OrchestratorError) -> None: |
f6b5b4d7 TL |
1481 | if e.event_subject is not None: |
1482 | self.add(OrchestratorEvent( | |
adb31ebb | 1483 | datetime_now(), |
f6b5b4d7 TL |
1484 | e.event_subject[0], |
1485 | e.event_subject[1], | |
1486 | "ERROR", | |
1487 | str(e) | |
1488 | )) | |
1489 | ||
adb31ebb TL |
1490 | def for_daemon(self, daemon_name: str, level: str, message: str) -> None: |
1491 | e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message) | |
f6b5b4d7 TL |
1492 | self.add(e) |
1493 | ||
adb31ebb | 1494 | def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None: |
f6b5b4d7 TL |
1495 | self.for_daemon( |
1496 | daemon_name, | |
1497 | "ERROR", | |
1498 | str(e) | |
1499 | ) | |
1500 | ||
1501 | def cleanup(self) -> None: | |
1502 | # Needs to be properly done, in case events are persistently stored. | |
1503 | ||
1504 | unknowns: List[str] = [] | |
1505 | daemons = self.mgr.cache.get_daemon_names() | |
f67539c2 | 1506 | specs = self.mgr.spec_store.all_specs.keys() |
f6b5b4d7 TL |
1507 | for k_s, v in self.events.items(): |
1508 | kind, subject = k_s.split(':') | |
1509 | if kind == 'service': | |
1510 | if subject not in specs: | |
1511 | unknowns.append(k_s) | |
1512 | elif kind == 'daemon': | |
1513 | if subject not in daemons: | |
1514 | unknowns.append(k_s) | |
1515 | ||
1516 | for k_s in unknowns: | |
1517 | del self.events[k_s] | |
1518 | ||
adb31ebb | 1519 | def get_for_service(self, name: str) -> List[OrchestratorEvent]: |
f6b5b4d7 TL |
1520 | return self.events.get('service:' + name, []) |
1521 | ||
adb31ebb | 1522 | def get_for_daemon(self, name: str) -> List[OrchestratorEvent]: |
f6b5b4d7 | 1523 | return self.events.get('daemon:' + name, []) |