]>
Commit | Line | Data |
---|---|---|
e306af50 TL |
1 | import datetime |
2 | from copy import copy | |
3 | import json | |
4 | import logging | |
5 | from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set | |
6 | ||
7 | import six | |
8 | ||
9 | import orchestrator | |
10 | from ceph.deployment import inventory | |
11 | from ceph.deployment.service_spec import ServiceSpec | |
adb31ebb | 12 | from ceph.utils import str_to_datetime, datetime_to_str, datetime_now |
f6b5b4d7 | 13 | from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent |
e306af50 TL |
14 | |
15 | if TYPE_CHECKING: | |
16 | from .module import CephadmOrchestrator | |
17 | ||
18 | ||
19 | logger = logging.getLogger(__name__) | |
20 | ||
21 | HOST_CACHE_PREFIX = "host." | |
22 | SPEC_STORE_PREFIX = "spec." | |
e306af50 TL |
23 | |
24 | ||
25 | class Inventory: | |
adb31ebb TL |
26 | """ |
27 | The inventory stores a HostSpec for all hosts persistently. | |
28 | """ | |
29 | ||
e306af50 TL |
30 | def __init__(self, mgr: 'CephadmOrchestrator'): |
31 | self.mgr = mgr | |
32 | # load inventory | |
33 | i = self.mgr.get_store('inventory') | |
34 | if i: | |
35 | self._inventory: Dict[str, dict] = json.loads(i) | |
adb31ebb TL |
36 | # handle old clusters missing 'hostname' key from hostspec |
37 | for k, v in self._inventory.items(): | |
38 | if 'hostname' not in v: | |
39 | v['hostname'] = k | |
e306af50 TL |
40 | else: |
41 | self._inventory = dict() | |
42 | logger.debug('Loaded inventory %s' % self._inventory) | |
43 | ||
44 | def keys(self) -> List[str]: | |
45 | return list(self._inventory.keys()) | |
46 | ||
47 | def __contains__(self, host: str) -> bool: | |
48 | return host in self._inventory | |
49 | ||
adb31ebb | 50 | def assert_host(self, host: str) -> None: |
e306af50 TL |
51 | if host not in self._inventory: |
52 | raise OrchestratorError('host %s does not exist' % host) | |
53 | ||
adb31ebb | 54 | def add_host(self, spec: HostSpec) -> None: |
e306af50 TL |
55 | self._inventory[spec.hostname] = spec.to_json() |
56 | self.save() | |
57 | ||
adb31ebb | 58 | def rm_host(self, host: str) -> None: |
e306af50 TL |
59 | self.assert_host(host) |
60 | del self._inventory[host] | |
61 | self.save() | |
62 | ||
adb31ebb | 63 | def set_addr(self, host: str, addr: str) -> None: |
e306af50 TL |
64 | self.assert_host(host) |
65 | self._inventory[host]['addr'] = addr | |
66 | self.save() | |
67 | ||
adb31ebb | 68 | def add_label(self, host: str, label: str) -> None: |
e306af50 TL |
69 | self.assert_host(host) |
70 | ||
71 | if 'labels' not in self._inventory[host]: | |
72 | self._inventory[host]['labels'] = list() | |
73 | if label not in self._inventory[host]['labels']: | |
74 | self._inventory[host]['labels'].append(label) | |
75 | self.save() | |
76 | ||
adb31ebb | 77 | def rm_label(self, host: str, label: str) -> None: |
e306af50 TL |
78 | self.assert_host(host) |
79 | ||
80 | if 'labels' not in self._inventory[host]: | |
81 | self._inventory[host]['labels'] = list() | |
82 | if label in self._inventory[host]['labels']: | |
83 | self._inventory[host]['labels'].remove(label) | |
84 | self.save() | |
85 | ||
adb31ebb | 86 | def get_addr(self, host: str) -> str: |
e306af50 TL |
87 | self.assert_host(host) |
88 | return self._inventory[host].get('addr', host) | |
89 | ||
90 | def filter_by_label(self, label: Optional[str] = '', as_hostspec: bool = False) -> Iterator: | |
91 | for h, hostspec in self._inventory.items(): | |
92 | if not label or label in hostspec.get('labels', []): | |
93 | if as_hostspec: | |
f6b5b4d7 TL |
94 | yield self.spec_from_dict(hostspec) |
95 | else: | |
96 | yield h | |
e306af50 | 97 | |
adb31ebb | 98 | def spec_from_dict(self, info: dict) -> HostSpec: |
e306af50 TL |
99 | hostname = info['hostname'] |
100 | return HostSpec( | |
f91f0fd5 TL |
101 | hostname, |
102 | addr=info.get('addr', hostname), | |
103 | labels=info.get('labels', []), | |
104 | status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''), | |
105 | ) | |
e306af50 | 106 | |
f91f0fd5 TL |
107 | def all_specs(self) -> List[HostSpec]: |
108 | return list(map(self.spec_from_dict, self._inventory.values())) | |
e306af50 | 109 | |
adb31ebb | 110 | def save(self) -> None: |
e306af50 TL |
111 | self.mgr.set_store('inventory', json.dumps(self._inventory)) |
112 | ||
113 | ||
114 | class SpecStore(): | |
115 | def __init__(self, mgr): | |
116 | # type: (CephadmOrchestrator) -> None | |
117 | self.mgr = mgr | |
f91f0fd5 TL |
118 | self.specs = {} # type: Dict[str, ServiceSpec] |
119 | self.spec_created = {} # type: Dict[str, datetime.datetime] | |
120 | self.spec_preview = {} # type: Dict[str, ServiceSpec] | |
e306af50 TL |
121 | |
122 | def load(self): | |
123 | # type: () -> None | |
124 | for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)): | |
125 | service_name = k[len(SPEC_STORE_PREFIX):] | |
126 | try: | |
127 | v = json.loads(v) | |
128 | spec = ServiceSpec.from_json(v['spec']) | |
f91f0fd5 | 129 | created = str_to_datetime(v['created']) |
e306af50 TL |
130 | self.specs[service_name] = spec |
131 | self.spec_created[service_name] = created | |
132 | self.mgr.log.debug('SpecStore: loaded spec for %s' % ( | |
133 | service_name)) | |
134 | except Exception as e: | |
135 | self.mgr.log.warning('unable to load spec for %s: %s' % ( | |
136 | service_name, e)) | |
137 | pass | |
138 | ||
139 | def save(self, spec): | |
140 | # type: (ServiceSpec) -> None | |
f6b5b4d7 TL |
141 | if spec.preview_only: |
142 | self.spec_preview[spec.service_name()] = spec | |
143 | return None | |
e306af50 | 144 | self.specs[spec.service_name()] = spec |
adb31ebb | 145 | self.spec_created[spec.service_name()] = datetime_now() |
e306af50 TL |
146 | self.mgr.set_store( |
147 | SPEC_STORE_PREFIX + spec.service_name(), | |
148 | json.dumps({ | |
149 | 'spec': spec.to_json(), | |
f91f0fd5 | 150 | 'created': datetime_to_str(self.spec_created[spec.service_name()]), |
e306af50 TL |
151 | }, sort_keys=True), |
152 | ) | |
f6b5b4d7 | 153 | self.mgr.events.for_service(spec, OrchestratorEvent.INFO, 'service was created') |
e306af50 TL |
154 | |
155 | def rm(self, service_name): | |
156 | # type: (str) -> bool | |
157 | found = service_name in self.specs | |
158 | if found: | |
159 | del self.specs[service_name] | |
160 | del self.spec_created[service_name] | |
161 | self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None) | |
162 | return found | |
163 | ||
164 | def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]: | |
165 | specs = [] | |
166 | for sn, spec in self.specs.items(): | |
167 | if not service_name or \ | |
168 | sn == service_name or \ | |
169 | sn.startswith(service_name + '.'): | |
170 | specs.append(spec) | |
171 | self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % ( | |
172 | service_name, specs)) | |
173 | return specs | |
174 | ||
f91f0fd5 | 175 | |
e306af50 | 176 | class HostCache(): |
adb31ebb TL |
177 | """ |
178 | HostCache stores different things: | |
179 | ||
180 | 1. `daemons`: Deployed daemons O(daemons) | |
181 | ||
182 | They're part of the configuration nowadays and need to be | |
183 | persistent. The name "daemon cache" is unfortunately a bit misleading. | |
184 | Like for example we really need to know where daemons are deployed on | |
185 | hosts that are offline. | |
186 | ||
187 | 2. `devices`: ceph-volume inventory cache O(hosts) | |
188 | ||
189 | As soon as this is populated, it becomes more or less read-only. | |
190 | ||
191 | 3. `networks`: network interfaces for each host. O(hosts) | |
192 | ||
193 | This is needed in order to deploy MONs. As this is mostly read-only. | |
194 | ||
195 | 4. `last_etc_ceph_ceph_conf` O(hosts) | |
196 | ||
197 | Stores the last refresh time for the /etc/ceph/ceph.conf. Used | |
198 | to avoid deploying new configs when failing over to a new mgr. | |
199 | ||
200 | 5. `scheduled_daemon_actions`: O(daemons) | |
201 | ||
202 | Used to run daemon actions after deploying a daemon. We need to | |
203 | store it persistently, in order to stay consistent across | |
204 | MGR failovers. | |
205 | """ | |
206 | ||
e306af50 TL |
207 | def __init__(self, mgr): |
208 | # type: (CephadmOrchestrator) -> None | |
209 | self.mgr: CephadmOrchestrator = mgr | |
210 | self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] | |
211 | self.last_daemon_update = {} # type: Dict[str, datetime.datetime] | |
212 | self.devices = {} # type: Dict[str, List[inventory.Device]] | |
adb31ebb TL |
213 | self.facts = {} # type: Dict[str, Dict[str, Any]] |
214 | self.last_facts_update = {} # type: Dict[str, datetime.datetime] | |
e306af50 TL |
215 | self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]] |
216 | self.networks = {} # type: Dict[str, Dict[str, List[str]]] | |
217 | self.last_device_update = {} # type: Dict[str, datetime.datetime] | |
f91f0fd5 TL |
218 | self.daemon_refresh_queue = [] # type: List[str] |
219 | self.device_refresh_queue = [] # type: List[str] | |
220 | self.osdspec_previews_refresh_queue = [] # type: List[str] | |
f6b5b4d7 TL |
221 | |
222 | # host -> daemon name -> dict | |
e306af50 TL |
223 | self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]] |
224 | self.last_host_check = {} # type: Dict[str, datetime.datetime] | |
225 | self.loading_osdspec_preview = set() # type: Set[str] | |
f6b5b4d7 TL |
226 | self.last_etc_ceph_ceph_conf: Dict[str, datetime.datetime] = {} |
227 | self.registry_login_queue: Set[str] = set() | |
e306af50 | 228 | |
f91f0fd5 TL |
229 | self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {} |
230 | ||
e306af50 TL |
231 | def load(self): |
232 | # type: () -> None | |
233 | for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)): | |
234 | host = k[len(HOST_CACHE_PREFIX):] | |
235 | if host not in self.mgr.inventory: | |
236 | self.mgr.log.warning('removing stray HostCache host record %s' % ( | |
237 | host)) | |
238 | self.mgr.set_store(k, None) | |
239 | try: | |
240 | j = json.loads(v) | |
241 | if 'last_device_update' in j: | |
f91f0fd5 | 242 | self.last_device_update[host] = str_to_datetime(j['last_device_update']) |
e306af50 TL |
243 | else: |
244 | self.device_refresh_queue.append(host) | |
245 | # for services, we ignore the persisted last_*_update | |
246 | # and always trigger a new scrape on mgr restart. | |
247 | self.daemon_refresh_queue.append(host) | |
248 | self.daemons[host] = {} | |
249 | self.osdspec_previews[host] = [] | |
250 | self.devices[host] = [] | |
251 | self.networks[host] = {} | |
252 | self.daemon_config_deps[host] = {} | |
253 | for name, d in j.get('daemons', {}).items(): | |
254 | self.daemons[host][name] = \ | |
255 | orchestrator.DaemonDescription.from_json(d) | |
256 | for d in j.get('devices', []): | |
257 | self.devices[host].append(inventory.Device.from_json(d)) | |
258 | self.networks[host] = j.get('networks', {}) | |
259 | self.osdspec_previews[host] = j.get('osdspec_previews', {}) | |
260 | ||
261 | for name, d in j.get('daemon_config_deps', {}).items(): | |
262 | self.daemon_config_deps[host][name] = { | |
263 | 'deps': d.get('deps', []), | |
f91f0fd5 | 264 | 'last_config': str_to_datetime(d['last_config']), |
e306af50 TL |
265 | } |
266 | if 'last_host_check' in j: | |
f91f0fd5 | 267 | self.last_host_check[host] = str_to_datetime(j['last_host_check']) |
f6b5b4d7 | 268 | if 'last_etc_ceph_ceph_conf' in j: |
f91f0fd5 TL |
269 | self.last_etc_ceph_ceph_conf[host] = str_to_datetime( |
270 | j['last_etc_ceph_ceph_conf']) | |
f6b5b4d7 | 271 | self.registry_login_queue.add(host) |
f91f0fd5 TL |
272 | self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {}) |
273 | ||
e306af50 TL |
274 | self.mgr.log.debug( |
275 | 'HostCache.load: host %s has %d daemons, ' | |
276 | '%d devices, %d networks' % ( | |
277 | host, len(self.daemons[host]), len(self.devices[host]), | |
278 | len(self.networks[host]))) | |
279 | except Exception as e: | |
280 | self.mgr.log.warning('unable to load cached state for %s: %s' % ( | |
281 | host, e)) | |
282 | pass | |
283 | ||
284 | def update_host_daemons(self, host, dm): | |
285 | # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None | |
286 | self.daemons[host] = dm | |
adb31ebb TL |
287 | self.last_daemon_update[host] = datetime_now() |
288 | ||
289 | def update_host_facts(self, host, facts): | |
290 | # type: (str, Dict[str, Dict[str, Any]]) -> None | |
291 | self.facts[host] = facts | |
292 | self.last_facts_update[host] = datetime.datetime.utcnow() | |
e306af50 TL |
293 | |
294 | def update_host_devices_networks(self, host, dls, nets): | |
295 | # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None | |
296 | self.devices[host] = dls | |
297 | self.networks[host] = nets | |
adb31ebb | 298 | self.last_device_update[host] = datetime_now() |
e306af50 | 299 | |
adb31ebb | 300 | def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None: |
e306af50 TL |
301 | self.daemon_config_deps[host][name] = { |
302 | 'deps': deps, | |
303 | 'last_config': stamp, | |
304 | } | |
305 | ||
306 | def update_last_host_check(self, host): | |
307 | # type: (str) -> None | |
adb31ebb | 308 | self.last_host_check[host] = datetime_now() |
e306af50 TL |
309 | |
310 | def prime_empty_host(self, host): | |
311 | # type: (str) -> None | |
312 | """ | |
313 | Install an empty entry for a host | |
314 | """ | |
315 | self.daemons[host] = {} | |
316 | self.devices[host] = [] | |
317 | self.networks[host] = {} | |
318 | self.osdspec_previews[host] = [] | |
319 | self.daemon_config_deps[host] = {} | |
320 | self.daemon_refresh_queue.append(host) | |
321 | self.device_refresh_queue.append(host) | |
322 | self.osdspec_previews_refresh_queue.append(host) | |
f6b5b4d7 | 323 | self.registry_login_queue.add(host) |
e306af50 TL |
324 | |
325 | def invalidate_host_daemons(self, host): | |
326 | # type: (str) -> None | |
327 | self.daemon_refresh_queue.append(host) | |
328 | if host in self.last_daemon_update: | |
329 | del self.last_daemon_update[host] | |
330 | self.mgr.event.set() | |
331 | ||
332 | def invalidate_host_devices(self, host): | |
333 | # type: (str) -> None | |
334 | self.device_refresh_queue.append(host) | |
335 | if host in self.last_device_update: | |
336 | del self.last_device_update[host] | |
337 | self.mgr.event.set() | |
f91f0fd5 | 338 | |
adb31ebb | 339 | def distribute_new_registry_login_info(self) -> None: |
f6b5b4d7 | 340 | self.registry_login_queue = set(self.mgr.inventory.keys()) |
e306af50 | 341 | |
f91f0fd5 TL |
342 | def save_host(self, host: str) -> None: |
343 | j: Dict[str, Any] = { | |
e306af50 TL |
344 | 'daemons': {}, |
345 | 'devices': [], | |
346 | 'osdspec_previews': [], | |
347 | 'daemon_config_deps': {}, | |
348 | } | |
349 | if host in self.last_daemon_update: | |
f91f0fd5 | 350 | j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host]) |
e306af50 | 351 | if host in self.last_device_update: |
f91f0fd5 | 352 | j['last_device_update'] = datetime_to_str(self.last_device_update[host]) |
adb31ebb TL |
353 | if host in self.daemons: |
354 | for name, dd in self.daemons[host].items(): | |
355 | j['daemons'][name] = dd.to_json() | |
356 | if host in self.devices: | |
357 | for d in self.devices[host]: | |
358 | j['devices'].append(d.to_json()) | |
359 | if host in self.networks: | |
360 | j['networks'] = self.networks[host] | |
361 | if host in self.daemon_config_deps: | |
362 | for name, depi in self.daemon_config_deps[host].items(): | |
363 | j['daemon_config_deps'][name] = { | |
364 | 'deps': depi.get('deps', []), | |
365 | 'last_config': datetime_to_str(depi['last_config']), | |
366 | } | |
367 | if host in self.osdspec_previews and self.osdspec_previews[host]: | |
e306af50 TL |
368 | j['osdspec_previews'] = self.osdspec_previews[host] |
369 | ||
370 | if host in self.last_host_check: | |
f91f0fd5 | 371 | j['last_host_check'] = datetime_to_str(self.last_host_check[host]) |
f6b5b4d7 TL |
372 | |
373 | if host in self.last_etc_ceph_ceph_conf: | |
f91f0fd5 | 374 | j['last_etc_ceph_ceph_conf'] = datetime_to_str(self.last_etc_ceph_ceph_conf[host]) |
adb31ebb | 375 | if host in self.scheduled_daemon_actions: |
f91f0fd5 | 376 | j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host] |
f6b5b4d7 | 377 | |
e306af50 TL |
378 | self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) |
379 | ||
380 | def rm_host(self, host): | |
381 | # type: (str) -> None | |
382 | if host in self.daemons: | |
383 | del self.daemons[host] | |
384 | if host in self.devices: | |
385 | del self.devices[host] | |
adb31ebb TL |
386 | if host in self.facts: |
387 | del self.facts[host] | |
388 | if host in self.last_facts_update: | |
389 | del self.last_facts_update[host] | |
e306af50 TL |
390 | if host in self.osdspec_previews: |
391 | del self.osdspec_previews[host] | |
392 | if host in self.loading_osdspec_preview: | |
393 | self.loading_osdspec_preview.remove(host) | |
394 | if host in self.networks: | |
395 | del self.networks[host] | |
396 | if host in self.last_daemon_update: | |
397 | del self.last_daemon_update[host] | |
398 | if host in self.last_device_update: | |
399 | del self.last_device_update[host] | |
400 | if host in self.daemon_config_deps: | |
401 | del self.daemon_config_deps[host] | |
f91f0fd5 TL |
402 | if host in self.scheduled_daemon_actions: |
403 | del self.scheduled_daemon_actions[host] | |
e306af50 TL |
404 | self.mgr.set_store(HOST_CACHE_PREFIX + host, None) |
405 | ||
406 | def get_hosts(self): | |
407 | # type: () -> List[str] | |
408 | r = [] | |
409 | for host, di in self.daemons.items(): | |
410 | r.append(host) | |
411 | return r | |
412 | ||
413 | def get_daemons(self): | |
414 | # type: () -> List[orchestrator.DaemonDescription] | |
415 | r = [] | |
416 | for host, dm in self.daemons.items(): | |
417 | for name, dd in dm.items(): | |
418 | r.append(dd) | |
419 | return r | |
420 | ||
f6b5b4d7 TL |
421 | def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription: |
422 | for _, dm in self.daemons.items(): | |
423 | for _, dd in dm.items(): | |
424 | if dd.name() == daemon_name: | |
425 | return dd | |
426 | raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)') | |
427 | ||
e306af50 | 428 | def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]: |
adb31ebb | 429 | def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription: |
f6b5b4d7 | 430 | dd = copy(dd_orig) |
e306af50 | 431 | if host in self.mgr.offline_hosts: |
f6b5b4d7 TL |
432 | dd.status = -1 |
433 | dd.status_desc = 'host is offline' | |
434 | dd.events = self.mgr.events.get_for_daemon(dd.name()) | |
435 | return dd | |
436 | ||
437 | for host, dm in self.daemons.items(): | |
438 | yield host, {name: alter(host, d) for name, d in dm.items()} | |
e306af50 TL |
439 | |
440 | def get_daemons_by_service(self, service_name): | |
441 | # type: (str) -> List[orchestrator.DaemonDescription] | |
442 | result = [] # type: List[orchestrator.DaemonDescription] | |
443 | for host, dm in self.daemons.items(): | |
444 | for name, d in dm.items(): | |
f6b5b4d7 TL |
445 | if d.service_name() == service_name: |
446 | result.append(d) | |
447 | return result | |
448 | ||
449 | def get_daemons_by_type(self, service_type): | |
450 | # type: (str) -> List[orchestrator.DaemonDescription] | |
451 | result = [] # type: List[orchestrator.DaemonDescription] | |
452 | for host, dm in self.daemons.items(): | |
453 | for name, d in dm.items(): | |
454 | if d.daemon_type == service_type: | |
e306af50 TL |
455 | result.append(d) |
456 | return result | |
457 | ||
458 | def get_daemon_names(self): | |
459 | # type: () -> List[str] | |
460 | r = [] | |
461 | for host, dm in self.daemons.items(): | |
462 | for name, dd in dm.items(): | |
463 | r.append(name) | |
464 | return r | |
465 | ||
adb31ebb | 466 | def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]: |
e306af50 TL |
467 | if host in self.daemon_config_deps: |
468 | if name in self.daemon_config_deps[host]: | |
469 | return self.daemon_config_deps[host][name].get('deps', []), \ | |
470 | self.daemon_config_deps[host][name].get('last_config', None) | |
471 | return None, None | |
472 | ||
473 | def host_needs_daemon_refresh(self, host): | |
474 | # type: (str) -> bool | |
475 | if host in self.mgr.offline_hosts: | |
476 | logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh') | |
477 | return False | |
478 | if host in self.daemon_refresh_queue: | |
479 | self.daemon_refresh_queue.remove(host) | |
480 | return True | |
adb31ebb | 481 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
482 | seconds=self.mgr.daemon_cache_timeout) |
483 | if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff: | |
484 | return True | |
485 | return False | |
486 | ||
adb31ebb TL |
487 | def host_needs_facts_refresh(self, host): |
488 | # type: (str) -> bool | |
489 | if host in self.mgr.offline_hosts: | |
490 | logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh') | |
491 | return False | |
492 | cutoff = datetime.datetime.utcnow() - datetime.timedelta( | |
493 | seconds=self.mgr.facts_cache_timeout) | |
494 | if host not in self.last_facts_update or self.last_facts_update[host] < cutoff: | |
495 | return True | |
496 | return False | |
497 | ||
f91f0fd5 TL |
498 | def host_had_daemon_refresh(self, host: str) -> bool: |
499 | """ | |
500 | ... at least once. | |
501 | """ | |
502 | if host in self.last_daemon_update: | |
503 | return True | |
504 | if host not in self.daemons: | |
505 | return False | |
506 | return bool(self.daemons[host]) | |
507 | ||
e306af50 TL |
508 | def host_needs_device_refresh(self, host): |
509 | # type: (str) -> bool | |
510 | if host in self.mgr.offline_hosts: | |
511 | logger.debug(f'Host "{host}" marked as offline. Skipping device refresh') | |
512 | return False | |
513 | if host in self.device_refresh_queue: | |
514 | self.device_refresh_queue.remove(host) | |
515 | return True | |
adb31ebb | 516 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
517 | seconds=self.mgr.device_cache_timeout) |
518 | if host not in self.last_device_update or self.last_device_update[host] < cutoff: | |
519 | return True | |
520 | return False | |
521 | ||
adb31ebb | 522 | def host_needs_osdspec_preview_refresh(self, host: str) -> bool: |
e306af50 TL |
523 | if host in self.mgr.offline_hosts: |
524 | logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh') | |
525 | return False | |
526 | if host in self.osdspec_previews_refresh_queue: | |
527 | self.osdspec_previews_refresh_queue.remove(host) | |
528 | return True | |
529 | # Since this is dependent on other factors (device and spec) this does not need | |
530 | # to be updated periodically. | |
531 | return False | |
532 | ||
533 | def host_needs_check(self, host): | |
534 | # type: (str) -> bool | |
adb31ebb | 535 | cutoff = datetime_now() - datetime.timedelta( |
e306af50 TL |
536 | seconds=self.mgr.host_check_interval) |
537 | return host not in self.last_host_check or self.last_host_check[host] < cutoff | |
538 | ||
adb31ebb | 539 | def host_needs_new_etc_ceph_ceph_conf(self, host: str) -> bool: |
f6b5b4d7 TL |
540 | if not self.mgr.manage_etc_ceph_ceph_conf: |
541 | return False | |
542 | if self.mgr.paused: | |
543 | return False | |
544 | if host in self.mgr.offline_hosts: | |
545 | return False | |
546 | if not self.mgr.last_monmap: | |
547 | return False | |
548 | if host not in self.last_etc_ceph_ceph_conf: | |
549 | return True | |
550 | if self.mgr.last_monmap > self.last_etc_ceph_ceph_conf[host]: | |
551 | return True | |
f91f0fd5 TL |
552 | if self.mgr.extra_ceph_conf_is_newer(self.last_etc_ceph_ceph_conf[host]): |
553 | return True | |
f6b5b4d7 TL |
554 | # already up to date: |
555 | return False | |
f91f0fd5 | 556 | |
adb31ebb | 557 | def update_last_etc_ceph_ceph_conf(self, host: str) -> None: |
f6b5b4d7 TL |
558 | if not self.mgr.last_monmap: |
559 | return | |
adb31ebb | 560 | self.last_etc_ceph_ceph_conf[host] = datetime_now() |
f6b5b4d7 | 561 | |
f91f0fd5 | 562 | def host_needs_registry_login(self, host: str) -> bool: |
f6b5b4d7 TL |
563 | if host in self.mgr.offline_hosts: |
564 | return False | |
565 | if host in self.registry_login_queue: | |
566 | self.registry_login_queue.remove(host) | |
567 | return True | |
568 | return False | |
569 | ||
e306af50 TL |
570 | def add_daemon(self, host, dd): |
571 | # type: (str, orchestrator.DaemonDescription) -> None | |
572 | assert host in self.daemons | |
573 | self.daemons[host][dd.name()] = dd | |
574 | ||
adb31ebb | 575 | def rm_daemon(self, host: str, name: str) -> None: |
e306af50 TL |
576 | if host in self.daemons: |
577 | if name in self.daemons[host]: | |
f6b5b4d7 TL |
578 | del self.daemons[host][name] |
579 | ||
adb31ebb | 580 | def daemon_cache_filled(self) -> bool: |
f6b5b4d7 TL |
581 | """ |
582 | i.e. we have checked the daemons for each hosts at least once. | |
583 | excluding offline hosts. | |
584 | ||
585 | We're not checking for `host_needs_daemon_refresh`, as this might never be | |
586 | False for all hosts. | |
587 | """ | |
f91f0fd5 | 588 | return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts) |
f6b5b4d7 TL |
589 | for h in self.get_hosts()) |
590 | ||
adb31ebb | 591 | def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None: |
f91f0fd5 TL |
592 | priorities = { |
593 | 'start': 1, | |
594 | 'restart': 2, | |
595 | 'reconfig': 3, | |
596 | 'redeploy': 4, | |
597 | 'stop': 5, | |
598 | } | |
599 | existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None) | |
600 | if existing_action and priorities[existing_action] > priorities[action]: | |
601 | logger.debug( | |
602 | f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.') | |
603 | return | |
604 | ||
605 | if host not in self.scheduled_daemon_actions: | |
606 | self.scheduled_daemon_actions[host] = {} | |
607 | self.scheduled_daemon_actions[host][daemon_name] = action | |
608 | ||
adb31ebb | 609 | def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> None: |
f91f0fd5 TL |
610 | if host in self.scheduled_daemon_actions: |
611 | if daemon_name in self.scheduled_daemon_actions[host]: | |
612 | del self.scheduled_daemon_actions[host][daemon_name] | |
613 | if not self.scheduled_daemon_actions[host]: | |
614 | del self.scheduled_daemon_actions[host] | |
615 | ||
adb31ebb | 616 | def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]: |
f91f0fd5 TL |
617 | return self.scheduled_daemon_actions.get(host, {}).get(daemon) |
618 | ||
f6b5b4d7 TL |
619 | |
620 | class EventStore(): | |
621 | def __init__(self, mgr): | |
622 | # type: (CephadmOrchestrator) -> None | |
623 | self.mgr: CephadmOrchestrator = mgr | |
f91f0fd5 | 624 | self.events = {} # type: Dict[str, List[OrchestratorEvent]] |
f6b5b4d7 TL |
625 | |
626 | def add(self, event: OrchestratorEvent) -> None: | |
627 | ||
628 | if event.kind_subject() not in self.events: | |
629 | self.events[event.kind_subject()] = [event] | |
630 | ||
631 | for e in self.events[event.kind_subject()]: | |
632 | if e.message == event.message: | |
633 | return | |
634 | ||
635 | self.events[event.kind_subject()].append(event) | |
636 | ||
637 | # limit to five events for now. | |
638 | self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:] | |
639 | ||
adb31ebb TL |
640 | def for_service(self, spec: ServiceSpec, level: str, message: str) -> None: |
641 | e = OrchestratorEvent(datetime_now(), 'service', | |
f91f0fd5 | 642 | spec.service_name(), level, message) |
f6b5b4d7 TL |
643 | self.add(e) |
644 | ||
adb31ebb | 645 | def from_orch_error(self, e: OrchestratorError) -> None: |
f6b5b4d7 TL |
646 | if e.event_subject is not None: |
647 | self.add(OrchestratorEvent( | |
adb31ebb | 648 | datetime_now(), |
f6b5b4d7 TL |
649 | e.event_subject[0], |
650 | e.event_subject[1], | |
651 | "ERROR", | |
652 | str(e) | |
653 | )) | |
654 | ||
adb31ebb TL |
655 | def for_daemon(self, daemon_name: str, level: str, message: str) -> None: |
656 | e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message) | |
f6b5b4d7 TL |
657 | self.add(e) |
658 | ||
adb31ebb | 659 | def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None: |
f6b5b4d7 TL |
660 | self.for_daemon( |
661 | daemon_name, | |
662 | "ERROR", | |
663 | str(e) | |
664 | ) | |
665 | ||
666 | def cleanup(self) -> None: | |
667 | # Needs to be properly done, in case events are persistently stored. | |
668 | ||
669 | unknowns: List[str] = [] | |
670 | daemons = self.mgr.cache.get_daemon_names() | |
671 | specs = self.mgr.spec_store.specs.keys() | |
672 | for k_s, v in self.events.items(): | |
673 | kind, subject = k_s.split(':') | |
674 | if kind == 'service': | |
675 | if subject not in specs: | |
676 | unknowns.append(k_s) | |
677 | elif kind == 'daemon': | |
678 | if subject not in daemons: | |
679 | unknowns.append(k_s) | |
680 | ||
681 | for k_s in unknowns: | |
682 | del self.events[k_s] | |
683 | ||
adb31ebb | 684 | def get_for_service(self, name: str) -> List[OrchestratorEvent]: |
f6b5b4d7 TL |
685 | return self.events.get('service:' + name, []) |
686 | ||
adb31ebb | 687 | def get_for_daemon(self, name: str) -> List[OrchestratorEvent]: |
f6b5b4d7 | 688 | return self.events.get('daemon:' + name, []) |