]>
Commit | Line | Data |
---|---|---|
e306af50 TL |
1 | import datetime |
2 | from copy import copy | |
b3b6e05e | 3 | import ipaddress |
e306af50 TL |
4 | import json |
5 | import logging | |
b3b6e05e | 6 | import socket |
f67539c2 | 7 | from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \ |
b3b6e05e | 8 | NamedTuple, Type |
e306af50 TL |
9 | |
10 | import orchestrator | |
11 | from ceph.deployment import inventory | |
b3b6e05e | 12 | from ceph.deployment.service_spec import ServiceSpec, PlacementSpec |
adb31ebb | 13 | from ceph.utils import str_to_datetime, datetime_to_str, datetime_now |
f67539c2 | 14 | from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types |
20effc67 | 15 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec |
e306af50 | 16 | |
b3b6e05e | 17 | from .utils import resolve_ip |
a4b75251 | 18 | from .migrations import queue_migrate_nfs_spec |
b3b6e05e | 19 | |
e306af50 TL |
20 | if TYPE_CHECKING: |
21 | from .module import CephadmOrchestrator | |
22 | ||
23 | ||
24 | logger = logging.getLogger(__name__) | |
25 | ||
26 | HOST_CACHE_PREFIX = "host." | |
27 | SPEC_STORE_PREFIX = "spec." | |
20effc67 | 28 | AGENT_CACHE_PREFIX = 'agent.' |
e306af50 TL |
29 | |
30 | ||
31 | class Inventory: | |
adb31ebb TL |
32 | """ |
33 | The inventory stores a HostSpec for all hosts persistently. | |
34 | """ | |
35 | ||
e306af50 TL |
36 | def __init__(self, mgr: 'CephadmOrchestrator'): |
37 | self.mgr = mgr | |
b3b6e05e TL |
38 | adjusted_addrs = False |
39 | ||
40 | def is_valid_ip(ip: str) -> bool: | |
41 | try: | |
42 | ipaddress.ip_address(ip) | |
43 | return True | |
44 | except ValueError: | |
45 | return False | |
46 | ||
e306af50 TL |
47 | # load inventory |
48 | i = self.mgr.get_store('inventory') | |
49 | if i: | |
50 | self._inventory: Dict[str, dict] = json.loads(i) | |
adb31ebb TL |
51 | # handle old clusters missing 'hostname' key from hostspec |
52 | for k, v in self._inventory.items(): | |
53 | if 'hostname' not in v: | |
54 | v['hostname'] = k | |
b3b6e05e TL |
55 | |
56 | # convert legacy non-IP addr? | |
57 | if is_valid_ip(str(v.get('addr'))): | |
58 | continue | |
59 | if len(self._inventory) > 1: | |
60 | if k == socket.gethostname(): | |
61 | # Never try to resolve our own host! This is | |
62 | # fraught and can lead to either a loopback | |
63 | # address (due to podman's futzing with | |
64 | # /etc/hosts) or a private IP based on the CNI | |
65 | # configuration. Instead, wait until the mgr | |
66 | # fails over to another host and let them resolve | |
67 | # this host. | |
68 | continue | |
69 | ip = resolve_ip(cast(str, v.get('addr'))) | |
70 | else: | |
71 | # we only have 1 node in the cluster, so we can't | |
72 | # rely on another host doing the lookup. use the | |
73 | # IP the mgr binds to. | |
74 | ip = self.mgr.get_mgr_ip() | |
75 | if is_valid_ip(ip) and not ip.startswith('127.0.'): | |
76 | self.mgr.log.info( | |
77 | f"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'" | |
78 | ) | |
79 | v['addr'] = ip | |
80 | adjusted_addrs = True | |
81 | if adjusted_addrs: | |
82 | self.save() | |
e306af50 TL |
83 | else: |
84 | self._inventory = dict() | |
85 | logger.debug('Loaded inventory %s' % self._inventory) | |
86 | ||
87 | def keys(self) -> List[str]: | |
88 | return list(self._inventory.keys()) | |
89 | ||
90 | def __contains__(self, host: str) -> bool: | |
91 | return host in self._inventory | |
92 | ||
adb31ebb | 93 | def assert_host(self, host: str) -> None: |
e306af50 TL |
94 | if host not in self._inventory: |
95 | raise OrchestratorError('host %s does not exist' % host) | |
96 | ||
adb31ebb | 97 | def add_host(self, spec: HostSpec) -> None: |
a4b75251 TL |
98 | if spec.hostname in self._inventory: |
99 | # addr | |
100 | if self.get_addr(spec.hostname) != spec.addr: | |
101 | self.set_addr(spec.hostname, spec.addr) | |
102 | # labels | |
103 | for label in spec.labels: | |
104 | self.add_label(spec.hostname, label) | |
105 | else: | |
106 | self._inventory[spec.hostname] = spec.to_json() | |
107 | self.save() | |
e306af50 | 108 | |
adb31ebb | 109 | def rm_host(self, host: str) -> None: |
e306af50 TL |
110 | self.assert_host(host) |
111 | del self._inventory[host] | |
112 | self.save() | |
113 | ||
adb31ebb | 114 | def set_addr(self, host: str, addr: str) -> None: |
e306af50 TL |
115 | self.assert_host(host) |
116 | self._inventory[host]['addr'] = addr | |
117 | self.save() | |
118 | ||
adb31ebb | 119 | def add_label(self, host: str, label: str) -> None: |
e306af50 TL |
120 | self.assert_host(host) |
121 | ||
122 | if 'labels' not in self._inventory[host]: | |
123 | self._inventory[host]['labels'] = list() | |
124 | if label not in self._inventory[host]['labels']: | |
125 | self._inventory[host]['labels'].append(label) | |
126 | self.save() | |
127 | ||
adb31ebb | 128 | def rm_label(self, host: str, label: str) -> None: |
e306af50 TL |
129 | self.assert_host(host) |
130 | ||
131 | if 'labels' not in self._inventory[host]: | |
132 | self._inventory[host]['labels'] = list() | |
133 | if label in self._inventory[host]['labels']: | |
134 | self._inventory[host]['labels'].remove(label) | |
135 | self.save() | |
136 | ||
b3b6e05e TL |
137 | def has_label(self, host: str, label: str) -> bool: |
138 | return ( | |
139 | host in self._inventory | |
140 | and label in self._inventory[host].get('labels', []) | |
141 | ) | |
142 | ||
adb31ebb | 143 | def get_addr(self, host: str) -> str: |
e306af50 TL |
144 | self.assert_host(host) |
145 | return self._inventory[host].get('addr', host) | |
146 | ||
adb31ebb | 147 | def spec_from_dict(self, info: dict) -> HostSpec: |
e306af50 TL |
148 | hostname = info['hostname'] |
149 | return HostSpec( | |
f91f0fd5 TL |
150 | hostname, |
151 | addr=info.get('addr', hostname), | |
152 | labels=info.get('labels', []), | |
153 | status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''), | |
154 | ) | |
e306af50 | 155 | |
f91f0fd5 TL |
156 | def all_specs(self) -> List[HostSpec]: |
157 | return list(map(self.spec_from_dict, self._inventory.values())) | |
e306af50 | 158 | |
f67539c2 TL |
159 | def get_host_with_state(self, state: str = "") -> List[str]: |
160 | """return a list of host names in a specific state""" | |
161 | return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state] | |
162 | ||
adb31ebb | 163 | def save(self) -> None: |
e306af50 TL |
164 | self.mgr.set_store('inventory', json.dumps(self._inventory)) |
165 | ||
166 | ||
f67539c2 TL |
167 | class SpecDescription(NamedTuple): |
168 | spec: ServiceSpec | |
b3b6e05e | 169 | rank_map: Optional[Dict[int, Dict[int, Optional[str]]]] |
f67539c2 TL |
170 | created: datetime.datetime |
171 | deleted: Optional[datetime.datetime] | |
172 | ||
173 | ||
e306af50 TL |
174 | class SpecStore(): |
175 | def __init__(self, mgr): | |
176 | # type: (CephadmOrchestrator) -> None | |
177 | self.mgr = mgr | |
f67539c2 | 178 | self._specs = {} # type: Dict[str, ServiceSpec] |
b3b6e05e TL |
179 | # service_name -> rank -> gen -> daemon_id |
180 | self._rank_maps = {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]] | |
f91f0fd5 | 181 | self.spec_created = {} # type: Dict[str, datetime.datetime] |
f67539c2 | 182 | self.spec_deleted = {} # type: Dict[str, datetime.datetime] |
f91f0fd5 | 183 | self.spec_preview = {} # type: Dict[str, ServiceSpec] |
e306af50 | 184 | |
f67539c2 TL |
185 | @property |
186 | def all_specs(self) -> Mapping[str, ServiceSpec]: | |
187 | """ | |
188 | returns active and deleted specs. Returns read-only dict. | |
189 | """ | |
190 | return self._specs | |
191 | ||
192 | def __contains__(self, name: str) -> bool: | |
193 | return name in self._specs | |
194 | ||
195 | def __getitem__(self, name: str) -> SpecDescription: | |
196 | if name not in self._specs: | |
197 | raise OrchestratorError(f'Service {name} not found.') | |
198 | return SpecDescription(self._specs[name], | |
b3b6e05e | 199 | self._rank_maps.get(name), |
f67539c2 TL |
200 | self.spec_created[name], |
201 | self.spec_deleted.get(name, None)) | |
202 | ||
203 | @property | |
204 | def active_specs(self) -> Mapping[str, ServiceSpec]: | |
205 | return {k: v for k, v in self._specs.items() if k not in self.spec_deleted} | |
206 | ||
e306af50 TL |
207 | def load(self): |
208 | # type: () -> None | |
f67539c2 | 209 | for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items(): |
e306af50 TL |
210 | service_name = k[len(SPEC_STORE_PREFIX):] |
211 | try: | |
f67539c2 | 212 | j = cast(Dict[str, dict], json.loads(v)) |
a4b75251 TL |
213 | if ( |
214 | (self.mgr.migration_current or 0) < 3 | |
215 | and j['spec'].get('service_type') == 'nfs' | |
216 | ): | |
217 | self.mgr.log.debug(f'found legacy nfs spec {j}') | |
218 | queue_migrate_nfs_spec(self.mgr, j) | |
f67539c2 TL |
219 | spec = ServiceSpec.from_json(j['spec']) |
220 | created = str_to_datetime(cast(str, j['created'])) | |
221 | self._specs[service_name] = spec | |
e306af50 | 222 | self.spec_created[service_name] = created |
f67539c2 | 223 | |
b3b6e05e | 224 | if 'deleted' in j: |
f67539c2 TL |
225 | deleted = str_to_datetime(cast(str, j['deleted'])) |
226 | self.spec_deleted[service_name] = deleted | |
227 | ||
b3b6e05e TL |
228 | if 'rank_map' in j and isinstance(j['rank_map'], dict): |
229 | self._rank_maps[service_name] = {} | |
230 | for rank_str, m in j['rank_map'].items(): | |
231 | try: | |
232 | rank = int(rank_str) | |
233 | except ValueError: | |
234 | logger.exception(f"failed to parse rank in {j['rank_map']}") | |
235 | continue | |
236 | if isinstance(m, dict): | |
237 | self._rank_maps[service_name][rank] = {} | |
238 | for gen_str, name in m.items(): | |
239 | try: | |
240 | gen = int(gen_str) | |
241 | except ValueError: | |
242 | logger.exception(f"failed to parse gen in {j['rank_map']}") | |
243 | continue | |
244 | if isinstance(name, str) or m is None: | |
245 | self._rank_maps[service_name][rank][gen] = name | |
246 | ||
e306af50 TL |
247 | self.mgr.log.debug('SpecStore: loaded spec for %s' % ( |
248 | service_name)) | |
249 | except Exception as e: | |
250 | self.mgr.log.warning('unable to load spec for %s: %s' % ( | |
251 | service_name, e)) | |
252 | pass | |
253 | ||
b3b6e05e TL |
254 | def save( |
255 | self, | |
256 | spec: ServiceSpec, | |
257 | update_create: bool = True, | |
258 | ) -> None: | |
f67539c2 | 259 | name = spec.service_name() |
f6b5b4d7 | 260 | if spec.preview_only: |
f67539c2 | 261 | self.spec_preview[name] = spec |
f6b5b4d7 | 262 | return None |
f67539c2 TL |
263 | self._specs[name] = spec |
264 | ||
265 | if update_create: | |
266 | self.spec_created[name] = datetime_now() | |
b3b6e05e | 267 | self._save(name) |
f67539c2 | 268 | |
b3b6e05e TL |
269 | def save_rank_map(self, |
270 | name: str, | |
271 | rank_map: Dict[int, Dict[int, Optional[str]]]) -> None: | |
272 | self._rank_maps[name] = rank_map | |
273 | self._save(name) | |
274 | ||
275 | def _save(self, name: str) -> None: | |
276 | data: Dict[str, Any] = { | |
277 | 'spec': self._specs[name].to_json(), | |
f67539c2 TL |
278 | 'created': datetime_to_str(self.spec_created[name]), |
279 | } | |
b3b6e05e TL |
280 | if name in self._rank_maps: |
281 | data['rank_map'] = self._rank_maps[name] | |
f67539c2 TL |
282 | if name in self.spec_deleted: |
283 | data['deleted'] = datetime_to_str(self.spec_deleted[name]) | |
284 | ||
e306af50 | 285 | self.mgr.set_store( |
f67539c2 TL |
286 | SPEC_STORE_PREFIX + name, |
287 | json.dumps(data, sort_keys=True), | |
e306af50 | 288 | ) |
b3b6e05e TL |
289 | self.mgr.events.for_service(self._specs[name], |
290 | OrchestratorEvent.INFO, | |
291 | 'service was created') | |
e306af50 | 292 | |
f67539c2 TL |
293 | def rm(self, service_name: str) -> bool: |
294 | if service_name not in self._specs: | |
295 | return False | |
296 | ||
297 | if self._specs[service_name].preview_only: | |
298 | self.finally_rm(service_name) | |
299 | return True | |
300 | ||
301 | self.spec_deleted[service_name] = datetime_now() | |
302 | self.save(self._specs[service_name], update_create=False) | |
303 | return True | |
304 | ||
305 | def finally_rm(self, service_name): | |
e306af50 | 306 | # type: (str) -> bool |
f67539c2 | 307 | found = service_name in self._specs |
e306af50 | 308 | if found: |
f67539c2 | 309 | del self._specs[service_name] |
b3b6e05e TL |
310 | if service_name in self._rank_maps: |
311 | del self._rank_maps[service_name] | |
e306af50 | 312 | del self.spec_created[service_name] |
f67539c2 TL |
313 | if service_name in self.spec_deleted: |
314 | del self.spec_deleted[service_name] | |
e306af50 TL |
315 | self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None) |
316 | return found | |
317 | ||
f67539c2 TL |
318 | def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]: |
319 | return self.spec_created.get(spec.service_name()) | |
e306af50 | 320 | |
f91f0fd5 | 321 | |
b3b6e05e TL |
322 | class ClientKeyringSpec(object): |
323 | """ | |
324 | A client keyring file that we should maintain | |
325 | """ | |
326 | ||
327 | def __init__( | |
328 | self, | |
329 | entity: str, | |
330 | placement: PlacementSpec, | |
331 | mode: Optional[int] = None, | |
332 | uid: Optional[int] = None, | |
333 | gid: Optional[int] = None, | |
334 | ) -> None: | |
335 | self.entity = entity | |
336 | self.placement = placement | |
337 | self.mode = mode or 0o600 | |
338 | self.uid = uid or 0 | |
339 | self.gid = gid or 0 | |
340 | ||
341 | def validate(self) -> None: | |
342 | pass | |
343 | ||
344 | def to_json(self) -> Dict[str, Any]: | |
345 | return { | |
346 | 'entity': self.entity, | |
347 | 'placement': self.placement.to_json(), | |
348 | 'mode': self.mode, | |
349 | 'uid': self.uid, | |
350 | 'gid': self.gid, | |
351 | } | |
352 | ||
353 | @property | |
354 | def path(self) -> str: | |
355 | return f'/etc/ceph/ceph.{self.entity}.keyring' | |
356 | ||
357 | @classmethod | |
358 | def from_json(cls: Type, data: dict) -> 'ClientKeyringSpec': | |
359 | c = data.copy() | |
360 | if 'placement' in c: | |
361 | c['placement'] = PlacementSpec.from_json(c['placement']) | |
362 | _cls = cls(**c) | |
363 | _cls.validate() | |
364 | return _cls | |
365 | ||
366 | ||
367 | class ClientKeyringStore(): | |
368 | """ | |
369 | Track client keyring files that we are supposed to maintain | |
370 | """ | |
371 | ||
372 | def __init__(self, mgr): | |
373 | # type: (CephadmOrchestrator) -> None | |
374 | self.mgr: CephadmOrchestrator = mgr | |
375 | self.mgr = mgr | |
376 | self.keys: Dict[str, ClientKeyringSpec] = {} | |
377 | ||
378 | def load(self) -> None: | |
379 | c = self.mgr.get_store('client_keyrings') or b'{}' | |
380 | j = json.loads(c) | |
381 | for e, d in j.items(): | |
382 | self.keys[e] = ClientKeyringSpec.from_json(d) | |
383 | ||
384 | def save(self) -> None: | |
385 | data = { | |
386 | k: v.to_json() for k, v in self.keys.items() | |
387 | } | |
388 | self.mgr.set_store('client_keyrings', json.dumps(data)) | |
389 | ||
390 | def update(self, ks: ClientKeyringSpec) -> None: | |
391 | self.keys[ks.entity] = ks | |
392 | self.save() | |
393 | ||
394 | def rm(self, entity: str) -> None: | |
395 | if entity in self.keys: | |
396 | del self.keys[entity] | |
397 | self.save() | |
398 | ||
399 | ||
e306af50 | 400 | class HostCache(): |
adb31ebb TL |
401 | """ |
402 | HostCache stores different things: | |
403 | ||
404 | 1. `daemons`: Deployed daemons O(daemons) | |
405 | ||
406 | They're part of the configuration nowadays and need to be | |
407 | persistent. The name "daemon cache" is unfortunately a bit misleading. | |
408 | Like for example we really need to know where daemons are deployed on | |
409 | hosts that are offline. | |
410 | ||
411 | 2. `devices`: ceph-volume inventory cache O(hosts) | |
412 | ||
413 | As soon as this is populated, it becomes more or less read-only. | |
414 | ||
415 | 3. `networks`: network interfaces for each host. O(hosts) | |
416 | ||
417 | This is needed in order to deploy MONs. As this is mostly read-only. | |
418 | ||
b3b6e05e | 419 | 4. `last_client_files` O(hosts) |
adb31ebb | 420 | |
b3b6e05e TL |
421 | Stores the last digest and owner/mode for files we've pushed to /etc/ceph |
422 | (ceph.conf or client keyrings). | |
adb31ebb TL |
423 | |
424 | 5. `scheduled_daemon_actions`: O(daemons) | |
425 | ||
426 | Used to run daemon actions after deploying a daemon. We need to | |
427 | store it persistently, in order to stay consistent across | |
f67539c2 | 428 | MGR failovers. |
adb31ebb TL |
429 | """ |
430 | ||
e306af50 TL |
431 | def __init__(self, mgr): |
432 | # type: (CephadmOrchestrator) -> None | |
433 | self.mgr: CephadmOrchestrator = mgr | |
434 | self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] | |
435 | self.last_daemon_update = {} # type: Dict[str, datetime.datetime] | |
436 | self.devices = {} # type: Dict[str, List[inventory.Device]] | |
adb31ebb TL |
437 | self.facts = {} # type: Dict[str, Dict[str, Any]] |
438 | self.last_facts_update = {} # type: Dict[str, datetime.datetime] | |
b3b6e05e | 439 | self.last_autotune = {} # type: Dict[str, datetime.datetime] |
e306af50 | 440 | self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]] |
f67539c2 TL |
441 | self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]] |
442 | self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]] | |
20effc67 | 443 | self.last_network_update = {} # type: Dict[str, datetime.datetime] |
e306af50 | 444 | self.last_device_update = {} # type: Dict[str, datetime.datetime] |
f67539c2 | 445 | self.last_device_change = {} # type: Dict[str, datetime.datetime] |
f91f0fd5 TL |
446 | self.daemon_refresh_queue = [] # type: List[str] |
447 | self.device_refresh_queue = [] # type: List[str] | |
20effc67 | 448 | self.network_refresh_queue = [] # type: List[str] |
f91f0fd5 | 449 | self.osdspec_previews_refresh_queue = [] # type: List[str] |
f6b5b4d7 TL |
450 | |
451 | # host -> daemon name -> dict | |
e306af50 TL |
452 | self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]] |
453 | self.last_host_check = {} # type: Dict[str, datetime.datetime] | |
454 | self.loading_osdspec_preview = set() # type: Set[str] | |
b3b6e05e | 455 | self.last_client_files: Dict[str, Dict[str, Tuple[str, int, int, int]]] = {} |
f6b5b4d7 | 456 | self.registry_login_queue: Set[str] = set() |
e306af50 | 457 | |
f91f0fd5 TL |
458 | self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {} |
459 | ||
20effc67 TL |
460 | self.metadata_up_to_date = {} # type: Dict[str, bool] |
461 | ||
e306af50 TL |
462 | def load(self): |
463 | # type: () -> None | |
f67539c2 | 464 | for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items(): |
e306af50 TL |
465 | host = k[len(HOST_CACHE_PREFIX):] |
466 | if host not in self.mgr.inventory: | |
467 | self.mgr.log.warning('removing stray HostCache host record %s' % ( | |
468 | host)) | |
469 | self.mgr.set_store(k, None) | |
470 | try: | |
471 | j = json.loads(v) | |
472 | if 'last_device_update' in j: | |
f91f0fd5 | 473 | self.last_device_update[host] = str_to_datetime(j['last_device_update']) |
e306af50 TL |
474 | else: |
475 | self.device_refresh_queue.append(host) | |
f67539c2 TL |
476 | if 'last_device_change' in j: |
477 | self.last_device_change[host] = str_to_datetime(j['last_device_change']) | |
e306af50 TL |
478 | # for services, we ignore the persisted last_*_update |
479 | # and always trigger a new scrape on mgr restart. | |
480 | self.daemon_refresh_queue.append(host) | |
20effc67 | 481 | self.network_refresh_queue.append(host) |
e306af50 TL |
482 | self.daemons[host] = {} |
483 | self.osdspec_previews[host] = [] | |
f67539c2 | 484 | self.osdspec_last_applied[host] = {} |
e306af50 TL |
485 | self.devices[host] = [] |
486 | self.networks[host] = {} | |
487 | self.daemon_config_deps[host] = {} | |
488 | for name, d in j.get('daemons', {}).items(): | |
489 | self.daemons[host][name] = \ | |
490 | orchestrator.DaemonDescription.from_json(d) | |
491 | for d in j.get('devices', []): | |
492 | self.devices[host].append(inventory.Device.from_json(d)) | |
f67539c2 | 493 | self.networks[host] = j.get('networks_and_interfaces', {}) |
e306af50 | 494 | self.osdspec_previews[host] = j.get('osdspec_previews', {}) |
b3b6e05e | 495 | self.last_client_files[host] = j.get('last_client_files', {}) |
f67539c2 TL |
496 | for name, ts in j.get('osdspec_last_applied', {}).items(): |
497 | self.osdspec_last_applied[host][name] = str_to_datetime(ts) | |
e306af50 TL |
498 | |
499 | for name, d in j.get('daemon_config_deps', {}).items(): | |
500 | self.daemon_config_deps[host][name] = { | |
501 | 'deps': d.get('deps', []), | |
f91f0fd5 | 502 | 'last_config': str_to_datetime(d['last_config']), |
e306af50 TL |
503 | } |
504 | if 'last_host_check' in j: | |
f91f0fd5 | 505 | self.last_host_check[host] = str_to_datetime(j['last_host_check']) |
f6b5b4d7 | 506 | self.registry_login_queue.add(host) |
f91f0fd5 | 507 | self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {}) |
20effc67 | 508 | self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False) |
f91f0fd5 | 509 | |
e306af50 TL |
510 | self.mgr.log.debug( |
511 | 'HostCache.load: host %s has %d daemons, ' | |
512 | '%d devices, %d networks' % ( | |
513 | host, len(self.daemons[host]), len(self.devices[host]), | |
514 | len(self.networks[host]))) | |
515 | except Exception as e: | |
516 | self.mgr.log.warning('unable to load cached state for %s: %s' % ( | |
517 | host, e)) | |
518 | pass | |
519 | ||
520 | def update_host_daemons(self, host, dm): | |
521 | # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None | |
522 | self.daemons[host] = dm | |
adb31ebb TL |
523 | self.last_daemon_update[host] = datetime_now() |
524 | ||
525 | def update_host_facts(self, host, facts): | |
526 | # type: (str, Dict[str, Dict[str, Any]]) -> None | |
527 | self.facts[host] = facts | |
f67539c2 TL |
528 | self.last_facts_update[host] = datetime_now() |
529 | ||
b3b6e05e TL |
530 | def update_autotune(self, host: str) -> None: |
531 | self.last_autotune[host] = datetime_now() | |
532 | ||
533 | def invalidate_autotune(self, host: str) -> None: | |
534 | if host in self.last_autotune: | |
535 | del self.last_autotune[host] | |
536 | ||
f67539c2 TL |
537 | def devices_changed(self, host: str, b: List[inventory.Device]) -> bool: |
538 | a = self.devices[host] | |
539 | if len(a) != len(b): | |
540 | return True | |
541 | aj = {d.path: d.to_json() for d in a} | |
542 | bj = {d.path: d.to_json() for d in b} | |
543 | if aj != bj: | |
544 | self.mgr.log.info("Detected new or changed devices on %s" % host) | |
545 | return True | |
546 | return False | |
e306af50 | 547 | |
20effc67 | 548 | def update_host_devices( |
f67539c2 TL |
549 | self, |
550 | host: str, | |
551 | dls: List[inventory.Device], | |
f67539c2 TL |
552 | ) -> None: |
553 | if ( | |
554 | host not in self.devices | |
555 | or host not in self.last_device_change | |
556 | or self.devices_changed(host, dls) | |
557 | ): | |
558 | self.last_device_change[host] = datetime_now() | |
559 | self.last_device_update[host] = datetime_now() | |
e306af50 | 560 | self.devices[host] = dls |
20effc67 TL |
561 | |
562 | def update_host_networks( | |
563 | self, | |
564 | host: str, | |
565 | nets: Dict[str, Dict[str, List[str]]] | |
566 | ) -> None: | |
e306af50 | 567 | self.networks[host] = nets |
20effc67 | 568 | self.last_network_update[host] = datetime_now() |
e306af50 | 569 | |
adb31ebb | 570 | def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None: |
e306af50 TL |
571 | self.daemon_config_deps[host][name] = { |
572 | 'deps': deps, | |
573 | 'last_config': stamp, | |
574 | } | |
575 | ||
576 | def update_last_host_check(self, host): | |
577 | # type: (str) -> None | |
adb31ebb | 578 | self.last_host_check[host] = datetime_now() |
e306af50 | 579 | |
f67539c2 TL |
580 | def update_osdspec_last_applied(self, host, service_name, ts): |
581 | # type: (str, str, datetime.datetime) -> None | |
582 | self.osdspec_last_applied[host][service_name] = ts | |
583 | ||
b3b6e05e TL |
584 | def update_client_file(self, |
585 | host: str, | |
586 | path: str, | |
587 | digest: str, | |
588 | mode: int, | |
589 | uid: int, | |
590 | gid: int) -> None: | |
591 | if host not in self.last_client_files: | |
592 | self.last_client_files[host] = {} | |
593 | self.last_client_files[host][path] = (digest, mode, uid, gid) | |
594 | ||
595 | def removed_client_file(self, host: str, path: str) -> None: | |
596 | if ( | |
597 | host in self.last_client_files | |
598 | and path in self.last_client_files[host] | |
599 | ): | |
600 | del self.last_client_files[host][path] | |
601 | ||
e306af50 TL |
602 | def prime_empty_host(self, host): |
603 | # type: (str) -> None | |
604 | """ | |
605 | Install an empty entry for a host | |
606 | """ | |
607 | self.daemons[host] = {} | |
608 | self.devices[host] = [] | |
609 | self.networks[host] = {} | |
610 | self.osdspec_previews[host] = [] | |
f67539c2 | 611 | self.osdspec_last_applied[host] = {} |
e306af50 TL |
612 | self.daemon_config_deps[host] = {} |
613 | self.daemon_refresh_queue.append(host) | |
614 | self.device_refresh_queue.append(host) | |
20effc67 | 615 | self.network_refresh_queue.append(host) |
e306af50 | 616 | self.osdspec_previews_refresh_queue.append(host) |
f6b5b4d7 | 617 | self.registry_login_queue.add(host) |
b3b6e05e | 618 | self.last_client_files[host] = {} |
e306af50 | 619 | |
a4b75251 TL |
620 | def refresh_all_host_info(self, host): |
621 | # type: (str) -> None | |
622 | ||
623 | self.last_host_check.pop(host, None) | |
624 | self.daemon_refresh_queue.append(host) | |
625 | self.registry_login_queue.add(host) | |
626 | self.device_refresh_queue.append(host) | |
627 | self.last_facts_update.pop(host, None) | |
628 | self.osdspec_previews_refresh_queue.append(host) | |
629 | self.last_autotune.pop(host, None) | |
630 | ||
e306af50 TL |
631 | def invalidate_host_daemons(self, host): |
632 | # type: (str) -> None | |
633 | self.daemon_refresh_queue.append(host) | |
634 | if host in self.last_daemon_update: | |
635 | del self.last_daemon_update[host] | |
636 | self.mgr.event.set() | |
637 | ||
638 | def invalidate_host_devices(self, host): | |
639 | # type: (str) -> None | |
640 | self.device_refresh_queue.append(host) | |
641 | if host in self.last_device_update: | |
642 | del self.last_device_update[host] | |
643 | self.mgr.event.set() | |
f91f0fd5 | 644 | |
20effc67 TL |
645 | def invalidate_host_networks(self, host): |
646 | # type: (str) -> None | |
647 | self.network_refresh_queue.append(host) | |
648 | if host in self.last_network_update: | |
649 | del self.last_network_update[host] | |
650 | self.mgr.event.set() | |
651 | ||
adb31ebb | 652 | def distribute_new_registry_login_info(self) -> None: |
f6b5b4d7 | 653 | self.registry_login_queue = set(self.mgr.inventory.keys()) |
e306af50 | 654 | |
f91f0fd5 TL |
655 | def save_host(self, host: str) -> None: |
656 | j: Dict[str, Any] = { | |
e306af50 TL |
657 | 'daemons': {}, |
658 | 'devices': [], | |
659 | 'osdspec_previews': [], | |
f67539c2 | 660 | 'osdspec_last_applied': {}, |
e306af50 TL |
661 | 'daemon_config_deps': {}, |
662 | } | |
663 | if host in self.last_daemon_update: | |
f91f0fd5 | 664 | j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host]) |
e306af50 | 665 | if host in self.last_device_update: |
f91f0fd5 | 666 | j['last_device_update'] = datetime_to_str(self.last_device_update[host]) |
20effc67 TL |
667 | if host in self.last_network_update: |
668 | j['last_network_update'] = datetime_to_str(self.last_network_update[host]) | |
f67539c2 TL |
669 | if host in self.last_device_change: |
670 | j['last_device_change'] = datetime_to_str(self.last_device_change[host]) | |
adb31ebb TL |
671 | if host in self.daemons: |
672 | for name, dd in self.daemons[host].items(): | |
673 | j['daemons'][name] = dd.to_json() | |
674 | if host in self.devices: | |
675 | for d in self.devices[host]: | |
676 | j['devices'].append(d.to_json()) | |
677 | if host in self.networks: | |
f67539c2 | 678 | j['networks_and_interfaces'] = self.networks[host] |
adb31ebb TL |
679 | if host in self.daemon_config_deps: |
680 | for name, depi in self.daemon_config_deps[host].items(): | |
681 | j['daemon_config_deps'][name] = { | |
682 | 'deps': depi.get('deps', []), | |
683 | 'last_config': datetime_to_str(depi['last_config']), | |
684 | } | |
685 | if host in self.osdspec_previews and self.osdspec_previews[host]: | |
e306af50 | 686 | j['osdspec_previews'] = self.osdspec_previews[host] |
f67539c2 TL |
687 | if host in self.osdspec_last_applied: |
688 | for name, ts in self.osdspec_last_applied[host].items(): | |
689 | j['osdspec_last_applied'][name] = datetime_to_str(ts) | |
e306af50 TL |
690 | |
691 | if host in self.last_host_check: | |
f91f0fd5 | 692 | j['last_host_check'] = datetime_to_str(self.last_host_check[host]) |
f6b5b4d7 | 693 | |
b3b6e05e TL |
694 | if host in self.last_client_files: |
695 | j['last_client_files'] = self.last_client_files[host] | |
adb31ebb | 696 | if host in self.scheduled_daemon_actions: |
f91f0fd5 | 697 | j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host] |
20effc67 TL |
698 | if host in self.metadata_up_to_date: |
699 | j['metadata_up_to_date'] = self.metadata_up_to_date[host] | |
f6b5b4d7 | 700 | |
e306af50 TL |
701 | self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) |
702 | ||
703 | def rm_host(self, host): | |
704 | # type: (str) -> None | |
705 | if host in self.daemons: | |
706 | del self.daemons[host] | |
707 | if host in self.devices: | |
708 | del self.devices[host] | |
adb31ebb TL |
709 | if host in self.facts: |
710 | del self.facts[host] | |
711 | if host in self.last_facts_update: | |
712 | del self.last_facts_update[host] | |
b3b6e05e TL |
713 | if host in self.last_autotune: |
714 | del self.last_autotune[host] | |
e306af50 TL |
715 | if host in self.osdspec_previews: |
716 | del self.osdspec_previews[host] | |
f67539c2 TL |
717 | if host in self.osdspec_last_applied: |
718 | del self.osdspec_last_applied[host] | |
e306af50 TL |
719 | if host in self.loading_osdspec_preview: |
720 | self.loading_osdspec_preview.remove(host) | |
721 | if host in self.networks: | |
722 | del self.networks[host] | |
723 | if host in self.last_daemon_update: | |
724 | del self.last_daemon_update[host] | |
725 | if host in self.last_device_update: | |
726 | del self.last_device_update[host] | |
20effc67 TL |
727 | if host in self.last_network_update: |
728 | del self.last_network_update[host] | |
f67539c2 TL |
729 | if host in self.last_device_change: |
730 | del self.last_device_change[host] | |
e306af50 TL |
731 | if host in self.daemon_config_deps: |
732 | del self.daemon_config_deps[host] | |
f91f0fd5 TL |
733 | if host in self.scheduled_daemon_actions: |
734 | del self.scheduled_daemon_actions[host] | |
b3b6e05e TL |
735 | if host in self.last_client_files: |
736 | del self.last_client_files[host] | |
e306af50 TL |
737 | self.mgr.set_store(HOST_CACHE_PREFIX + host, None) |
738 | ||
739 | def get_hosts(self): | |
740 | # type: () -> List[str] | |
20effc67 TL |
741 | return list(self.daemons) |
742 | ||
743 | def get_schedulable_hosts(self) -> List[HostSpec]: | |
744 | """ | |
745 | Returns all usable hosts that went through _refresh_host_daemons(). | |
746 | ||
747 | This mitigates a potential race, where new host was added *after* | |
748 | ``_refresh_host_daemons()`` was called, but *before* | |
749 | ``_apply_all_specs()`` was called. thus we end up with a hosts | |
750 | where daemons might be running, but we have not yet detected them. | |
751 | """ | |
752 | return [ | |
753 | h for h in self.mgr.inventory.all_specs() | |
754 | if ( | |
755 | self.host_had_daemon_refresh(h.hostname) | |
756 | and '_no_schedule' not in h.labels | |
757 | ) | |
758 | ] | |
759 | ||
760 | def get_non_draining_hosts(self) -> List[HostSpec]: | |
761 | """ | |
762 | Returns all hosts that do not have _no_schedule label. | |
763 | ||
764 | Useful for the agent who needs this specific list rather than the | |
765 | schedulable_hosts since the agent needs to be deployed on hosts with | |
766 | no daemon refresh | |
767 | """ | |
768 | return [ | |
769 | h for h in self.mgr.inventory.all_specs() if '_no_schedule' not in h.labels | |
770 | ] | |
771 | ||
772 | def get_unreachable_hosts(self) -> List[HostSpec]: | |
773 | """ | |
774 | Return all hosts that are offline or in maintenance mode. | |
775 | ||
776 | The idea is we should not touch the daemons on these hosts (since | |
777 | in theory the hosts are inaccessible so we CAN'T touch them) but | |
778 | we still want to count daemons that exist on these hosts toward the | |
779 | placement so daemons on these hosts aren't just moved elsewhere | |
780 | """ | |
781 | return [ | |
782 | h for h in self.mgr.inventory.all_specs() | |
783 | if ( | |
784 | h.status.lower() in ['maintenance', 'offline'] | |
785 | or h.hostname in self.mgr.offline_hosts | |
786 | ) | |
787 | ] | |
e306af50 | 788 | |
b3b6e05e TL |
789 | def get_facts(self, host: str) -> Dict[str, Any]: |
790 | return self.facts.get(host, {}) | |
791 | ||
20effc67 TL |
792 | def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]: |
793 | for dm in self.daemons.copy().values(): | |
794 | yield from dm.values() | |
795 | ||
e306af50 TL |
796 | def get_daemons(self): |
797 | # type: () -> List[orchestrator.DaemonDescription] | |
20effc67 TL |
798 | return list(self._get_daemons()) |
799 | ||
800 | def get_error_daemons(self) -> List[orchestrator.DaemonDescription]: | |
e306af50 | 801 | r = [] |
20effc67 TL |
802 | for dd in self._get_daemons(): |
803 | if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error: | |
e306af50 TL |
804 | r.append(dd) |
805 | return r | |
806 | ||
b3b6e05e TL |
807 | def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]: |
808 | return list(self.daemons.get(host, {}).values()) | |
809 | ||
20effc67 | 810 | def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription: |
f67539c2 | 811 | assert not daemon_name.startswith('ha-rgw.') |
20effc67 TL |
812 | dds = self.get_daemons_by_host(host) if host else self._get_daemons() |
813 | for dd in dds: | |
814 | if dd.name() == daemon_name: | |
815 | return dd | |
816 | ||
f6b5b4d7 TL |
817 | raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)') |
818 | ||
20effc67 | 819 | def has_daemon(self, daemon_name: str, host: Optional[str] = None) -> bool: |
a4b75251 | 820 | try: |
20effc67 | 821 | self.get_daemon(daemon_name, host) |
a4b75251 TL |
822 | except orchestrator.OrchestratorError: |
823 | return False | |
824 | return True | |
825 | ||
e306af50 | 826 | def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]: |
adb31ebb | 827 | def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription: |
f6b5b4d7 | 828 | dd = copy(dd_orig) |
e306af50 | 829 | if host in self.mgr.offline_hosts: |
f67539c2 | 830 | dd.status = orchestrator.DaemonDescriptionStatus.error |
f6b5b4d7 | 831 | dd.status_desc = 'host is offline' |
b3b6e05e TL |
832 | elif self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance": |
833 | # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses | |
834 | # could be wrong. We must assume maintenance is working and daemons are stopped | |
835 | dd.status = orchestrator.DaemonDescriptionStatus.stopped | |
f6b5b4d7 TL |
836 | dd.events = self.mgr.events.get_for_daemon(dd.name()) |
837 | return dd | |
838 | ||
20effc67 | 839 | for host, dm in self.daemons.copy().items(): |
f6b5b4d7 | 840 | yield host, {name: alter(host, d) for name, d in dm.items()} |
e306af50 TL |
841 | |
842 | def get_daemons_by_service(self, service_name): | |
843 | # type: (str) -> List[orchestrator.DaemonDescription] | |
f67539c2 TL |
844 | assert not service_name.startswith('keepalived.') |
845 | assert not service_name.startswith('haproxy.') | |
846 | ||
20effc67 | 847 | return list(dd for dd in self._get_daemons() if dd.service_name() == service_name) |
f6b5b4d7 | 848 | |
20effc67 | 849 | def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]: |
f67539c2 TL |
850 | assert service_type not in ['keepalived', 'haproxy'] |
851 | ||
20effc67 | 852 | daemons = self.daemons[host].values() if host else self._get_daemons() |
e306af50 | 853 | |
20effc67 TL |
854 | return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)] |
855 | ||
856 | def get_daemon_types(self, hostname: str) -> Set[str]: | |
f67539c2 | 857 | """Provide a list of the types of daemons on the host""" |
20effc67 | 858 | return cast(Set[str], {d.daemon_type for d in self.daemons[hostname].values()}) |
f67539c2 | 859 | |
e306af50 TL |
860 | def get_daemon_names(self): |
861 | # type: () -> List[str] | |
20effc67 | 862 | return [d.name() for d in self._get_daemons()] |
e306af50 | 863 | |
adb31ebb | 864 | def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]: |
e306af50 TL |
865 | if host in self.daemon_config_deps: |
866 | if name in self.daemon_config_deps[host]: | |
867 | return self.daemon_config_deps[host][name].get('deps', []), \ | |
868 | self.daemon_config_deps[host][name].get('last_config', None) | |
869 | return None, None | |
870 | ||
b3b6e05e TL |
871 | def get_host_client_files(self, host: str) -> Dict[str, Tuple[str, int, int, int]]: |
872 | return self.last_client_files.get(host, {}) | |
873 | ||
e306af50 TL |
874 | def host_needs_daemon_refresh(self, host): |
875 | # type: (str) -> bool | |
876 | if host in self.mgr.offline_hosts: | |
877 | logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh') | |
878 | return False | |
879 | if host in self.daemon_refresh_queue: | |
880 | self.daemon_refresh_queue.remove(host) | |
881 | return True | |
adb31ebb | 882 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
883 | seconds=self.mgr.daemon_cache_timeout) |
884 | if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff: | |
885 | return True | |
20effc67 TL |
886 | if not self.mgr.cache.host_metadata_up_to_date(host): |
887 | return True | |
e306af50 TL |
888 | return False |
889 | ||
adb31ebb TL |
890 | def host_needs_facts_refresh(self, host): |
891 | # type: (str) -> bool | |
892 | if host in self.mgr.offline_hosts: | |
893 | logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh') | |
894 | return False | |
f67539c2 | 895 | cutoff = datetime_now() - datetime.timedelta( |
adb31ebb TL |
896 | seconds=self.mgr.facts_cache_timeout) |
897 | if host not in self.last_facts_update or self.last_facts_update[host] < cutoff: | |
898 | return True | |
20effc67 TL |
899 | if not self.mgr.cache.host_metadata_up_to_date(host): |
900 | return True | |
adb31ebb TL |
901 | return False |
902 | ||
b3b6e05e TL |
903 | def host_needs_autotune_memory(self, host): |
904 | # type: (str) -> bool | |
905 | if host in self.mgr.offline_hosts: | |
906 | logger.debug(f'Host "{host}" marked as offline. Skipping autotune') | |
907 | return False | |
908 | cutoff = datetime_now() - datetime.timedelta( | |
909 | seconds=self.mgr.autotune_interval) | |
910 | if host not in self.last_autotune or self.last_autotune[host] < cutoff: | |
911 | return True | |
912 | return False | |
913 | ||
f91f0fd5 TL |
914 | def host_had_daemon_refresh(self, host: str) -> bool: |
915 | """ | |
916 | ... at least once. | |
917 | """ | |
918 | if host in self.last_daemon_update: | |
919 | return True | |
920 | if host not in self.daemons: | |
921 | return False | |
922 | return bool(self.daemons[host]) | |
923 | ||
e306af50 TL |
924 | def host_needs_device_refresh(self, host): |
925 | # type: (str) -> bool | |
926 | if host in self.mgr.offline_hosts: | |
927 | logger.debug(f'Host "{host}" marked as offline. Skipping device refresh') | |
928 | return False | |
929 | if host in self.device_refresh_queue: | |
930 | self.device_refresh_queue.remove(host) | |
931 | return True | |
adb31ebb | 932 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
933 | seconds=self.mgr.device_cache_timeout) |
934 | if host not in self.last_device_update or self.last_device_update[host] < cutoff: | |
935 | return True | |
20effc67 TL |
936 | if not self.mgr.cache.host_metadata_up_to_date(host): |
937 | return True | |
938 | return False | |
939 | ||
940 | def host_needs_network_refresh(self, host): | |
941 | # type: (str) -> bool | |
942 | if host in self.mgr.offline_hosts: | |
943 | logger.debug(f'Host "{host}" marked as offline. Skipping network refresh') | |
944 | return False | |
945 | if host in self.network_refresh_queue: | |
946 | self.network_refresh_queue.remove(host) | |
947 | return True | |
948 | cutoff = datetime_now() - datetime.timedelta( | |
949 | seconds=self.mgr.device_cache_timeout) | |
950 | if host not in self.last_network_update or self.last_network_update[host] < cutoff: | |
951 | return True | |
952 | if not self.mgr.cache.host_metadata_up_to_date(host): | |
953 | return True | |
e306af50 TL |
954 | return False |
955 | ||
adb31ebb | 956 | def host_needs_osdspec_preview_refresh(self, host: str) -> bool: |
e306af50 TL |
957 | if host in self.mgr.offline_hosts: |
958 | logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh') | |
959 | return False | |
960 | if host in self.osdspec_previews_refresh_queue: | |
961 | self.osdspec_previews_refresh_queue.remove(host) | |
962 | return True | |
963 | # Since this is dependent on other factors (device and spec) this does not need | |
964 | # to be updated periodically. | |
965 | return False | |
966 | ||
967 | def host_needs_check(self, host): | |
968 | # type: (str) -> bool | |
adb31ebb | 969 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
970 | seconds=self.mgr.host_check_interval) |
971 | return host not in self.last_host_check or self.last_host_check[host] < cutoff | |
972 | ||
f67539c2 TL |
973 | def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool: |
974 | if ( | |
975 | host not in self.devices | |
976 | or host not in self.last_device_change | |
977 | or host not in self.last_device_update | |
978 | or host not in self.osdspec_last_applied | |
979 | or spec.service_name() not in self.osdspec_last_applied[host] | |
980 | ): | |
981 | return True | |
982 | created = self.mgr.spec_store.get_created(spec) | |
983 | if not created or created > self.last_device_change[host]: | |
984 | return True | |
985 | return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host] | |
986 | ||
f91f0fd5 | 987 | def host_needs_registry_login(self, host: str) -> bool: |
f6b5b4d7 TL |
988 | if host in self.mgr.offline_hosts: |
989 | return False | |
990 | if host in self.registry_login_queue: | |
991 | self.registry_login_queue.remove(host) | |
992 | return True | |
993 | return False | |
994 | ||
20effc67 TL |
995 | def host_metadata_up_to_date(self, host: str) -> bool: |
996 | if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]: | |
997 | return False | |
998 | return True | |
999 | ||
1000 | def all_host_metadata_up_to_date(self) -> bool: | |
1001 | unreachables = [h.hostname for h in self.get_unreachable_hosts()] | |
1002 | if [h for h in self.get_hosts() if (not self.host_metadata_up_to_date(h) and h not in unreachables)]: | |
1003 | # this function is primarily for telling if it's safe to try and apply a service | |
1004 | # spec. Since offline/maintenance hosts aren't considered in that process anyway | |
1005 | # we don't want to return False if the host without up-to-date metadata is in one | |
1006 | # of those two categories. | |
1007 | return False | |
1008 | return True | |
1009 | ||
e306af50 TL |
1010 | def add_daemon(self, host, dd): |
1011 | # type: (str, orchestrator.DaemonDescription) -> None | |
1012 | assert host in self.daemons | |
1013 | self.daemons[host][dd.name()] = dd | |
1014 | ||
adb31ebb | 1015 | def rm_daemon(self, host: str, name: str) -> None: |
f67539c2 TL |
1016 | assert not name.startswith('ha-rgw.') |
1017 | ||
e306af50 TL |
1018 | if host in self.daemons: |
1019 | if name in self.daemons[host]: | |
f6b5b4d7 TL |
1020 | del self.daemons[host][name] |
1021 | ||
adb31ebb | 1022 | def daemon_cache_filled(self) -> bool: |
f6b5b4d7 TL |
1023 | """ |
1024 | i.e. we have checked the daemons for each hosts at least once. | |
1025 | excluding offline hosts. | |
1026 | ||
1027 | We're not checking for `host_needs_daemon_refresh`, as this might never be | |
1028 | False for all hosts. | |
1029 | """ | |
f91f0fd5 | 1030 | return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts) |
f6b5b4d7 TL |
1031 | for h in self.get_hosts()) |
1032 | ||
adb31ebb | 1033 | def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None: |
f67539c2 TL |
1034 | assert not daemon_name.startswith('ha-rgw.') |
1035 | ||
f91f0fd5 TL |
1036 | priorities = { |
1037 | 'start': 1, | |
1038 | 'restart': 2, | |
1039 | 'reconfig': 3, | |
1040 | 'redeploy': 4, | |
1041 | 'stop': 5, | |
1042 | } | |
1043 | existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None) | |
1044 | if existing_action and priorities[existing_action] > priorities[action]: | |
1045 | logger.debug( | |
1046 | f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.') | |
1047 | return | |
1048 | ||
1049 | if host not in self.scheduled_daemon_actions: | |
1050 | self.scheduled_daemon_actions[host] = {} | |
1051 | self.scheduled_daemon_actions[host][daemon_name] = action | |
1052 | ||
20effc67 TL |
1053 | def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> bool: |
1054 | found = False | |
f91f0fd5 TL |
1055 | if host in self.scheduled_daemon_actions: |
1056 | if daemon_name in self.scheduled_daemon_actions[host]: | |
1057 | del self.scheduled_daemon_actions[host][daemon_name] | |
20effc67 | 1058 | found = True |
f91f0fd5 TL |
1059 | if not self.scheduled_daemon_actions[host]: |
1060 | del self.scheduled_daemon_actions[host] | |
20effc67 | 1061 | return found |
f91f0fd5 | 1062 | |
adb31ebb | 1063 | def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]: |
f67539c2 TL |
1064 | assert not daemon.startswith('ha-rgw.') |
1065 | ||
f91f0fd5 TL |
1066 | return self.scheduled_daemon_actions.get(host, {}).get(daemon) |
1067 | ||
f6b5b4d7 | 1068 | |
20effc67 TL |
1069 | class AgentCache(): |
1070 | """ | |
1071 | AgentCache is used for storing metadata about agent daemons that must be kept | |
1072 | through MGR failovers | |
1073 | """ | |
1074 | ||
1075 | def __init__(self, mgr): | |
1076 | # type: (CephadmOrchestrator) -> None | |
1077 | self.mgr: CephadmOrchestrator = mgr | |
1078 | self.agent_config_deps = {} # type: Dict[str, Dict[str,Any]] | |
1079 | self.agent_counter = {} # type: Dict[str, int] | |
1080 | self.agent_timestamp = {} # type: Dict[str, datetime.datetime] | |
1081 | self.agent_keys = {} # type: Dict[str, str] | |
1082 | self.agent_ports = {} # type: Dict[str, int] | |
1083 | self.sending_agent_message = {} # type: Dict[str, bool] | |
1084 | ||
1085 | def load(self): | |
1086 | # type: () -> None | |
1087 | for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items(): | |
1088 | host = k[len(AGENT_CACHE_PREFIX):] | |
1089 | if host not in self.mgr.inventory: | |
1090 | self.mgr.log.warning('removing stray AgentCache record for agent on %s' % ( | |
1091 | host)) | |
1092 | self.mgr.set_store(k, None) | |
1093 | try: | |
1094 | j = json.loads(v) | |
1095 | self.agent_config_deps[host] = {} | |
1096 | conf_deps = j.get('agent_config_deps', {}) | |
1097 | if conf_deps: | |
1098 | conf_deps['last_config'] = str_to_datetime(conf_deps['last_config']) | |
1099 | self.agent_config_deps[host] = conf_deps | |
1100 | self.agent_counter[host] = int(j.get('agent_counter', 1)) | |
1101 | self.agent_timestamp[host] = str_to_datetime( | |
1102 | j.get('agent_timestamp', datetime_to_str(datetime_now()))) | |
1103 | self.agent_keys[host] = str(j.get('agent_keys', '')) | |
1104 | agent_port = int(j.get('agent_ports', 0)) | |
1105 | if agent_port: | |
1106 | self.agent_ports[host] = agent_port | |
1107 | ||
1108 | except Exception as e: | |
1109 | self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % ( | |
1110 | host, e)) | |
1111 | pass | |
1112 | ||
1113 | def save_agent(self, host: str) -> None: | |
1114 | j: Dict[str, Any] = {} | |
1115 | if host in self.agent_config_deps: | |
1116 | j['agent_config_deps'] = { | |
1117 | 'deps': self.agent_config_deps[host].get('deps', []), | |
1118 | 'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']), | |
1119 | } | |
1120 | if host in self.agent_counter: | |
1121 | j['agent_counter'] = self.agent_counter[host] | |
1122 | if host in self.agent_keys: | |
1123 | j['agent_keys'] = self.agent_keys[host] | |
1124 | if host in self.agent_ports: | |
1125 | j['agent_ports'] = self.agent_ports[host] | |
1126 | if host in self.agent_timestamp: | |
1127 | j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host]) | |
1128 | ||
1129 | self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j)) | |
1130 | ||
1131 | def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None: | |
1132 | self.agent_config_deps[host] = { | |
1133 | 'deps': deps, | |
1134 | 'last_config': stamp, | |
1135 | } | |
1136 | ||
1137 | def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]: | |
1138 | if host in self.agent_config_deps: | |
1139 | return self.agent_config_deps[host].get('deps', []), \ | |
1140 | self.agent_config_deps[host].get('last_config', None) | |
1141 | return None, None | |
1142 | ||
1143 | def messaging_agent(self, host: str) -> bool: | |
1144 | if host not in self.sending_agent_message or not self.sending_agent_message[host]: | |
1145 | return False | |
1146 | return True | |
1147 | ||
1148 | def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None: | |
1149 | # agent successfully received new config. Update config/deps | |
1150 | assert daemon_spec.service_name == 'agent' | |
1151 | self.update_agent_config_deps( | |
1152 | daemon_spec.host, daemon_spec.deps, datetime_now()) | |
1153 | self.agent_timestamp[daemon_spec.host] = datetime_now() | |
1154 | self.agent_counter[daemon_spec.host] = 1 | |
1155 | self.save_agent(daemon_spec.host) | |
1156 | ||
1157 | ||
f6b5b4d7 TL |
1158 | class EventStore(): |
1159 | def __init__(self, mgr): | |
1160 | # type: (CephadmOrchestrator) -> None | |
1161 | self.mgr: CephadmOrchestrator = mgr | |
f91f0fd5 | 1162 | self.events = {} # type: Dict[str, List[OrchestratorEvent]] |
f6b5b4d7 TL |
1163 | |
1164 | def add(self, event: OrchestratorEvent) -> None: | |
1165 | ||
1166 | if event.kind_subject() not in self.events: | |
1167 | self.events[event.kind_subject()] = [event] | |
1168 | ||
1169 | for e in self.events[event.kind_subject()]: | |
1170 | if e.message == event.message: | |
1171 | return | |
1172 | ||
1173 | self.events[event.kind_subject()].append(event) | |
1174 | ||
1175 | # limit to five events for now. | |
1176 | self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:] | |
1177 | ||
adb31ebb TL |
1178 | def for_service(self, spec: ServiceSpec, level: str, message: str) -> None: |
1179 | e = OrchestratorEvent(datetime_now(), 'service', | |
f91f0fd5 | 1180 | spec.service_name(), level, message) |
f6b5b4d7 TL |
1181 | self.add(e) |
1182 | ||
adb31ebb | 1183 | def from_orch_error(self, e: OrchestratorError) -> None: |
f6b5b4d7 TL |
1184 | if e.event_subject is not None: |
1185 | self.add(OrchestratorEvent( | |
adb31ebb | 1186 | datetime_now(), |
f6b5b4d7 TL |
1187 | e.event_subject[0], |
1188 | e.event_subject[1], | |
1189 | "ERROR", | |
1190 | str(e) | |
1191 | )) | |
1192 | ||
adb31ebb TL |
1193 | def for_daemon(self, daemon_name: str, level: str, message: str) -> None: |
1194 | e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message) | |
f6b5b4d7 TL |
1195 | self.add(e) |
1196 | ||
adb31ebb | 1197 | def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None: |
f6b5b4d7 TL |
1198 | self.for_daemon( |
1199 | daemon_name, | |
1200 | "ERROR", | |
1201 | str(e) | |
1202 | ) | |
1203 | ||
1204 | def cleanup(self) -> None: | |
1205 | # Needs to be properly done, in case events are persistently stored. | |
1206 | ||
1207 | unknowns: List[str] = [] | |
1208 | daemons = self.mgr.cache.get_daemon_names() | |
f67539c2 | 1209 | specs = self.mgr.spec_store.all_specs.keys() |
f6b5b4d7 TL |
1210 | for k_s, v in self.events.items(): |
1211 | kind, subject = k_s.split(':') | |
1212 | if kind == 'service': | |
1213 | if subject not in specs: | |
1214 | unknowns.append(k_s) | |
1215 | elif kind == 'daemon': | |
1216 | if subject not in daemons: | |
1217 | unknowns.append(k_s) | |
1218 | ||
1219 | for k_s in unknowns: | |
1220 | del self.events[k_s] | |
1221 | ||
adb31ebb | 1222 | def get_for_service(self, name: str) -> List[OrchestratorEvent]: |
f6b5b4d7 TL |
1223 | return self.events.get('service:' + name, []) |
1224 | ||
adb31ebb | 1225 | def get_for_daemon(self, name: str) -> List[OrchestratorEvent]: |
f6b5b4d7 | 1226 | return self.events.get('daemon:' + name, []) |