]>
Commit | Line | Data |
---|---|---|
e306af50 TL |
1 | import datetime |
2 | from copy import copy | |
3 | import json | |
4 | import logging | |
f67539c2 TL |
5 | from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \ |
6 | NamedTuple | |
e306af50 TL |
7 | |
8 | import orchestrator | |
9 | from ceph.deployment import inventory | |
10 | from ceph.deployment.service_spec import ServiceSpec | |
adb31ebb | 11 | from ceph.utils import str_to_datetime, datetime_to_str, datetime_now |
f67539c2 | 12 | from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types |
e306af50 TL |
13 | |
14 | if TYPE_CHECKING: | |
15 | from .module import CephadmOrchestrator | |
16 | ||
17 | ||
18 | logger = logging.getLogger(__name__) | |
19 | ||
20 | HOST_CACHE_PREFIX = "host." | |
21 | SPEC_STORE_PREFIX = "spec." | |
e306af50 TL |
22 | |
23 | ||
24 | class Inventory: | |
adb31ebb TL |
25 | """ |
26 | The inventory stores a HostSpec for all hosts persistently. | |
27 | """ | |
28 | ||
e306af50 TL |
29 | def __init__(self, mgr: 'CephadmOrchestrator'): |
30 | self.mgr = mgr | |
31 | # load inventory | |
32 | i = self.mgr.get_store('inventory') | |
33 | if i: | |
34 | self._inventory: Dict[str, dict] = json.loads(i) | |
adb31ebb TL |
35 | # handle old clusters missing 'hostname' key from hostspec |
36 | for k, v in self._inventory.items(): | |
37 | if 'hostname' not in v: | |
38 | v['hostname'] = k | |
e306af50 TL |
39 | else: |
40 | self._inventory = dict() | |
41 | logger.debug('Loaded inventory %s' % self._inventory) | |
42 | ||
43 | def keys(self) -> List[str]: | |
44 | return list(self._inventory.keys()) | |
45 | ||
46 | def __contains__(self, host: str) -> bool: | |
47 | return host in self._inventory | |
48 | ||
adb31ebb | 49 | def assert_host(self, host: str) -> None: |
e306af50 TL |
50 | if host not in self._inventory: |
51 | raise OrchestratorError('host %s does not exist' % host) | |
52 | ||
adb31ebb | 53 | def add_host(self, spec: HostSpec) -> None: |
e306af50 TL |
54 | self._inventory[spec.hostname] = spec.to_json() |
55 | self.save() | |
56 | ||
adb31ebb | 57 | def rm_host(self, host: str) -> None: |
e306af50 TL |
58 | self.assert_host(host) |
59 | del self._inventory[host] | |
60 | self.save() | |
61 | ||
adb31ebb | 62 | def set_addr(self, host: str, addr: str) -> None: |
e306af50 TL |
63 | self.assert_host(host) |
64 | self._inventory[host]['addr'] = addr | |
65 | self.save() | |
66 | ||
adb31ebb | 67 | def add_label(self, host: str, label: str) -> None: |
e306af50 TL |
68 | self.assert_host(host) |
69 | ||
70 | if 'labels' not in self._inventory[host]: | |
71 | self._inventory[host]['labels'] = list() | |
72 | if label not in self._inventory[host]['labels']: | |
73 | self._inventory[host]['labels'].append(label) | |
74 | self.save() | |
75 | ||
adb31ebb | 76 | def rm_label(self, host: str, label: str) -> None: |
e306af50 TL |
77 | self.assert_host(host) |
78 | ||
79 | if 'labels' not in self._inventory[host]: | |
80 | self._inventory[host]['labels'] = list() | |
81 | if label in self._inventory[host]['labels']: | |
82 | self._inventory[host]['labels'].remove(label) | |
83 | self.save() | |
84 | ||
adb31ebb | 85 | def get_addr(self, host: str) -> str: |
e306af50 TL |
86 | self.assert_host(host) |
87 | return self._inventory[host].get('addr', host) | |
88 | ||
89 | def filter_by_label(self, label: Optional[str] = '', as_hostspec: bool = False) -> Iterator: | |
90 | for h, hostspec in self._inventory.items(): | |
91 | if not label or label in hostspec.get('labels', []): | |
92 | if as_hostspec: | |
f6b5b4d7 TL |
93 | yield self.spec_from_dict(hostspec) |
94 | else: | |
95 | yield h | |
e306af50 | 96 | |
adb31ebb | 97 | def spec_from_dict(self, info: dict) -> HostSpec: |
e306af50 TL |
98 | hostname = info['hostname'] |
99 | return HostSpec( | |
f91f0fd5 TL |
100 | hostname, |
101 | addr=info.get('addr', hostname), | |
102 | labels=info.get('labels', []), | |
103 | status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''), | |
104 | ) | |
e306af50 | 105 | |
f91f0fd5 TL |
106 | def all_specs(self) -> List[HostSpec]: |
107 | return list(map(self.spec_from_dict, self._inventory.values())) | |
e306af50 | 108 | |
f67539c2 TL |
109 | def get_host_with_state(self, state: str = "") -> List[str]: |
110 | """return a list of host names in a specific state""" | |
111 | return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state] | |
112 | ||
adb31ebb | 113 | def save(self) -> None: |
e306af50 TL |
114 | self.mgr.set_store('inventory', json.dumps(self._inventory)) |
115 | ||
116 | ||
f67539c2 TL |
117 | class SpecDescription(NamedTuple): |
118 | spec: ServiceSpec | |
119 | created: datetime.datetime | |
120 | deleted: Optional[datetime.datetime] | |
121 | ||
122 | ||
e306af50 TL |
123 | class SpecStore(): |
124 | def __init__(self, mgr): | |
125 | # type: (CephadmOrchestrator) -> None | |
126 | self.mgr = mgr | |
f67539c2 | 127 | self._specs = {} # type: Dict[str, ServiceSpec] |
f91f0fd5 | 128 | self.spec_created = {} # type: Dict[str, datetime.datetime] |
f67539c2 | 129 | self.spec_deleted = {} # type: Dict[str, datetime.datetime] |
f91f0fd5 | 130 | self.spec_preview = {} # type: Dict[str, ServiceSpec] |
e306af50 | 131 | |
f67539c2 TL |
132 | @property |
133 | def all_specs(self) -> Mapping[str, ServiceSpec]: | |
134 | """ | |
135 | returns active and deleted specs. Returns read-only dict. | |
136 | """ | |
137 | return self._specs | |
138 | ||
139 | def __contains__(self, name: str) -> bool: | |
140 | return name in self._specs | |
141 | ||
142 | def __getitem__(self, name: str) -> SpecDescription: | |
143 | if name not in self._specs: | |
144 | raise OrchestratorError(f'Service {name} not found.') | |
145 | return SpecDescription(self._specs[name], | |
146 | self.spec_created[name], | |
147 | self.spec_deleted.get(name, None)) | |
148 | ||
149 | @property | |
150 | def active_specs(self) -> Mapping[str, ServiceSpec]: | |
151 | return {k: v for k, v in self._specs.items() if k not in self.spec_deleted} | |
152 | ||
e306af50 TL |
153 | def load(self): |
154 | # type: () -> None | |
f67539c2 | 155 | for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items(): |
e306af50 TL |
156 | service_name = k[len(SPEC_STORE_PREFIX):] |
157 | try: | |
f67539c2 TL |
158 | j = cast(Dict[str, dict], json.loads(v)) |
159 | spec = ServiceSpec.from_json(j['spec']) | |
160 | created = str_to_datetime(cast(str, j['created'])) | |
161 | self._specs[service_name] = spec | |
e306af50 | 162 | self.spec_created[service_name] = created |
f67539c2 TL |
163 | |
164 | if 'deleted' in v: | |
165 | deleted = str_to_datetime(cast(str, j['deleted'])) | |
166 | self.spec_deleted[service_name] = deleted | |
167 | ||
e306af50 TL |
168 | self.mgr.log.debug('SpecStore: loaded spec for %s' % ( |
169 | service_name)) | |
170 | except Exception as e: | |
171 | self.mgr.log.warning('unable to load spec for %s: %s' % ( | |
172 | service_name, e)) | |
173 | pass | |
174 | ||
f67539c2 TL |
175 | def save(self, spec: ServiceSpec, update_create: bool = True) -> None: |
176 | name = spec.service_name() | |
f6b5b4d7 | 177 | if spec.preview_only: |
f67539c2 | 178 | self.spec_preview[name] = spec |
f6b5b4d7 | 179 | return None |
f67539c2 TL |
180 | self._specs[name] = spec |
181 | ||
182 | if update_create: | |
183 | self.spec_created[name] = datetime_now() | |
184 | ||
185 | data = { | |
186 | 'spec': spec.to_json(), | |
187 | 'created': datetime_to_str(self.spec_created[name]), | |
188 | } | |
189 | if name in self.spec_deleted: | |
190 | data['deleted'] = datetime_to_str(self.spec_deleted[name]) | |
191 | ||
e306af50 | 192 | self.mgr.set_store( |
f67539c2 TL |
193 | SPEC_STORE_PREFIX + name, |
194 | json.dumps(data, sort_keys=True), | |
e306af50 | 195 | ) |
f6b5b4d7 | 196 | self.mgr.events.for_service(spec, OrchestratorEvent.INFO, 'service was created') |
e306af50 | 197 | |
f67539c2 TL |
198 | def rm(self, service_name: str) -> bool: |
199 | if service_name not in self._specs: | |
200 | return False | |
201 | ||
202 | if self._specs[service_name].preview_only: | |
203 | self.finally_rm(service_name) | |
204 | return True | |
205 | ||
206 | self.spec_deleted[service_name] = datetime_now() | |
207 | self.save(self._specs[service_name], update_create=False) | |
208 | return True | |
209 | ||
210 | def finally_rm(self, service_name): | |
e306af50 | 211 | # type: (str) -> bool |
f67539c2 | 212 | found = service_name in self._specs |
e306af50 | 213 | if found: |
f67539c2 | 214 | del self._specs[service_name] |
e306af50 | 215 | del self.spec_created[service_name] |
f67539c2 TL |
216 | if service_name in self.spec_deleted: |
217 | del self.spec_deleted[service_name] | |
e306af50 TL |
218 | self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None) |
219 | return found | |
220 | ||
f67539c2 TL |
221 | def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]: |
222 | return self.spec_created.get(spec.service_name()) | |
e306af50 | 223 | |
f91f0fd5 | 224 | |
e306af50 | 225 | class HostCache(): |
adb31ebb TL |
226 | """ |
227 | HostCache stores different things: | |
228 | ||
229 | 1. `daemons`: Deployed daemons O(daemons) | |
230 | ||
231 | They're part of the configuration nowadays and need to be | |
232 | persistent. The name "daemon cache" is unfortunately a bit misleading. | |
233 | Like for example we really need to know where daemons are deployed on | |
234 | hosts that are offline. | |
235 | ||
236 | 2. `devices`: ceph-volume inventory cache O(hosts) | |
237 | ||
238 | As soon as this is populated, it becomes more or less read-only. | |
239 | ||
240 | 3. `networks`: network interfaces for each host. O(hosts) | |
241 | ||
242 | This is needed in order to deploy MONs. As this is mostly read-only. | |
243 | ||
244 | 4. `last_etc_ceph_ceph_conf` O(hosts) | |
245 | ||
f67539c2 | 246 | Stores the last refresh time for the /etc/ceph/ceph.conf. Used |
adb31ebb TL |
247 | to avoid deploying new configs when failing over to a new mgr. |
248 | ||
249 | 5. `scheduled_daemon_actions`: O(daemons) | |
250 | ||
251 | Used to run daemon actions after deploying a daemon. We need to | |
252 | store it persistently, in order to stay consistent across | |
f67539c2 | 253 | MGR failovers. |
adb31ebb TL |
254 | """ |
255 | ||
e306af50 TL |
256 | def __init__(self, mgr): |
257 | # type: (CephadmOrchestrator) -> None | |
258 | self.mgr: CephadmOrchestrator = mgr | |
259 | self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] | |
260 | self.last_daemon_update = {} # type: Dict[str, datetime.datetime] | |
261 | self.devices = {} # type: Dict[str, List[inventory.Device]] | |
adb31ebb TL |
262 | self.facts = {} # type: Dict[str, Dict[str, Any]] |
263 | self.last_facts_update = {} # type: Dict[str, datetime.datetime] | |
e306af50 | 264 | self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]] |
f67539c2 TL |
265 | self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]] |
266 | self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]] | |
e306af50 | 267 | self.last_device_update = {} # type: Dict[str, datetime.datetime] |
f67539c2 | 268 | self.last_device_change = {} # type: Dict[str, datetime.datetime] |
f91f0fd5 TL |
269 | self.daemon_refresh_queue = [] # type: List[str] |
270 | self.device_refresh_queue = [] # type: List[str] | |
271 | self.osdspec_previews_refresh_queue = [] # type: List[str] | |
f6b5b4d7 TL |
272 | |
273 | # host -> daemon name -> dict | |
e306af50 TL |
274 | self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]] |
275 | self.last_host_check = {} # type: Dict[str, datetime.datetime] | |
276 | self.loading_osdspec_preview = set() # type: Set[str] | |
f6b5b4d7 TL |
277 | self.last_etc_ceph_ceph_conf: Dict[str, datetime.datetime] = {} |
278 | self.registry_login_queue: Set[str] = set() | |
e306af50 | 279 | |
f91f0fd5 TL |
280 | self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {} |
281 | ||
e306af50 TL |
282 | def load(self): |
283 | # type: () -> None | |
f67539c2 | 284 | for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items(): |
e306af50 TL |
285 | host = k[len(HOST_CACHE_PREFIX):] |
286 | if host not in self.mgr.inventory: | |
287 | self.mgr.log.warning('removing stray HostCache host record %s' % ( | |
288 | host)) | |
289 | self.mgr.set_store(k, None) | |
290 | try: | |
291 | j = json.loads(v) | |
292 | if 'last_device_update' in j: | |
f91f0fd5 | 293 | self.last_device_update[host] = str_to_datetime(j['last_device_update']) |
e306af50 TL |
294 | else: |
295 | self.device_refresh_queue.append(host) | |
f67539c2 TL |
296 | if 'last_device_change' in j: |
297 | self.last_device_change[host] = str_to_datetime(j['last_device_change']) | |
e306af50 TL |
298 | # for services, we ignore the persisted last_*_update |
299 | # and always trigger a new scrape on mgr restart. | |
300 | self.daemon_refresh_queue.append(host) | |
301 | self.daemons[host] = {} | |
302 | self.osdspec_previews[host] = [] | |
f67539c2 | 303 | self.osdspec_last_applied[host] = {} |
e306af50 TL |
304 | self.devices[host] = [] |
305 | self.networks[host] = {} | |
306 | self.daemon_config_deps[host] = {} | |
307 | for name, d in j.get('daemons', {}).items(): | |
308 | self.daemons[host][name] = \ | |
309 | orchestrator.DaemonDescription.from_json(d) | |
310 | for d in j.get('devices', []): | |
311 | self.devices[host].append(inventory.Device.from_json(d)) | |
f67539c2 | 312 | self.networks[host] = j.get('networks_and_interfaces', {}) |
e306af50 | 313 | self.osdspec_previews[host] = j.get('osdspec_previews', {}) |
f67539c2 TL |
314 | for name, ts in j.get('osdspec_last_applied', {}).items(): |
315 | self.osdspec_last_applied[host][name] = str_to_datetime(ts) | |
e306af50 TL |
316 | |
317 | for name, d in j.get('daemon_config_deps', {}).items(): | |
318 | self.daemon_config_deps[host][name] = { | |
319 | 'deps': d.get('deps', []), | |
f91f0fd5 | 320 | 'last_config': str_to_datetime(d['last_config']), |
e306af50 TL |
321 | } |
322 | if 'last_host_check' in j: | |
f91f0fd5 | 323 | self.last_host_check[host] = str_to_datetime(j['last_host_check']) |
f6b5b4d7 | 324 | if 'last_etc_ceph_ceph_conf' in j: |
f91f0fd5 TL |
325 | self.last_etc_ceph_ceph_conf[host] = str_to_datetime( |
326 | j['last_etc_ceph_ceph_conf']) | |
f6b5b4d7 | 327 | self.registry_login_queue.add(host) |
f91f0fd5 TL |
328 | self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {}) |
329 | ||
e306af50 TL |
330 | self.mgr.log.debug( |
331 | 'HostCache.load: host %s has %d daemons, ' | |
332 | '%d devices, %d networks' % ( | |
333 | host, len(self.daemons[host]), len(self.devices[host]), | |
334 | len(self.networks[host]))) | |
335 | except Exception as e: | |
336 | self.mgr.log.warning('unable to load cached state for %s: %s' % ( | |
337 | host, e)) | |
338 | pass | |
339 | ||
340 | def update_host_daemons(self, host, dm): | |
341 | # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None | |
342 | self.daemons[host] = dm | |
adb31ebb TL |
343 | self.last_daemon_update[host] = datetime_now() |
344 | ||
345 | def update_host_facts(self, host, facts): | |
346 | # type: (str, Dict[str, Dict[str, Any]]) -> None | |
347 | self.facts[host] = facts | |
f67539c2 TL |
348 | self.last_facts_update[host] = datetime_now() |
349 | ||
350 | def devices_changed(self, host: str, b: List[inventory.Device]) -> bool: | |
351 | a = self.devices[host] | |
352 | if len(a) != len(b): | |
353 | return True | |
354 | aj = {d.path: d.to_json() for d in a} | |
355 | bj = {d.path: d.to_json() for d in b} | |
356 | if aj != bj: | |
357 | self.mgr.log.info("Detected new or changed devices on %s" % host) | |
358 | return True | |
359 | return False | |
e306af50 | 360 | |
f67539c2 TL |
361 | def update_host_devices_networks( |
362 | self, | |
363 | host: str, | |
364 | dls: List[inventory.Device], | |
365 | nets: Dict[str, Dict[str, List[str]]] | |
366 | ) -> None: | |
367 | if ( | |
368 | host not in self.devices | |
369 | or host not in self.last_device_change | |
370 | or self.devices_changed(host, dls) | |
371 | ): | |
372 | self.last_device_change[host] = datetime_now() | |
373 | self.last_device_update[host] = datetime_now() | |
e306af50 TL |
374 | self.devices[host] = dls |
375 | self.networks[host] = nets | |
e306af50 | 376 | |
adb31ebb | 377 | def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None: |
e306af50 TL |
378 | self.daemon_config_deps[host][name] = { |
379 | 'deps': deps, | |
380 | 'last_config': stamp, | |
381 | } | |
382 | ||
383 | def update_last_host_check(self, host): | |
384 | # type: (str) -> None | |
adb31ebb | 385 | self.last_host_check[host] = datetime_now() |
e306af50 | 386 | |
f67539c2 TL |
387 | def update_osdspec_last_applied(self, host, service_name, ts): |
388 | # type: (str, str, datetime.datetime) -> None | |
389 | self.osdspec_last_applied[host][service_name] = ts | |
390 | ||
e306af50 TL |
391 | def prime_empty_host(self, host): |
392 | # type: (str) -> None | |
393 | """ | |
394 | Install an empty entry for a host | |
395 | """ | |
396 | self.daemons[host] = {} | |
397 | self.devices[host] = [] | |
398 | self.networks[host] = {} | |
399 | self.osdspec_previews[host] = [] | |
f67539c2 | 400 | self.osdspec_last_applied[host] = {} |
e306af50 TL |
401 | self.daemon_config_deps[host] = {} |
402 | self.daemon_refresh_queue.append(host) | |
403 | self.device_refresh_queue.append(host) | |
404 | self.osdspec_previews_refresh_queue.append(host) | |
f6b5b4d7 | 405 | self.registry_login_queue.add(host) |
e306af50 TL |
406 | |
407 | def invalidate_host_daemons(self, host): | |
408 | # type: (str) -> None | |
409 | self.daemon_refresh_queue.append(host) | |
410 | if host in self.last_daemon_update: | |
411 | del self.last_daemon_update[host] | |
412 | self.mgr.event.set() | |
413 | ||
414 | def invalidate_host_devices(self, host): | |
415 | # type: (str) -> None | |
416 | self.device_refresh_queue.append(host) | |
417 | if host in self.last_device_update: | |
418 | del self.last_device_update[host] | |
419 | self.mgr.event.set() | |
f91f0fd5 | 420 | |
adb31ebb | 421 | def distribute_new_registry_login_info(self) -> None: |
f6b5b4d7 | 422 | self.registry_login_queue = set(self.mgr.inventory.keys()) |
e306af50 | 423 | |
f91f0fd5 TL |
424 | def save_host(self, host: str) -> None: |
425 | j: Dict[str, Any] = { | |
e306af50 TL |
426 | 'daemons': {}, |
427 | 'devices': [], | |
428 | 'osdspec_previews': [], | |
f67539c2 | 429 | 'osdspec_last_applied': {}, |
e306af50 TL |
430 | 'daemon_config_deps': {}, |
431 | } | |
432 | if host in self.last_daemon_update: | |
f91f0fd5 | 433 | j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host]) |
e306af50 | 434 | if host in self.last_device_update: |
f91f0fd5 | 435 | j['last_device_update'] = datetime_to_str(self.last_device_update[host]) |
f67539c2 TL |
436 | if host in self.last_device_change: |
437 | j['last_device_change'] = datetime_to_str(self.last_device_change[host]) | |
adb31ebb TL |
438 | if host in self.daemons: |
439 | for name, dd in self.daemons[host].items(): | |
440 | j['daemons'][name] = dd.to_json() | |
441 | if host in self.devices: | |
442 | for d in self.devices[host]: | |
443 | j['devices'].append(d.to_json()) | |
444 | if host in self.networks: | |
f67539c2 | 445 | j['networks_and_interfaces'] = self.networks[host] |
adb31ebb TL |
446 | if host in self.daemon_config_deps: |
447 | for name, depi in self.daemon_config_deps[host].items(): | |
448 | j['daemon_config_deps'][name] = { | |
449 | 'deps': depi.get('deps', []), | |
450 | 'last_config': datetime_to_str(depi['last_config']), | |
451 | } | |
452 | if host in self.osdspec_previews and self.osdspec_previews[host]: | |
e306af50 | 453 | j['osdspec_previews'] = self.osdspec_previews[host] |
f67539c2 TL |
454 | if host in self.osdspec_last_applied: |
455 | for name, ts in self.osdspec_last_applied[host].items(): | |
456 | j['osdspec_last_applied'][name] = datetime_to_str(ts) | |
e306af50 TL |
457 | |
458 | if host in self.last_host_check: | |
f91f0fd5 | 459 | j['last_host_check'] = datetime_to_str(self.last_host_check[host]) |
f6b5b4d7 TL |
460 | |
461 | if host in self.last_etc_ceph_ceph_conf: | |
f91f0fd5 | 462 | j['last_etc_ceph_ceph_conf'] = datetime_to_str(self.last_etc_ceph_ceph_conf[host]) |
adb31ebb | 463 | if host in self.scheduled_daemon_actions: |
f91f0fd5 | 464 | j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host] |
f6b5b4d7 | 465 | |
e306af50 TL |
466 | self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) |
467 | ||
468 | def rm_host(self, host): | |
469 | # type: (str) -> None | |
470 | if host in self.daemons: | |
471 | del self.daemons[host] | |
472 | if host in self.devices: | |
473 | del self.devices[host] | |
adb31ebb TL |
474 | if host in self.facts: |
475 | del self.facts[host] | |
476 | if host in self.last_facts_update: | |
477 | del self.last_facts_update[host] | |
e306af50 TL |
478 | if host in self.osdspec_previews: |
479 | del self.osdspec_previews[host] | |
f67539c2 TL |
480 | if host in self.osdspec_last_applied: |
481 | del self.osdspec_last_applied[host] | |
e306af50 TL |
482 | if host in self.loading_osdspec_preview: |
483 | self.loading_osdspec_preview.remove(host) | |
484 | if host in self.networks: | |
485 | del self.networks[host] | |
486 | if host in self.last_daemon_update: | |
487 | del self.last_daemon_update[host] | |
488 | if host in self.last_device_update: | |
489 | del self.last_device_update[host] | |
f67539c2 TL |
490 | if host in self.last_device_change: |
491 | del self.last_device_change[host] | |
e306af50 TL |
492 | if host in self.daemon_config_deps: |
493 | del self.daemon_config_deps[host] | |
f91f0fd5 TL |
494 | if host in self.scheduled_daemon_actions: |
495 | del self.scheduled_daemon_actions[host] | |
e306af50 TL |
496 | self.mgr.set_store(HOST_CACHE_PREFIX + host, None) |
497 | ||
498 | def get_hosts(self): | |
499 | # type: () -> List[str] | |
500 | r = [] | |
501 | for host, di in self.daemons.items(): | |
502 | r.append(host) | |
503 | return r | |
504 | ||
505 | def get_daemons(self): | |
506 | # type: () -> List[orchestrator.DaemonDescription] | |
507 | r = [] | |
508 | for host, dm in self.daemons.items(): | |
509 | for name, dd in dm.items(): | |
510 | r.append(dd) | |
511 | return r | |
512 | ||
f6b5b4d7 | 513 | def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription: |
f67539c2 | 514 | assert not daemon_name.startswith('ha-rgw.') |
f6b5b4d7 TL |
515 | for _, dm in self.daemons.items(): |
516 | for _, dd in dm.items(): | |
517 | if dd.name() == daemon_name: | |
518 | return dd | |
519 | raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)') | |
520 | ||
e306af50 | 521 | def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]: |
adb31ebb | 522 | def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription: |
f6b5b4d7 | 523 | dd = copy(dd_orig) |
e306af50 | 524 | if host in self.mgr.offline_hosts: |
f67539c2 | 525 | dd.status = orchestrator.DaemonDescriptionStatus.error |
f6b5b4d7 TL |
526 | dd.status_desc = 'host is offline' |
527 | dd.events = self.mgr.events.get_for_daemon(dd.name()) | |
528 | return dd | |
529 | ||
530 | for host, dm in self.daemons.items(): | |
531 | yield host, {name: alter(host, d) for name, d in dm.items()} | |
e306af50 TL |
532 | |
533 | def get_daemons_by_service(self, service_name): | |
534 | # type: (str) -> List[orchestrator.DaemonDescription] | |
f67539c2 TL |
535 | assert not service_name.startswith('keepalived.') |
536 | assert not service_name.startswith('haproxy.') | |
537 | ||
e306af50 TL |
538 | result = [] # type: List[orchestrator.DaemonDescription] |
539 | for host, dm in self.daemons.items(): | |
540 | for name, d in dm.items(): | |
f6b5b4d7 TL |
541 | if d.service_name() == service_name: |
542 | result.append(d) | |
543 | return result | |
544 | ||
545 | def get_daemons_by_type(self, service_type): | |
546 | # type: (str) -> List[orchestrator.DaemonDescription] | |
f67539c2 TL |
547 | assert service_type not in ['keepalived', 'haproxy'] |
548 | ||
f6b5b4d7 TL |
549 | result = [] # type: List[orchestrator.DaemonDescription] |
550 | for host, dm in self.daemons.items(): | |
551 | for name, d in dm.items(): | |
f67539c2 | 552 | if d.daemon_type in service_to_daemon_types(service_type): |
e306af50 TL |
553 | result.append(d) |
554 | return result | |
555 | ||
f67539c2 TL |
556 | def get_daemon_types(self, hostname: str) -> List[str]: |
557 | """Provide a list of the types of daemons on the host""" | |
558 | result = set() | |
559 | for _d, dm in self.daemons[hostname].items(): | |
560 | assert dm.daemon_type is not None, f'no daemon type for {dm!r}' | |
561 | result.add(dm.daemon_type) | |
562 | return list(result) | |
563 | ||
e306af50 TL |
564 | def get_daemon_names(self): |
565 | # type: () -> List[str] | |
566 | r = [] | |
567 | for host, dm in self.daemons.items(): | |
568 | for name, dd in dm.items(): | |
569 | r.append(name) | |
570 | return r | |
571 | ||
adb31ebb | 572 | def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]: |
e306af50 TL |
573 | if host in self.daemon_config_deps: |
574 | if name in self.daemon_config_deps[host]: | |
575 | return self.daemon_config_deps[host][name].get('deps', []), \ | |
576 | self.daemon_config_deps[host][name].get('last_config', None) | |
577 | return None, None | |
578 | ||
579 | def host_needs_daemon_refresh(self, host): | |
580 | # type: (str) -> bool | |
581 | if host in self.mgr.offline_hosts: | |
582 | logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh') | |
583 | return False | |
584 | if host in self.daemon_refresh_queue: | |
585 | self.daemon_refresh_queue.remove(host) | |
586 | return True | |
adb31ebb | 587 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
588 | seconds=self.mgr.daemon_cache_timeout) |
589 | if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff: | |
590 | return True | |
591 | return False | |
592 | ||
adb31ebb TL |
593 | def host_needs_facts_refresh(self, host): |
594 | # type: (str) -> bool | |
595 | if host in self.mgr.offline_hosts: | |
596 | logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh') | |
597 | return False | |
f67539c2 | 598 | cutoff = datetime_now() - datetime.timedelta( |
adb31ebb TL |
599 | seconds=self.mgr.facts_cache_timeout) |
600 | if host not in self.last_facts_update or self.last_facts_update[host] < cutoff: | |
601 | return True | |
602 | return False | |
603 | ||
f91f0fd5 TL |
604 | def host_had_daemon_refresh(self, host: str) -> bool: |
605 | """ | |
606 | ... at least once. | |
607 | """ | |
608 | if host in self.last_daemon_update: | |
609 | return True | |
610 | if host not in self.daemons: | |
611 | return False | |
612 | return bool(self.daemons[host]) | |
613 | ||
e306af50 TL |
614 | def host_needs_device_refresh(self, host): |
615 | # type: (str) -> bool | |
616 | if host in self.mgr.offline_hosts: | |
617 | logger.debug(f'Host "{host}" marked as offline. Skipping device refresh') | |
618 | return False | |
619 | if host in self.device_refresh_queue: | |
620 | self.device_refresh_queue.remove(host) | |
621 | return True | |
adb31ebb | 622 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
623 | seconds=self.mgr.device_cache_timeout) |
624 | if host not in self.last_device_update or self.last_device_update[host] < cutoff: | |
625 | return True | |
626 | return False | |
627 | ||
adb31ebb | 628 | def host_needs_osdspec_preview_refresh(self, host: str) -> bool: |
e306af50 TL |
629 | if host in self.mgr.offline_hosts: |
630 | logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh') | |
631 | return False | |
632 | if host in self.osdspec_previews_refresh_queue: | |
633 | self.osdspec_previews_refresh_queue.remove(host) | |
634 | return True | |
635 | # Since this is dependent on other factors (device and spec) this does not need | |
636 | # to be updated periodically. | |
637 | return False | |
638 | ||
639 | def host_needs_check(self, host): | |
640 | # type: (str) -> bool | |
adb31ebb | 641 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
642 | seconds=self.mgr.host_check_interval) |
643 | return host not in self.last_host_check or self.last_host_check[host] < cutoff | |
644 | ||
adb31ebb | 645 | def host_needs_new_etc_ceph_ceph_conf(self, host: str) -> bool: |
f6b5b4d7 TL |
646 | if not self.mgr.manage_etc_ceph_ceph_conf: |
647 | return False | |
648 | if self.mgr.paused: | |
649 | return False | |
650 | if host in self.mgr.offline_hosts: | |
651 | return False | |
652 | if not self.mgr.last_monmap: | |
653 | return False | |
654 | if host not in self.last_etc_ceph_ceph_conf: | |
655 | return True | |
656 | if self.mgr.last_monmap > self.last_etc_ceph_ceph_conf[host]: | |
657 | return True | |
f91f0fd5 TL |
658 | if self.mgr.extra_ceph_conf_is_newer(self.last_etc_ceph_ceph_conf[host]): |
659 | return True | |
f6b5b4d7 TL |
660 | # already up to date: |
661 | return False | |
f91f0fd5 | 662 | |
f67539c2 TL |
663 | def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool: |
664 | if ( | |
665 | host not in self.devices | |
666 | or host not in self.last_device_change | |
667 | or host not in self.last_device_update | |
668 | or host not in self.osdspec_last_applied | |
669 | or spec.service_name() not in self.osdspec_last_applied[host] | |
670 | ): | |
671 | return True | |
672 | created = self.mgr.spec_store.get_created(spec) | |
673 | if not created or created > self.last_device_change[host]: | |
674 | return True | |
675 | return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host] | |
676 | ||
adb31ebb | 677 | def update_last_etc_ceph_ceph_conf(self, host: str) -> None: |
f6b5b4d7 TL |
678 | if not self.mgr.last_monmap: |
679 | return | |
adb31ebb | 680 | self.last_etc_ceph_ceph_conf[host] = datetime_now() |
f6b5b4d7 | 681 | |
f91f0fd5 | 682 | def host_needs_registry_login(self, host: str) -> bool: |
f6b5b4d7 TL |
683 | if host in self.mgr.offline_hosts: |
684 | return False | |
685 | if host in self.registry_login_queue: | |
686 | self.registry_login_queue.remove(host) | |
687 | return True | |
688 | return False | |
689 | ||
e306af50 TL |
690 | def add_daemon(self, host, dd): |
691 | # type: (str, orchestrator.DaemonDescription) -> None | |
692 | assert host in self.daemons | |
693 | self.daemons[host][dd.name()] = dd | |
694 | ||
adb31ebb | 695 | def rm_daemon(self, host: str, name: str) -> None: |
f67539c2 TL |
696 | assert not name.startswith('ha-rgw.') |
697 | ||
e306af50 TL |
698 | if host in self.daemons: |
699 | if name in self.daemons[host]: | |
f6b5b4d7 TL |
700 | del self.daemons[host][name] |
701 | ||
adb31ebb | 702 | def daemon_cache_filled(self) -> bool: |
f6b5b4d7 TL |
703 | """ |
704 | i.e. we have checked the daemons for each hosts at least once. | |
705 | excluding offline hosts. | |
706 | ||
707 | We're not checking for `host_needs_daemon_refresh`, as this might never be | |
708 | False for all hosts. | |
709 | """ | |
f91f0fd5 | 710 | return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts) |
f6b5b4d7 TL |
711 | for h in self.get_hosts()) |
712 | ||
adb31ebb | 713 | def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None: |
f67539c2 TL |
714 | assert not daemon_name.startswith('ha-rgw.') |
715 | ||
f91f0fd5 TL |
716 | priorities = { |
717 | 'start': 1, | |
718 | 'restart': 2, | |
719 | 'reconfig': 3, | |
720 | 'redeploy': 4, | |
721 | 'stop': 5, | |
722 | } | |
723 | existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None) | |
724 | if existing_action and priorities[existing_action] > priorities[action]: | |
725 | logger.debug( | |
726 | f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.') | |
727 | return | |
728 | ||
729 | if host not in self.scheduled_daemon_actions: | |
730 | self.scheduled_daemon_actions[host] = {} | |
731 | self.scheduled_daemon_actions[host][daemon_name] = action | |
732 | ||
adb31ebb | 733 | def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> None: |
f91f0fd5 TL |
734 | if host in self.scheduled_daemon_actions: |
735 | if daemon_name in self.scheduled_daemon_actions[host]: | |
736 | del self.scheduled_daemon_actions[host][daemon_name] | |
737 | if not self.scheduled_daemon_actions[host]: | |
738 | del self.scheduled_daemon_actions[host] | |
739 | ||
adb31ebb | 740 | def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]: |
f67539c2 TL |
741 | assert not daemon.startswith('ha-rgw.') |
742 | ||
f91f0fd5 TL |
743 | return self.scheduled_daemon_actions.get(host, {}).get(daemon) |
744 | ||
f6b5b4d7 TL |
745 | |
746 | class EventStore(): | |
747 | def __init__(self, mgr): | |
748 | # type: (CephadmOrchestrator) -> None | |
749 | self.mgr: CephadmOrchestrator = mgr | |
f91f0fd5 | 750 | self.events = {} # type: Dict[str, List[OrchestratorEvent]] |
f6b5b4d7 TL |
751 | |
752 | def add(self, event: OrchestratorEvent) -> None: | |
753 | ||
754 | if event.kind_subject() not in self.events: | |
755 | self.events[event.kind_subject()] = [event] | |
756 | ||
757 | for e in self.events[event.kind_subject()]: | |
758 | if e.message == event.message: | |
759 | return | |
760 | ||
761 | self.events[event.kind_subject()].append(event) | |
762 | ||
763 | # limit to five events for now. | |
764 | self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:] | |
765 | ||
adb31ebb TL |
766 | def for_service(self, spec: ServiceSpec, level: str, message: str) -> None: |
767 | e = OrchestratorEvent(datetime_now(), 'service', | |
f91f0fd5 | 768 | spec.service_name(), level, message) |
f6b5b4d7 TL |
769 | self.add(e) |
770 | ||
adb31ebb | 771 | def from_orch_error(self, e: OrchestratorError) -> None: |
f6b5b4d7 TL |
772 | if e.event_subject is not None: |
773 | self.add(OrchestratorEvent( | |
adb31ebb | 774 | datetime_now(), |
f6b5b4d7 TL |
775 | e.event_subject[0], |
776 | e.event_subject[1], | |
777 | "ERROR", | |
778 | str(e) | |
779 | )) | |
780 | ||
adb31ebb TL |
781 | def for_daemon(self, daemon_name: str, level: str, message: str) -> None: |
782 | e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message) | |
f6b5b4d7 TL |
783 | self.add(e) |
784 | ||
adb31ebb | 785 | def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None: |
f6b5b4d7 TL |
786 | self.for_daemon( |
787 | daemon_name, | |
788 | "ERROR", | |
789 | str(e) | |
790 | ) | |
791 | ||
792 | def cleanup(self) -> None: | |
793 | # Needs to be properly done, in case events are persistently stored. | |
794 | ||
795 | unknowns: List[str] = [] | |
796 | daemons = self.mgr.cache.get_daemon_names() | |
f67539c2 | 797 | specs = self.mgr.spec_store.all_specs.keys() |
f6b5b4d7 TL |
798 | for k_s, v in self.events.items(): |
799 | kind, subject = k_s.split(':') | |
800 | if kind == 'service': | |
801 | if subject not in specs: | |
802 | unknowns.append(k_s) | |
803 | elif kind == 'daemon': | |
804 | if subject not in daemons: | |
805 | unknowns.append(k_s) | |
806 | ||
807 | for k_s in unknowns: | |
808 | del self.events[k_s] | |
809 | ||
adb31ebb | 810 | def get_for_service(self, name: str) -> List[OrchestratorEvent]: |
f6b5b4d7 TL |
811 | return self.events.get('service:' + name, []) |
812 | ||
adb31ebb | 813 | def get_for_daemon(self, name: str) -> List[OrchestratorEvent]: |
f6b5b4d7 | 814 | return self.events.get('daemon:' + name, []) |