]>
Commit | Line | Data |
---|---|---|
e306af50 | 1 | import datetime |
2a845540 | 2 | import enum |
e306af50 | 3 | from copy import copy |
b3b6e05e | 4 | import ipaddress |
e306af50 TL |
5 | import json |
6 | import logging | |
2a845540 | 7 | import math |
b3b6e05e | 8 | import socket |
f67539c2 | 9 | from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \ |
b3b6e05e | 10 | NamedTuple, Type |
e306af50 TL |
11 | |
12 | import orchestrator | |
13 | from ceph.deployment import inventory | |
2a845540 | 14 | from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, TunedProfileSpec |
adb31ebb | 15 | from ceph.utils import str_to_datetime, datetime_to_str, datetime_now |
f67539c2 | 16 | from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types |
20effc67 | 17 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec |
e306af50 | 18 | |
b3b6e05e | 19 | from .utils import resolve_ip |
a4b75251 | 20 | from .migrations import queue_migrate_nfs_spec |
b3b6e05e | 21 | |
e306af50 TL |
22 | if TYPE_CHECKING: |
23 | from .module import CephadmOrchestrator | |
24 | ||
25 | ||
26 | logger = logging.getLogger(__name__) | |
27 | ||
28 | HOST_CACHE_PREFIX = "host." | |
29 | SPEC_STORE_PREFIX = "spec." | |
20effc67 | 30 | AGENT_CACHE_PREFIX = 'agent.' |
e306af50 TL |
31 | |
32 | ||
2a845540 TL |
33 | class HostCacheStatus(enum.Enum): |
34 | stray = 'stray' | |
35 | host = 'host' | |
36 | devices = 'devices' | |
37 | ||
38 | ||
e306af50 | 39 | class Inventory: |
adb31ebb TL |
40 | """ |
41 | The inventory stores a HostSpec for all hosts persistently. | |
42 | """ | |
43 | ||
e306af50 TL |
44 | def __init__(self, mgr: 'CephadmOrchestrator'): |
45 | self.mgr = mgr | |
b3b6e05e TL |
46 | adjusted_addrs = False |
47 | ||
48 | def is_valid_ip(ip: str) -> bool: | |
49 | try: | |
50 | ipaddress.ip_address(ip) | |
51 | return True | |
52 | except ValueError: | |
53 | return False | |
54 | ||
e306af50 TL |
55 | # load inventory |
56 | i = self.mgr.get_store('inventory') | |
57 | if i: | |
58 | self._inventory: Dict[str, dict] = json.loads(i) | |
adb31ebb TL |
59 | # handle old clusters missing 'hostname' key from hostspec |
60 | for k, v in self._inventory.items(): | |
61 | if 'hostname' not in v: | |
62 | v['hostname'] = k | |
b3b6e05e TL |
63 | |
64 | # convert legacy non-IP addr? | |
65 | if is_valid_ip(str(v.get('addr'))): | |
66 | continue | |
67 | if len(self._inventory) > 1: | |
68 | if k == socket.gethostname(): | |
69 | # Never try to resolve our own host! This is | |
70 | # fraught and can lead to either a loopback | |
71 | # address (due to podman's futzing with | |
72 | # /etc/hosts) or a private IP based on the CNI | |
73 | # configuration. Instead, wait until the mgr | |
74 | # fails over to another host and let them resolve | |
75 | # this host. | |
76 | continue | |
77 | ip = resolve_ip(cast(str, v.get('addr'))) | |
78 | else: | |
79 | # we only have 1 node in the cluster, so we can't | |
80 | # rely on another host doing the lookup. use the | |
81 | # IP the mgr binds to. | |
82 | ip = self.mgr.get_mgr_ip() | |
83 | if is_valid_ip(ip) and not ip.startswith('127.0.'): | |
84 | self.mgr.log.info( | |
85 | f"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'" | |
86 | ) | |
87 | v['addr'] = ip | |
88 | adjusted_addrs = True | |
89 | if adjusted_addrs: | |
90 | self.save() | |
e306af50 TL |
91 | else: |
92 | self._inventory = dict() | |
93 | logger.debug('Loaded inventory %s' % self._inventory) | |
94 | ||
95 | def keys(self) -> List[str]: | |
96 | return list(self._inventory.keys()) | |
97 | ||
98 | def __contains__(self, host: str) -> bool: | |
99 | return host in self._inventory | |
100 | ||
adb31ebb | 101 | def assert_host(self, host: str) -> None: |
e306af50 TL |
102 | if host not in self._inventory: |
103 | raise OrchestratorError('host %s does not exist' % host) | |
104 | ||
adb31ebb | 105 | def add_host(self, spec: HostSpec) -> None: |
a4b75251 TL |
106 | if spec.hostname in self._inventory: |
107 | # addr | |
108 | if self.get_addr(spec.hostname) != spec.addr: | |
109 | self.set_addr(spec.hostname, spec.addr) | |
110 | # labels | |
111 | for label in spec.labels: | |
112 | self.add_label(spec.hostname, label) | |
113 | else: | |
114 | self._inventory[spec.hostname] = spec.to_json() | |
115 | self.save() | |
e306af50 | 116 | |
adb31ebb | 117 | def rm_host(self, host: str) -> None: |
e306af50 TL |
118 | self.assert_host(host) |
119 | del self._inventory[host] | |
120 | self.save() | |
121 | ||
adb31ebb | 122 | def set_addr(self, host: str, addr: str) -> None: |
e306af50 TL |
123 | self.assert_host(host) |
124 | self._inventory[host]['addr'] = addr | |
125 | self.save() | |
126 | ||
adb31ebb | 127 | def add_label(self, host: str, label: str) -> None: |
e306af50 TL |
128 | self.assert_host(host) |
129 | ||
130 | if 'labels' not in self._inventory[host]: | |
131 | self._inventory[host]['labels'] = list() | |
132 | if label not in self._inventory[host]['labels']: | |
133 | self._inventory[host]['labels'].append(label) | |
134 | self.save() | |
135 | ||
adb31ebb | 136 | def rm_label(self, host: str, label: str) -> None: |
e306af50 TL |
137 | self.assert_host(host) |
138 | ||
139 | if 'labels' not in self._inventory[host]: | |
140 | self._inventory[host]['labels'] = list() | |
141 | if label in self._inventory[host]['labels']: | |
142 | self._inventory[host]['labels'].remove(label) | |
143 | self.save() | |
144 | ||
b3b6e05e TL |
145 | def has_label(self, host: str, label: str) -> bool: |
146 | return ( | |
147 | host in self._inventory | |
148 | and label in self._inventory[host].get('labels', []) | |
149 | ) | |
150 | ||
adb31ebb | 151 | def get_addr(self, host: str) -> str: |
e306af50 TL |
152 | self.assert_host(host) |
153 | return self._inventory[host].get('addr', host) | |
154 | ||
adb31ebb | 155 | def spec_from_dict(self, info: dict) -> HostSpec: |
e306af50 TL |
156 | hostname = info['hostname'] |
157 | return HostSpec( | |
f91f0fd5 TL |
158 | hostname, |
159 | addr=info.get('addr', hostname), | |
160 | labels=info.get('labels', []), | |
161 | status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''), | |
162 | ) | |
e306af50 | 163 | |
f91f0fd5 TL |
164 | def all_specs(self) -> List[HostSpec]: |
165 | return list(map(self.spec_from_dict, self._inventory.values())) | |
e306af50 | 166 | |
f67539c2 TL |
167 | def get_host_with_state(self, state: str = "") -> List[str]: |
168 | """return a list of host names in a specific state""" | |
169 | return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state] | |
170 | ||
adb31ebb | 171 | def save(self) -> None: |
e306af50 TL |
172 | self.mgr.set_store('inventory', json.dumps(self._inventory)) |
173 | ||
174 | ||
f67539c2 TL |
175 | class SpecDescription(NamedTuple): |
176 | spec: ServiceSpec | |
b3b6e05e | 177 | rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] |
f67539c2 TL |
178 | created: datetime.datetime |
179 | deleted: Optional[datetime.datetime] | |
180 | ||
181 | ||
e306af50 TL |
182 | class SpecStore(): |
183 | def __init__(self, mgr): | |
184 | # type: (CephadmOrchestrator) -> None | |
185 | self.mgr = mgr | |
f67539c2 | 186 | self._specs = {} # type: Dict[str, ServiceSpec] |
b3b6e05e TL |
187 | # service_name -> rank -> gen -> daemon_id |
188 | self._rank_maps = {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]] | |
f91f0fd5 | 189 | self.spec_created = {} # type: Dict[str, datetime.datetime] |
f67539c2 | 190 | self.spec_deleted = {} # type: Dict[str, datetime.datetime] |
f91f0fd5 | 191 | self.spec_preview = {} # type: Dict[str, ServiceSpec] |
e306af50 | 192 | |
f67539c2 TL |
193 | @property |
194 | def all_specs(self) -> Mapping[str, ServiceSpec]: | |
195 | """ | |
196 | returns active and deleted specs. Returns read-only dict. | |
197 | """ | |
198 | return self._specs | |
199 | ||
200 | def __contains__(self, name: str) -> bool: | |
201 | return name in self._specs | |
202 | ||
203 | def __getitem__(self, name: str) -> SpecDescription: | |
204 | if name not in self._specs: | |
205 | raise OrchestratorError(f'Service {name} not found.') | |
206 | return SpecDescription(self._specs[name], | |
b3b6e05e | 207 | self._rank_maps.get(name), |
f67539c2 TL |
208 | self.spec_created[name], |
209 | self.spec_deleted.get(name, None)) | |
210 | ||
211 | @property | |
212 | def active_specs(self) -> Mapping[str, ServiceSpec]: | |
213 | return {k: v for k, v in self._specs.items() if k not in self.spec_deleted} | |
214 | ||
e306af50 TL |
215 | def load(self): |
216 | # type: () -> None | |
f67539c2 | 217 | for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items(): |
e306af50 TL |
218 | service_name = k[len(SPEC_STORE_PREFIX):] |
219 | try: | |
f67539c2 | 220 | j = cast(Dict[str, dict], json.loads(v)) |
a4b75251 TL |
221 | if ( |
222 | (self.mgr.migration_current or 0) < 3 | |
223 | and j['spec'].get('service_type') == 'nfs' | |
224 | ): | |
225 | self.mgr.log.debug(f'found legacy nfs spec {j}') | |
226 | queue_migrate_nfs_spec(self.mgr, j) | |
f67539c2 TL |
227 | spec = ServiceSpec.from_json(j['spec']) |
228 | created = str_to_datetime(cast(str, j['created'])) | |
229 | self._specs[service_name] = spec | |
e306af50 | 230 | self.spec_created[service_name] = created |
f67539c2 | 231 | |
b3b6e05e | 232 | if 'deleted' in j: |
f67539c2 TL |
233 | deleted = str_to_datetime(cast(str, j['deleted'])) |
234 | self.spec_deleted[service_name] = deleted | |
235 | ||
b3b6e05e TL |
236 | if 'rank_map' in j and isinstance(j['rank_map'], dict): |
237 | self._rank_maps[service_name] = {} | |
238 | for rank_str, m in j['rank_map'].items(): | |
239 | try: | |
240 | rank = int(rank_str) | |
241 | except ValueError: | |
242 | logger.exception(f"failed to parse rank in {j['rank_map']}") | |
243 | continue | |
244 | if isinstance(m, dict): | |
245 | self._rank_maps[service_name][rank] = {} | |
246 | for gen_str, name in m.items(): | |
247 | try: | |
248 | gen = int(gen_str) | |
249 | except ValueError: | |
250 | logger.exception(f"failed to parse gen in {j['rank_map']}") | |
251 | continue | |
252 | if isinstance(name, str) or m is None: | |
253 | self._rank_maps[service_name][rank][gen] = name | |
254 | ||
e306af50 TL |
255 | self.mgr.log.debug('SpecStore: loaded spec for %s' % ( |
256 | service_name)) | |
257 | except Exception as e: | |
258 | self.mgr.log.warning('unable to load spec for %s: %s' % ( | |
259 | service_name, e)) | |
260 | pass | |
261 | ||
b3b6e05e TL |
262 | def save( |
263 | self, | |
264 | spec: ServiceSpec, | |
265 | update_create: bool = True, | |
266 | ) -> None: | |
f67539c2 | 267 | name = spec.service_name() |
f6b5b4d7 | 268 | if spec.preview_only: |
f67539c2 | 269 | self.spec_preview[name] = spec |
f6b5b4d7 | 270 | return None |
f67539c2 TL |
271 | self._specs[name] = spec |
272 | ||
273 | if update_create: | |
274 | self.spec_created[name] = datetime_now() | |
b3b6e05e | 275 | self._save(name) |
f67539c2 | 276 | |
b3b6e05e TL |
277 | def save_rank_map(self, |
278 | name: str, | |
279 | rank_map: Dict[int, Dict[int, Optional[str]]]) -> None: | |
280 | self._rank_maps[name] = rank_map | |
281 | self._save(name) | |
282 | ||
283 | def _save(self, name: str) -> None: | |
284 | data: Dict[str, Any] = { | |
285 | 'spec': self._specs[name].to_json(), | |
f67539c2 TL |
286 | 'created': datetime_to_str(self.spec_created[name]), |
287 | } | |
b3b6e05e TL |
288 | if name in self._rank_maps: |
289 | data['rank_map'] = self._rank_maps[name] | |
f67539c2 TL |
290 | if name in self.spec_deleted: |
291 | data['deleted'] = datetime_to_str(self.spec_deleted[name]) | |
292 | ||
e306af50 | 293 | self.mgr.set_store( |
f67539c2 TL |
294 | SPEC_STORE_PREFIX + name, |
295 | json.dumps(data, sort_keys=True), | |
e306af50 | 296 | ) |
b3b6e05e TL |
297 | self.mgr.events.for_service(self._specs[name], |
298 | OrchestratorEvent.INFO, | |
299 | 'service was created') | |
e306af50 | 300 | |
f67539c2 TL |
301 | def rm(self, service_name: str) -> bool: |
302 | if service_name not in self._specs: | |
303 | return False | |
304 | ||
305 | if self._specs[service_name].preview_only: | |
306 | self.finally_rm(service_name) | |
307 | return True | |
308 | ||
309 | self.spec_deleted[service_name] = datetime_now() | |
310 | self.save(self._specs[service_name], update_create=False) | |
311 | return True | |
312 | ||
313 | def finally_rm(self, service_name): | |
e306af50 | 314 | # type: (str) -> bool |
f67539c2 | 315 | found = service_name in self._specs |
e306af50 | 316 | if found: |
f67539c2 | 317 | del self._specs[service_name] |
b3b6e05e TL |
318 | if service_name in self._rank_maps: |
319 | del self._rank_maps[service_name] | |
e306af50 | 320 | del self.spec_created[service_name] |
f67539c2 TL |
321 | if service_name in self.spec_deleted: |
322 | del self.spec_deleted[service_name] | |
e306af50 TL |
323 | self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None) |
324 | return found | |
325 | ||
f67539c2 TL |
326 | def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]: |
327 | return self.spec_created.get(spec.service_name()) | |
e306af50 | 328 | |
f91f0fd5 | 329 | |
b3b6e05e TL |
330 | class ClientKeyringSpec(object): |
331 | """ | |
332 | A client keyring file that we should maintain | |
333 | """ | |
334 | ||
335 | def __init__( | |
336 | self, | |
337 | entity: str, | |
338 | placement: PlacementSpec, | |
339 | mode: Optional[int] = None, | |
340 | uid: Optional[int] = None, | |
341 | gid: Optional[int] = None, | |
342 | ) -> None: | |
343 | self.entity = entity | |
344 | self.placement = placement | |
345 | self.mode = mode or 0o600 | |
346 | self.uid = uid or 0 | |
347 | self.gid = gid or 0 | |
348 | ||
349 | def validate(self) -> None: | |
350 | pass | |
351 | ||
352 | def to_json(self) -> Dict[str, Any]: | |
353 | return { | |
354 | 'entity': self.entity, | |
355 | 'placement': self.placement.to_json(), | |
356 | 'mode': self.mode, | |
357 | 'uid': self.uid, | |
358 | 'gid': self.gid, | |
359 | } | |
360 | ||
361 | @property | |
362 | def path(self) -> str: | |
363 | return f'/etc/ceph/ceph.{self.entity}.keyring' | |
364 | ||
365 | @classmethod | |
366 | def from_json(cls: Type, data: dict) -> 'ClientKeyringSpec': | |
367 | c = data.copy() | |
368 | if 'placement' in c: | |
369 | c['placement'] = PlacementSpec.from_json(c['placement']) | |
370 | _cls = cls(**c) | |
371 | _cls.validate() | |
372 | return _cls | |
373 | ||
374 | ||
375 | class ClientKeyringStore(): | |
376 | """ | |
377 | Track client keyring files that we are supposed to maintain | |
378 | """ | |
379 | ||
380 | def __init__(self, mgr): | |
381 | # type: (CephadmOrchestrator) -> None | |
382 | self.mgr: CephadmOrchestrator = mgr | |
383 | self.mgr = mgr | |
384 | self.keys: Dict[str, ClientKeyringSpec] = {} | |
385 | ||
386 | def load(self) -> None: | |
387 | c = self.mgr.get_store('client_keyrings') or b'{}' | |
388 | j = json.loads(c) | |
389 | for e, d in j.items(): | |
390 | self.keys[e] = ClientKeyringSpec.from_json(d) | |
391 | ||
392 | def save(self) -> None: | |
393 | data = { | |
394 | k: v.to_json() for k, v in self.keys.items() | |
395 | } | |
396 | self.mgr.set_store('client_keyrings', json.dumps(data)) | |
397 | ||
398 | def update(self, ks: ClientKeyringSpec) -> None: | |
399 | self.keys[ks.entity] = ks | |
400 | self.save() | |
401 | ||
402 | def rm(self, entity: str) -> None: | |
403 | if entity in self.keys: | |
404 | del self.keys[entity] | |
405 | self.save() | |
406 | ||
407 | ||
2a845540 TL |
408 | class TunedProfileStore(): |
409 | """ | |
410 | Store for out tuned profile information | |
411 | """ | |
412 | ||
413 | def __init__(self, mgr: "CephadmOrchestrator") -> None: | |
414 | self.mgr: CephadmOrchestrator = mgr | |
415 | self.mgr = mgr | |
416 | self.profiles: Dict[str, TunedProfileSpec] = {} | |
417 | ||
418 | def __contains__(self, profile: str) -> bool: | |
419 | return profile in self.profiles | |
420 | ||
421 | def load(self) -> None: | |
422 | c = self.mgr.get_store('tuned_profiles') or b'{}' | |
423 | j = json.loads(c) | |
424 | for k, v in j.items(): | |
425 | self.profiles[k] = TunedProfileSpec.from_json(v) | |
426 | self.profiles[k]._last_updated = datetime_to_str(datetime_now()) | |
427 | ||
428 | def exists(self, profile_name: str) -> bool: | |
429 | return profile_name in self.profiles | |
430 | ||
431 | def save(self) -> None: | |
432 | profiles_json = {k: v.to_json() for k, v in self.profiles.items()} | |
433 | self.mgr.set_store('tuned_profiles', json.dumps(profiles_json)) | |
434 | ||
435 | def add_setting(self, profile: str, setting: str, value: str) -> None: | |
436 | if profile in self.profiles: | |
437 | self.profiles[profile].settings[setting] = value | |
438 | self.profiles[profile]._last_updated = datetime_to_str(datetime_now()) | |
439 | self.save() | |
440 | else: | |
441 | logger.error( | |
442 | f'Attempted to set setting "{setting}" for nonexistent os tuning profile "{profile}"') | |
443 | ||
444 | def rm_setting(self, profile: str, setting: str) -> None: | |
445 | if profile in self.profiles: | |
446 | if setting in self.profiles[profile].settings: | |
447 | self.profiles[profile].settings.pop(setting, '') | |
448 | self.profiles[profile]._last_updated = datetime_to_str(datetime_now()) | |
449 | self.save() | |
450 | else: | |
451 | logger.error( | |
452 | f'Attemped to remove nonexistent setting "{setting}" from os tuning profile "{profile}"') | |
453 | else: | |
454 | logger.error( | |
455 | f'Attempted to remove setting "{setting}" from nonexistent os tuning profile "{profile}"') | |
456 | ||
457 | def add_profile(self, spec: TunedProfileSpec) -> None: | |
458 | spec._last_updated = datetime_to_str(datetime_now()) | |
459 | self.profiles[spec.profile_name] = spec | |
460 | self.save() | |
461 | ||
462 | def rm_profile(self, profile: str) -> None: | |
463 | if profile in self.profiles: | |
464 | self.profiles.pop(profile, TunedProfileSpec('')) | |
465 | else: | |
466 | logger.error(f'Attempted to remove nonexistent os tuning profile "{profile}"') | |
467 | self.save() | |
468 | ||
469 | def last_updated(self, profile: str) -> Optional[datetime.datetime]: | |
470 | if profile not in self.profiles or not self.profiles[profile]._last_updated: | |
471 | return None | |
472 | return str_to_datetime(self.profiles[profile]._last_updated) | |
473 | ||
474 | def set_last_updated(self, profile: str, new_datetime: datetime.datetime) -> None: | |
475 | if profile in self.profiles: | |
476 | self.profiles[profile]._last_updated = datetime_to_str(new_datetime) | |
477 | ||
478 | def list_profiles(self) -> List[TunedProfileSpec]: | |
479 | return [p for p in self.profiles.values()] | |
480 | ||
481 | ||
e306af50 | 482 | class HostCache(): |
adb31ebb TL |
483 | """ |
484 | HostCache stores different things: | |
485 | ||
486 | 1. `daemons`: Deployed daemons O(daemons) | |
487 | ||
488 | They're part of the configuration nowadays and need to be | |
489 | persistent. The name "daemon cache" is unfortunately a bit misleading. | |
490 | Like for example we really need to know where daemons are deployed on | |
491 | hosts that are offline. | |
492 | ||
493 | 2. `devices`: ceph-volume inventory cache O(hosts) | |
494 | ||
495 | As soon as this is populated, it becomes more or less read-only. | |
496 | ||
497 | 3. `networks`: network interfaces for each host. O(hosts) | |
498 | ||
499 | This is needed in order to deploy MONs. As this is mostly read-only. | |
500 | ||
b3b6e05e | 501 | 4. `last_client_files` O(hosts) |
adb31ebb | 502 | |
b3b6e05e TL |
503 | Stores the last digest and owner/mode for files we've pushed to /etc/ceph |
504 | (ceph.conf or client keyrings). | |
adb31ebb TL |
505 | |
506 | 5. `scheduled_daemon_actions`: O(daemons) | |
507 | ||
508 | Used to run daemon actions after deploying a daemon. We need to | |
509 | store it persistently, in order to stay consistent across | |
f67539c2 | 510 | MGR failovers. |
adb31ebb TL |
511 | """ |
512 | ||
e306af50 TL |
513 | def __init__(self, mgr): |
514 | # type: (CephadmOrchestrator) -> None | |
515 | self.mgr: CephadmOrchestrator = mgr | |
516 | self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] | |
517 | self.last_daemon_update = {} # type: Dict[str, datetime.datetime] | |
518 | self.devices = {} # type: Dict[str, List[inventory.Device]] | |
adb31ebb TL |
519 | self.facts = {} # type: Dict[str, Dict[str, Any]] |
520 | self.last_facts_update = {} # type: Dict[str, datetime.datetime] | |
b3b6e05e | 521 | self.last_autotune = {} # type: Dict[str, datetime.datetime] |
e306af50 | 522 | self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]] |
f67539c2 TL |
523 | self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]] |
524 | self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]] | |
20effc67 | 525 | self.last_network_update = {} # type: Dict[str, datetime.datetime] |
e306af50 | 526 | self.last_device_update = {} # type: Dict[str, datetime.datetime] |
f67539c2 | 527 | self.last_device_change = {} # type: Dict[str, datetime.datetime] |
2a845540 | 528 | self.last_tuned_profile_update = {} # type: Dict[str, datetime.datetime] |
f91f0fd5 TL |
529 | self.daemon_refresh_queue = [] # type: List[str] |
530 | self.device_refresh_queue = [] # type: List[str] | |
20effc67 | 531 | self.network_refresh_queue = [] # type: List[str] |
f91f0fd5 | 532 | self.osdspec_previews_refresh_queue = [] # type: List[str] |
f6b5b4d7 TL |
533 | |
534 | # host -> daemon name -> dict | |
e306af50 TL |
535 | self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]] |
536 | self.last_host_check = {} # type: Dict[str, datetime.datetime] | |
537 | self.loading_osdspec_preview = set() # type: Set[str] | |
b3b6e05e | 538 | self.last_client_files: Dict[str, Dict[str, Tuple[str, int, int, int]]] = {} |
f6b5b4d7 | 539 | self.registry_login_queue: Set[str] = set() |
e306af50 | 540 | |
f91f0fd5 TL |
541 | self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {} |
542 | ||
20effc67 TL |
543 | self.metadata_up_to_date = {} # type: Dict[str, bool] |
544 | ||
e306af50 TL |
545 | def load(self): |
546 | # type: () -> None | |
f67539c2 | 547 | for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items(): |
e306af50 | 548 | host = k[len(HOST_CACHE_PREFIX):] |
2a845540 TL |
549 | if self._get_host_cache_entry_status(host) != HostCacheStatus.host: |
550 | if self._get_host_cache_entry_status(host) == HostCacheStatus.devices: | |
551 | continue | |
e306af50 TL |
552 | self.mgr.log.warning('removing stray HostCache host record %s' % ( |
553 | host)) | |
554 | self.mgr.set_store(k, None) | |
555 | try: | |
556 | j = json.loads(v) | |
557 | if 'last_device_update' in j: | |
f91f0fd5 | 558 | self.last_device_update[host] = str_to_datetime(j['last_device_update']) |
e306af50 TL |
559 | else: |
560 | self.device_refresh_queue.append(host) | |
f67539c2 TL |
561 | if 'last_device_change' in j: |
562 | self.last_device_change[host] = str_to_datetime(j['last_device_change']) | |
e306af50 TL |
563 | # for services, we ignore the persisted last_*_update |
564 | # and always trigger a new scrape on mgr restart. | |
565 | self.daemon_refresh_queue.append(host) | |
20effc67 | 566 | self.network_refresh_queue.append(host) |
e306af50 TL |
567 | self.daemons[host] = {} |
568 | self.osdspec_previews[host] = [] | |
f67539c2 | 569 | self.osdspec_last_applied[host] = {} |
e306af50 TL |
570 | self.networks[host] = {} |
571 | self.daemon_config_deps[host] = {} | |
572 | for name, d in j.get('daemons', {}).items(): | |
573 | self.daemons[host][name] = \ | |
574 | orchestrator.DaemonDescription.from_json(d) | |
2a845540 TL |
575 | self.devices[host] = [] |
576 | # still want to check old device location for upgrade scenarios | |
e306af50 TL |
577 | for d in j.get('devices', []): |
578 | self.devices[host].append(inventory.Device.from_json(d)) | |
2a845540 | 579 | self.devices[host] += self.load_host_devices(host) |
f67539c2 | 580 | self.networks[host] = j.get('networks_and_interfaces', {}) |
e306af50 | 581 | self.osdspec_previews[host] = j.get('osdspec_previews', {}) |
b3b6e05e | 582 | self.last_client_files[host] = j.get('last_client_files', {}) |
f67539c2 TL |
583 | for name, ts in j.get('osdspec_last_applied', {}).items(): |
584 | self.osdspec_last_applied[host][name] = str_to_datetime(ts) | |
e306af50 TL |
585 | |
586 | for name, d in j.get('daemon_config_deps', {}).items(): | |
587 | self.daemon_config_deps[host][name] = { | |
588 | 'deps': d.get('deps', []), | |
f91f0fd5 | 589 | 'last_config': str_to_datetime(d['last_config']), |
e306af50 TL |
590 | } |
591 | if 'last_host_check' in j: | |
f91f0fd5 | 592 | self.last_host_check[host] = str_to_datetime(j['last_host_check']) |
2a845540 TL |
593 | if 'last_tuned_profile_update' in j: |
594 | self.last_tuned_profile_update[host] = str_to_datetime( | |
595 | j['last_tuned_profile_update']) | |
f6b5b4d7 | 596 | self.registry_login_queue.add(host) |
f91f0fd5 | 597 | self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {}) |
20effc67 | 598 | self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False) |
f91f0fd5 | 599 | |
e306af50 TL |
600 | self.mgr.log.debug( |
601 | 'HostCache.load: host %s has %d daemons, ' | |
602 | '%d devices, %d networks' % ( | |
603 | host, len(self.daemons[host]), len(self.devices[host]), | |
604 | len(self.networks[host]))) | |
605 | except Exception as e: | |
606 | self.mgr.log.warning('unable to load cached state for %s: %s' % ( | |
607 | host, e)) | |
608 | pass | |
609 | ||
2a845540 TL |
610 | def _get_host_cache_entry_status(self, host: str) -> HostCacheStatus: |
611 | # return whether a host cache entry in the config-key | |
612 | # store is for a host, a set of devices or is stray. | |
613 | # for a host, the entry name will match a hostname in our | |
614 | # inventory. For devices, it will be formatted | |
615 | # <hostname>.devices.<integer> where <hostname> is | |
616 | # in out inventory. If neither case applies, it is stray | |
617 | if host in self.mgr.inventory: | |
618 | return HostCacheStatus.host | |
619 | try: | |
620 | # try stripping off the ".devices.<integer>" and see if we get | |
621 | # a host name that matches our inventory | |
622 | actual_host = '.'.join(host.split('.')[:-2]) | |
623 | return HostCacheStatus.devices if actual_host in self.mgr.inventory else HostCacheStatus.stray | |
624 | except Exception: | |
625 | return HostCacheStatus.stray | |
626 | ||
e306af50 TL |
627 | def update_host_daemons(self, host, dm): |
628 | # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None | |
629 | self.daemons[host] = dm | |
adb31ebb TL |
630 | self.last_daemon_update[host] = datetime_now() |
631 | ||
632 | def update_host_facts(self, host, facts): | |
633 | # type: (str, Dict[str, Dict[str, Any]]) -> None | |
634 | self.facts[host] = facts | |
f67539c2 TL |
635 | self.last_facts_update[host] = datetime_now() |
636 | ||
b3b6e05e TL |
637 | def update_autotune(self, host: str) -> None: |
638 | self.last_autotune[host] = datetime_now() | |
639 | ||
640 | def invalidate_autotune(self, host: str) -> None: | |
641 | if host in self.last_autotune: | |
642 | del self.last_autotune[host] | |
643 | ||
f67539c2 | 644 | def devices_changed(self, host: str, b: List[inventory.Device]) -> bool: |
39ae355f TL |
645 | old_devs = inventory.Devices(self.devices[host]) |
646 | new_devs = inventory.Devices(b) | |
647 | # relying on Devices class __eq__ function here | |
648 | if old_devs != new_devs: | |
f67539c2 TL |
649 | self.mgr.log.info("Detected new or changed devices on %s" % host) |
650 | return True | |
651 | return False | |
e306af50 | 652 | |
20effc67 | 653 | def update_host_devices( |
f67539c2 TL |
654 | self, |
655 | host: str, | |
656 | dls: List[inventory.Device], | |
f67539c2 TL |
657 | ) -> None: |
658 | if ( | |
659 | host not in self.devices | |
660 | or host not in self.last_device_change | |
661 | or self.devices_changed(host, dls) | |
662 | ): | |
663 | self.last_device_change[host] = datetime_now() | |
664 | self.last_device_update[host] = datetime_now() | |
e306af50 | 665 | self.devices[host] = dls |
20effc67 TL |
666 | |
667 | def update_host_networks( | |
668 | self, | |
669 | host: str, | |
670 | nets: Dict[str, Dict[str, List[str]]] | |
671 | ) -> None: | |
e306af50 | 672 | self.networks[host] = nets |
20effc67 | 673 | self.last_network_update[host] = datetime_now() |
e306af50 | 674 | |
adb31ebb | 675 | def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None: |
e306af50 TL |
676 | self.daemon_config_deps[host][name] = { |
677 | 'deps': deps, | |
678 | 'last_config': stamp, | |
679 | } | |
680 | ||
681 | def update_last_host_check(self, host): | |
682 | # type: (str) -> None | |
adb31ebb | 683 | self.last_host_check[host] = datetime_now() |
e306af50 | 684 | |
f67539c2 TL |
685 | def update_osdspec_last_applied(self, host, service_name, ts): |
686 | # type: (str, str, datetime.datetime) -> None | |
687 | self.osdspec_last_applied[host][service_name] = ts | |
688 | ||
b3b6e05e TL |
689 | def update_client_file(self, |
690 | host: str, | |
691 | path: str, | |
692 | digest: str, | |
693 | mode: int, | |
694 | uid: int, | |
695 | gid: int) -> None: | |
696 | if host not in self.last_client_files: | |
697 | self.last_client_files[host] = {} | |
698 | self.last_client_files[host][path] = (digest, mode, uid, gid) | |
699 | ||
700 | def removed_client_file(self, host: str, path: str) -> None: | |
701 | if ( | |
702 | host in self.last_client_files | |
703 | and path in self.last_client_files[host] | |
704 | ): | |
705 | del self.last_client_files[host][path] | |
706 | ||
e306af50 TL |
707 | def prime_empty_host(self, host): |
708 | # type: (str) -> None | |
709 | """ | |
710 | Install an empty entry for a host | |
711 | """ | |
712 | self.daemons[host] = {} | |
713 | self.devices[host] = [] | |
714 | self.networks[host] = {} | |
715 | self.osdspec_previews[host] = [] | |
f67539c2 | 716 | self.osdspec_last_applied[host] = {} |
e306af50 TL |
717 | self.daemon_config_deps[host] = {} |
718 | self.daemon_refresh_queue.append(host) | |
719 | self.device_refresh_queue.append(host) | |
20effc67 | 720 | self.network_refresh_queue.append(host) |
e306af50 | 721 | self.osdspec_previews_refresh_queue.append(host) |
f6b5b4d7 | 722 | self.registry_login_queue.add(host) |
b3b6e05e | 723 | self.last_client_files[host] = {} |
e306af50 | 724 | |
a4b75251 TL |
725 | def refresh_all_host_info(self, host): |
726 | # type: (str) -> None | |
727 | ||
728 | self.last_host_check.pop(host, None) | |
729 | self.daemon_refresh_queue.append(host) | |
730 | self.registry_login_queue.add(host) | |
731 | self.device_refresh_queue.append(host) | |
732 | self.last_facts_update.pop(host, None) | |
733 | self.osdspec_previews_refresh_queue.append(host) | |
734 | self.last_autotune.pop(host, None) | |
735 | ||
e306af50 TL |
736 | def invalidate_host_daemons(self, host): |
737 | # type: (str) -> None | |
738 | self.daemon_refresh_queue.append(host) | |
739 | if host in self.last_daemon_update: | |
740 | del self.last_daemon_update[host] | |
741 | self.mgr.event.set() | |
742 | ||
743 | def invalidate_host_devices(self, host): | |
744 | # type: (str) -> None | |
745 | self.device_refresh_queue.append(host) | |
746 | if host in self.last_device_update: | |
747 | del self.last_device_update[host] | |
748 | self.mgr.event.set() | |
f91f0fd5 | 749 | |
20effc67 TL |
750 | def invalidate_host_networks(self, host): |
751 | # type: (str) -> None | |
752 | self.network_refresh_queue.append(host) | |
753 | if host in self.last_network_update: | |
754 | del self.last_network_update[host] | |
755 | self.mgr.event.set() | |
756 | ||
adb31ebb | 757 | def distribute_new_registry_login_info(self) -> None: |
f6b5b4d7 | 758 | self.registry_login_queue = set(self.mgr.inventory.keys()) |
e306af50 | 759 | |
f91f0fd5 TL |
760 | def save_host(self, host: str) -> None: |
761 | j: Dict[str, Any] = { | |
e306af50 TL |
762 | 'daemons': {}, |
763 | 'devices': [], | |
764 | 'osdspec_previews': [], | |
f67539c2 | 765 | 'osdspec_last_applied': {}, |
e306af50 TL |
766 | 'daemon_config_deps': {}, |
767 | } | |
768 | if host in self.last_daemon_update: | |
f91f0fd5 | 769 | j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host]) |
e306af50 | 770 | if host in self.last_device_update: |
f91f0fd5 | 771 | j['last_device_update'] = datetime_to_str(self.last_device_update[host]) |
20effc67 TL |
772 | if host in self.last_network_update: |
773 | j['last_network_update'] = datetime_to_str(self.last_network_update[host]) | |
f67539c2 TL |
774 | if host in self.last_device_change: |
775 | j['last_device_change'] = datetime_to_str(self.last_device_change[host]) | |
2a845540 TL |
776 | if host in self.last_tuned_profile_update: |
777 | j['last_tuned_profile_update'] = datetime_to_str(self.last_tuned_profile_update[host]) | |
adb31ebb TL |
778 | if host in self.daemons: |
779 | for name, dd in self.daemons[host].items(): | |
780 | j['daemons'][name] = dd.to_json() | |
adb31ebb | 781 | if host in self.networks: |
f67539c2 | 782 | j['networks_and_interfaces'] = self.networks[host] |
adb31ebb TL |
783 | if host in self.daemon_config_deps: |
784 | for name, depi in self.daemon_config_deps[host].items(): | |
785 | j['daemon_config_deps'][name] = { | |
786 | 'deps': depi.get('deps', []), | |
787 | 'last_config': datetime_to_str(depi['last_config']), | |
788 | } | |
789 | if host in self.osdspec_previews and self.osdspec_previews[host]: | |
e306af50 | 790 | j['osdspec_previews'] = self.osdspec_previews[host] |
f67539c2 TL |
791 | if host in self.osdspec_last_applied: |
792 | for name, ts in self.osdspec_last_applied[host].items(): | |
793 | j['osdspec_last_applied'][name] = datetime_to_str(ts) | |
e306af50 TL |
794 | |
795 | if host in self.last_host_check: | |
f91f0fd5 | 796 | j['last_host_check'] = datetime_to_str(self.last_host_check[host]) |
f6b5b4d7 | 797 | |
b3b6e05e TL |
798 | if host in self.last_client_files: |
799 | j['last_client_files'] = self.last_client_files[host] | |
adb31ebb | 800 | if host in self.scheduled_daemon_actions: |
f91f0fd5 | 801 | j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host] |
20effc67 TL |
802 | if host in self.metadata_up_to_date: |
803 | j['metadata_up_to_date'] = self.metadata_up_to_date[host] | |
2a845540 TL |
804 | if host in self.devices: |
805 | self.save_host_devices(host) | |
f6b5b4d7 | 806 | |
e306af50 TL |
807 | self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) |
808 | ||
2a845540 TL |
809 | def save_host_devices(self, host: str) -> None: |
810 | if host not in self.devices or not self.devices[host]: | |
811 | logger.debug(f'Host {host} has no devices to save') | |
812 | return | |
813 | ||
814 | devs: List[Dict[str, Any]] = [] | |
815 | for d in self.devices[host]: | |
816 | devs.append(d.to_json()) | |
817 | ||
818 | def byte_len(s: str) -> int: | |
819 | return len(s.encode('utf-8')) | |
820 | ||
821 | dev_cache_counter: int = 0 | |
822 | cache_size: int = self.mgr.get_foreign_ceph_option('mon', 'mon_config_key_max_entry_size') | |
823 | if cache_size is not None and cache_size != 0 and byte_len(json.dumps(devs)) > cache_size - 1024: | |
824 | # no guarantee all device entries take up the same amount of space | |
825 | # splitting it up so there's one more entry than we need should be fairly | |
826 | # safe and save a lot of extra logic checking sizes | |
827 | cache_entries_needed = math.ceil(byte_len(json.dumps(devs)) / cache_size) + 1 | |
828 | dev_sublist_size = math.ceil(len(devs) / cache_entries_needed) | |
829 | dev_lists: List[List[Dict[str, Any]]] = [devs[i:i + dev_sublist_size] | |
830 | for i in range(0, len(devs), dev_sublist_size)] | |
831 | for dev_list in dev_lists: | |
832 | dev_dict: Dict[str, Any] = {'devices': dev_list} | |
833 | if dev_cache_counter == 0: | |
834 | dev_dict.update({'entries': len(dev_lists)}) | |
835 | self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.' | |
836 | + str(dev_cache_counter), json.dumps(dev_dict)) | |
837 | dev_cache_counter += 1 | |
838 | else: | |
839 | self.mgr.set_store(HOST_CACHE_PREFIX + host + '.devices.' | |
840 | + str(dev_cache_counter), json.dumps({'devices': devs, 'entries': 1})) | |
841 | ||
842 | def load_host_devices(self, host: str) -> List[inventory.Device]: | |
843 | dev_cache_counter: int = 0 | |
844 | devs: List[Dict[str, Any]] = [] | |
845 | dev_entries: int = 0 | |
846 | try: | |
847 | # number of entries for the host's devices should be in | |
848 | # the "entries" field of the first entry | |
849 | dev_entries = json.loads(self.mgr.get_store( | |
850 | HOST_CACHE_PREFIX + host + '.devices.0')).get('entries') | |
851 | except Exception: | |
852 | logger.debug(f'No device entries found for host {host}') | |
853 | for i in range(dev_entries): | |
854 | try: | |
855 | new_devs = json.loads(self.mgr.get_store( | |
856 | HOST_CACHE_PREFIX + host + '.devices.' + str(i))).get('devices', []) | |
857 | if len(new_devs) > 0: | |
858 | # verify list contains actual device objects by trying to load one from json | |
859 | inventory.Device.from_json(new_devs[0]) | |
860 | # if we didn't throw an Exception on above line, we can add the devices | |
861 | devs = devs + new_devs | |
862 | dev_cache_counter += 1 | |
863 | except Exception as e: | |
864 | logger.error(('Hit exception trying to load devices from ' | |
865 | + f'{HOST_CACHE_PREFIX + host + ".devices." + str(dev_cache_counter)} in key store: {e}')) | |
866 | return [] | |
867 | return [inventory.Device.from_json(d) for d in devs] | |
868 | ||
e306af50 TL |
869 | def rm_host(self, host): |
870 | # type: (str) -> None | |
871 | if host in self.daemons: | |
872 | del self.daemons[host] | |
873 | if host in self.devices: | |
874 | del self.devices[host] | |
adb31ebb TL |
875 | if host in self.facts: |
876 | del self.facts[host] | |
877 | if host in self.last_facts_update: | |
878 | del self.last_facts_update[host] | |
b3b6e05e TL |
879 | if host in self.last_autotune: |
880 | del self.last_autotune[host] | |
e306af50 TL |
881 | if host in self.osdspec_previews: |
882 | del self.osdspec_previews[host] | |
f67539c2 TL |
883 | if host in self.osdspec_last_applied: |
884 | del self.osdspec_last_applied[host] | |
e306af50 TL |
885 | if host in self.loading_osdspec_preview: |
886 | self.loading_osdspec_preview.remove(host) | |
887 | if host in self.networks: | |
888 | del self.networks[host] | |
889 | if host in self.last_daemon_update: | |
890 | del self.last_daemon_update[host] | |
891 | if host in self.last_device_update: | |
892 | del self.last_device_update[host] | |
20effc67 TL |
893 | if host in self.last_network_update: |
894 | del self.last_network_update[host] | |
f67539c2 TL |
895 | if host in self.last_device_change: |
896 | del self.last_device_change[host] | |
2a845540 TL |
897 | if host in self.last_tuned_profile_update: |
898 | del self.last_tuned_profile_update[host] | |
e306af50 TL |
899 | if host in self.daemon_config_deps: |
900 | del self.daemon_config_deps[host] | |
f91f0fd5 TL |
901 | if host in self.scheduled_daemon_actions: |
902 | del self.scheduled_daemon_actions[host] | |
b3b6e05e TL |
903 | if host in self.last_client_files: |
904 | del self.last_client_files[host] | |
e306af50 TL |
905 | self.mgr.set_store(HOST_CACHE_PREFIX + host, None) |
906 | ||
907 | def get_hosts(self): | |
908 | # type: () -> List[str] | |
20effc67 TL |
909 | return list(self.daemons) |
910 | ||
911 | def get_schedulable_hosts(self) -> List[HostSpec]: | |
912 | """ | |
913 | Returns all usable hosts that went through _refresh_host_daemons(). | |
914 | ||
915 | This mitigates a potential race, where new host was added *after* | |
916 | ``_refresh_host_daemons()`` was called, but *before* | |
917 | ``_apply_all_specs()`` was called. thus we end up with a hosts | |
918 | where daemons might be running, but we have not yet detected them. | |
919 | """ | |
920 | return [ | |
921 | h for h in self.mgr.inventory.all_specs() | |
922 | if ( | |
923 | self.host_had_daemon_refresh(h.hostname) | |
924 | and '_no_schedule' not in h.labels | |
925 | ) | |
926 | ] | |
927 | ||
928 | def get_non_draining_hosts(self) -> List[HostSpec]: | |
929 | """ | |
930 | Returns all hosts that do not have _no_schedule label. | |
931 | ||
932 | Useful for the agent who needs this specific list rather than the | |
933 | schedulable_hosts since the agent needs to be deployed on hosts with | |
934 | no daemon refresh | |
935 | """ | |
936 | return [ | |
937 | h for h in self.mgr.inventory.all_specs() if '_no_schedule' not in h.labels | |
938 | ] | |
939 | ||
2a845540 TL |
940 | def get_draining_hosts(self) -> List[HostSpec]: |
941 | """ | |
942 | Returns all hosts that have _no_schedule label and therefore should have | |
943 | no daemons placed on them, but are potentially still reachable | |
944 | """ | |
945 | return [ | |
946 | h for h in self.mgr.inventory.all_specs() if '_no_schedule' in h.labels | |
947 | ] | |
948 | ||
20effc67 TL |
949 | def get_unreachable_hosts(self) -> List[HostSpec]: |
950 | """ | |
951 | Return all hosts that are offline or in maintenance mode. | |
952 | ||
953 | The idea is we should not touch the daemons on these hosts (since | |
954 | in theory the hosts are inaccessible so we CAN'T touch them) but | |
955 | we still want to count daemons that exist on these hosts toward the | |
956 | placement so daemons on these hosts aren't just moved elsewhere | |
957 | """ | |
958 | return [ | |
959 | h for h in self.mgr.inventory.all_specs() | |
960 | if ( | |
961 | h.status.lower() in ['maintenance', 'offline'] | |
962 | or h.hostname in self.mgr.offline_hosts | |
963 | ) | |
964 | ] | |
e306af50 | 965 | |
b3b6e05e TL |
966 | def get_facts(self, host: str) -> Dict[str, Any]: |
967 | return self.facts.get(host, {}) | |
968 | ||
20effc67 TL |
969 | def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]: |
970 | for dm in self.daemons.copy().values(): | |
971 | yield from dm.values() | |
972 | ||
e306af50 TL |
973 | def get_daemons(self): |
974 | # type: () -> List[orchestrator.DaemonDescription] | |
20effc67 TL |
975 | return list(self._get_daemons()) |
976 | ||
977 | def get_error_daemons(self) -> List[orchestrator.DaemonDescription]: | |
e306af50 | 978 | r = [] |
20effc67 TL |
979 | for dd in self._get_daemons(): |
980 | if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error: | |
e306af50 TL |
981 | r.append(dd) |
982 | return r | |
983 | ||
b3b6e05e TL |
984 | def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]: |
985 | return list(self.daemons.get(host, {}).values()) | |
986 | ||
20effc67 | 987 | def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription: |
f67539c2 | 988 | assert not daemon_name.startswith('ha-rgw.') |
20effc67 TL |
989 | dds = self.get_daemons_by_host(host) if host else self._get_daemons() |
990 | for dd in dds: | |
991 | if dd.name() == daemon_name: | |
992 | return dd | |
993 | ||
f6b5b4d7 TL |
994 | raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)') |
995 | ||
20effc67 | 996 | def has_daemon(self, daemon_name: str, host: Optional[str] = None) -> bool: |
a4b75251 | 997 | try: |
20effc67 | 998 | self.get_daemon(daemon_name, host) |
a4b75251 TL |
999 | except orchestrator.OrchestratorError: |
1000 | return False | |
1001 | return True | |
1002 | ||
e306af50 | 1003 | def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]: |
adb31ebb | 1004 | def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription: |
f6b5b4d7 | 1005 | dd = copy(dd_orig) |
e306af50 | 1006 | if host in self.mgr.offline_hosts: |
f67539c2 | 1007 | dd.status = orchestrator.DaemonDescriptionStatus.error |
f6b5b4d7 | 1008 | dd.status_desc = 'host is offline' |
b3b6e05e TL |
1009 | elif self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance": |
1010 | # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses | |
1011 | # could be wrong. We must assume maintenance is working and daemons are stopped | |
1012 | dd.status = orchestrator.DaemonDescriptionStatus.stopped | |
f6b5b4d7 TL |
1013 | dd.events = self.mgr.events.get_for_daemon(dd.name()) |
1014 | return dd | |
1015 | ||
20effc67 | 1016 | for host, dm in self.daemons.copy().items(): |
f6b5b4d7 | 1017 | yield host, {name: alter(host, d) for name, d in dm.items()} |
e306af50 TL |
1018 | |
1019 | def get_daemons_by_service(self, service_name): | |
1020 | # type: (str) -> List[orchestrator.DaemonDescription] | |
f67539c2 TL |
1021 | assert not service_name.startswith('keepalived.') |
1022 | assert not service_name.startswith('haproxy.') | |
1023 | ||
20effc67 | 1024 | return list(dd for dd in self._get_daemons() if dd.service_name() == service_name) |
f6b5b4d7 | 1025 | |
20effc67 | 1026 | def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]: |
f67539c2 TL |
1027 | assert service_type not in ['keepalived', 'haproxy'] |
1028 | ||
20effc67 | 1029 | daemons = self.daemons[host].values() if host else self._get_daemons() |
e306af50 | 1030 | |
20effc67 TL |
1031 | return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)] |
1032 | ||
1033 | def get_daemon_types(self, hostname: str) -> Set[str]: | |
f67539c2 | 1034 | """Provide a list of the types of daemons on the host""" |
20effc67 | 1035 | return cast(Set[str], {d.daemon_type for d in self.daemons[hostname].values()}) |
f67539c2 | 1036 | |
e306af50 TL |
1037 | def get_daemon_names(self): |
1038 | # type: () -> List[str] | |
20effc67 | 1039 | return [d.name() for d in self._get_daemons()] |
e306af50 | 1040 | |
adb31ebb | 1041 | def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]: |
e306af50 TL |
1042 | if host in self.daemon_config_deps: |
1043 | if name in self.daemon_config_deps[host]: | |
1044 | return self.daemon_config_deps[host][name].get('deps', []), \ | |
1045 | self.daemon_config_deps[host][name].get('last_config', None) | |
1046 | return None, None | |
1047 | ||
b3b6e05e TL |
1048 | def get_host_client_files(self, host: str) -> Dict[str, Tuple[str, int, int, int]]: |
1049 | return self.last_client_files.get(host, {}) | |
1050 | ||
e306af50 TL |
1051 | def host_needs_daemon_refresh(self, host): |
1052 | # type: (str) -> bool | |
1053 | if host in self.mgr.offline_hosts: | |
1054 | logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh') | |
1055 | return False | |
1056 | if host in self.daemon_refresh_queue: | |
1057 | self.daemon_refresh_queue.remove(host) | |
1058 | return True | |
adb31ebb | 1059 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
1060 | seconds=self.mgr.daemon_cache_timeout) |
1061 | if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff: | |
1062 | return True | |
20effc67 TL |
1063 | if not self.mgr.cache.host_metadata_up_to_date(host): |
1064 | return True | |
e306af50 TL |
1065 | return False |
1066 | ||
adb31ebb TL |
1067 | def host_needs_facts_refresh(self, host): |
1068 | # type: (str) -> bool | |
1069 | if host in self.mgr.offline_hosts: | |
1070 | logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh') | |
1071 | return False | |
f67539c2 | 1072 | cutoff = datetime_now() - datetime.timedelta( |
adb31ebb TL |
1073 | seconds=self.mgr.facts_cache_timeout) |
1074 | if host not in self.last_facts_update or self.last_facts_update[host] < cutoff: | |
1075 | return True | |
20effc67 TL |
1076 | if not self.mgr.cache.host_metadata_up_to_date(host): |
1077 | return True | |
adb31ebb TL |
1078 | return False |
1079 | ||
b3b6e05e TL |
1080 | def host_needs_autotune_memory(self, host): |
1081 | # type: (str) -> bool | |
1082 | if host in self.mgr.offline_hosts: | |
1083 | logger.debug(f'Host "{host}" marked as offline. Skipping autotune') | |
1084 | return False | |
1085 | cutoff = datetime_now() - datetime.timedelta( | |
1086 | seconds=self.mgr.autotune_interval) | |
1087 | if host not in self.last_autotune or self.last_autotune[host] < cutoff: | |
1088 | return True | |
1089 | return False | |
1090 | ||
2a845540 TL |
1091 | def host_needs_tuned_profile_update(self, host: str, profile: str) -> bool: |
1092 | if host in self.mgr.offline_hosts: | |
1093 | logger.debug(f'Host "{host}" marked as offline. Cannot apply tuned profile') | |
1094 | return False | |
1095 | if profile not in self.mgr.tuned_profiles: | |
1096 | logger.debug( | |
1097 | f'Cannot apply tuned profile {profile} on host {host}. Profile does not exist') | |
1098 | return False | |
1099 | if host not in self.last_tuned_profile_update: | |
1100 | return True | |
1101 | last_profile_update = self.mgr.tuned_profiles.last_updated(profile) | |
1102 | if last_profile_update is None: | |
1103 | self.mgr.tuned_profiles.set_last_updated(profile, datetime_now()) | |
1104 | return True | |
1105 | if self.last_tuned_profile_update[host] < last_profile_update: | |
1106 | return True | |
1107 | return False | |
1108 | ||
f91f0fd5 TL |
1109 | def host_had_daemon_refresh(self, host: str) -> bool: |
1110 | """ | |
1111 | ... at least once. | |
1112 | """ | |
1113 | if host in self.last_daemon_update: | |
1114 | return True | |
1115 | if host not in self.daemons: | |
1116 | return False | |
1117 | return bool(self.daemons[host]) | |
1118 | ||
e306af50 TL |
1119 | def host_needs_device_refresh(self, host): |
1120 | # type: (str) -> bool | |
1121 | if host in self.mgr.offline_hosts: | |
1122 | logger.debug(f'Host "{host}" marked as offline. Skipping device refresh') | |
1123 | return False | |
1124 | if host in self.device_refresh_queue: | |
1125 | self.device_refresh_queue.remove(host) | |
1126 | return True | |
adb31ebb | 1127 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
1128 | seconds=self.mgr.device_cache_timeout) |
1129 | if host not in self.last_device_update or self.last_device_update[host] < cutoff: | |
1130 | return True | |
20effc67 TL |
1131 | if not self.mgr.cache.host_metadata_up_to_date(host): |
1132 | return True | |
1133 | return False | |
1134 | ||
1135 | def host_needs_network_refresh(self, host): | |
1136 | # type: (str) -> bool | |
1137 | if host in self.mgr.offline_hosts: | |
1138 | logger.debug(f'Host "{host}" marked as offline. Skipping network refresh') | |
1139 | return False | |
1140 | if host in self.network_refresh_queue: | |
1141 | self.network_refresh_queue.remove(host) | |
1142 | return True | |
1143 | cutoff = datetime_now() - datetime.timedelta( | |
1144 | seconds=self.mgr.device_cache_timeout) | |
1145 | if host not in self.last_network_update or self.last_network_update[host] < cutoff: | |
1146 | return True | |
1147 | if not self.mgr.cache.host_metadata_up_to_date(host): | |
1148 | return True | |
e306af50 TL |
1149 | return False |
1150 | ||
adb31ebb | 1151 | def host_needs_osdspec_preview_refresh(self, host: str) -> bool: |
e306af50 TL |
1152 | if host in self.mgr.offline_hosts: |
1153 | logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh') | |
1154 | return False | |
1155 | if host in self.osdspec_previews_refresh_queue: | |
1156 | self.osdspec_previews_refresh_queue.remove(host) | |
1157 | return True | |
1158 | # Since this is dependent on other factors (device and spec) this does not need | |
1159 | # to be updated periodically. | |
1160 | return False | |
1161 | ||
1162 | def host_needs_check(self, host): | |
1163 | # type: (str) -> bool | |
adb31ebb | 1164 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
1165 | seconds=self.mgr.host_check_interval) |
1166 | return host not in self.last_host_check or self.last_host_check[host] < cutoff | |
1167 | ||
f67539c2 TL |
1168 | def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool: |
1169 | if ( | |
1170 | host not in self.devices | |
1171 | or host not in self.last_device_change | |
1172 | or host not in self.last_device_update | |
1173 | or host not in self.osdspec_last_applied | |
1174 | or spec.service_name() not in self.osdspec_last_applied[host] | |
1175 | ): | |
1176 | return True | |
1177 | created = self.mgr.spec_store.get_created(spec) | |
1178 | if not created or created > self.last_device_change[host]: | |
1179 | return True | |
1180 | return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host] | |
1181 | ||
f91f0fd5 | 1182 | def host_needs_registry_login(self, host: str) -> bool: |
f6b5b4d7 TL |
1183 | if host in self.mgr.offline_hosts: |
1184 | return False | |
1185 | if host in self.registry_login_queue: | |
1186 | self.registry_login_queue.remove(host) | |
1187 | return True | |
1188 | return False | |
1189 | ||
20effc67 TL |
1190 | def host_metadata_up_to_date(self, host: str) -> bool: |
1191 | if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]: | |
1192 | return False | |
1193 | return True | |
1194 | ||
1195 | def all_host_metadata_up_to_date(self) -> bool: | |
1196 | unreachables = [h.hostname for h in self.get_unreachable_hosts()] | |
1197 | if [h for h in self.get_hosts() if (not self.host_metadata_up_to_date(h) and h not in unreachables)]: | |
1198 | # this function is primarily for telling if it's safe to try and apply a service | |
1199 | # spec. Since offline/maintenance hosts aren't considered in that process anyway | |
1200 | # we don't want to return False if the host without up-to-date metadata is in one | |
1201 | # of those two categories. | |
1202 | return False | |
1203 | return True | |
1204 | ||
e306af50 TL |
1205 | def add_daemon(self, host, dd): |
1206 | # type: (str, orchestrator.DaemonDescription) -> None | |
1207 | assert host in self.daemons | |
1208 | self.daemons[host][dd.name()] = dd | |
1209 | ||
adb31ebb | 1210 | def rm_daemon(self, host: str, name: str) -> None: |
f67539c2 TL |
1211 | assert not name.startswith('ha-rgw.') |
1212 | ||
e306af50 TL |
1213 | if host in self.daemons: |
1214 | if name in self.daemons[host]: | |
f6b5b4d7 TL |
1215 | del self.daemons[host][name] |
1216 | ||
adb31ebb | 1217 | def daemon_cache_filled(self) -> bool: |
f6b5b4d7 TL |
1218 | """ |
1219 | i.e. we have checked the daemons for each hosts at least once. | |
1220 | excluding offline hosts. | |
1221 | ||
1222 | We're not checking for `host_needs_daemon_refresh`, as this might never be | |
1223 | False for all hosts. | |
1224 | """ | |
f91f0fd5 | 1225 | return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts) |
f6b5b4d7 TL |
1226 | for h in self.get_hosts()) |
1227 | ||
adb31ebb | 1228 | def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None: |
f67539c2 TL |
1229 | assert not daemon_name.startswith('ha-rgw.') |
1230 | ||
f91f0fd5 TL |
1231 | priorities = { |
1232 | 'start': 1, | |
1233 | 'restart': 2, | |
1234 | 'reconfig': 3, | |
1235 | 'redeploy': 4, | |
1236 | 'stop': 5, | |
39ae355f | 1237 | 'rotate-key': 6, |
f91f0fd5 TL |
1238 | } |
1239 | existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None) | |
1240 | if existing_action and priorities[existing_action] > priorities[action]: | |
1241 | logger.debug( | |
1242 | f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.') | |
1243 | return | |
1244 | ||
1245 | if host not in self.scheduled_daemon_actions: | |
1246 | self.scheduled_daemon_actions[host] = {} | |
1247 | self.scheduled_daemon_actions[host][daemon_name] = action | |
1248 | ||
20effc67 TL |
1249 | def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> bool: |
1250 | found = False | |
f91f0fd5 TL |
1251 | if host in self.scheduled_daemon_actions: |
1252 | if daemon_name in self.scheduled_daemon_actions[host]: | |
1253 | del self.scheduled_daemon_actions[host][daemon_name] | |
20effc67 | 1254 | found = True |
f91f0fd5 TL |
1255 | if not self.scheduled_daemon_actions[host]: |
1256 | del self.scheduled_daemon_actions[host] | |
20effc67 | 1257 | return found |
f91f0fd5 | 1258 | |
adb31ebb | 1259 | def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]: |
f67539c2 TL |
1260 | assert not daemon.startswith('ha-rgw.') |
1261 | ||
f91f0fd5 TL |
1262 | return self.scheduled_daemon_actions.get(host, {}).get(daemon) |
1263 | ||
f6b5b4d7 | 1264 | |
20effc67 TL |
1265 | class AgentCache(): |
1266 | """ | |
1267 | AgentCache is used for storing metadata about agent daemons that must be kept | |
1268 | through MGR failovers | |
1269 | """ | |
1270 | ||
1271 | def __init__(self, mgr): | |
1272 | # type: (CephadmOrchestrator) -> None | |
1273 | self.mgr: CephadmOrchestrator = mgr | |
1274 | self.agent_config_deps = {} # type: Dict[str, Dict[str,Any]] | |
1275 | self.agent_counter = {} # type: Dict[str, int] | |
1276 | self.agent_timestamp = {} # type: Dict[str, datetime.datetime] | |
1277 | self.agent_keys = {} # type: Dict[str, str] | |
1278 | self.agent_ports = {} # type: Dict[str, int] | |
1279 | self.sending_agent_message = {} # type: Dict[str, bool] | |
1280 | ||
1281 | def load(self): | |
1282 | # type: () -> None | |
1283 | for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items(): | |
1284 | host = k[len(AGENT_CACHE_PREFIX):] | |
1285 | if host not in self.mgr.inventory: | |
1286 | self.mgr.log.warning('removing stray AgentCache record for agent on %s' % ( | |
1287 | host)) | |
1288 | self.mgr.set_store(k, None) | |
1289 | try: | |
1290 | j = json.loads(v) | |
1291 | self.agent_config_deps[host] = {} | |
1292 | conf_deps = j.get('agent_config_deps', {}) | |
1293 | if conf_deps: | |
1294 | conf_deps['last_config'] = str_to_datetime(conf_deps['last_config']) | |
1295 | self.agent_config_deps[host] = conf_deps | |
1296 | self.agent_counter[host] = int(j.get('agent_counter', 1)) | |
1297 | self.agent_timestamp[host] = str_to_datetime( | |
1298 | j.get('agent_timestamp', datetime_to_str(datetime_now()))) | |
1299 | self.agent_keys[host] = str(j.get('agent_keys', '')) | |
1300 | agent_port = int(j.get('agent_ports', 0)) | |
1301 | if agent_port: | |
1302 | self.agent_ports[host] = agent_port | |
1303 | ||
1304 | except Exception as e: | |
1305 | self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % ( | |
1306 | host, e)) | |
1307 | pass | |
1308 | ||
1309 | def save_agent(self, host: str) -> None: | |
1310 | j: Dict[str, Any] = {} | |
1311 | if host in self.agent_config_deps: | |
1312 | j['agent_config_deps'] = { | |
1313 | 'deps': self.agent_config_deps[host].get('deps', []), | |
1314 | 'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']), | |
1315 | } | |
1316 | if host in self.agent_counter: | |
1317 | j['agent_counter'] = self.agent_counter[host] | |
1318 | if host in self.agent_keys: | |
1319 | j['agent_keys'] = self.agent_keys[host] | |
1320 | if host in self.agent_ports: | |
1321 | j['agent_ports'] = self.agent_ports[host] | |
1322 | if host in self.agent_timestamp: | |
1323 | j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host]) | |
1324 | ||
1325 | self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j)) | |
1326 | ||
1327 | def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None: | |
1328 | self.agent_config_deps[host] = { | |
1329 | 'deps': deps, | |
1330 | 'last_config': stamp, | |
1331 | } | |
1332 | ||
1333 | def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]: | |
1334 | if host in self.agent_config_deps: | |
1335 | return self.agent_config_deps[host].get('deps', []), \ | |
1336 | self.agent_config_deps[host].get('last_config', None) | |
1337 | return None, None | |
1338 | ||
1339 | def messaging_agent(self, host: str) -> bool: | |
1340 | if host not in self.sending_agent_message or not self.sending_agent_message[host]: | |
1341 | return False | |
1342 | return True | |
1343 | ||
1344 | def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None: | |
1345 | # agent successfully received new config. Update config/deps | |
1346 | assert daemon_spec.service_name == 'agent' | |
1347 | self.update_agent_config_deps( | |
1348 | daemon_spec.host, daemon_spec.deps, datetime_now()) | |
1349 | self.agent_timestamp[daemon_spec.host] = datetime_now() | |
1350 | self.agent_counter[daemon_spec.host] = 1 | |
1351 | self.save_agent(daemon_spec.host) | |
1352 | ||
1353 | ||
f6b5b4d7 TL |
1354 | class EventStore(): |
1355 | def __init__(self, mgr): | |
1356 | # type: (CephadmOrchestrator) -> None | |
1357 | self.mgr: CephadmOrchestrator = mgr | |
f91f0fd5 | 1358 | self.events = {} # type: Dict[str, List[OrchestratorEvent]] |
f6b5b4d7 TL |
1359 | |
1360 | def add(self, event: OrchestratorEvent) -> None: | |
1361 | ||
1362 | if event.kind_subject() not in self.events: | |
1363 | self.events[event.kind_subject()] = [event] | |
1364 | ||
1365 | for e in self.events[event.kind_subject()]: | |
1366 | if e.message == event.message: | |
1367 | return | |
1368 | ||
1369 | self.events[event.kind_subject()].append(event) | |
1370 | ||
1371 | # limit to five events for now. | |
1372 | self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:] | |
1373 | ||
adb31ebb TL |
1374 | def for_service(self, spec: ServiceSpec, level: str, message: str) -> None: |
1375 | e = OrchestratorEvent(datetime_now(), 'service', | |
f91f0fd5 | 1376 | spec.service_name(), level, message) |
f6b5b4d7 TL |
1377 | self.add(e) |
1378 | ||
adb31ebb | 1379 | def from_orch_error(self, e: OrchestratorError) -> None: |
f6b5b4d7 TL |
1380 | if e.event_subject is not None: |
1381 | self.add(OrchestratorEvent( | |
adb31ebb | 1382 | datetime_now(), |
f6b5b4d7 TL |
1383 | e.event_subject[0], |
1384 | e.event_subject[1], | |
1385 | "ERROR", | |
1386 | str(e) | |
1387 | )) | |
1388 | ||
adb31ebb TL |
1389 | def for_daemon(self, daemon_name: str, level: str, message: str) -> None: |
1390 | e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message) | |
f6b5b4d7 TL |
1391 | self.add(e) |
1392 | ||
adb31ebb | 1393 | def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None: |
f6b5b4d7 TL |
1394 | self.for_daemon( |
1395 | daemon_name, | |
1396 | "ERROR", | |
1397 | str(e) | |
1398 | ) | |
1399 | ||
1400 | def cleanup(self) -> None: | |
1401 | # Needs to be properly done, in case events are persistently stored. | |
1402 | ||
1403 | unknowns: List[str] = [] | |
1404 | daemons = self.mgr.cache.get_daemon_names() | |
f67539c2 | 1405 | specs = self.mgr.spec_store.all_specs.keys() |
f6b5b4d7 TL |
1406 | for k_s, v in self.events.items(): |
1407 | kind, subject = k_s.split(':') | |
1408 | if kind == 'service': | |
1409 | if subject not in specs: | |
1410 | unknowns.append(k_s) | |
1411 | elif kind == 'daemon': | |
1412 | if subject not in daemons: | |
1413 | unknowns.append(k_s) | |
1414 | ||
1415 | for k_s in unknowns: | |
1416 | del self.events[k_s] | |
1417 | ||
adb31ebb | 1418 | def get_for_service(self, name: str) -> List[OrchestratorEvent]: |
f6b5b4d7 TL |
1419 | return self.events.get('service:' + name, []) |
1420 | ||
adb31ebb | 1421 | def get_for_daemon(self, name: str) -> List[OrchestratorEvent]: |
f6b5b4d7 | 1422 | return self.events.get('daemon:' + name, []) |