]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | import json |
2 | import errno | |
3 | import logging | |
4 | import time | |
1911f103 | 5 | from copy import copy |
9f95a23c TL |
6 | from threading import Event |
7 | from functools import wraps | |
8 | ||
9 | from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException | |
10 | ||
11 | import string | |
12 | try: | |
13 | from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, \ | |
14 | Any, NamedTuple, Iterator, Set, Sequence | |
801d1391 | 15 | from typing import TYPE_CHECKING, cast |
9f95a23c TL |
16 | except ImportError: |
17 | TYPE_CHECKING = False # just for type checking | |
18 | ||
19 | ||
20 | import datetime | |
21 | import six | |
22 | import os | |
23 | import random | |
24 | import tempfile | |
25 | import multiprocessing.pool | |
26 | import re | |
27 | import shutil | |
28 | import subprocess | |
29 | import uuid | |
30 | ||
31 | from ceph.deployment import inventory, translate | |
32 | from ceph.deployment.drive_group import DriveGroupSpec | |
1911f103 | 33 | from ceph.deployment.drive_selection.selector import DriveSelection |
801d1391 TL |
34 | from ceph.deployment.service_spec import \ |
35 | HostPlacementSpec, NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host | |
9f95a23c | 36 | |
801d1391 | 37 | from mgr_module import MgrModule, HandleCommandResult |
9f95a23c TL |
38 | import orchestrator |
39 | from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \ | |
40 | CLICommandMeta | |
41 | ||
42 | from . import remotes | |
801d1391 TL |
43 | from . import utils |
44 | from .nfs import NFSGanesha | |
9f95a23c TL |
45 | from .osd import RemoveUtil, OSDRemoval |
46 | ||
47 | ||
48 | try: | |
49 | import remoto | |
50 | import remoto.process | |
51 | import execnet.gateway_bootstrap | |
52 | except ImportError as e: | |
53 | remoto = None | |
54 | remoto_import_error = str(e) | |
55 | ||
56 | try: | |
57 | from typing import List | |
58 | except ImportError: | |
59 | pass | |
60 | ||
61 | logger = logging.getLogger(__name__) | |
62 | ||
1911f103 TL |
63 | DEFAULT_SSH_CONFIG = """ |
64 | Host * | |
65 | User root | |
66 | StrictHostKeyChecking no | |
67 | UserKnownHostsFile /dev/null | |
68 | ConnectTimeout=30 | |
69 | """ | |
9f95a23c TL |
70 | |
71 | DATEFMT = '%Y-%m-%dT%H:%M:%S.%f' | |
72 | CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ' | |
73 | ||
74 | HOST_CACHE_PREFIX = "host." | |
75 | SPEC_STORE_PREFIX = "spec." | |
76 | ||
77 | # ceph daemon types that use the ceph container image. | |
78 | # NOTE: listed in upgrade order! | |
79 | CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror'] | |
80 | CEPH_TYPES = set(CEPH_UPGRADE_ORDER) | |
81 | ||
82 | ||
83 | # for py2 compat | |
84 | try: | |
85 | from tempfile import TemporaryDirectory # py3 | |
86 | except ImportError: | |
87 | # define a minimal (but sufficient) equivalent for <= py 3.2 | |
88 | class TemporaryDirectory(object): # type: ignore | |
89 | def __init__(self): | |
90 | self.name = tempfile.mkdtemp() | |
91 | ||
92 | def __enter__(self): | |
93 | if not self.name: | |
94 | self.name = tempfile.mkdtemp() | |
95 | return self.name | |
96 | ||
97 | def cleanup(self): | |
98 | shutil.rmtree(self.name) | |
99 | ||
100 | def __exit__(self, exc_type, exc_value, traceback): | |
101 | self.cleanup() | |
102 | ||
103 | ||
9f95a23c TL |
104 | class SpecStore(): |
105 | def __init__(self, mgr): | |
106 | # type: (CephadmOrchestrator) -> None | |
107 | self.mgr = mgr | |
108 | self.specs = {} # type: Dict[str, ServiceSpec] | |
109 | self.spec_created = {} # type: Dict[str, datetime.datetime] | |
110 | ||
111 | def load(self): | |
112 | # type: () -> None | |
113 | for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)): | |
114 | service_name = k[len(SPEC_STORE_PREFIX):] | |
115 | try: | |
116 | v = json.loads(v) | |
117 | spec = ServiceSpec.from_json(v['spec']) | |
118 | created = datetime.datetime.strptime(v['created'], DATEFMT) | |
119 | self.specs[service_name] = spec | |
120 | self.spec_created[service_name] = created | |
121 | self.mgr.log.debug('SpecStore: loaded spec for %s' % ( | |
122 | service_name)) | |
123 | except Exception as e: | |
124 | self.mgr.log.warning('unable to load spec for %s: %s' % ( | |
125 | service_name, e)) | |
126 | pass | |
127 | ||
128 | def save(self, spec): | |
129 | # type: (ServiceSpec) -> None | |
130 | self.specs[spec.service_name()] = spec | |
131 | self.spec_created[spec.service_name()] = datetime.datetime.utcnow() | |
132 | self.mgr.set_store( | |
133 | SPEC_STORE_PREFIX + spec.service_name(), | |
134 | json.dumps({ | |
135 | 'spec': spec.to_json(), | |
136 | 'created': self.spec_created[spec.service_name()].strftime(DATEFMT), | |
137 | }, sort_keys=True), | |
138 | ) | |
139 | ||
140 | def rm(self, service_name): | |
1911f103 TL |
141 | # type: (str) -> bool |
142 | found = service_name in self.specs | |
143 | if found: | |
9f95a23c TL |
144 | del self.specs[service_name] |
145 | del self.spec_created[service_name] | |
146 | self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None) | |
1911f103 | 147 | return found |
9f95a23c | 148 | |
1911f103 | 149 | def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]: |
9f95a23c TL |
150 | specs = [] |
151 | for sn, spec in self.specs.items(): | |
152 | if not service_name or \ | |
153 | sn == service_name or \ | |
154 | sn.startswith(service_name + '.'): | |
155 | specs.append(spec) | |
801d1391 TL |
156 | self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % ( |
157 | service_name, specs)) | |
9f95a23c TL |
158 | return specs |
159 | ||
160 | class HostCache(): | |
161 | def __init__(self, mgr): | |
162 | # type: (CephadmOrchestrator) -> None | |
1911f103 | 163 | self.mgr: CephadmOrchestrator = mgr |
9f95a23c TL |
164 | self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] |
165 | self.last_daemon_update = {} # type: Dict[str, datetime.datetime] | |
166 | self.devices = {} # type: Dict[str, List[inventory.Device]] | |
167 | self.networks = {} # type: Dict[str, Dict[str, List[str]]] | |
168 | self.last_device_update = {} # type: Dict[str, datetime.datetime] | |
169 | self.daemon_refresh_queue = [] # type: List[str] | |
170 | self.device_refresh_queue = [] # type: List[str] | |
171 | self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]] | |
172 | self.last_host_check = {} # type: Dict[str, datetime.datetime] | |
173 | ||
174 | def load(self): | |
175 | # type: () -> None | |
176 | for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)): | |
177 | host = k[len(HOST_CACHE_PREFIX):] | |
178 | if host not in self.mgr.inventory: | |
179 | self.mgr.log.warning('removing stray HostCache host record %s' % ( | |
180 | host)) | |
181 | self.mgr.set_store(k, None) | |
182 | try: | |
183 | j = json.loads(v) | |
184 | if 'last_device_update' in j: | |
185 | self.last_device_update[host] = datetime.datetime.strptime( | |
186 | j['last_device_update'], DATEFMT) | |
187 | else: | |
188 | self.device_refresh_queue.append(host) | |
189 | # for services, we ignore the persisted last_*_update | |
190 | # and always trigger a new scrape on mgr restart. | |
191 | self.daemon_refresh_queue.append(host) | |
192 | self.daemons[host] = {} | |
193 | self.devices[host] = [] | |
194 | self.networks[host] = {} | |
195 | self.daemon_config_deps[host] = {} | |
196 | for name, d in j.get('daemons', {}).items(): | |
197 | self.daemons[host][name] = \ | |
198 | orchestrator.DaemonDescription.from_json(d) | |
199 | for d in j.get('devices', []): | |
200 | self.devices[host].append(inventory.Device.from_json(d)) | |
201 | self.networks[host] = j.get('networks', {}) | |
202 | for name, d in j.get('daemon_config_deps', {}).items(): | |
203 | self.daemon_config_deps[host][name] = { | |
204 | 'deps': d.get('deps', []), | |
205 | 'last_config': datetime.datetime.strptime( | |
206 | d['last_config'], DATEFMT), | |
207 | } | |
208 | if 'last_host_check' in j: | |
209 | self.last_host_check[host] = datetime.datetime.strptime( | |
210 | j['last_host_check'], DATEFMT) | |
211 | self.mgr.log.debug( | |
212 | 'HostCache.load: host %s has %d daemons, ' | |
213 | '%d devices, %d networks' % ( | |
214 | host, len(self.daemons[host]), len(self.devices[host]), | |
215 | len(self.networks[host]))) | |
216 | except Exception as e: | |
217 | self.mgr.log.warning('unable to load cached state for %s: %s' % ( | |
218 | host, e)) | |
219 | pass | |
220 | ||
221 | def update_host_daemons(self, host, dm): | |
222 | # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None | |
223 | self.daemons[host] = dm | |
224 | self.last_daemon_update[host] = datetime.datetime.utcnow() | |
225 | ||
226 | def update_host_devices_networks(self, host, dls, nets): | |
227 | # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None | |
228 | self.devices[host] = dls | |
229 | self.networks[host] = nets | |
230 | self.last_device_update[host] = datetime.datetime.utcnow() | |
231 | ||
232 | def update_daemon_config_deps(self, host, name, deps, stamp): | |
233 | self.daemon_config_deps[host][name] = { | |
234 | 'deps': deps, | |
235 | 'last_config': stamp, | |
236 | } | |
1911f103 | 237 | |
9f95a23c TL |
238 | def update_last_host_check(self, host): |
239 | # type: (str) -> None | |
240 | self.last_host_check[host] = datetime.datetime.utcnow() | |
241 | ||
242 | def prime_empty_host(self, host): | |
243 | # type: (str) -> None | |
244 | """ | |
245 | Install an empty entry for a host | |
246 | """ | |
247 | self.daemons[host] = {} | |
248 | self.devices[host] = [] | |
249 | self.networks[host] = {} | |
250 | self.daemon_config_deps[host] = {} | |
251 | self.daemon_refresh_queue.append(host) | |
252 | self.device_refresh_queue.append(host) | |
253 | ||
254 | def invalidate_host_daemons(self, host): | |
255 | # type: (str) -> None | |
256 | self.daemon_refresh_queue.append(host) | |
257 | if host in self.last_daemon_update: | |
258 | del self.last_daemon_update[host] | |
259 | self.mgr.event.set() | |
260 | ||
261 | def invalidate_host_devices(self, host): | |
262 | # type: (str) -> None | |
263 | self.device_refresh_queue.append(host) | |
264 | if host in self.last_device_update: | |
265 | del self.last_device_update[host] | |
266 | self.mgr.event.set() | |
267 | ||
268 | def save_host(self, host): | |
269 | # type: (str) -> None | |
270 | j = { # type: ignore | |
271 | 'daemons': {}, | |
272 | 'devices': [], | |
273 | 'daemon_config_deps': {}, | |
274 | } | |
275 | if host in self.last_daemon_update: | |
276 | j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore | |
277 | if host in self.last_device_update: | |
278 | j['last_device_update'] = self.last_device_update[host].strftime(DATEFMT) # type: ignore | |
279 | for name, dd in self.daemons[host].items(): | |
280 | j['daemons'][name] = dd.to_json() # type: ignore | |
281 | for d in self.devices[host]: | |
282 | j['devices'].append(d.to_json()) # type: ignore | |
283 | j['networks'] = self.networks[host] | |
284 | for name, depi in self.daemon_config_deps[host].items(): | |
285 | j['daemon_config_deps'][name] = { # type: ignore | |
286 | 'deps': depi.get('deps', []), | |
287 | 'last_config': depi['last_config'].strftime(DATEFMT), | |
288 | } | |
289 | if host in self.last_host_check: | |
290 | j['last_host_check']= self.last_host_check[host].strftime(DATEFMT) | |
291 | self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) | |
292 | ||
293 | def rm_host(self, host): | |
294 | # type: (str) -> None | |
295 | if host in self.daemons: | |
296 | del self.daemons[host] | |
297 | if host in self.devices: | |
298 | del self.devices[host] | |
299 | if host in self.networks: | |
300 | del self.networks[host] | |
301 | if host in self.last_daemon_update: | |
302 | del self.last_daemon_update[host] | |
303 | if host in self.last_device_update: | |
304 | del self.last_device_update[host] | |
305 | if host in self.daemon_config_deps: | |
306 | del self.daemon_config_deps[host] | |
307 | self.mgr.set_store(HOST_CACHE_PREFIX + host, None) | |
308 | ||
309 | def get_hosts(self): | |
310 | # type: () -> List[str] | |
311 | r = [] | |
312 | for host, di in self.daemons.items(): | |
313 | r.append(host) | |
314 | return r | |
315 | ||
316 | def get_daemons(self): | |
317 | # type: () -> List[orchestrator.DaemonDescription] | |
318 | r = [] | |
319 | for host, dm in self.daemons.items(): | |
320 | for name, dd in dm.items(): | |
321 | r.append(dd) | |
322 | return r | |
323 | ||
1911f103 TL |
324 | def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]: |
325 | for host, dm in self.daemons.items(): | |
326 | if host in self.mgr.offline_hosts: | |
327 | def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription: | |
328 | ret = copy(dd) | |
329 | ret.status = -1 | |
330 | ret.status_desc = 'host is offline' | |
331 | return ret | |
332 | yield host, {name: set_offline(d) for name, d in dm.items()} | |
333 | else: | |
334 | yield host, dm | |
335 | ||
9f95a23c TL |
336 | def get_daemons_by_service(self, service_name): |
337 | # type: (str) -> List[orchestrator.DaemonDescription] | |
338 | result = [] # type: List[orchestrator.DaemonDescription] | |
339 | for host, dm in self.daemons.items(): | |
340 | for name, d in dm.items(): | |
341 | if name.startswith(service_name + '.'): | |
342 | result.append(d) | |
343 | return result | |
344 | ||
345 | def get_daemon_names(self): | |
346 | # type: () -> List[str] | |
347 | r = [] | |
348 | for host, dm in self.daemons.items(): | |
349 | for name, dd in dm.items(): | |
350 | r.append(name) | |
351 | return r | |
352 | ||
353 | def get_daemon_last_config_deps(self, host, name): | |
354 | if host in self.daemon_config_deps: | |
355 | if name in self.daemon_config_deps[host]: | |
356 | return self.daemon_config_deps[host][name].get('deps', []), \ | |
357 | self.daemon_config_deps[host][name].get('last_config', None) | |
358 | return None, None | |
359 | ||
360 | def host_needs_daemon_refresh(self, host): | |
361 | # type: (str) -> bool | |
1911f103 TL |
362 | if host in self.mgr.offline_hosts: |
363 | logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh') | |
364 | return False | |
9f95a23c TL |
365 | if host in self.daemon_refresh_queue: |
366 | self.daemon_refresh_queue.remove(host) | |
367 | return True | |
368 | cutoff = datetime.datetime.utcnow() - datetime.timedelta( | |
369 | seconds=self.mgr.daemon_cache_timeout) | |
370 | if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff: | |
371 | return True | |
372 | return False | |
373 | ||
374 | def host_needs_device_refresh(self, host): | |
375 | # type: (str) -> bool | |
1911f103 TL |
376 | if host in self.mgr.offline_hosts: |
377 | logger.debug(f'Host "{host}" marked as offline. Skipping device refresh') | |
378 | return False | |
9f95a23c TL |
379 | if host in self.device_refresh_queue: |
380 | self.device_refresh_queue.remove(host) | |
381 | return True | |
382 | cutoff = datetime.datetime.utcnow() - datetime.timedelta( | |
383 | seconds=self.mgr.device_cache_timeout) | |
384 | if host not in self.last_device_update or self.last_device_update[host] < cutoff: | |
385 | return True | |
386 | return False | |
387 | ||
388 | def host_needs_check(self, host): | |
389 | # type: (str) -> bool | |
390 | cutoff = datetime.datetime.utcnow() - datetime.timedelta( | |
391 | seconds=self.mgr.host_check_interval) | |
392 | return host not in self.last_host_check or self.last_host_check[host] < cutoff | |
393 | ||
394 | def add_daemon(self, host, dd): | |
395 | # type: (str, orchestrator.DaemonDescription) -> None | |
396 | assert host in self.daemons | |
397 | self.daemons[host][dd.name()] = dd | |
398 | ||
399 | def rm_daemon(self, host, name): | |
400 | if host in self.daemons: | |
401 | if name in self.daemons[host]: | |
402 | del self.daemons[host][name] | |
403 | ||
404 | ||
405 | class AsyncCompletion(orchestrator.Completion): | |
406 | def __init__(self, | |
407 | _first_promise=None, # type: Optional[orchestrator.Completion] | |
408 | value=orchestrator._Promise.NO_RESULT, # type: Any | |
409 | on_complete=None, # type: Optional[Callable] | |
410 | name=None, # type: Optional[str] | |
411 | many=False, # type: bool | |
412 | update_progress=False, # type: bool | |
413 | ): | |
414 | ||
415 | assert CephadmOrchestrator.instance is not None | |
416 | self.many = many | |
417 | self.update_progress = update_progress | |
418 | if name is None and on_complete is not None: | |
419 | name = getattr(on_complete, '__name__', None) | |
420 | super(AsyncCompletion, self).__init__(_first_promise, value, on_complete, name) | |
421 | ||
422 | @property | |
423 | def _progress_reference(self): | |
424 | # type: () -> Optional[orchestrator.ProgressReference] | |
425 | if hasattr(self._on_complete_, 'progress_id'): # type: ignore | |
426 | return self._on_complete_ # type: ignore | |
427 | return None | |
428 | ||
429 | @property | |
430 | def _on_complete(self): | |
431 | # type: () -> Optional[Callable] | |
432 | if self._on_complete_ is None: | |
433 | return None | |
434 | ||
435 | def callback(result): | |
436 | try: | |
437 | if self.update_progress: | |
438 | assert self.progress_reference | |
439 | self.progress_reference.progress = 1.0 | |
440 | self._on_complete_ = None | |
441 | self._finalize(result) | |
442 | except Exception as e: | |
443 | try: | |
444 | self.fail(e) | |
445 | except Exception: | |
446 | logger.exception(f'failed to fail AsyncCompletion: >{repr(self)}<') | |
447 | if 'UNITTEST' in os.environ: | |
448 | assert False | |
449 | ||
450 | def error_callback(e): | |
451 | pass | |
452 | ||
453 | def run(value): | |
454 | def do_work(*args, **kwargs): | |
455 | assert self._on_complete_ is not None | |
456 | try: | |
457 | res = self._on_complete_(*args, **kwargs) | |
458 | if self.update_progress and self.many: | |
459 | assert self.progress_reference | |
460 | self.progress_reference.progress += 1.0 / len(value) | |
461 | return res | |
462 | except Exception as e: | |
463 | self.fail(e) | |
464 | raise | |
465 | ||
466 | assert CephadmOrchestrator.instance | |
467 | if self.many: | |
468 | if not value: | |
469 | logger.info('calling map_async without values') | |
470 | callback([]) | |
471 | if six.PY3: | |
472 | CephadmOrchestrator.instance._worker_pool.map_async(do_work, value, | |
473 | callback=callback, | |
474 | error_callback=error_callback) | |
475 | else: | |
476 | CephadmOrchestrator.instance._worker_pool.map_async(do_work, value, | |
477 | callback=callback) | |
478 | else: | |
479 | if six.PY3: | |
480 | CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,), | |
481 | callback=callback, error_callback=error_callback) | |
482 | else: | |
483 | CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,), | |
484 | callback=callback) | |
485 | return self.ASYNC_RESULT | |
486 | ||
487 | return run | |
488 | ||
489 | @_on_complete.setter | |
490 | def _on_complete(self, inner): | |
491 | # type: (Callable) -> None | |
492 | self._on_complete_ = inner | |
493 | ||
494 | ||
495 | def ssh_completion(cls=AsyncCompletion, **c_kwargs): | |
496 | # type: (Type[orchestrator.Completion], Any) -> Callable | |
497 | """ | |
498 | See ./HACKING.rst for a how-to | |
499 | """ | |
500 | def decorator(f): | |
501 | @wraps(f) | |
502 | def wrapper(*args): | |
503 | ||
504 | name = f.__name__ | |
505 | many = c_kwargs.get('many', False) | |
506 | ||
507 | # Some weired logic to make calling functions with multiple arguments work. | |
508 | if len(args) == 1: | |
509 | [value] = args | |
510 | if many and value and isinstance(value[0], tuple): | |
511 | return cls(on_complete=lambda x: f(*x), value=value, name=name, **c_kwargs) | |
512 | else: | |
513 | return cls(on_complete=f, value=value, name=name, **c_kwargs) | |
514 | else: | |
515 | if many: | |
516 | self, value = args | |
517 | ||
518 | def call_self(inner_args): | |
519 | if not isinstance(inner_args, tuple): | |
520 | inner_args = (inner_args, ) | |
521 | return f(self, *inner_args) | |
522 | ||
523 | return cls(on_complete=call_self, value=value, name=name, **c_kwargs) | |
524 | else: | |
525 | return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs) | |
526 | ||
527 | return wrapper | |
528 | return decorator | |
529 | ||
530 | ||
531 | def async_completion(f): | |
532 | # type: (Callable) -> Callable[..., AsyncCompletion] | |
533 | """ | |
534 | See ./HACKING.rst for a how-to | |
535 | ||
536 | :param f: wrapped function | |
537 | """ | |
538 | return ssh_completion()(f) | |
539 | ||
540 | ||
541 | def async_map_completion(f): | |
542 | # type: (Callable) -> Callable[..., AsyncCompletion] | |
543 | """ | |
544 | See ./HACKING.rst for a how-to | |
545 | ||
546 | :param f: wrapped function | |
547 | ||
548 | kind of similar to | |
549 | ||
550 | >>> def sync_map(f): | |
551 | ... return lambda x: map(f, x) | |
552 | ||
553 | """ | |
554 | return ssh_completion(many=True)(f) | |
555 | ||
556 | ||
557 | def trivial_completion(f): | |
558 | # type: (Callable) -> Callable[..., orchestrator.Completion] | |
559 | @wraps(f) | |
560 | def wrapper(*args, **kwargs): | |
561 | return AsyncCompletion(value=f(*args, **kwargs), name=f.__name__) | |
562 | return wrapper | |
563 | ||
564 | ||
565 | @six.add_metaclass(CLICommandMeta) | |
566 | class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): | |
567 | ||
568 | _STORE_HOST_PREFIX = "host" | |
569 | ||
570 | instance = None | |
571 | NATIVE_OPTIONS = [] # type: List[Any] | |
572 | MODULE_OPTIONS = [ | |
573 | { | |
574 | 'name': 'ssh_config_file', | |
575 | 'type': 'str', | |
576 | 'default': None, | |
577 | 'desc': 'customized SSH config file to connect to managed hosts', | |
578 | }, | |
579 | { | |
580 | 'name': 'device_cache_timeout', | |
581 | 'type': 'secs', | |
582 | 'default': 30 * 60, | |
583 | 'desc': 'seconds to cache device inventory', | |
584 | }, | |
585 | { | |
586 | 'name': 'daemon_cache_timeout', | |
587 | 'type': 'secs', | |
588 | 'default': 10 * 60, | |
589 | 'desc': 'seconds to cache service (daemon) inventory', | |
590 | }, | |
591 | { | |
592 | 'name': 'host_check_interval', | |
593 | 'type': 'secs', | |
594 | 'default': 10 * 60, | |
595 | 'desc': 'how frequently to perform a host check', | |
596 | }, | |
597 | { | |
598 | 'name': 'mode', | |
599 | 'type': 'str', | |
600 | 'enum_allowed': ['root', 'cephadm-package'], | |
601 | 'default': 'root', | |
602 | 'desc': 'mode for remote execution of cephadm', | |
603 | }, | |
604 | { | |
605 | 'name': 'container_image_base', | |
801d1391 | 606 | 'default': 'docker.io/ceph/ceph', |
9f95a23c TL |
607 | 'desc': 'Container image name, without the tag', |
608 | 'runtime': True, | |
609 | }, | |
610 | { | |
611 | 'name': 'warn_on_stray_hosts', | |
612 | 'type': 'bool', | |
613 | 'default': True, | |
614 | 'desc': 'raise a health warning if daemons are detected on a host ' | |
615 | 'that is not managed by cephadm', | |
616 | }, | |
617 | { | |
618 | 'name': 'warn_on_stray_daemons', | |
619 | 'type': 'bool', | |
620 | 'default': True, | |
621 | 'desc': 'raise a health warning if daemons are detected ' | |
622 | 'that are not managed by cephadm', | |
623 | }, | |
624 | { | |
625 | 'name': 'warn_on_failed_host_check', | |
626 | 'type': 'bool', | |
627 | 'default': True, | |
628 | 'desc': 'raise a health warning if the host check fails', | |
629 | }, | |
630 | { | |
631 | 'name': 'log_to_cluster', | |
632 | 'type': 'bool', | |
633 | 'default': True, | |
634 | 'desc': 'log to the "cephadm" cluster log channel"', | |
635 | }, | |
636 | { | |
637 | 'name': 'allow_ptrace', | |
638 | 'type': 'bool', | |
639 | 'default': False, | |
640 | 'desc': 'allow SYS_PTRACE capability on ceph containers', | |
641 | 'long_desc': 'The SYS_PTRACE capability is needed to attach to a ' | |
642 | 'process with gdb or strace. Enabling this options ' | |
643 | 'can allow debugging daemons that encounter problems ' | |
644 | 'at runtime.', | |
645 | }, | |
801d1391 TL |
646 | { |
647 | 'name': 'prometheus_alerts_path', | |
648 | 'type': 'str', | |
649 | 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml', | |
650 | 'desc': 'location of alerts to include in prometheus deployments', | |
651 | }, | |
9f95a23c TL |
652 | ] |
653 | ||
654 | def __init__(self, *args, **kwargs): | |
655 | super(CephadmOrchestrator, self).__init__(*args, **kwargs) | |
656 | self._cluster_fsid = self.get('mon_map')['fsid'] | |
657 | ||
658 | # for serve() | |
659 | self.run = True | |
660 | self.event = Event() | |
661 | ||
662 | if self.get_store('pause'): | |
663 | self.paused = True | |
664 | else: | |
665 | self.paused = False | |
666 | ||
667 | # for mypy which does not run the code | |
668 | if TYPE_CHECKING: | |
669 | self.ssh_config_file = None # type: Optional[str] | |
670 | self.device_cache_timeout = 0 | |
671 | self.daemon_cache_timeout = 0 | |
672 | self.host_check_interval = 0 | |
673 | self.mode = '' | |
674 | self.container_image_base = '' | |
675 | self.warn_on_stray_hosts = True | |
676 | self.warn_on_stray_daemons = True | |
677 | self.warn_on_failed_host_check = True | |
678 | self.allow_ptrace = False | |
801d1391 | 679 | self.prometheus_alerts_path = '' |
9f95a23c TL |
680 | |
681 | self._cons = {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]] | |
682 | ||
683 | self.config_notify() | |
684 | ||
685 | path = self.get_ceph_option('cephadm_path') | |
686 | try: | |
687 | with open(path, 'r') as f: | |
688 | self._cephadm = f.read() | |
689 | except (IOError, TypeError) as e: | |
690 | raise RuntimeError("unable to read cephadm at '%s': %s" % ( | |
691 | path, str(e))) | |
692 | ||
693 | self._worker_pool = multiprocessing.pool.ThreadPool(10) | |
694 | ||
695 | self._reconfig_ssh() | |
696 | ||
697 | CephadmOrchestrator.instance = self | |
698 | ||
699 | t = self.get_store('upgrade_state') | |
700 | if t: | |
701 | self.upgrade_state = json.loads(t) | |
702 | else: | |
703 | self.upgrade_state = None | |
704 | ||
705 | self.health_checks = {} | |
706 | ||
707 | self.all_progress_references = list() # type: List[orchestrator.ProgressReference] | |
708 | ||
709 | # load inventory | |
710 | i = self.get_store('inventory') | |
711 | if i: | |
712 | self.inventory: Dict[str, dict] = json.loads(i) | |
713 | else: | |
714 | self.inventory = dict() | |
715 | self.log.debug('Loaded inventory %s' % self.inventory) | |
716 | ||
717 | self.cache = HostCache(self) | |
718 | self.cache.load() | |
719 | self.rm_util = RemoveUtil(self) | |
720 | ||
721 | self.spec_store = SpecStore(self) | |
722 | self.spec_store.load() | |
723 | ||
724 | # ensure the host lists are in sync | |
725 | for h in self.inventory.keys(): | |
726 | if h not in self.cache.daemons: | |
727 | self.cache.prime_empty_host(h) | |
728 | for h in self.cache.get_hosts(): | |
729 | if h not in self.inventory: | |
730 | self.cache.rm_host(h) | |
731 | ||
1911f103 TL |
732 | # in-memory only. |
733 | self.offline_hosts: Set[str] = set() | |
734 | ||
9f95a23c TL |
735 | def shutdown(self): |
736 | self.log.debug('shutdown') | |
737 | self._worker_pool.close() | |
738 | self._worker_pool.join() | |
739 | self.run = False | |
740 | self.event.set() | |
741 | ||
742 | def _kick_serve_loop(self): | |
743 | self.log.debug('_kick_serve_loop') | |
744 | self.event.set() | |
745 | ||
746 | def _check_safe_to_destroy_mon(self, mon_id): | |
747 | # type: (str) -> None | |
748 | ret, out, err = self.mon_command({ | |
749 | 'prefix': 'quorum_status', | |
750 | }) | |
751 | if ret: | |
752 | raise OrchestratorError('failed to check mon quorum status') | |
753 | try: | |
754 | j = json.loads(out) | |
755 | except Exception as e: | |
756 | raise OrchestratorError('failed to parse quorum status') | |
757 | ||
758 | mons = [m['name'] for m in j['monmap']['mons']] | |
759 | if mon_id not in mons: | |
760 | self.log.info('Safe to remove mon.%s: not in monmap (%s)' % ( | |
761 | mon_id, mons)) | |
762 | return | |
763 | new_mons = [m for m in mons if m != mon_id] | |
764 | new_quorum = [m for m in j['quorum_names'] if m != mon_id] | |
765 | if len(new_quorum) > len(new_mons) / 2: | |
766 | self.log.info('Safe to remove mon.%s: new quorum should be %s (from %s)' % (mon_id, new_quorum, new_mons)) | |
767 | return | |
768 | raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons)) | |
769 | ||
770 | def _wait_for_ok_to_stop(self, s): | |
771 | # only wait a little bit; the service might go away for something | |
772 | tries = 4 | |
773 | while tries > 0: | |
774 | if s.daemon_type not in ['mon', 'osd', 'mds']: | |
775 | self.log.info('Upgrade: It is presumed safe to stop %s.%s' % | |
776 | (s.daemon_type, s.daemon_id)) | |
777 | return True | |
778 | ret, out, err = self.mon_command({ | |
779 | 'prefix': '%s ok-to-stop' % s.daemon_type, | |
780 | 'ids': [s.daemon_id], | |
781 | }) | |
782 | if not self.upgrade_state or self.upgrade_state.get('paused'): | |
783 | return False | |
784 | if ret: | |
785 | self.log.info('Upgrade: It is NOT safe to stop %s.%s' % | |
786 | (s.daemon_type, s.daemon_id)) | |
787 | time.sleep(15) | |
788 | tries -= 1 | |
789 | else: | |
790 | self.log.info('Upgrade: It is safe to stop %s.%s' % | |
791 | (s.daemon_type, s.daemon_id)) | |
792 | return True | |
793 | return False | |
794 | ||
795 | def _clear_upgrade_health_checks(self): | |
796 | for k in ['UPGRADE_NO_STANDBY_MGR', | |
797 | 'UPGRADE_FAILED_PULL']: | |
798 | if k in self.health_checks: | |
799 | del self.health_checks[k] | |
800 | self.set_health_checks(self.health_checks) | |
801 | ||
802 | def _fail_upgrade(self, alert_id, alert): | |
803 | self.log.error('Upgrade: Paused due to %s: %s' % (alert_id, | |
804 | alert['summary'])) | |
805 | self.upgrade_state['error'] = alert_id + ': ' + alert['summary'] | |
806 | self.upgrade_state['paused'] = True | |
807 | self._save_upgrade_state() | |
808 | self.health_checks[alert_id] = alert | |
809 | self.set_health_checks(self.health_checks) | |
810 | ||
811 | def _update_upgrade_progress(self, progress): | |
812 | if 'progress_id' not in self.upgrade_state: | |
813 | self.upgrade_state['progress_id'] = str(uuid.uuid4()) | |
814 | self._save_upgrade_state() | |
815 | self.remote('progress', 'update', self.upgrade_state['progress_id'], | |
816 | ev_msg='Upgrade to %s' % self.upgrade_state['target_name'], | |
817 | ev_progress=progress) | |
818 | ||
819 | def _do_upgrade(self): | |
820 | # type: () -> None | |
821 | if not self.upgrade_state: | |
822 | self.log.debug('_do_upgrade no state, exiting') | |
823 | return | |
824 | ||
825 | target_name = self.upgrade_state.get('target_name') | |
826 | target_id = self.upgrade_state.get('target_id', None) | |
827 | if not target_id: | |
828 | # need to learn the container hash | |
829 | self.log.info('Upgrade: First pull of %s' % target_name) | |
830 | try: | |
831 | target_id, target_version = self._get_container_image_id(target_name) | |
832 | except OrchestratorError as e: | |
833 | self._fail_upgrade('UPGRADE_FAILED_PULL', { | |
834 | 'severity': 'warning', | |
835 | 'summary': 'Upgrade: failed to pull target image', | |
836 | 'count': 1, | |
837 | 'detail': [str(e)], | |
838 | }) | |
839 | return | |
840 | self.upgrade_state['target_id'] = target_id | |
841 | self.upgrade_state['target_version'] = target_version | |
842 | self._save_upgrade_state() | |
843 | target_version = self.upgrade_state.get('target_version') | |
844 | self.log.info('Upgrade: Target is %s with id %s' % (target_name, | |
845 | target_id)) | |
846 | ||
847 | # get all distinct container_image settings | |
848 | image_settings = {} | |
849 | ret, out, err = self.mon_command({ | |
850 | 'prefix': 'config dump', | |
851 | 'format': 'json', | |
852 | }) | |
853 | config = json.loads(out) | |
854 | for opt in config: | |
855 | if opt['name'] == 'container_image': | |
856 | image_settings[opt['section']] = opt['value'] | |
857 | ||
858 | daemons = self.cache.get_daemons() | |
859 | done = 0 | |
860 | for daemon_type in CEPH_UPGRADE_ORDER: | |
861 | self.log.info('Upgrade: Checking %s daemons...' % daemon_type) | |
862 | need_upgrade_self = False | |
863 | for d in daemons: | |
864 | if d.daemon_type != daemon_type: | |
865 | continue | |
866 | if d.container_image_id == target_id: | |
867 | self.log.debug('daemon %s.%s version correct' % ( | |
868 | daemon_type, d.daemon_id)) | |
869 | done += 1 | |
870 | continue | |
871 | self.log.debug('daemon %s.%s not correct (%s, %s, %s)' % ( | |
872 | daemon_type, d.daemon_id, | |
873 | d.container_image_name, d.container_image_id, d.version)) | |
874 | ||
875 | if daemon_type == 'mgr' and \ | |
876 | d.daemon_id == self.get_mgr_id(): | |
877 | self.log.info('Upgrade: Need to upgrade myself (mgr.%s)' % | |
878 | self.get_mgr_id()) | |
879 | need_upgrade_self = True | |
880 | continue | |
881 | ||
882 | # make sure host has latest container image | |
883 | out, err, code = self._run_cephadm( | |
884 | d.hostname, None, 'inspect-image', [], | |
885 | image=target_name, no_fsid=True, error_ok=True) | |
886 | if code or json.loads(''.join(out)).get('image_id') != target_id: | |
887 | self.log.info('Upgrade: Pulling %s on %s' % (target_name, | |
888 | d.hostname)) | |
889 | out, err, code = self._run_cephadm( | |
890 | d.hostname, None, 'pull', [], | |
891 | image=target_name, no_fsid=True, error_ok=True) | |
892 | if code: | |
893 | self._fail_upgrade('UPGRADE_FAILED_PULL', { | |
894 | 'severity': 'warning', | |
895 | 'summary': 'Upgrade: failed to pull target image', | |
896 | 'count': 1, | |
897 | 'detail': [ | |
898 | 'failed to pull %s on host %s' % (target_name, | |
899 | d.hostname)], | |
900 | }) | |
901 | return | |
902 | r = json.loads(''.join(out)) | |
903 | if r.get('image_id') != target_id: | |
904 | self.log.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name, d.hostname, r['image_id'], target_id)) | |
905 | self.upgrade_state['target_id'] = r['image_id'] | |
906 | self._save_upgrade_state() | |
907 | return | |
908 | ||
909 | self._update_upgrade_progress(done / len(daemons)) | |
910 | ||
911 | if not d.container_image_id: | |
912 | if d.container_image_name == target_name: | |
913 | self.log.debug('daemon %s has unknown container_image_id but has correct image name' % (d.name())) | |
914 | continue | |
915 | if not self._wait_for_ok_to_stop(d): | |
916 | return | |
917 | self.log.info('Upgrade: Redeploying %s.%s' % | |
918 | (d.daemon_type, d.daemon_id)) | |
919 | ret, out, err = self.mon_command({ | |
920 | 'prefix': 'config set', | |
921 | 'name': 'container_image', | |
922 | 'value': target_name, | |
801d1391 | 923 | 'who': utils.name_to_config_section(daemon_type + '.' + d.daemon_id), |
9f95a23c TL |
924 | }) |
925 | self._daemon_action( | |
926 | d.daemon_type, | |
927 | d.daemon_id, | |
928 | d.hostname, | |
929 | 'redeploy' | |
930 | ) | |
931 | return | |
932 | ||
933 | if need_upgrade_self: | |
934 | mgr_map = self.get('mgr_map') | |
935 | num = len(mgr_map.get('standbys')) | |
936 | if not num: | |
937 | self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', { | |
938 | 'severity': 'warning', | |
939 | 'summary': 'Upgrade: Need standby mgr daemon', | |
940 | 'count': 1, | |
941 | 'detail': [ | |
942 | 'The upgrade process needs to upgrade the mgr, ' | |
943 | 'but it needs at least one standby to proceed.', | |
944 | ], | |
945 | }) | |
946 | return | |
947 | ||
948 | self.log.info('Upgrade: there are %d other already-upgraded ' | |
949 | 'standby mgrs, failing over' % num) | |
950 | ||
951 | self._update_upgrade_progress(done / len(daemons)) | |
952 | ||
953 | # fail over | |
954 | ret, out, err = self.mon_command({ | |
955 | 'prefix': 'mgr fail', | |
956 | 'who': self.get_mgr_id(), | |
957 | }) | |
958 | return | |
959 | elif daemon_type == 'mgr': | |
960 | if 'UPGRADE_NO_STANDBY_MGR' in self.health_checks: | |
961 | del self.health_checks['UPGRADE_NO_STANDBY_MGR'] | |
962 | self.set_health_checks(self.health_checks) | |
963 | ||
964 | # make sure 'ceph versions' agrees | |
965 | ret, out, err = self.mon_command({ | |
966 | 'prefix': 'versions', | |
967 | }) | |
968 | j = json.loads(out) | |
969 | for version, count in j.get(daemon_type, {}).items(): | |
970 | if version != target_version: | |
971 | self.log.warning( | |
972 | 'Upgrade: %d %s daemon(s) are %s != target %s' % | |
973 | (count, daemon_type, version, target_version)) | |
974 | ||
975 | # push down configs | |
976 | if image_settings.get(daemon_type) != target_name: | |
977 | self.log.info('Upgrade: Setting container_image for all %s...' % | |
978 | daemon_type) | |
979 | ret, out, err = self.mon_command({ | |
980 | 'prefix': 'config set', | |
981 | 'name': 'container_image', | |
982 | 'value': target_name, | |
983 | 'who': daemon_type, | |
984 | }) | |
985 | to_clean = [] | |
986 | for section in image_settings.keys(): | |
801d1391 | 987 | if section.startswith(utils.name_to_config_section(daemon_type) + '.'): |
9f95a23c TL |
988 | to_clean.append(section) |
989 | if to_clean: | |
990 | self.log.debug('Upgrade: Cleaning up container_image for %s...' % | |
991 | to_clean) | |
992 | for section in to_clean: | |
993 | ret, image, err = self.mon_command({ | |
994 | 'prefix': 'config rm', | |
995 | 'name': 'container_image', | |
996 | 'who': section, | |
997 | }) | |
998 | ||
999 | self.log.info('Upgrade: All %s daemons are up to date.' % | |
1000 | daemon_type) | |
1001 | ||
1002 | # clean up | |
1003 | self.log.info('Upgrade: Finalizing container_image settings') | |
1004 | ret, out, err = self.mon_command({ | |
1005 | 'prefix': 'config set', | |
1006 | 'name': 'container_image', | |
1007 | 'value': target_name, | |
1008 | 'who': 'global', | |
1009 | }) | |
1010 | for daemon_type in CEPH_UPGRADE_ORDER: | |
1011 | ret, image, err = self.mon_command({ | |
1012 | 'prefix': 'config rm', | |
1013 | 'name': 'container_image', | |
801d1391 | 1014 | 'who': utils.name_to_config_section(daemon_type), |
9f95a23c TL |
1015 | }) |
1016 | ||
1017 | self.log.info('Upgrade: Complete!') | |
1018 | if 'progress_id' in self.upgrade_state: | |
1019 | self.remote('progress', 'complete', | |
1020 | self.upgrade_state['progress_id']) | |
1021 | self.upgrade_state = None | |
1022 | self._save_upgrade_state() | |
1023 | return | |
1024 | ||
9f95a23c TL |
1025 | def _check_host(self, host): |
1026 | if host not in self.inventory: | |
1027 | return | |
1028 | self.log.debug(' checking %s' % host) | |
1029 | try: | |
1030 | out, err, code = self._run_cephadm( | |
1031 | host, 'client', 'check-host', [], | |
1032 | error_ok=True, no_fsid=True) | |
1033 | self.cache.update_last_host_check(host) | |
1034 | self.cache.save_host(host) | |
1035 | if code: | |
1036 | self.log.debug(' host %s failed check' % host) | |
1037 | if self.warn_on_failed_host_check: | |
1038 | return 'host %s failed check: %s' % (host, err) | |
1039 | else: | |
1040 | self.log.debug(' host %s ok' % host) | |
1041 | except Exception as e: | |
1042 | self.log.debug(' host %s failed check' % host) | |
1043 | return 'host %s failed check: %s' % (host, e) | |
1044 | ||
1045 | def _check_for_strays(self): | |
1046 | self.log.debug('_check_for_strays') | |
1047 | for k in ['CEPHADM_STRAY_HOST', | |
1048 | 'CEPHADM_STRAY_DAEMON']: | |
1049 | if k in self.health_checks: | |
1050 | del self.health_checks[k] | |
1051 | if self.warn_on_stray_hosts or self.warn_on_stray_daemons: | |
1052 | ls = self.list_servers() | |
1053 | managed = self.cache.get_daemon_names() | |
1054 | host_detail = [] # type: List[str] | |
1055 | host_num_daemons = 0 | |
1056 | daemon_detail = [] # type: List[str] | |
1057 | for item in ls: | |
1058 | host = item.get('hostname') | |
1059 | daemons = item.get('services') # misnomer! | |
1060 | missing_names = [] | |
1061 | for s in daemons: | |
1062 | name = '%s.%s' % (s.get('type'), s.get('id')) | |
1063 | if host not in self.inventory: | |
1064 | missing_names.append(name) | |
1065 | host_num_daemons += 1 | |
1066 | if name not in managed: | |
1067 | daemon_detail.append( | |
1068 | 'stray daemon %s on host %s not managed by cephadm' % (name, host)) | |
1069 | if missing_names: | |
1070 | host_detail.append( | |
1071 | 'stray host %s has %d stray daemons: %s' % ( | |
1072 | host, len(missing_names), missing_names)) | |
1911f103 | 1073 | if self.warn_on_stray_hosts and host_detail: |
9f95a23c TL |
1074 | self.health_checks['CEPHADM_STRAY_HOST'] = { |
1075 | 'severity': 'warning', | |
1076 | 'summary': '%d stray host(s) with %s daemon(s) ' | |
1077 | 'not managed by cephadm' % ( | |
1078 | len(host_detail), host_num_daemons), | |
1079 | 'count': len(host_detail), | |
1080 | 'detail': host_detail, | |
1081 | } | |
1911f103 | 1082 | if self.warn_on_stray_daemons and daemon_detail: |
9f95a23c TL |
1083 | self.health_checks['CEPHADM_STRAY_DAEMON'] = { |
1084 | 'severity': 'warning', | |
1085 | 'summary': '%d stray daemons(s) not managed by cephadm' % ( | |
1086 | len(daemon_detail)), | |
1087 | 'count': len(daemon_detail), | |
1088 | 'detail': daemon_detail, | |
1089 | } | |
1090 | self.set_health_checks(self.health_checks) | |
1091 | ||
1092 | def _serve_sleep(self): | |
1093 | sleep_interval = 600 | |
1094 | self.log.debug('Sleeping for %d seconds', sleep_interval) | |
1095 | ret = self.event.wait(sleep_interval) | |
1096 | self.event.clear() | |
1097 | ||
1098 | def serve(self): | |
1099 | # type: () -> None | |
1100 | self.log.debug("serve starting") | |
1101 | while self.run: | |
1102 | ||
1103 | # refresh daemons | |
1104 | self.log.debug('refreshing hosts') | |
1105 | bad_hosts = [] | |
1106 | failures = [] | |
1107 | for host in self.cache.get_hosts(): | |
1108 | if self.cache.host_needs_check(host): | |
1109 | r = self._check_host(host) | |
1110 | if r is not None: | |
1111 | bad_hosts.append(r) | |
1112 | if self.cache.host_needs_daemon_refresh(host): | |
1113 | self.log.debug('refreshing %s daemons' % host) | |
1114 | r = self._refresh_host_daemons(host) | |
1115 | if r: | |
1116 | failures.append(r) | |
1117 | if self.cache.host_needs_device_refresh(host): | |
1118 | self.log.debug('refreshing %s devices' % host) | |
1119 | r = self._refresh_host_devices(host) | |
1120 | if r: | |
1121 | failures.append(r) | |
1122 | ||
1123 | health_changed = False | |
1124 | if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: | |
1125 | del self.health_checks['CEPHADM_HOST_CHECK_FAILED'] | |
1126 | health_changed = True | |
1127 | if bad_hosts: | |
1128 | self.health_checks['CEPHADM_HOST_CHECK_FAILED'] = { | |
1129 | 'severity': 'warning', | |
1130 | 'summary': '%d hosts fail cephadm check' % len(bad_hosts), | |
1131 | 'count': len(bad_hosts), | |
1132 | 'detail': bad_hosts, | |
1133 | } | |
1134 | health_changed = True | |
1135 | if failures: | |
1136 | self.health_checks['CEPHADM_REFRESH_FAILED'] = { | |
1137 | 'severity': 'warning', | |
1138 | 'summary': 'failed to probe daemons or devices', | |
1139 | 'count': len(failures), | |
1140 | 'detail': failures, | |
1141 | } | |
1142 | health_changed = True | |
1143 | elif 'CEPHADM_REFRESH_FAILED' in self.health_checks: | |
1144 | del self.health_checks['CEPHADM_REFRESH_FAILED'] | |
1145 | health_changed = True | |
1146 | if health_changed: | |
1147 | self.set_health_checks(self.health_checks) | |
1148 | ||
1149 | ||
1150 | ||
1151 | self._check_for_strays() | |
1152 | ||
1153 | if self.paused: | |
1154 | self.health_checks['CEPHADM_PAUSED'] = { | |
1155 | 'severity': 'warning', | |
1156 | 'summary': 'cephadm background work is paused', | |
1157 | 'count': 1, | |
1158 | 'detail': ["'ceph orch resume' to resume"], | |
1159 | } | |
1160 | self.set_health_checks(self.health_checks) | |
1161 | else: | |
1162 | if 'CEPHADM_PAUSED' in self.health_checks: | |
1163 | del self.health_checks['CEPHADM_PAUSED'] | |
1164 | self.set_health_checks(self.health_checks) | |
1165 | ||
1166 | self.rm_util._remove_osds_bg() | |
1167 | ||
1168 | if self._apply_all_services(): | |
1169 | continue # did something, refresh | |
1170 | ||
1171 | self._check_daemons() | |
1172 | ||
1173 | if self.upgrade_state and not self.upgrade_state.get('paused'): | |
1174 | self._do_upgrade() | |
1175 | continue | |
1176 | ||
1177 | self._serve_sleep() | |
1178 | self.log.debug("serve exit") | |
1179 | ||
1180 | def config_notify(self): | |
1181 | """ | |
1182 | This method is called whenever one of our config options is changed. | |
1183 | """ | |
1184 | for opt in self.MODULE_OPTIONS: | |
1185 | setattr(self, | |
1186 | opt['name'], # type: ignore | |
1187 | self.get_module_option(opt['name'])) # type: ignore | |
1188 | self.log.debug(' mgr option %s = %s', | |
1189 | opt['name'], getattr(self, opt['name'])) # type: ignore | |
1190 | for opt in self.NATIVE_OPTIONS: | |
1191 | setattr(self, | |
1192 | opt, # type: ignore | |
1193 | self.get_ceph_option(opt)) | |
1194 | self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore | |
1195 | ||
1196 | self.event.set() | |
1197 | ||
1198 | def notify(self, notify_type, notify_id): | |
1199 | pass | |
1200 | ||
1201 | def pause(self): | |
1202 | if not self.paused: | |
1203 | self.log.info('Paused') | |
1204 | self.set_store('pause', 'true') | |
1205 | self.paused = True | |
1206 | # wake loop so we update the health status | |
1207 | self._kick_serve_loop() | |
1208 | ||
1209 | def resume(self): | |
1210 | if self.paused: | |
1211 | self.log.info('Resumed') | |
1212 | self.paused = False | |
1213 | self.set_store('pause', None) | |
1214 | # unconditionally wake loop so that 'orch resume' can be used to kick | |
1215 | # cephadm | |
1216 | self._kick_serve_loop() | |
1217 | ||
1218 | def get_unique_name(self, daemon_type, host, existing, prefix=None, | |
1219 | forcename=None): | |
1220 | # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str | |
1221 | """ | |
1222 | Generate a unique random service name | |
1223 | """ | |
1224 | suffix = daemon_type not in [ | |
801d1391 | 1225 | 'mon', 'crash', 'nfs', |
9f95a23c TL |
1226 | 'prometheus', 'node-exporter', 'grafana', 'alertmanager', |
1227 | ] | |
1228 | if forcename: | |
1229 | if len([d for d in existing if d.daemon_id == forcename]): | |
1230 | raise orchestrator.OrchestratorValidationError('name %s already in use', forcename) | |
1231 | return forcename | |
1232 | ||
1233 | if '.' in host: | |
1234 | host = host.split('.')[0] | |
1235 | while True: | |
1236 | if prefix: | |
1237 | name = prefix + '.' | |
1238 | else: | |
1239 | name = '' | |
1240 | name += host | |
1241 | if suffix: | |
1242 | name += '.' + ''.join(random.choice(string.ascii_lowercase) | |
1243 | for _ in range(6)) | |
1244 | if len([d for d in existing if d.daemon_id == name]): | |
1245 | if not suffix: | |
1246 | raise orchestrator.OrchestratorValidationError('name %s already in use', name) | |
1247 | self.log.debug('name %s exists, trying again', name) | |
1248 | continue | |
1249 | return name | |
1250 | ||
801d1391 TL |
1251 | def get_service_name(self, daemon_type, daemon_id, host): |
1252 | # type: (str, str, str) -> (str) | |
1253 | """ | |
1254 | Returns the generic service name | |
1255 | """ | |
1256 | p = re.compile(r'(.*)\.%s.*' % (host)) | |
801d1391 TL |
1257 | return '%s.%s' % (daemon_type, p.sub(r'\1', daemon_id)) |
1258 | ||
9f95a23c TL |
1259 | def _save_inventory(self): |
1260 | self.set_store('inventory', json.dumps(self.inventory)) | |
1261 | ||
1262 | def _save_upgrade_state(self): | |
1263 | self.set_store('upgrade_state', json.dumps(self.upgrade_state)) | |
1264 | ||
1265 | def _reconfig_ssh(self): | |
1266 | temp_files = [] # type: list | |
1267 | ssh_options = [] # type: List[str] | |
1268 | ||
1269 | # ssh_config | |
1270 | ssh_config_fname = self.ssh_config_file | |
1271 | ssh_config = self.get_store("ssh_config") | |
1272 | if ssh_config is not None or ssh_config_fname is None: | |
1273 | if not ssh_config: | |
1274 | ssh_config = DEFAULT_SSH_CONFIG | |
1275 | f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-') | |
1276 | os.fchmod(f.fileno(), 0o600) | |
1277 | f.write(ssh_config.encode('utf-8')) | |
1278 | f.flush() # make visible to other processes | |
1279 | temp_files += [f] | |
1280 | ssh_config_fname = f.name | |
1281 | if ssh_config_fname: | |
801d1391 | 1282 | self.validate_ssh_config_fname(ssh_config_fname) |
9f95a23c TL |
1283 | ssh_options += ['-F', ssh_config_fname] |
1284 | ||
1285 | # identity | |
1286 | ssh_key = self.get_store("ssh_identity_key") | |
1287 | ssh_pub = self.get_store("ssh_identity_pub") | |
1288 | self.ssh_pub = ssh_pub | |
1289 | self.ssh_key = ssh_key | |
1290 | if ssh_key and ssh_pub: | |
1291 | tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-') | |
1292 | tkey.write(ssh_key.encode('utf-8')) | |
1293 | os.fchmod(tkey.fileno(), 0o600) | |
1294 | tkey.flush() # make visible to other processes | |
1295 | tpub = open(tkey.name + '.pub', 'w') | |
1296 | os.fchmod(tpub.fileno(), 0o600) | |
1297 | tpub.write(ssh_pub) | |
1298 | tpub.flush() # make visible to other processes | |
1299 | temp_files += [tkey, tpub] | |
1300 | ssh_options += ['-i', tkey.name] | |
1301 | ||
1302 | self._temp_files = temp_files | |
1303 | if ssh_options: | |
1304 | self._ssh_options = ' '.join(ssh_options) # type: Optional[str] | |
1305 | else: | |
1306 | self._ssh_options = None | |
1307 | ||
1308 | if self.mode == 'root': | |
1309 | self.ssh_user = 'root' | |
1310 | elif self.mode == 'cephadm-package': | |
1311 | self.ssh_user = 'cephadm' | |
1312 | ||
1313 | self._reset_cons() | |
1314 | ||
801d1391 TL |
1315 | def validate_ssh_config_fname(self, ssh_config_fname): |
1316 | if not os.path.isfile(ssh_config_fname): | |
1317 | raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format( | |
1318 | ssh_config_fname)) | |
1319 | ||
9f95a23c TL |
1320 | def _reset_con(self, host): |
1321 | conn, r = self._cons.get(host, (None, None)) | |
1322 | if conn: | |
1323 | self.log.debug('_reset_con close %s' % host) | |
1324 | conn.exit() | |
1325 | del self._cons[host] | |
1326 | ||
1327 | def _reset_cons(self): | |
1328 | for host, conn_and_r in self._cons.items(): | |
1329 | self.log.debug('_reset_cons close %s' % host) | |
1330 | conn, r = conn_and_r | |
1331 | conn.exit() | |
1332 | self._cons = {} | |
1333 | ||
1911f103 TL |
1334 | def offline_hosts_remove(self, host): |
1335 | if host in self.offline_hosts: | |
1336 | self.offline_hosts.remove(host) | |
1337 | ||
1338 | ||
9f95a23c TL |
1339 | @staticmethod |
1340 | def can_run(): | |
1341 | if remoto is not None: | |
1342 | return True, "" | |
1343 | else: | |
1344 | return False, "loading remoto library:{}".format( | |
1345 | remoto_import_error) | |
1346 | ||
1347 | def available(self): | |
1348 | """ | |
1349 | The cephadm orchestrator is always available. | |
1350 | """ | |
1351 | return self.can_run() | |
1352 | ||
1353 | def process(self, completions): | |
1354 | """ | |
1355 | Does nothing, as completions are processed in another thread. | |
1356 | """ | |
1357 | if completions: | |
1358 | self.log.debug("process: completions={0}".format(orchestrator.pretty_print(completions))) | |
1359 | ||
1360 | for p in completions: | |
1361 | p.finalize() | |
1362 | ||
1363 | def _require_hosts(self, hosts): | |
1364 | """ | |
1365 | Raise an error if any of the given hosts are unregistered. | |
1366 | """ | |
1367 | if isinstance(hosts, six.string_types): | |
1368 | hosts = [hosts] | |
1369 | keys = self.inventory.keys() | |
1370 | unregistered_hosts = set(hosts) - keys | |
1371 | if unregistered_hosts: | |
1372 | logger.warning('keys = {}'.format(keys)) | |
1373 | raise RuntimeError("Host(s) {} not registered".format( | |
1374 | ", ".join(map(lambda h: "'{}'".format(h), | |
1375 | unregistered_hosts)))) | |
1376 | ||
1377 | @orchestrator._cli_write_command( | |
1378 | prefix='cephadm set-ssh-config', | |
1379 | desc='Set the ssh_config file (use -i <ssh_config>)') | |
1380 | def _set_ssh_config(self, inbuf=None): | |
1381 | """ | |
1382 | Set an ssh_config file provided from stdin | |
1383 | ||
1384 | TODO: | |
1385 | - validation | |
1386 | """ | |
1387 | if inbuf is None or len(inbuf) == 0: | |
1388 | return -errno.EINVAL, "", "empty ssh config provided" | |
1389 | self.set_store("ssh_config", inbuf) | |
1390 | self.log.info('Set ssh_config') | |
1391 | return 0, "", "" | |
1392 | ||
1393 | @orchestrator._cli_write_command( | |
1394 | prefix='cephadm clear-ssh-config', | |
1395 | desc='Clear the ssh_config file') | |
1396 | def _clear_ssh_config(self): | |
1397 | """ | |
1398 | Clear the ssh_config file provided from stdin | |
1399 | """ | |
1400 | self.set_store("ssh_config", None) | |
1401 | self.ssh_config_tmp = None | |
1402 | self.log.info('Cleared ssh_config') | |
1403 | return 0, "", "" | |
1404 | ||
801d1391 TL |
1405 | @orchestrator._cli_read_command( |
1406 | prefix='cephadm get-ssh-config', | |
1407 | desc='Returns the ssh config as used by cephadm' | |
1408 | ) | |
1409 | def _get_ssh_config(self): | |
1410 | if self.ssh_config_file: | |
1411 | self.validate_ssh_config_fname(self.ssh_config_file) | |
1412 | with open(self.ssh_config_file) as f: | |
1413 | return HandleCommandResult(stdout=f.read()) | |
1414 | ssh_config = self.get_store("ssh_config") | |
1415 | if ssh_config: | |
1416 | return HandleCommandResult(stdout=ssh_config) | |
1417 | return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG) | |
1418 | ||
1419 | ||
9f95a23c TL |
1420 | @orchestrator._cli_write_command( |
1421 | 'cephadm generate-key', | |
1422 | desc='Generate a cluster SSH key (if not present)') | |
1423 | def _generate_key(self): | |
1424 | if not self.ssh_pub or not self.ssh_key: | |
1425 | self.log.info('Generating ssh key...') | |
1426 | tmp_dir = TemporaryDirectory() | |
1427 | path = tmp_dir.name + '/key' | |
1428 | try: | |
1911f103 | 1429 | subprocess.check_call([ |
9f95a23c TL |
1430 | '/usr/bin/ssh-keygen', |
1431 | '-C', 'ceph-%s' % self._cluster_fsid, | |
1432 | '-N', '', | |
1433 | '-f', path | |
1434 | ]) | |
1435 | with open(path, 'r') as f: | |
1436 | secret = f.read() | |
1437 | with open(path + '.pub', 'r') as f: | |
1438 | pub = f.read() | |
1439 | finally: | |
1440 | os.unlink(path) | |
1441 | os.unlink(path + '.pub') | |
1442 | tmp_dir.cleanup() | |
1443 | self.set_store('ssh_identity_key', secret) | |
1444 | self.set_store('ssh_identity_pub', pub) | |
1445 | self._reconfig_ssh() | |
1446 | return 0, '', '' | |
1447 | ||
1448 | @orchestrator._cli_write_command( | |
1449 | 'cephadm clear-key', | |
1450 | desc='Clear cluster SSH key') | |
1451 | def _clear_key(self): | |
1452 | self.set_store('ssh_identity_key', None) | |
1453 | self.set_store('ssh_identity_pub', None) | |
1454 | self._reconfig_ssh() | |
1455 | self.log.info('Cleared cluster SSH key') | |
1456 | return 0, '', '' | |
1457 | ||
1458 | @orchestrator._cli_read_command( | |
1459 | 'cephadm get-pub-key', | |
1460 | desc='Show SSH public key for connecting to cluster hosts') | |
1461 | def _get_pub_key(self): | |
1462 | if self.ssh_pub: | |
1463 | return 0, self.ssh_pub, '' | |
1464 | else: | |
1465 | return -errno.ENOENT, '', 'No cluster SSH key defined' | |
1466 | ||
1467 | @orchestrator._cli_read_command( | |
1468 | 'cephadm get-user', | |
1469 | desc='Show user for SSHing to cluster hosts') | |
1470 | def _get_user(self): | |
1471 | return 0, self.ssh_user, '' | |
1472 | ||
1473 | @orchestrator._cli_read_command( | |
1474 | 'cephadm check-host', | |
1475 | 'name=host,type=CephString ' | |
1476 | 'name=addr,type=CephString,req=false', | |
1477 | 'Check whether we can access and manage a remote host') | |
1478 | def check_host(self, host, addr=None): | |
1479 | out, err, code = self._run_cephadm(host, 'client', 'check-host', | |
1480 | ['--expect-hostname', host], | |
1481 | addr=addr, | |
1482 | error_ok=True, no_fsid=True) | |
1483 | if code: | |
1484 | return 1, '', ('check-host failed:\n' + '\n'.join(err)) | |
1485 | # if we have an outstanding health alert for this host, give the | |
1486 | # serve thread a kick | |
1487 | if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: | |
1488 | for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: | |
1489 | if item.startswith('host %s ' % host): | |
1490 | self.event.set() | |
1491 | return 0, '%s (%s) ok' % (host, addr), err | |
1492 | ||
1493 | @orchestrator._cli_read_command( | |
1494 | 'cephadm prepare-host', | |
1495 | 'name=host,type=CephString ' | |
1496 | 'name=addr,type=CephString,req=false', | |
1497 | 'Prepare a remote host for use with cephadm') | |
1498 | def _prepare_host(self, host, addr=None): | |
1499 | out, err, code = self._run_cephadm(host, 'client', 'prepare-host', | |
1500 | ['--expect-hostname', host], | |
1501 | addr=addr, | |
1502 | error_ok=True, no_fsid=True) | |
1503 | if code: | |
1504 | return 1, '', ('prepare-host failed:\n' + '\n'.join(err)) | |
1505 | # if we have an outstanding health alert for this host, give the | |
1506 | # serve thread a kick | |
1507 | if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: | |
1508 | for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: | |
1509 | if item.startswith('host %s ' % host): | |
1510 | self.event.set() | |
1511 | return 0, '%s (%s) ok' % (host, addr), err | |
1512 | ||
1513 | def _get_connection(self, host): | |
1514 | """ | |
1515 | Setup a connection for running commands on remote host. | |
1516 | """ | |
1517 | conn_and_r = self._cons.get(host) | |
1518 | if conn_and_r: | |
1519 | self.log.debug('Have connection to %s' % host) | |
1520 | return conn_and_r | |
1521 | n = self.ssh_user + '@' + host | |
1522 | self.log.debug("Opening connection to {} with ssh options '{}'".format( | |
1523 | n, self._ssh_options)) | |
1524 | child_logger=self.log.getChild(n) | |
1525 | child_logger.setLevel('WARNING') | |
1526 | conn = remoto.Connection( | |
1527 | n, | |
1528 | logger=child_logger, | |
1529 | ssh_options=self._ssh_options) | |
1530 | ||
1531 | r = conn.import_module(remotes) | |
1532 | self._cons[host] = conn, r | |
1533 | ||
1534 | return conn, r | |
1535 | ||
1536 | def _executable_path(self, conn, executable): | |
1537 | """ | |
1538 | Remote validator that accepts a connection object to ensure that a certain | |
1539 | executable is available returning its full path if so. | |
1540 | ||
1541 | Otherwise an exception with thorough details will be raised, informing the | |
1542 | user that the executable was not found. | |
1543 | """ | |
1544 | executable_path = conn.remote_module.which(executable) | |
1545 | if not executable_path: | |
1546 | raise RuntimeError("Executable '{}' not found on host '{}'".format( | |
1547 | executable, conn.hostname)) | |
1548 | self.log.debug("Found executable '{}' at path '{}'".format(executable, | |
1549 | executable_path)) | |
1550 | return executable_path | |
1551 | ||
1552 | def _run_cephadm(self, host, entity, command, args, | |
1553 | addr=None, | |
1554 | stdin=None, | |
1555 | no_fsid=False, | |
1556 | error_ok=False, | |
1557 | image=None): | |
1558 | # type: (str, Optional[str], str, List[str], Optional[str], Optional[str], bool, bool, Optional[str]) -> Tuple[List[str], List[str], int] | |
1559 | """ | |
1560 | Run cephadm on the remote host with the given command + args | |
1561 | """ | |
1562 | if not addr and host in self.inventory: | |
1563 | addr = self.inventory[host].get('addr', host) | |
1564 | ||
1911f103 TL |
1565 | self.offline_hosts_remove(host) |
1566 | ||
9f95a23c | 1567 | try: |
1911f103 TL |
1568 | try: |
1569 | conn, connr = self._get_connection(addr) | |
1570 | except IOError as e: | |
1571 | if error_ok: | |
1572 | self.log.exception('failed to establish ssh connection') | |
1573 | return [], [str("Can't communicate with remote host, possibly because python3 is not installed there")], 1 | |
1574 | raise | |
9f95a23c TL |
1575 | |
1576 | assert image or entity | |
1577 | if not image: | |
1578 | daemon_type = entity.split('.', 1)[0] # type: ignore | |
1911f103 TL |
1579 | if daemon_type in CEPH_TYPES or \ |
1580 | daemon_type == 'nfs': | |
9f95a23c TL |
1581 | # get container image |
1582 | ret, image, err = self.mon_command({ | |
1583 | 'prefix': 'config get', | |
801d1391 | 1584 | 'who': utils.name_to_config_section(entity), |
9f95a23c TL |
1585 | 'key': 'container_image', |
1586 | }) | |
1587 | image = image.strip() # type: ignore | |
1588 | self.log.debug('%s container image %s' % (entity, image)) | |
1589 | ||
1590 | final_args = [] | |
1591 | if image: | |
1592 | final_args.extend(['--image', image]) | |
1593 | final_args.append(command) | |
1594 | ||
1595 | if not no_fsid: | |
1596 | final_args += ['--fsid', self._cluster_fsid] | |
1597 | final_args += args | |
1598 | ||
1599 | if self.mode == 'root': | |
1600 | self.log.debug('args: %s' % (' '.join(final_args))) | |
1601 | if stdin: | |
1602 | self.log.debug('stdin: %s' % stdin) | |
1603 | script = 'injected_argv = ' + json.dumps(final_args) + '\n' | |
1604 | if stdin: | |
1605 | script += 'injected_stdin = ' + json.dumps(stdin) + '\n' | |
1606 | script += self._cephadm | |
1607 | python = connr.choose_python() | |
1608 | if not python: | |
1609 | raise RuntimeError( | |
1610 | 'unable to find python on %s (tried %s in %s)' % ( | |
1611 | host, remotes.PYTHONS, remotes.PATH)) | |
1612 | try: | |
1613 | out, err, code = remoto.process.check( | |
1614 | conn, | |
1615 | [python, '-u'], | |
1616 | stdin=script.encode('utf-8')) | |
1617 | except RuntimeError as e: | |
1618 | self._reset_con(host) | |
1619 | if error_ok: | |
1620 | return [], [str(e)], 1 | |
1621 | raise | |
1622 | elif self.mode == 'cephadm-package': | |
1623 | try: | |
1624 | out, err, code = remoto.process.check( | |
1625 | conn, | |
1626 | ['sudo', '/usr/bin/cephadm'] + final_args, | |
1627 | stdin=stdin) | |
1628 | except RuntimeError as e: | |
1629 | self._reset_con(host) | |
1630 | if error_ok: | |
1631 | return [], [str(e)], 1 | |
1632 | raise | |
1633 | else: | |
1634 | assert False, 'unsupported mode' | |
1635 | ||
1636 | self.log.debug('code: %d' % code) | |
1637 | if out: | |
1638 | self.log.debug('out: %s' % '\n'.join(out)) | |
1639 | if err: | |
1640 | self.log.debug('err: %s' % '\n'.join(err)) | |
1641 | if code and not error_ok: | |
1642 | raise RuntimeError( | |
1643 | 'cephadm exited with an error code: %d, stderr:%s' % ( | |
1644 | code, '\n'.join(err))) | |
1645 | return out, err, code | |
1646 | ||
1647 | except execnet.gateway_bootstrap.HostNotFound as e: | |
1648 | # this is a misleading exception as it seems to be thrown for | |
1649 | # any sort of connection failure, even those having nothing to | |
1650 | # do with "host not found" (e.g., ssh key permission denied). | |
1911f103 | 1651 | self.offline_hosts.add(host) |
801d1391 TL |
1652 | user = 'root' if self.mode == 'root' else 'cephadm' |
1653 | msg = f'Failed to connect to {host} ({addr}). ' \ | |
1654 | f'Check that the host is reachable and accepts connections using the cephadm SSH key\n' \ | |
1655 | f'you may want to run: \n' \ | |
1656 | f'> ssh -F =(ceph cephadm get-ssh-config) -i =(ceph config-key get mgr/cephadm/ssh_identity_key) {user}@{host}' | |
1657 | raise OrchestratorError(msg) from e | |
9f95a23c TL |
1658 | except Exception as ex: |
1659 | self.log.exception(ex) | |
1660 | raise | |
1661 | ||
1662 | def _get_hosts(self, label=None): | |
1663 | # type: (Optional[str]) -> List[str] | |
1664 | r = [] | |
1665 | for h, hostspec in self.inventory.items(): | |
1666 | if not label or label in hostspec.get('labels', []): | |
1667 | r.append(h) | |
1668 | return r | |
1669 | ||
1670 | @async_completion | |
1671 | def add_host(self, spec): | |
1672 | # type: (HostSpec) -> str | |
1673 | """ | |
1674 | Add a host to be managed by the orchestrator. | |
1675 | ||
1676 | :param host: host name | |
1677 | """ | |
1678 | assert_valid_host(spec.hostname) | |
1679 | out, err, code = self._run_cephadm(spec.hostname, 'client', 'check-host', | |
1680 | ['--expect-hostname', spec.hostname], | |
1681 | addr=spec.addr, | |
1682 | error_ok=True, no_fsid=True) | |
1683 | if code: | |
1684 | raise OrchestratorError('New host %s (%s) failed check: %s' % ( | |
1685 | spec.hostname, spec.addr, err)) | |
1686 | ||
1687 | self.inventory[spec.hostname] = spec.to_json() | |
1688 | self._save_inventory() | |
1689 | self.cache.prime_empty_host(spec.hostname) | |
1911f103 | 1690 | self.offline_hosts_remove(spec.hostname) |
9f95a23c TL |
1691 | self.event.set() # refresh stray health check |
1692 | self.log.info('Added host %s' % spec.hostname) | |
1693 | return "Added host '{}'".format(spec.hostname) | |
1694 | ||
1695 | @async_completion | |
1696 | def remove_host(self, host): | |
1697 | # type: (str) -> str | |
1698 | """ | |
1699 | Remove a host from orchestrator management. | |
1700 | ||
1701 | :param host: host name | |
1702 | """ | |
1703 | del self.inventory[host] | |
1704 | self._save_inventory() | |
1705 | self.cache.rm_host(host) | |
1706 | self._reset_con(host) | |
1707 | self.event.set() # refresh stray health check | |
1708 | self.log.info('Removed host %s' % host) | |
1709 | return "Removed host '{}'".format(host) | |
1710 | ||
1711 | @async_completion | |
1712 | def update_host_addr(self, host, addr): | |
1713 | if host not in self.inventory: | |
1714 | raise OrchestratorError('host %s not registered' % host) | |
1715 | self.inventory[host]['addr'] = addr | |
1716 | self._save_inventory() | |
1717 | self._reset_con(host) | |
1718 | self.event.set() # refresh stray health check | |
1719 | self.log.info('Set host %s addr to %s' % (host, addr)) | |
1720 | return "Updated host '{}' addr to '{}'".format(host, addr) | |
1721 | ||
1722 | @trivial_completion | |
1723 | def get_hosts(self): | |
1724 | # type: () -> List[orchestrator.HostSpec] | |
1725 | """ | |
1726 | Return a list of hosts managed by the orchestrator. | |
1727 | ||
1728 | Notes: | |
1729 | - skip async: manager reads from cache. | |
1730 | """ | |
1731 | r = [] | |
1732 | for hostname, info in self.inventory.items(): | |
1733 | r.append(orchestrator.HostSpec( | |
1734 | hostname, | |
1735 | addr=info.get('addr', hostname), | |
1736 | labels=info.get('labels', []), | |
1911f103 | 1737 | status='Offline' if hostname in self.offline_hosts else info.get('status', ''), |
9f95a23c TL |
1738 | )) |
1739 | return r | |
1740 | ||
1741 | @async_completion | |
1742 | def add_host_label(self, host, label): | |
1743 | if host not in self.inventory: | |
1744 | raise OrchestratorError('host %s does not exist' % host) | |
1745 | ||
1746 | if 'labels' not in self.inventory[host]: | |
1747 | self.inventory[host]['labels'] = list() | |
1748 | if label not in self.inventory[host]['labels']: | |
1749 | self.inventory[host]['labels'].append(label) | |
1750 | self._save_inventory() | |
1751 | self.log.info('Added label %s to host %s' % (label, host)) | |
1752 | return 'Added label %s to host %s' % (label, host) | |
1753 | ||
1754 | @async_completion | |
1755 | def remove_host_label(self, host, label): | |
1756 | if host not in self.inventory: | |
1757 | raise OrchestratorError('host %s does not exist' % host) | |
1758 | ||
1759 | if 'labels' not in self.inventory[host]: | |
1760 | self.inventory[host]['labels'] = list() | |
1761 | if label in self.inventory[host]['labels']: | |
1762 | self.inventory[host]['labels'].remove(label) | |
1763 | self._save_inventory() | |
1764 | self.log.info('Removed label %s to host %s' % (label, host)) | |
1765 | return 'Removed label %s from host %s' % (label, host) | |
1766 | ||
1767 | def _refresh_host_daemons(self, host): | |
1768 | try: | |
1769 | out, err, code = self._run_cephadm( | |
1770 | host, 'mon', 'ls', [], no_fsid=True) | |
1771 | if code: | |
1772 | return 'host %s cephadm ls returned %d: %s' % ( | |
1773 | host, code, err) | |
1774 | except Exception as e: | |
1775 | return 'host %s scrape failed: %s' % (host, e) | |
1776 | ls = json.loads(''.join(out)) | |
1777 | dm = {} | |
1778 | for d in ls: | |
1779 | if not d['style'].startswith('cephadm'): | |
1780 | continue | |
1781 | if d['fsid'] != self._cluster_fsid: | |
1782 | continue | |
1783 | if '.' not in d['name']: | |
1784 | continue | |
1785 | sd = orchestrator.DaemonDescription() | |
1786 | sd.last_refresh = datetime.datetime.utcnow() | |
1787 | for k in ['created', 'started', 'last_configured', 'last_deployed']: | |
1788 | v = d.get(k, None) | |
1789 | if v: | |
1790 | setattr(sd, k, datetime.datetime.strptime(d[k], DATEFMT)) | |
1791 | sd.daemon_type = d['name'].split('.')[0] | |
1792 | sd.daemon_id = '.'.join(d['name'].split('.')[1:]) | |
1793 | sd.hostname = host | |
1794 | sd.container_id = d.get('container_id') | |
1795 | if sd.container_id: | |
1796 | # shorten the hash | |
1797 | sd.container_id = sd.container_id[0:12] | |
1798 | sd.container_image_name = d.get('container_image_name') | |
1799 | sd.container_image_id = d.get('container_image_id') | |
1800 | sd.version = d.get('version') | |
1801 | if 'state' in d: | |
1802 | sd.status_desc = d['state'] | |
1803 | sd.status = { | |
1804 | 'running': 1, | |
1805 | 'stopped': 0, | |
1806 | 'error': -1, | |
1807 | 'unknown': -1, | |
1808 | }[d['state']] | |
1809 | else: | |
1810 | sd.status_desc = 'unknown' | |
1811 | sd.status = None | |
1812 | dm[sd.name()] = sd | |
1813 | self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm))) | |
1814 | self.cache.update_host_daemons(host, dm) | |
1815 | self.cache.save_host(host) | |
1816 | return None | |
1817 | ||
1818 | def _refresh_host_devices(self, host): | |
1819 | try: | |
1820 | out, err, code = self._run_cephadm( | |
1821 | host, 'osd', | |
1822 | 'ceph-volume', | |
1823 | ['--', 'inventory', '--format=json']) | |
1824 | if code: | |
1825 | return 'host %s ceph-volume inventory returned %d: %s' % ( | |
1826 | host, code, err) | |
1827 | except Exception as e: | |
1828 | return 'host %s ceph-volume inventory failed: %s' % (host, e) | |
1829 | devices = json.loads(''.join(out)) | |
1830 | try: | |
1831 | out, err, code = self._run_cephadm( | |
1832 | host, 'mon', | |
1833 | 'list-networks', | |
1834 | [], | |
1835 | no_fsid=True) | |
1836 | if code: | |
1837 | return 'host %s list-networks returned %d: %s' % ( | |
1838 | host, code, err) | |
1839 | except Exception as e: | |
1840 | return 'host %s list-networks failed: %s' % (host, e) | |
1841 | networks = json.loads(''.join(out)) | |
1842 | self.log.debug('Refreshed host %s devices (%d) networks (%s)' % ( | |
1843 | host, len(devices), len(networks))) | |
1844 | devices = inventory.Devices.from_json(devices) | |
1845 | self.cache.update_host_devices_networks(host, devices.devices, networks) | |
1846 | self.cache.save_host(host) | |
1847 | return None | |
1848 | ||
1849 | def _get_spec_size(self, spec): | |
1850 | if spec.placement.count: | |
1851 | return spec.placement.count | |
1852 | elif spec.placement.host_pattern: | |
1853 | return len(spec.placement.pattern_matches_hosts(self.inventory.keys())) | |
1854 | elif spec.placement.label: | |
1855 | return len(self._get_hosts(spec.placement.label)) | |
1856 | elif spec.placement.hosts: | |
1857 | return len(spec.placement.hosts) | |
1858 | # hmm! | |
1859 | return 0 | |
1860 | ||
1861 | @trivial_completion | |
1862 | def describe_service(self, service_type=None, service_name=None, | |
1863 | refresh=False): | |
1864 | if refresh: | |
1865 | # ugly sync path, FIXME someday perhaps? | |
1866 | for host, hi in self.inventory.items(): | |
1867 | self._refresh_host_daemons(host) | |
1868 | # <service_map> | |
1869 | sm = {} # type: Dict[str, orchestrator.ServiceDescription] | |
1911f103 | 1870 | for h, dm in self.cache.get_daemons_with_volatile_status(): |
9f95a23c TL |
1871 | for name, dd in dm.items(): |
1872 | if service_type and service_type != dd.daemon_type: | |
1873 | continue | |
1874 | n: str = dd.service_name() | |
1875 | if service_name and service_name != n: | |
1876 | continue | |
1877 | if dd.daemon_type == 'osd': | |
1878 | continue # ignore OSDs for now | |
9f95a23c TL |
1879 | if dd.service_name() in self.spec_store.specs: |
1880 | spec = self.spec_store.specs[dd.service_name()] | |
1911f103 TL |
1881 | else: |
1882 | spec = ServiceSpec( | |
1883 | unmanaged=True, | |
1884 | service_type=dd.daemon_type, | |
1885 | service_id=dd.service_id(), | |
1886 | placement=PlacementSpec( | |
1887 | hosts=[dd.hostname] | |
1888 | ) | |
1889 | ) | |
9f95a23c TL |
1890 | if n not in sm: |
1891 | sm[n] = orchestrator.ServiceDescription( | |
9f95a23c TL |
1892 | last_refresh=dd.last_refresh, |
1893 | container_image_id=dd.container_image_id, | |
1894 | container_image_name=dd.container_image_name, | |
1895 | spec=spec, | |
1896 | ) | |
1911f103 | 1897 | if dd.service_name() in self.spec_store.specs: |
9f95a23c TL |
1898 | sm[n].size = self._get_spec_size(spec) |
1899 | sm[n].created = self.spec_store.spec_created[dd.service_name()] | |
1911f103 TL |
1900 | if service_type == 'nfs': |
1901 | spec = cast(NFSServiceSpec, spec) | |
1902 | sm[n].rados_config_location = spec.rados_config_location() | |
9f95a23c TL |
1903 | else: |
1904 | sm[n].size = 0 | |
1905 | if dd.status == 1: | |
1906 | sm[n].running += 1 | |
1907 | if not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh: # type: ignore | |
1908 | sm[n].last_refresh = dd.last_refresh | |
1909 | if sm[n].container_image_id != dd.container_image_id: | |
1910 | sm[n].container_image_id = 'mix' | |
1911 | if sm[n].container_image_name != dd.container_image_name: | |
1912 | sm[n].container_image_name = 'mix' | |
1913 | for n, spec in self.spec_store.specs.items(): | |
1914 | if n in sm: | |
1915 | continue | |
1916 | if service_type is not None and service_type != spec.service_type: | |
1917 | continue | |
1918 | if service_name is not None and service_name != n: | |
1919 | continue | |
1920 | sm[n] = orchestrator.ServiceDescription( | |
9f95a23c TL |
1921 | spec=spec, |
1922 | size=self._get_spec_size(spec), | |
1923 | running=0, | |
1924 | ) | |
1911f103 TL |
1925 | if service_type == 'nfs': |
1926 | spec = cast(NFSServiceSpec, spec) | |
1927 | sm[n].rados_config_location = spec.rados_config_location() | |
1928 | return list(sm.values()) | |
9f95a23c TL |
1929 | |
1930 | @trivial_completion | |
801d1391 | 1931 | def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, |
9f95a23c TL |
1932 | host=None, refresh=False): |
1933 | if refresh: | |
1934 | # ugly sync path, FIXME someday perhaps? | |
1935 | if host: | |
1936 | self._refresh_host_daemons(host) | |
1937 | else: | |
801d1391 TL |
1938 | for hostname, hi in self.inventory.items(): |
1939 | self._refresh_host_daemons(hostname) | |
9f95a23c | 1940 | result = [] |
1911f103 | 1941 | for h, dm in self.cache.get_daemons_with_volatile_status(): |
9f95a23c TL |
1942 | if host and h != host: |
1943 | continue | |
1944 | for name, dd in dm.items(): | |
801d1391 TL |
1945 | if daemon_type is not None and daemon_type != dd.daemon_type: |
1946 | continue | |
1947 | if daemon_id is not None and daemon_id != dd.daemon_id: | |
9f95a23c | 1948 | continue |
801d1391 | 1949 | if service_name is not None and service_name != dd.service_name(): |
9f95a23c TL |
1950 | continue |
1951 | result.append(dd) | |
1952 | return result | |
1953 | ||
1954 | def service_action(self, action, service_name): | |
1955 | args = [] | |
1956 | for host, dm in self.cache.daemons.items(): | |
1957 | for name, d in dm.items(): | |
1958 | if d.matches_service(service_name): | |
1959 | args.append((d.daemon_type, d.daemon_id, | |
1960 | d.hostname, action)) | |
1961 | self.log.info('%s service %s' % (action.capitalize(), service_name)) | |
1962 | return self._daemon_actions(args) | |
1963 | ||
1964 | @async_map_completion | |
1965 | def _daemon_actions(self, daemon_type, daemon_id, host, action): | |
1966 | return self._daemon_action(daemon_type, daemon_id, host, action) | |
1967 | ||
1968 | def _daemon_action(self, daemon_type, daemon_id, host, action): | |
1969 | if action == 'redeploy': | |
1970 | # stop, recreate the container+unit, then restart | |
1971 | return self._create_daemon(daemon_type, daemon_id, host) | |
1972 | elif action == 'reconfig': | |
1973 | return self._create_daemon(daemon_type, daemon_id, host, | |
1974 | reconfig=True) | |
1975 | ||
1976 | actions = { | |
1977 | 'start': ['reset-failed', 'start'], | |
1978 | 'stop': ['stop'], | |
1979 | 'restart': ['reset-failed', 'restart'], | |
1980 | } | |
1981 | name = '%s.%s' % (daemon_type, daemon_id) | |
1982 | for a in actions[action]: | |
1983 | out, err, code = self._run_cephadm( | |
1984 | host, name, 'unit', | |
1985 | ['--name', name, a], | |
1986 | error_ok=True) | |
1987 | self.cache.invalidate_host_daemons(host) | |
1988 | return "{} {} from host '{}'".format(action, name, host) | |
1989 | ||
1990 | def daemon_action(self, action, daemon_type, daemon_id): | |
1991 | args = [] | |
1992 | for host, dm in self.cache.daemons.items(): | |
1993 | for name, d in dm.items(): | |
1994 | if d.daemon_type == daemon_type and d.daemon_id == daemon_id: | |
1995 | args.append((d.daemon_type, d.daemon_id, | |
1996 | d.hostname, action)) | |
1997 | if not args: | |
1998 | raise orchestrator.OrchestratorError( | |
1999 | 'Unable to find %s.%s daemon(s)' % ( | |
2000 | daemon_type, daemon_id)) | |
2001 | self.log.info('%s daemons %s' % ( | |
2002 | action.capitalize(), | |
2003 | ','.join(['%s.%s' % (a[0], a[1]) for a in args]))) | |
2004 | return self._daemon_actions(args) | |
2005 | ||
2006 | def remove_daemons(self, names): | |
2007 | # type: (List[str]) -> orchestrator.Completion | |
2008 | args = [] | |
2009 | for host, dm in self.cache.daemons.items(): | |
2010 | for name in names: | |
2011 | if name in dm: | |
2012 | args.append((name, host)) | |
2013 | if not args: | |
2014 | raise OrchestratorError('Unable to find daemon(s) %s' % (names)) | |
2015 | self.log.info('Remove daemons %s' % [a[0] for a in args]) | |
2016 | return self._remove_daemons(args) | |
2017 | ||
2018 | @trivial_completion | |
2019 | def remove_service(self, service_name): | |
2020 | self.log.info('Remove service %s' % service_name) | |
1911f103 TL |
2021 | found = self.spec_store.rm(service_name) |
2022 | if found: | |
2023 | self._kick_serve_loop() | |
2024 | return ['Removed service %s' % service_name] | |
2025 | else: | |
2026 | # must be idempotent: still a success. | |
2027 | return [f'Failed to remove service. <{service_name}> was not found.'] | |
9f95a23c TL |
2028 | |
2029 | @trivial_completion | |
2030 | def get_inventory(self, host_filter=None, refresh=False): | |
2031 | """ | |
2032 | Return the storage inventory of hosts matching the given filter. | |
2033 | ||
2034 | :param host_filter: host filter | |
2035 | ||
2036 | TODO: | |
2037 | - add filtering by label | |
2038 | """ | |
2039 | if refresh: | |
2040 | # ugly sync path, FIXME someday perhaps? | |
2041 | if host_filter: | |
2042 | for host in host_filter.hosts: | |
2043 | self._refresh_host_devices(host) | |
2044 | else: | |
2045 | for host, hi in self.inventory.items(): | |
2046 | self._refresh_host_devices(host) | |
2047 | ||
2048 | result = [] | |
2049 | for host, dls in self.cache.devices.items(): | |
2050 | if host_filter and host not in host_filter.hosts: | |
2051 | continue | |
2052 | result.append(orchestrator.InventoryHost(host, | |
2053 | inventory.Devices(dls))) | |
2054 | return result | |
2055 | ||
2056 | @trivial_completion | |
2057 | def zap_device(self, host, path): | |
2058 | self.log.info('Zap device %s:%s' % (host, path)) | |
2059 | out, err, code = self._run_cephadm( | |
2060 | host, 'osd', 'ceph-volume', | |
2061 | ['--', 'lvm', 'zap', '--destroy', path], | |
2062 | error_ok=True) | |
2063 | self.cache.invalidate_host_devices(host) | |
2064 | if code: | |
2065 | raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) | |
2066 | return '\n'.join(out + err) | |
2067 | ||
2068 | def blink_device_light(self, ident_fault, on, locs): | |
2069 | @async_map_completion | |
2070 | def blink(host, dev, path): | |
2071 | cmd = [ | |
2072 | 'lsmcli', | |
2073 | 'local-disk-%s-led-%s' % ( | |
2074 | ident_fault, | |
2075 | 'on' if on else 'off'), | |
2076 | '--path', path or dev, | |
2077 | ] | |
2078 | out, err, code = self._run_cephadm( | |
2079 | host, 'osd', 'shell', ['--'] + cmd, | |
2080 | error_ok=True) | |
2081 | if code: | |
2082 | raise RuntimeError( | |
2083 | 'Unable to affect %s light for %s:%s. Command: %s' % ( | |
2084 | ident_fault, host, dev, ' '.join(cmd))) | |
2085 | self.log.info('Set %s light for %s:%s %s' % ( | |
2086 | ident_fault, host, dev, 'on' if on else 'off')) | |
2087 | return "Set %s light for %s:%s %s" % ( | |
2088 | ident_fault, host, dev, 'on' if on else 'off') | |
2089 | ||
2090 | return blink(locs) | |
2091 | ||
2092 | def get_osd_uuid_map(self, only_up=False): | |
1911f103 | 2093 | # type: (bool) -> Dict[str, str] |
9f95a23c TL |
2094 | osd_map = self.get('osd_map') |
2095 | r = {} | |
2096 | for o in osd_map['osds']: | |
2097 | # only include OSDs that have ever started in this map. this way | |
2098 | # an interrupted osd create can be repeated and succeed the second | |
2099 | # time around. | |
1911f103 TL |
2100 | osd_id = o.get('osd') |
2101 | if osd_id is None: | |
2102 | raise OrchestratorError("Could not retrieve osd_id from osd_map") | |
2103 | if not only_up or (o['up_from'] > 0): | |
2104 | r[str(osd_id)] = o.get('uuid', '') | |
9f95a23c TL |
2105 | return r |
2106 | ||
2107 | @trivial_completion | |
2108 | def apply_drivegroups(self, specs: List[DriveGroupSpec]): | |
2109 | return [self._apply(spec) for spec in specs] | |
2110 | ||
1911f103 TL |
2111 | def find_destroyed_osds(self) -> Dict[str, List[str]]: |
2112 | osd_host_map: Dict[str, List[str]] = dict() | |
2113 | ret, out, err = self.mon_command({ | |
2114 | 'prefix': 'osd tree', | |
2115 | 'states': ['destroyed'], | |
2116 | 'format': 'json' | |
2117 | }) | |
2118 | if ret != 0: | |
2119 | raise OrchestratorError(f"Caught error on calling 'osd tree destroyed' -> {err}") | |
2120 | try: | |
2121 | tree = json.loads(out) | |
2122 | except json.decoder.JSONDecodeError: | |
2123 | self.log.error(f"Could not decode json -> {out}") | |
2124 | return osd_host_map | |
2125 | ||
2126 | nodes = tree.get('nodes', {}) | |
2127 | for node in nodes: | |
2128 | if node.get('type') == 'host': | |
2129 | osd_host_map.update( | |
2130 | {node.get('name'): [str(_id) for _id in node.get('children', list())]} | |
2131 | ) | |
2132 | return osd_host_map | |
2133 | ||
9f95a23c TL |
2134 | @trivial_completion |
2135 | def create_osds(self, drive_group: DriveGroupSpec): | |
1911f103 TL |
2136 | self.log.debug(f"Processing DriveGroup {drive_group}") |
2137 | ret = [] | |
2138 | drive_group.osd_id_claims = self.find_destroyed_osds() | |
2139 | self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}") | |
2140 | for host, drive_selection in self.prepare_drivegroup(drive_group): | |
2141 | self.log.info('Applying %s on host %s...' % (drive_group.service_id, host)) | |
2142 | cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection, | |
2143 | drive_group.osd_id_claims.get(host, [])) | |
2144 | if not cmd: | |
2145 | self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id)) | |
2146 | continue | |
2147 | ret_msg = self._create_osd(host, cmd, | |
2148 | replace_osd_ids=drive_group.osd_id_claims.get(host, [])) | |
2149 | ret.append(ret_msg) | |
2150 | return ", ".join(ret) | |
2151 | ||
2152 | def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]: | |
9f95a23c TL |
2153 | # 1) use fn_filter to determine matching_hosts |
2154 | matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()]) | |
2155 | # 2) Map the inventory to the InventoryHost object | |
1911f103 TL |
2156 | host_ds_map = [] |
2157 | ||
2158 | # set osd_id_claims | |
9f95a23c TL |
2159 | |
2160 | def _find_inv_for_host(hostname: str, inventory_dict: dict): | |
2161 | # This is stupid and needs to be loaded with the host | |
2162 | for _host, _inventory in inventory_dict.items(): | |
2163 | if _host == hostname: | |
2164 | return _inventory | |
2165 | raise OrchestratorError("No inventory found for host: {}".format(hostname)) | |
2166 | ||
1911f103 | 2167 | # 3) iterate over matching_host and call DriveSelection |
9f95a23c TL |
2168 | self.log.debug(f"Checking matching hosts -> {matching_hosts}") |
2169 | for host in matching_hosts: | |
2170 | inventory_for_host = _find_inv_for_host(host, self.cache.devices) | |
2171 | self.log.debug(f"Found inventory for host {inventory_for_host}") | |
1911f103 | 2172 | drive_selection = DriveSelection(drive_group, inventory_for_host) |
9f95a23c | 2173 | self.log.debug(f"Found drive selection {drive_selection}") |
1911f103 TL |
2174 | host_ds_map.append((host, drive_selection)) |
2175 | return host_ds_map | |
2176 | ||
2177 | def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec, | |
2178 | drive_selection: DriveSelection, | |
2179 | osd_id_claims: Optional[List[str]] = None, | |
2180 | preview: bool = False) -> Optional[str]: | |
2181 | self.log.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command") | |
2182 | cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, osd_id_claims, preview=preview).run() | |
2183 | self.log.debug(f"Resulting ceph-volume cmd: {cmd}") | |
2184 | return cmd | |
2185 | ||
2186 | def preview_drivegroups(self, drive_group_name: Optional[str] = None, | |
2187 | dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]: | |
2188 | # find drivegroups | |
2189 | if drive_group_name: | |
2190 | drive_groups = cast(List[DriveGroupSpec], | |
2191 | self.spec_store.find(service_name=drive_group_name)) | |
2192 | elif dg_specs: | |
2193 | drive_groups = dg_specs | |
2194 | else: | |
2195 | drive_groups = [] | |
2196 | ret_all = [] | |
2197 | for drive_group in drive_groups: | |
2198 | drive_group.osd_id_claims = self.find_destroyed_osds() | |
2199 | self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}") | |
2200 | # prepare driveselection | |
2201 | for host, ds in self.prepare_drivegroup(drive_group): | |
2202 | cmd = self.driveselection_to_ceph_volume(drive_group, ds, | |
2203 | drive_group.osd_id_claims.get(host, []), preview=True) | |
2204 | if not cmd: | |
2205 | self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name())) | |
2206 | continue | |
2207 | out, err, code = self._run_ceph_volume_command(host, cmd) | |
2208 | if out: | |
2209 | concat_out = json.loads(" ".join(out)) | |
2210 | ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id, 'host': host}) | |
2211 | return ret_all | |
9f95a23c | 2212 | |
1911f103 | 2213 | def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]: |
9f95a23c TL |
2214 | self._require_hosts(host) |
2215 | ||
2216 | # get bootstrap key | |
2217 | ret, keyring, err = self.mon_command({ | |
2218 | 'prefix': 'auth get', | |
2219 | 'entity': 'client.bootstrap-osd', | |
2220 | }) | |
2221 | ||
2222 | # generate config | |
2223 | ret, config, err = self.mon_command({ | |
2224 | "prefix": "config generate-minimal-conf", | |
2225 | }) | |
2226 | ||
2227 | j = json.dumps({ | |
2228 | 'config': config, | |
2229 | 'keyring': keyring, | |
2230 | }) | |
2231 | ||
9f95a23c TL |
2232 | split_cmd = cmd.split(' ') |
2233 | _cmd = ['--config-json', '-', '--'] | |
2234 | _cmd.extend(split_cmd) | |
2235 | out, err, code = self._run_cephadm( | |
2236 | host, 'osd', 'ceph-volume', | |
2237 | _cmd, | |
2238 | stdin=j, | |
2239 | error_ok=True) | |
1911f103 TL |
2240 | return out, err, code |
2241 | ||
2242 | def _create_osd(self, host, cmd, replace_osd_ids=None): | |
2243 | out, err, code = self._run_ceph_volume_command(host, cmd) | |
2244 | ||
9f95a23c TL |
2245 | if code == 1 and ', it is already prepared' in '\n'.join(err): |
2246 | # HACK: when we create against an existing LV, ceph-volume | |
2247 | # returns an error and the above message. To make this | |
2248 | # command idempotent, tolerate this "error" and continue. | |
2249 | self.log.debug('the device was already prepared; continuing') | |
2250 | code = 0 | |
2251 | if code: | |
2252 | raise RuntimeError( | |
2253 | 'cephadm exited with an error code: %d, stderr:%s' % ( | |
2254 | code, '\n'.join(err))) | |
2255 | ||
2256 | # check result | |
2257 | out, err, code = self._run_cephadm( | |
2258 | host, 'osd', 'ceph-volume', | |
2259 | [ | |
2260 | '--', | |
2261 | 'lvm', 'list', | |
2262 | '--format', 'json', | |
2263 | ]) | |
1911f103 | 2264 | before_osd_uuid_map = self.get_osd_uuid_map(only_up=True) |
9f95a23c TL |
2265 | osds_elems = json.loads('\n'.join(out)) |
2266 | fsid = self._cluster_fsid | |
2267 | osd_uuid_map = self.get_osd_uuid_map() | |
2268 | created = [] | |
2269 | for osd_id, osds in osds_elems.items(): | |
2270 | for osd in osds: | |
2271 | if osd['tags']['ceph.cluster_fsid'] != fsid: | |
2272 | self.log.debug('mismatched fsid, skipping %s' % osd) | |
2273 | continue | |
1911f103 TL |
2274 | if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids: |
2275 | # if it exists but is part of the replacement operation, don't skip | |
9f95a23c TL |
2276 | continue |
2277 | if osd_id not in osd_uuid_map: | |
1911f103 | 2278 | self.log.debug('osd id {} does not exist in cluster'.format(osd_id)) |
9f95a23c | 2279 | continue |
1911f103 | 2280 | if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']: |
9f95a23c TL |
2281 | self.log.debug('mismatched osd uuid (cluster has %s, osd ' |
2282 | 'has %s)' % ( | |
1911f103 | 2283 | osd_uuid_map.get(osd_id), |
9f95a23c TL |
2284 | osd['tags']['ceph.osd_fsid'])) |
2285 | continue | |
2286 | ||
2287 | created.append(osd_id) | |
2288 | self._create_daemon( | |
2289 | 'osd', osd_id, host, | |
2290 | osd_uuid_map=osd_uuid_map) | |
2291 | ||
2292 | if created: | |
2293 | self.cache.invalidate_host_devices(host) | |
2294 | return "Created osd(s) %s on host '%s'" % (','.join(created), host) | |
2295 | else: | |
2296 | return "Created no osd(s) on host %s; already created?" % host | |
2297 | ||
2298 | def _calc_daemon_deps(self, daemon_type, daemon_id): | |
2299 | need = { | |
2300 | 'prometheus': ['mgr', 'alertmanager', 'node-exporter'], | |
2301 | 'grafana': ['prometheus'], | |
801d1391 | 2302 | 'alertmanager': ['mgr', 'alertmanager'], |
9f95a23c TL |
2303 | } |
2304 | deps = [] | |
2305 | for dep_type in need.get(daemon_type, []): | |
2306 | for dd in self.cache.get_daemons_by_service(dep_type): | |
2307 | deps.append(dd.name()) | |
2308 | return sorted(deps) | |
2309 | ||
801d1391 TL |
2310 | def _get_config_and_keyring(self, daemon_type, daemon_id, |
2311 | keyring=None, | |
1911f103 | 2312 | extra_ceph_config=None): |
801d1391 TL |
2313 | # type: (str, str, Optional[str], Optional[str]) -> Dict[str, Any] |
2314 | # keyring | |
2315 | if not keyring: | |
2316 | if daemon_type == 'mon': | |
2317 | ename = 'mon.' | |
2318 | else: | |
2319 | ename = utils.name_to_config_section(daemon_type + '.' + daemon_id) | |
2320 | ret, keyring, err = self.mon_command({ | |
2321 | 'prefix': 'auth get', | |
2322 | 'entity': ename, | |
2323 | }) | |
2324 | ||
2325 | # generate config | |
2326 | ret, config, err = self.mon_command({ | |
2327 | "prefix": "config generate-minimal-conf", | |
2328 | }) | |
1911f103 TL |
2329 | if extra_ceph_config: |
2330 | config += extra_ceph_config | |
801d1391 TL |
2331 | |
2332 | return { | |
2333 | 'config': config, | |
2334 | 'keyring': keyring, | |
2335 | } | |
2336 | ||
9f95a23c TL |
2337 | def _create_daemon(self, daemon_type, daemon_id, host, |
2338 | keyring=None, | |
2339 | extra_args=None, extra_config=None, | |
2340 | reconfig=False, | |
2341 | osd_uuid_map=None): | |
2342 | if not extra_args: | |
2343 | extra_args = [] | |
1911f103 TL |
2344 | if not extra_config: |
2345 | extra_config = {} | |
9f95a23c TL |
2346 | name = '%s.%s' % (daemon_type, daemon_id) |
2347 | ||
2348 | start_time = datetime.datetime.utcnow() | |
2349 | deps = [] # type: List[str] | |
801d1391 | 2350 | cephadm_config = {} # type: Dict[str, Any] |
9f95a23c TL |
2351 | if daemon_type == 'prometheus': |
2352 | cephadm_config, deps = self._generate_prometheus_config() | |
2353 | extra_args.extend(['--config-json', '-']) | |
2354 | elif daemon_type == 'grafana': | |
2355 | cephadm_config, deps = self._generate_grafana_config() | |
2356 | extra_args.extend(['--config-json', '-']) | |
801d1391 TL |
2357 | elif daemon_type == 'nfs': |
2358 | cephadm_config, deps = \ | |
2359 | self._generate_nfs_config(daemon_type, daemon_id, host) | |
2360 | extra_args.extend(['--config-json', '-']) | |
9f95a23c TL |
2361 | elif daemon_type == 'alertmanager': |
2362 | cephadm_config, deps = self._generate_alertmanager_config() | |
2363 | extra_args.extend(['--config-json', '-']) | |
2364 | else: | |
801d1391 TL |
2365 | # Ceph.daemons (mon, mgr, mds, osd, etc) |
2366 | cephadm_config = self._get_config_and_keyring( | |
2367 | daemon_type, daemon_id, | |
2368 | keyring=keyring, | |
1911f103 TL |
2369 | extra_ceph_config=extra_config.pop('config', '')) |
2370 | if extra_config: | |
2371 | cephadm_config.update({'files': extra_config}) | |
9f95a23c TL |
2372 | extra_args.extend(['--config-json', '-']) |
2373 | ||
2374 | # osd deployments needs an --osd-uuid arg | |
2375 | if daemon_type == 'osd': | |
2376 | if not osd_uuid_map: | |
2377 | osd_uuid_map = self.get_osd_uuid_map() | |
1911f103 | 2378 | osd_uuid = osd_uuid_map.get(daemon_id) |
9f95a23c TL |
2379 | if not osd_uuid: |
2380 | raise OrchestratorError('osd.%d not in osdmap' % daemon_id) | |
2381 | extra_args.extend(['--osd-fsid', osd_uuid]) | |
2382 | ||
2383 | if reconfig: | |
2384 | extra_args.append('--reconfig') | |
2385 | if self.allow_ptrace: | |
2386 | extra_args.append('--allow-ptrace') | |
2387 | ||
2388 | self.log.info('%s daemon %s on %s' % ( | |
2389 | 'Reconfiguring' if reconfig else 'Deploying', | |
2390 | name, host)) | |
2391 | ||
2392 | out, err, code = self._run_cephadm( | |
2393 | host, name, 'deploy', | |
2394 | [ | |
2395 | '--name', name, | |
2396 | ] + extra_args, | |
2397 | stdin=json.dumps(cephadm_config)) | |
2398 | if not code and host in self.cache.daemons: | |
2399 | # prime cached service state with what we (should have) | |
2400 | # just created | |
2401 | sd = orchestrator.DaemonDescription() | |
2402 | sd.daemon_type = daemon_type | |
2403 | sd.daemon_id = daemon_id | |
2404 | sd.hostname = host | |
2405 | sd.status = 1 | |
2406 | sd.status_desc = 'starting' | |
2407 | self.cache.add_daemon(host, sd) | |
2408 | self.cache.invalidate_host_daemons(host) | |
2409 | self.cache.update_daemon_config_deps(host, name, deps, start_time) | |
2410 | self.cache.save_host(host) | |
2411 | return "{} {} on host '{}'".format( | |
2412 | 'Reconfigured' if reconfig else 'Deployed', name, host) | |
2413 | ||
2414 | @async_map_completion | |
2415 | def _remove_daemons(self, name, host): | |
2416 | return self._remove_daemon(name, host) | |
2417 | ||
2418 | def _remove_daemon(self, name, host): | |
2419 | """ | |
2420 | Remove a daemon | |
2421 | """ | |
2422 | (daemon_type, daemon_id) = name.split('.', 1) | |
2423 | if daemon_type == 'mon': | |
2424 | self._check_safe_to_destroy_mon(daemon_id) | |
2425 | ||
2426 | # remove mon from quorum before we destroy the daemon | |
2427 | self.log.info('Removing monitor %s from monmap...' % name) | |
2428 | ret, out, err = self.mon_command({ | |
2429 | 'prefix': 'mon rm', | |
2430 | 'name': daemon_id, | |
2431 | }) | |
2432 | if ret: | |
2433 | raise OrchestratorError('failed to remove mon %s from monmap' % ( | |
2434 | name)) | |
2435 | ||
2436 | args = ['--name', name, '--force'] | |
2437 | self.log.info('Removing daemon %s from %s' % (name, host)) | |
2438 | out, err, code = self._run_cephadm( | |
2439 | host, name, 'rm-daemon', args) | |
2440 | if not code: | |
2441 | # remove item from cache | |
2442 | self.cache.rm_daemon(host, name) | |
2443 | self.cache.invalidate_host_daemons(host) | |
2444 | return "Removed {} from host '{}'".format(name, host) | |
2445 | ||
2446 | def _apply_service(self, spec): | |
2447 | """ | |
2448 | Schedule a service. Deploy new daemons or remove old ones, depending | |
2449 | on the target label and count specified in the placement. | |
2450 | """ | |
2451 | daemon_type = spec.service_type | |
2452 | service_name = spec.service_name() | |
2453 | if spec.unmanaged: | |
2454 | self.log.debug('Skipping unmanaged service %s spec' % service_name) | |
2455 | return False | |
2456 | self.log.debug('Applying service %s spec' % service_name) | |
2457 | create_fns = { | |
2458 | 'mon': self._create_mon, | |
2459 | 'mgr': self._create_mgr, | |
2460 | 'osd': self.create_osds, | |
2461 | 'mds': self._create_mds, | |
2462 | 'rgw': self._create_rgw, | |
2463 | 'rbd-mirror': self._create_rbd_mirror, | |
801d1391 | 2464 | 'nfs': self._create_nfs, |
9f95a23c TL |
2465 | 'grafana': self._create_grafana, |
2466 | 'alertmanager': self._create_alertmanager, | |
2467 | 'prometheus': self._create_prometheus, | |
2468 | 'node-exporter': self._create_node_exporter, | |
2469 | 'crash': self._create_crash, | |
1911f103 | 2470 | 'iscsi': self._create_iscsi, |
9f95a23c TL |
2471 | } |
2472 | config_fns = { | |
2473 | 'mds': self._config_mds, | |
2474 | 'rgw': self._config_rgw, | |
801d1391 | 2475 | 'nfs': self._config_nfs, |
1911f103 | 2476 | 'iscsi': self._config_iscsi, |
9f95a23c TL |
2477 | } |
2478 | create_func = create_fns.get(daemon_type, None) | |
2479 | if not create_func: | |
2480 | self.log.debug('unrecognized service type %s' % daemon_type) | |
2481 | return False | |
2482 | config_func = config_fns.get(daemon_type, None) | |
2483 | ||
2484 | daemons = self.cache.get_daemons_by_service(service_name) | |
2485 | ||
2486 | public_network = None | |
2487 | if daemon_type == 'mon': | |
2488 | ret, out, err = self.mon_command({ | |
2489 | 'prefix': 'config get', | |
2490 | 'who': 'mon', | |
2491 | 'key': 'public_network', | |
2492 | }) | |
2493 | if '/' in out: | |
2494 | public_network = out.strip() | |
2495 | self.log.debug('mon public_network is %s' % public_network) | |
2496 | ||
2497 | def matches_network(host): | |
2498 | # type: (str) -> bool | |
2499 | if not public_network: | |
2500 | return False | |
2501 | # make sure we have 1 or more IPs for that network on that | |
2502 | # host | |
2503 | return len(self.cache.networks[host].get(public_network, [])) > 0 | |
2504 | ||
2505 | hosts = HostAssignment( | |
2506 | spec=spec, | |
2507 | get_hosts_func=self._get_hosts, | |
2508 | get_daemons_func=self.cache.get_daemons_by_service, | |
2509 | filter_new_host=matches_network if daemon_type == 'mon' else None, | |
2510 | ).place() | |
2511 | ||
2512 | r = False | |
2513 | ||
2514 | if daemon_type == 'osd': | |
2515 | return False if create_func(spec) else True # type: ignore | |
2516 | ||
2517 | # sanity check | |
2518 | if daemon_type in ['mon', 'mgr'] and len(hosts) < 1: | |
2519 | self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts) | |
2520 | return False | |
2521 | ||
2522 | # add any? | |
2523 | did_config = False | |
2524 | hosts_with_daemons = {d.hostname for d in daemons} | |
2525 | self.log.debug('hosts with daemons: %s' % hosts_with_daemons) | |
2526 | for host, network, name in hosts: | |
2527 | if host not in hosts_with_daemons: | |
2528 | if not did_config and config_func: | |
2529 | config_func(spec) | |
2530 | did_config = True | |
2531 | daemon_id = self.get_unique_name(daemon_type, host, daemons, | |
2532 | spec.service_id, name) | |
2533 | self.log.debug('Placing %s.%s on host %s' % ( | |
2534 | daemon_type, daemon_id, host)) | |
2535 | if daemon_type == 'mon': | |
2536 | create_func(daemon_id, host, network) # type: ignore | |
801d1391 TL |
2537 | elif daemon_type == 'nfs': |
2538 | create_func(daemon_id, host, spec) # type: ignore | |
9f95a23c TL |
2539 | else: |
2540 | create_func(daemon_id, host) # type: ignore | |
2541 | ||
2542 | # add to daemon list so next name(s) will also be unique | |
2543 | sd = orchestrator.DaemonDescription( | |
2544 | hostname=host, | |
2545 | daemon_type=daemon_type, | |
2546 | daemon_id=daemon_id, | |
2547 | ) | |
2548 | daemons.append(sd) | |
2549 | r = True | |
2550 | ||
2551 | # remove any? | |
2552 | target_hosts = [h.hostname for h in hosts] | |
2553 | for d in daemons: | |
2554 | if d.hostname not in target_hosts: | |
2555 | # NOTE: we are passing the 'force' flag here, which means | |
2556 | # we can delete a mon instances data. | |
2557 | self._remove_daemon(d.name(), d.hostname) | |
2558 | r = True | |
2559 | ||
2560 | return r | |
2561 | ||
2562 | def _apply_all_services(self): | |
2563 | r = False | |
2564 | specs = [] # type: List[ServiceSpec] | |
2565 | for sn, spec in self.spec_store.specs.items(): | |
2566 | specs.append(spec) | |
2567 | for spec in specs: | |
2568 | try: | |
2569 | if self._apply_service(spec): | |
2570 | r = True | |
2571 | except Exception as e: | |
2572 | self.log.warning('Failed to apply %s spec %s: %s' % ( | |
2573 | spec.service_name(), spec, e)) | |
2574 | return r | |
2575 | ||
2576 | def _check_daemons(self): | |
2577 | # get monmap mtime so we can refresh configs when mons change | |
2578 | monmap = self.get('mon_map') | |
2579 | last_monmap: Optional[datetime.datetime] = datetime.datetime.strptime( | |
2580 | monmap['modified'], CEPH_DATEFMT) | |
2581 | if last_monmap and last_monmap > datetime.datetime.utcnow(): | |
2582 | last_monmap = None # just in case clocks are skewed | |
2583 | ||
2584 | daemons = self.cache.get_daemons() | |
2585 | grafanas = [] # type: List[orchestrator.DaemonDescription] | |
2586 | for dd in daemons: | |
2587 | # orphan? | |
2588 | spec = self.spec_store.specs.get(dd.service_name(), None) | |
2589 | if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']: | |
2590 | # (mon and mgr specs should always exist; osds aren't matched | |
2591 | # to a service spec) | |
2592 | self.log.info('Removing orphan daemon %s...' % dd.name()) | |
2593 | self._remove_daemon(dd.name(), dd.hostname) | |
2594 | ||
2595 | # ignore unmanaged services | |
2596 | if not spec or spec.unmanaged: | |
2597 | continue | |
2598 | ||
2599 | # dependencies? | |
2600 | if dd.daemon_type == 'grafana': | |
2601 | # put running instances at the front of the list | |
2602 | grafanas.insert(0, dd) | |
2603 | deps = self._calc_daemon_deps(dd.daemon_type, dd.daemon_id) | |
2604 | last_deps, last_config = self.cache.get_daemon_last_config_deps( | |
2605 | dd.hostname, dd.name()) | |
2606 | if last_deps is None: | |
2607 | last_deps = [] | |
2608 | reconfig = False | |
2609 | if not last_config: | |
2610 | self.log.info('Reconfiguring %s (unknown last config time)...'% ( | |
2611 | dd.name())) | |
2612 | reconfig = True | |
2613 | elif last_deps != deps: | |
2614 | self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps, | |
2615 | deps)) | |
2616 | self.log.info('Reconfiguring %s (dependencies changed)...' % ( | |
2617 | dd.name())) | |
2618 | reconfig = True | |
2619 | elif last_monmap and \ | |
2620 | last_monmap > last_config and \ | |
2621 | dd.daemon_type in CEPH_TYPES: | |
2622 | self.log.info('Reconfiguring %s (monmap changed)...' % dd.name()) | |
2623 | reconfig = True | |
2624 | if reconfig: | |
2625 | self._create_daemon(dd.daemon_type, dd.daemon_id, | |
2626 | dd.hostname, reconfig=True) | |
2627 | ||
2628 | # make sure the dashboard [does not] references grafana | |
2629 | try: | |
2630 | current_url = self.get_module_option_ex('dashboard', | |
2631 | 'GRAFANA_API_URL') | |
2632 | if grafanas: | |
2633 | host = grafanas[0].hostname | |
2634 | url = 'https://%s:3000' % (self.inventory[host].get('addr', | |
2635 | host)) | |
2636 | if current_url != url: | |
2637 | self.log.info('Setting dashboard grafana config to %s' % url) | |
2638 | self.set_module_option_ex('dashboard', 'GRAFANA_API_URL', | |
2639 | url) | |
2640 | # FIXME: is it a signed cert?? | |
2641 | except Exception as e: | |
2642 | self.log.debug('got exception fetching dashboard grafana state: %s', | |
2643 | e) | |
2644 | ||
2645 | def _add_daemon(self, daemon_type, spec, | |
2646 | create_func, config_func=None): | |
2647 | """ | |
2648 | Add (and place) a daemon. Require explicit host placement. Do not | |
2649 | schedule, and do not apply the related scheduling limitations. | |
2650 | """ | |
2651 | self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement)) | |
2652 | if not spec.placement.hosts: | |
2653 | raise OrchestratorError('must specify host(s) to deploy on') | |
2654 | count = spec.placement.count or len(spec.placement.hosts) | |
2655 | daemons = self.cache.get_daemons_by_service(spec.service_name()) | |
2656 | return self._create_daemons(daemon_type, spec, daemons, | |
2657 | spec.placement.hosts, count, | |
2658 | create_func, config_func) | |
2659 | ||
2660 | def _create_daemons(self, daemon_type, spec, daemons, | |
2661 | hosts, count, | |
2662 | create_func, config_func=None): | |
2663 | if count > len(hosts): | |
2664 | raise OrchestratorError('too few hosts: want %d, have %s' % ( | |
2665 | count, hosts)) | |
2666 | ||
2667 | if config_func: | |
2668 | config_func(spec) | |
2669 | ||
2670 | args = [] # type: List[tuple] | |
2671 | for host, network, name in hosts: | |
2672 | daemon_id = self.get_unique_name(daemon_type, host, daemons, | |
2673 | spec.service_id, name) | |
2674 | self.log.debug('Placing %s.%s on host %s' % ( | |
2675 | daemon_type, daemon_id, host)) | |
2676 | if daemon_type == 'mon': | |
2677 | args.append((daemon_id, host, network)) # type: ignore | |
801d1391 TL |
2678 | elif daemon_type == 'nfs': |
2679 | args.append((daemon_id, host, spec)) # type: ignore | |
1911f103 TL |
2680 | elif daemon_type == 'iscsi': |
2681 | args.append((daemon_id, host, spec)) # type: ignore | |
9f95a23c TL |
2682 | else: |
2683 | args.append((daemon_id, host)) # type: ignore | |
2684 | ||
2685 | # add to daemon list so next name(s) will also be unique | |
2686 | sd = orchestrator.DaemonDescription( | |
2687 | hostname=host, | |
2688 | daemon_type=daemon_type, | |
2689 | daemon_id=daemon_id, | |
2690 | ) | |
2691 | daemons.append(sd) | |
2692 | ||
2693 | @async_map_completion | |
2694 | def create_func_map(*args): | |
2695 | return create_func(*args) | |
2696 | ||
2697 | return create_func_map(args) | |
2698 | ||
2699 | @trivial_completion | |
2700 | def apply_mon(self, spec): | |
2701 | return self._apply(spec) | |
2702 | ||
2703 | def _create_mon(self, name, host, network): | |
2704 | """ | |
2705 | Create a new monitor on the given host. | |
2706 | """ | |
2707 | # get mon. key | |
2708 | ret, keyring, err = self.mon_command({ | |
2709 | 'prefix': 'auth get', | |
2710 | 'entity': 'mon.', | |
2711 | }) | |
2712 | ||
2713 | extra_config = '[mon.%s]\n' % name | |
2714 | if network: | |
2715 | # infer whether this is a CIDR network, addrvec, or plain IP | |
2716 | if '/' in network: | |
2717 | extra_config += 'public network = %s\n' % network | |
2718 | elif network.startswith('[v') and network.endswith(']'): | |
2719 | extra_config += 'public addrv = %s\n' % network | |
2720 | elif ':' not in network: | |
2721 | extra_config += 'public addr = %s\n' % network | |
2722 | else: | |
2723 | raise OrchestratorError('Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network) | |
2724 | else: | |
2725 | # try to get the public_network from the config | |
2726 | ret, network, err = self.mon_command({ | |
2727 | 'prefix': 'config get', | |
2728 | 'who': 'mon', | |
2729 | 'key': 'public_network', | |
2730 | }) | |
2731 | network = network.strip() # type: ignore | |
2732 | if ret: | |
2733 | raise RuntimeError('Unable to fetch cluster_network config option') | |
2734 | if not network: | |
2735 | raise OrchestratorError('Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP') | |
2736 | if '/' not in network: | |
2737 | raise OrchestratorError('public_network is set but does not look like a CIDR network: \'%s\'' % network) | |
2738 | extra_config += 'public network = %s\n' % network | |
2739 | ||
2740 | return self._create_daemon('mon', name, host, | |
2741 | keyring=keyring, | |
1911f103 | 2742 | extra_config={'config': extra_config}) |
9f95a23c TL |
2743 | |
2744 | def add_mon(self, spec): | |
2745 | # type: (ServiceSpec) -> orchestrator.Completion | |
2746 | return self._add_daemon('mon', spec, self._create_mon) | |
2747 | ||
2748 | def _create_mgr(self, mgr_id, host): | |
2749 | """ | |
2750 | Create a new manager instance on a host. | |
2751 | """ | |
2752 | # get mgr. key | |
2753 | ret, keyring, err = self.mon_command({ | |
2754 | 'prefix': 'auth get-or-create', | |
2755 | 'entity': 'mgr.%s' % mgr_id, | |
2756 | 'caps': ['mon', 'profile mgr', | |
2757 | 'osd', 'allow *', | |
2758 | 'mds', 'allow *'], | |
2759 | }) | |
2760 | ||
2761 | return self._create_daemon('mgr', mgr_id, host, keyring=keyring) | |
2762 | ||
2763 | def add_mgr(self, spec): | |
2764 | # type: (ServiceSpec) -> orchestrator.Completion | |
2765 | return self._add_daemon('mgr', spec, self._create_mgr) | |
2766 | ||
2767 | def _apply(self, spec: ServiceSpec) -> str: | |
2768 | if spec.placement.is_empty(): | |
2769 | # fill in default placement | |
2770 | defaults = { | |
2771 | 'mon': PlacementSpec(count=5), | |
2772 | 'mgr': PlacementSpec(count=2), | |
2773 | 'mds': PlacementSpec(count=2), | |
2774 | 'rgw': PlacementSpec(count=2), | |
1911f103 | 2775 | 'iscsi': PlacementSpec(count=1), |
9f95a23c | 2776 | 'rbd-mirror': PlacementSpec(count=2), |
801d1391 | 2777 | 'nfs': PlacementSpec(count=1), |
9f95a23c TL |
2778 | 'grafana': PlacementSpec(count=1), |
2779 | 'alertmanager': PlacementSpec(count=1), | |
2780 | 'prometheus': PlacementSpec(count=1), | |
2781 | 'node-exporter': PlacementSpec(host_pattern='*'), | |
2782 | 'crash': PlacementSpec(host_pattern='*'), | |
2783 | } | |
2784 | spec.placement = defaults[spec.service_type] | |
2785 | elif spec.service_type in ['mon', 'mgr'] and \ | |
2786 | spec.placement.count is not None and \ | |
2787 | spec.placement.count < 1: | |
2788 | raise OrchestratorError('cannot scale %s service below 1' % ( | |
2789 | spec.service_type)) | |
2790 | ||
2791 | HostAssignment( | |
2792 | spec=spec, | |
2793 | get_hosts_func=self._get_hosts, | |
2794 | get_daemons_func=self.cache.get_daemons_by_service, | |
2795 | ).validate() | |
2796 | ||
2797 | self.log.info('Saving service %s spec with placement %s' % ( | |
2798 | spec.service_name(), spec.placement.pretty_str())) | |
2799 | self.spec_store.save(spec) | |
2800 | self._kick_serve_loop() | |
1911f103 | 2801 | return "Scheduled %s update..." % spec.service_name() |
9f95a23c TL |
2802 | |
2803 | @trivial_completion | |
2804 | def apply(self, specs: List[ServiceSpec]): | |
2805 | return [self._apply(spec) for spec in specs] | |
2806 | ||
2807 | @trivial_completion | |
2808 | def apply_mgr(self, spec): | |
2809 | return self._apply(spec) | |
2810 | ||
2811 | def add_mds(self, spec: ServiceSpec): | |
2812 | return self._add_daemon('mds', spec, self._create_mds, self._config_mds) | |
2813 | ||
2814 | @trivial_completion | |
2815 | def apply_mds(self, spec: ServiceSpec): | |
2816 | return self._apply(spec) | |
2817 | ||
2818 | def _config_mds(self, spec): | |
2819 | # ensure mds_join_fs is set for these daemons | |
2820 | assert spec.service_id | |
2821 | ret, out, err = self.mon_command({ | |
2822 | 'prefix': 'config set', | |
2823 | 'who': 'mds.' + spec.service_id, | |
2824 | 'name': 'mds_join_fs', | |
2825 | 'value': spec.service_id, | |
2826 | }) | |
2827 | ||
2828 | def _create_mds(self, mds_id, host): | |
2829 | # get mgr. key | |
2830 | ret, keyring, err = self.mon_command({ | |
2831 | 'prefix': 'auth get-or-create', | |
2832 | 'entity': 'mds.' + mds_id, | |
2833 | 'caps': ['mon', 'profile mds', | |
2834 | 'osd', 'allow rwx', | |
2835 | 'mds', 'allow'], | |
2836 | }) | |
2837 | return self._create_daemon('mds', mds_id, host, keyring=keyring) | |
2838 | ||
2839 | def add_rgw(self, spec): | |
2840 | return self._add_daemon('rgw', spec, self._create_rgw, self._config_rgw) | |
2841 | ||
2842 | def _config_rgw(self, spec): | |
2843 | # ensure rgw_realm and rgw_zone is set for these daemons | |
2844 | ret, out, err = self.mon_command({ | |
2845 | 'prefix': 'config set', | |
1911f103 | 2846 | 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}", |
9f95a23c TL |
2847 | 'name': 'rgw_zone', |
2848 | 'value': spec.rgw_zone, | |
2849 | }) | |
2850 | ret, out, err = self.mon_command({ | |
2851 | 'prefix': 'config set', | |
1911f103 | 2852 | 'who': f"{utils.name_to_config_section('rgw')}.{spec.rgw_realm}", |
9f95a23c TL |
2853 | 'name': 'rgw_realm', |
2854 | 'value': spec.rgw_realm, | |
2855 | }) | |
9f95a23c TL |
2856 | ret, out, err = self.mon_command({ |
2857 | 'prefix': 'config set', | |
1911f103 | 2858 | 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}", |
9f95a23c | 2859 | 'name': 'rgw_frontends', |
1911f103 | 2860 | 'value': spec.rgw_frontends_config_value(), |
9f95a23c TL |
2861 | }) |
2862 | ||
1911f103 TL |
2863 | if spec.rgw_frontend_ssl_certificate: |
2864 | if isinstance(spec.rgw_frontend_ssl_certificate, list): | |
2865 | cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate) | |
2866 | else: | |
2867 | cert_data = spec.rgw_frontend_ssl_certificate | |
2868 | ret, out, err = self.mon_command({ | |
2869 | 'prefix': 'config-key set', | |
2870 | 'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.crt', | |
2871 | 'val': cert_data, | |
2872 | }) | |
2873 | ||
2874 | if spec.rgw_frontend_ssl_key: | |
2875 | if isinstance(spec.rgw_frontend_ssl_key, list): | |
2876 | key_data = '\n'.join(spec.rgw_frontend_ssl_key) | |
2877 | else: | |
2878 | key_data = spec.rgw_frontend_ssl_key | |
2879 | ret, out, err = self.mon_command({ | |
2880 | 'prefix': 'config-key set', | |
2881 | 'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.key', | |
2882 | 'val': key_data, | |
2883 | }) | |
2884 | ||
2885 | logger.info('Saving service %s spec with placement %s' % ( | |
2886 | spec.service_name(), spec.placement.pretty_str())) | |
2887 | self.spec_store.save(spec) | |
2888 | ||
9f95a23c TL |
2889 | def _create_rgw(self, rgw_id, host): |
2890 | ret, keyring, err = self.mon_command({ | |
2891 | 'prefix': 'auth get-or-create', | |
1911f103 TL |
2892 | 'entity': f"{utils.name_to_config_section('rgw')}.{rgw_id}", |
2893 | 'caps': ['mon', 'allow *', | |
9f95a23c TL |
2894 | 'mgr', 'allow rw', |
2895 | 'osd', 'allow rwx'], | |
2896 | }) | |
2897 | return self._create_daemon('rgw', rgw_id, host, keyring=keyring) | |
2898 | ||
2899 | @trivial_completion | |
2900 | def apply_rgw(self, spec): | |
2901 | return self._apply(spec) | |
2902 | ||
1911f103 TL |
2903 | def add_iscsi(self, spec): |
2904 | # type: (ServiceSpec) -> orchestrator.Completion | |
2905 | return self._add_daemon('iscsi', spec, self._create_iscsi, self._config_iscsi) | |
2906 | ||
2907 | def _config_iscsi(self, spec): | |
2908 | logger.info('Saving service %s spec with placement %s' % ( | |
2909 | spec.service_name(), spec.placement.pretty_str())) | |
2910 | self.spec_store.save(spec) | |
2911 | ||
2912 | def _create_iscsi(self, igw_id, host, spec): | |
2913 | ret, keyring, err = self.mon_command({ | |
2914 | 'prefix': 'auth get-or-create', | |
2915 | 'entity': utils.name_to_config_section('iscsi') + '.' + igw_id, | |
2916 | 'caps': ['mon', 'allow rw', | |
2917 | 'osd', f'allow rwx pool={spec.pool}'], | |
2918 | }) | |
2919 | ||
2920 | api_secure = 'false' if spec.api_secure is None else spec.api_secure | |
2921 | igw_conf = f""" | |
2922 | # generated by cephadm | |
2923 | [config] | |
2924 | cluster_client_name = {utils.name_to_config_section('iscsi')}.{igw_id} | |
2925 | pool = {spec.pool} | |
2926 | trusted_ip_list = {spec.trusted_ip_list or ''} | |
2927 | minimum_gateways = 1 | |
2928 | fqdn_enabled = {spec.fqdn_enabled or ''} | |
2929 | api_port = {spec.api_port or ''} | |
2930 | api_user = {spec.api_user or ''} | |
2931 | api_password = {spec.api_password or ''} | |
2932 | api_secure = {api_secure} | |
2933 | """ | |
2934 | extra_config = {'iscsi-gateway.cfg': igw_conf} | |
2935 | return self._create_daemon('iscsi', igw_id, host, keyring=keyring, | |
2936 | extra_config=extra_config) | |
2937 | ||
2938 | @trivial_completion | |
2939 | def apply_iscsi(self, spec): | |
2940 | return self._apply(spec) | |
2941 | ||
9f95a23c TL |
2942 | def add_rbd_mirror(self, spec): |
2943 | return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror) | |
2944 | ||
2945 | def _create_rbd_mirror(self, daemon_id, host): | |
2946 | ret, keyring, err = self.mon_command({ | |
2947 | 'prefix': 'auth get-or-create', | |
2948 | 'entity': 'client.rbd-mirror.' + daemon_id, | |
2949 | 'caps': ['mon', 'profile rbd-mirror', | |
2950 | 'osd', 'profile rbd'], | |
2951 | }) | |
2952 | return self._create_daemon('rbd-mirror', daemon_id, host, | |
2953 | keyring=keyring) | |
2954 | ||
2955 | @trivial_completion | |
2956 | def apply_rbd_mirror(self, spec): | |
2957 | return self._apply(spec) | |
2958 | ||
801d1391 TL |
2959 | def _generate_nfs_config(self, daemon_type, daemon_id, host): |
2960 | # type: (str, str, str) -> Tuple[Dict[str, Any], List[str]] | |
2961 | deps = [] # type: List[str] | |
2962 | ||
2963 | # find the matching NFSServiceSpec | |
2964 | # TODO: find the spec and pass via _create_daemon instead ?? | |
2965 | service_name = self.get_service_name(daemon_type, daemon_id, host) | |
2966 | specs = self.spec_store.find(service_name) | |
2967 | if not specs: | |
2968 | raise OrchestratorError('Cannot find service spec %s' % (service_name)) | |
2969 | elif len(specs) > 1: | |
2970 | raise OrchestratorError('Found multiple service specs for %s' % (service_name)) | |
2971 | else: | |
2972 | # cast to keep mypy happy | |
2973 | spec = cast(NFSServiceSpec, specs[0]) | |
2974 | ||
2975 | nfs = NFSGanesha(self, daemon_id, spec) | |
2976 | ||
2977 | # create the keyring | |
2978 | entity = nfs.get_keyring_entity() | |
2979 | keyring = nfs.get_or_create_keyring(entity=entity) | |
2980 | ||
2981 | # update the caps after get-or-create, the keyring might already exist! | |
2982 | nfs.update_keyring_caps(entity=entity) | |
2983 | ||
2984 | # create the rados config object | |
2985 | nfs.create_rados_config_obj() | |
2986 | ||
2987 | # generate the cephadm config | |
2988 | cephadm_config = nfs.get_cephadm_config() | |
2989 | cephadm_config.update( | |
2990 | self._get_config_and_keyring( | |
2991 | daemon_type, daemon_id, | |
2992 | keyring=keyring)) | |
2993 | ||
2994 | return cephadm_config, deps | |
2995 | ||
2996 | def add_nfs(self, spec): | |
2997 | return self._add_daemon('nfs', spec, self._create_nfs, self._config_nfs) | |
2998 | ||
2999 | def _config_nfs(self, spec): | |
3000 | logger.info('Saving service %s spec with placement %s' % ( | |
3001 | spec.service_name(), spec.placement.pretty_str())) | |
3002 | self.spec_store.save(spec) | |
3003 | ||
3004 | def _create_nfs(self, daemon_id, host, spec): | |
3005 | return self._create_daemon('nfs', daemon_id, host) | |
3006 | ||
3007 | @trivial_completion | |
3008 | def apply_nfs(self, spec): | |
3009 | return self._apply(spec) | |
3010 | ||
9f95a23c TL |
3011 | def _generate_prometheus_config(self): |
3012 | # type: () -> Tuple[Dict[str, Any], List[str]] | |
3013 | deps = [] # type: List[str] | |
3014 | ||
3015 | # scrape mgrs | |
3016 | mgr_scrape_list = [] | |
3017 | mgr_map = self.get('mgr_map') | |
3018 | port = None | |
3019 | t = mgr_map.get('services', {}).get('prometheus', None) | |
3020 | if t: | |
3021 | t = t.split('/')[2] | |
3022 | mgr_scrape_list.append(t) | |
3023 | port = '9283' | |
3024 | if ':' in t: | |
3025 | port = t.split(':')[1] | |
3026 | # scan all mgrs to generate deps and to get standbys too. | |
3027 | # assume that they are all on the same port as the active mgr. | |
3028 | for dd in self.cache.get_daemons_by_service('mgr'): | |
3029 | # we consider the mgr a dep even if the prometheus module is | |
3030 | # disabled in order to be consistent with _calc_daemon_deps(). | |
3031 | deps.append(dd.name()) | |
3032 | if not port: | |
3033 | continue | |
3034 | if dd.daemon_id == self.get_mgr_id(): | |
3035 | continue | |
3036 | hi = self.inventory.get(dd.hostname, {}) | |
3037 | addr = hi.get('addr', dd.hostname) | |
3038 | mgr_scrape_list.append(addr.split(':')[0] + ':' + port) | |
3039 | ||
3040 | # scrape node exporters | |
3041 | node_configs = '' | |
3042 | for dd in self.cache.get_daemons_by_service('node-exporter'): | |
3043 | deps.append(dd.name()) | |
3044 | hi = self.inventory.get(dd.hostname, {}) | |
3045 | addr = hi.get('addr', dd.hostname) | |
3046 | if not node_configs: | |
3047 | node_configs = """ | |
3048 | - job_name: 'node' | |
3049 | static_configs: | |
3050 | """ | |
3051 | node_configs += """ - targets: {} | |
3052 | labels: | |
3053 | instance: '{}' | |
3054 | """.format([addr.split(':')[0] + ':9100'], | |
3055 | dd.hostname) | |
3056 | ||
3057 | # scrape alert managers | |
3058 | alertmgr_configs = "" | |
3059 | alertmgr_targets = [] | |
3060 | for dd in self.cache.get_daemons_by_service('alertmanager'): | |
3061 | deps.append(dd.name()) | |
3062 | hi = self.inventory.get(dd.hostname, {}) | |
3063 | addr = hi.get('addr', dd.hostname) | |
3064 | alertmgr_targets.append("'{}:9093'".format(addr.split(':')[0])) | |
3065 | if alertmgr_targets: | |
3066 | alertmgr_configs = """alerting: | |
3067 | alertmanagers: | |
3068 | - scheme: http | |
3069 | path_prefix: /alertmanager | |
3070 | static_configs: | |
3071 | - targets: [{}] | |
3072 | """.format(", ".join(alertmgr_targets)) | |
3073 | ||
3074 | # generate the prometheus configuration | |
801d1391 | 3075 | r = { |
9f95a23c TL |
3076 | 'files': { |
3077 | 'prometheus.yml': """# generated by cephadm | |
3078 | global: | |
3079 | scrape_interval: 5s | |
3080 | evaluation_interval: 10s | |
3081 | rule_files: | |
3082 | - /etc/prometheus/alerting/* | |
3083 | {alertmgr_configs} | |
3084 | scrape_configs: | |
3085 | - job_name: 'ceph' | |
3086 | static_configs: | |
3087 | - targets: {mgr_scrape_list} | |
3088 | labels: | |
3089 | instance: 'ceph_cluster' | |
3090 | {node_configs} | |
3091 | """.format( | |
3092 | mgr_scrape_list=str(mgr_scrape_list), | |
3093 | node_configs=str(node_configs), | |
3094 | alertmgr_configs=str(alertmgr_configs) | |
3095 | ), | |
3096 | }, | |
801d1391 TL |
3097 | } |
3098 | ||
3099 | # include alerts, if present in the container | |
3100 | if os.path.exists(self.prometheus_alerts_path): | |
3101 | with open(self.prometheus_alerts_path, "r") as f: | |
3102 | alerts = f.read() | |
3103 | r['files']['/etc/prometheus/alerting/ceph_alerts.yml'] = alerts | |
3104 | ||
3105 | return r, sorted(deps) | |
9f95a23c TL |
3106 | |
3107 | def _generate_grafana_config(self): | |
3108 | # type: () -> Tuple[Dict[str, Any], List[str]] | |
3109 | deps = [] # type: List[str] | |
3110 | def generate_grafana_ds_config(hosts: List[str]) -> str: | |
3111 | config = '''# generated by cephadm | |
3112 | deleteDatasources: | |
3113 | {delete_data_sources} | |
3114 | ||
3115 | datasources: | |
3116 | {data_sources} | |
3117 | ''' | |
3118 | delete_ds_template = ''' | |
3119 | - name: '{name}' | |
3120 | orgId: 1\n'''.lstrip('\n') | |
3121 | ds_template = ''' | |
3122 | - name: '{name}' | |
3123 | type: 'prometheus' | |
3124 | access: 'proxy' | |
3125 | orgId: 1 | |
3126 | url: 'http://{host}:9095' | |
3127 | basicAuth: false | |
3128 | isDefault: {is_default} | |
3129 | editable: false\n'''.lstrip('\n') | |
3130 | ||
3131 | delete_data_sources = '' | |
3132 | data_sources = '' | |
3133 | for i, host in enumerate(hosts): | |
3134 | name = "Dashboard %d" % (i + 1) | |
3135 | data_sources += ds_template.format( | |
3136 | name=name, | |
3137 | host=host, | |
3138 | is_default=str(i == 0).lower() | |
3139 | ) | |
3140 | delete_data_sources += delete_ds_template.format( | |
3141 | name=name | |
3142 | ) | |
3143 | return config.format( | |
3144 | delete_data_sources=delete_data_sources, | |
3145 | data_sources=data_sources, | |
3146 | ) | |
3147 | ||
3148 | prom_services = [] # type: List[str] | |
3149 | for dd in self.cache.get_daemons_by_service('prometheus'): | |
3150 | prom_services.append(dd.hostname) | |
3151 | deps.append(dd.name()) | |
3152 | ||
3153 | cert = self.get_store('grafana_crt') | |
3154 | pkey = self.get_store('grafana_key') | |
3155 | if cert and pkey: | |
3156 | try: | |
3157 | verify_tls(cert, pkey) | |
3158 | except ServerConfigException as e: | |
3159 | logger.warning('Provided grafana TLS certificates invalid: %s', str(e)) | |
3160 | cert, pkey = None, None | |
3161 | if not (cert and pkey): | |
3162 | cert, pkey = create_self_signed_cert('Ceph', 'cephadm') | |
3163 | self.set_store('grafana_crt', cert) | |
3164 | self.set_store('grafana_key', pkey) | |
1911f103 TL |
3165 | self.mon_command({ |
3166 | 'prefix': 'dashboard set-grafana-api-ssl-verify', | |
3167 | 'value': 'false', | |
3168 | }) | |
3169 | ||
3170 | ||
9f95a23c TL |
3171 | |
3172 | config_file = { | |
3173 | 'files': { | |
3174 | "grafana.ini": """# generated by cephadm | |
3175 | [users] | |
3176 | default_theme = light | |
3177 | [auth.anonymous] | |
3178 | enabled = true | |
3179 | org_name = 'Main Org.' | |
3180 | org_role = 'Viewer' | |
3181 | [server] | |
3182 | domain = 'bootstrap.storage.lab' | |
3183 | protocol = https | |
3184 | cert_file = /etc/grafana/certs/cert_file | |
3185 | cert_key = /etc/grafana/certs/cert_key | |
3186 | http_port = 3000 | |
9f95a23c TL |
3187 | [security] |
3188 | admin_user = admin | |
3189 | admin_password = admin | |
3190 | allow_embedding = true | |
3191 | """, | |
3192 | 'provisioning/datasources/ceph-dashboard.yml': generate_grafana_ds_config(prom_services), | |
3193 | 'certs/cert_file': '# generated by cephadm\n%s' % cert, | |
3194 | 'certs/cert_key': '# generated by cephadm\n%s' % pkey, | |
3195 | } | |
3196 | } | |
3197 | return config_file, sorted(deps) | |
3198 | ||
3199 | def _get_dashboard_url(self): | |
3200 | # type: () -> str | |
3201 | return self.get('mgr_map').get('services', {}).get('dashboard', '') | |
3202 | ||
3203 | def _generate_alertmanager_config(self): | |
3204 | # type: () -> Tuple[Dict[str, Any], List[str]] | |
3205 | deps = [] # type: List[str] | |
801d1391 TL |
3206 | |
3207 | # dashboard(s) | |
3208 | dashboard_urls = [] | |
3209 | mgr_map = self.get('mgr_map') | |
3210 | port = None | |
3211 | proto = None # http: or https: | |
3212 | url = mgr_map.get('services', {}).get('dashboard', None) | |
3213 | if url: | |
3214 | dashboard_urls.append(url) | |
3215 | proto = url.split('/')[0] | |
3216 | port = url.split('/')[2].split(':')[1] | |
3217 | # scan all mgrs to generate deps and to get standbys too. | |
3218 | # assume that they are all on the same port as the active mgr. | |
3219 | for dd in self.cache.get_daemons_by_service('mgr'): | |
3220 | # we consider mgr a dep even if the dashboard is disabled | |
3221 | # in order to be consistent with _calc_daemon_deps(). | |
3222 | deps.append(dd.name()) | |
3223 | if not port: | |
3224 | continue | |
3225 | if dd.daemon_id == self.get_mgr_id(): | |
3226 | continue | |
3227 | hi = self.inventory.get(dd.hostname, {}) | |
3228 | addr = hi.get('addr', dd.hostname) | |
3229 | dashboard_urls.append('%s//%s:%s/' % (proto, addr.split(':')[0], | |
3230 | port)) | |
3231 | ||
9f95a23c TL |
3232 | yml = """# generated by cephadm |
3233 | # See https://prometheus.io/docs/alerting/configuration/ for documentation. | |
3234 | ||
3235 | global: | |
3236 | resolve_timeout: 5m | |
3237 | ||
3238 | route: | |
3239 | group_by: ['alertname'] | |
3240 | group_wait: 10s | |
3241 | group_interval: 10s | |
3242 | repeat_interval: 1h | |
3243 | receiver: 'ceph-dashboard' | |
3244 | receivers: | |
3245 | - name: 'ceph-dashboard' | |
3246 | webhook_configs: | |
801d1391 TL |
3247 | {urls} |
3248 | """.format( | |
3249 | urls='\n'.join( | |
3250 | [" - url: '{}api/prometheus_receiver'".format(u) | |
3251 | for u in dashboard_urls] | |
3252 | )) | |
9f95a23c TL |
3253 | peers = [] |
3254 | port = '9094' | |
3255 | for dd in self.cache.get_daemons_by_service('alertmanager'): | |
3256 | deps.append(dd.name()) | |
3257 | hi = self.inventory.get(dd.hostname, {}) | |
3258 | addr = hi.get('addr', dd.hostname) | |
3259 | peers.append(addr.split(':')[0] + ':' + port) | |
3260 | return { | |
3261 | "files": { | |
3262 | "alertmanager.yml": yml | |
3263 | }, | |
3264 | "peers": peers | |
3265 | }, sorted(deps) | |
3266 | ||
3267 | def add_prometheus(self, spec): | |
3268 | return self._add_daemon('prometheus', spec, self._create_prometheus) | |
3269 | ||
3270 | def _create_prometheus(self, daemon_id, host): | |
3271 | return self._create_daemon('prometheus', daemon_id, host) | |
3272 | ||
3273 | @trivial_completion | |
3274 | def apply_prometheus(self, spec): | |
3275 | return self._apply(spec) | |
3276 | ||
3277 | def add_node_exporter(self, spec): | |
3278 | # type: (ServiceSpec) -> AsyncCompletion | |
3279 | return self._add_daemon('node-exporter', spec, | |
3280 | self._create_node_exporter) | |
3281 | ||
3282 | @trivial_completion | |
3283 | def apply_node_exporter(self, spec): | |
3284 | return self._apply(spec) | |
3285 | ||
3286 | def _create_node_exporter(self, daemon_id, host): | |
3287 | return self._create_daemon('node-exporter', daemon_id, host) | |
3288 | ||
3289 | def add_crash(self, spec): | |
3290 | # type: (ServiceSpec) -> AsyncCompletion | |
3291 | return self._add_daemon('crash', spec, | |
3292 | self._create_crash) | |
3293 | ||
3294 | @trivial_completion | |
3295 | def apply_crash(self, spec): | |
3296 | return self._apply(spec) | |
3297 | ||
3298 | def _create_crash(self, daemon_id, host): | |
3299 | ret, keyring, err = self.mon_command({ | |
3300 | 'prefix': 'auth get-or-create', | |
3301 | 'entity': 'client.crash.' + host, | |
3302 | 'caps': ['mon', 'profile crash', | |
3303 | 'mgr', 'profile crash'], | |
3304 | }) | |
3305 | return self._create_daemon('crash', daemon_id, host, keyring=keyring) | |
3306 | ||
3307 | def add_grafana(self, spec): | |
3308 | # type: (ServiceSpec) -> AsyncCompletion | |
3309 | return self._add_daemon('grafana', spec, self._create_grafana) | |
3310 | ||
3311 | @trivial_completion | |
3312 | def apply_grafana(self, spec: ServiceSpec): | |
3313 | return self._apply(spec) | |
3314 | ||
3315 | def _create_grafana(self, daemon_id, host): | |
3316 | # type: (str, str) -> str | |
3317 | return self._create_daemon('grafana', daemon_id, host) | |
3318 | ||
3319 | def add_alertmanager(self, spec): | |
3320 | # type: (ServiceSpec) -> AsyncCompletion | |
3321 | return self._add_daemon('alertmanager', spec, self._create_alertmanager) | |
3322 | ||
3323 | @trivial_completion | |
3324 | def apply_alertmanager(self, spec: ServiceSpec): | |
3325 | return self._apply(spec) | |
3326 | ||
3327 | def _create_alertmanager(self, daemon_id, host): | |
3328 | return self._create_daemon('alertmanager', daemon_id, host) | |
3329 | ||
3330 | ||
3331 | def _get_container_image_id(self, image_name): | |
3332 | # pick a random host... | |
3333 | host = None | |
3334 | for host_name, hi in self.inventory.items(): | |
3335 | host = host_name | |
3336 | break | |
3337 | if not host: | |
3338 | raise OrchestratorError('no hosts defined') | |
3339 | out, err, code = self._run_cephadm( | |
3340 | host, None, 'pull', [], | |
3341 | image=image_name, | |
3342 | no_fsid=True, | |
3343 | error_ok=True) | |
3344 | if code: | |
3345 | raise OrchestratorError('Failed to pull %s on %s: %s' % ( | |
3346 | image_name, host, '\n'.join(out))) | |
3347 | j = json.loads('\n'.join(out)) | |
3348 | image_id = j.get('image_id') | |
3349 | ceph_version = j.get('ceph_version') | |
3350 | self.log.debug('image %s -> id %s version %s' % | |
3351 | (image_name, image_id, ceph_version)) | |
3352 | return image_id, ceph_version | |
3353 | ||
3354 | @trivial_completion | |
3355 | def upgrade_check(self, image, version): | |
3356 | if version: | |
3357 | target_name = self.container_image_base + ':v' + version | |
3358 | elif image: | |
3359 | target_name = image | |
3360 | else: | |
3361 | raise OrchestratorError('must specify either image or version') | |
3362 | ||
3363 | target_id, target_version = self._get_container_image_id(target_name) | |
3364 | self.log.debug('Target image %s id %s version %s' % ( | |
3365 | target_name, target_id, target_version)) | |
3366 | r = { | |
3367 | 'target_name': target_name, | |
3368 | 'target_id': target_id, | |
3369 | 'target_version': target_version, | |
3370 | 'needs_update': dict(), | |
3371 | 'up_to_date': list(), | |
3372 | } | |
3373 | for host, dm in self.cache.daemons.items(): | |
3374 | for name, dd in dm.items(): | |
3375 | if target_id == dd.container_image_id: | |
3376 | r['up_to_date'].append(dd.name()) | |
3377 | else: | |
3378 | r['needs_update'][dd.name()] = { | |
3379 | 'current_name': dd.container_image_name, | |
3380 | 'current_id': dd.container_image_id, | |
3381 | 'current_version': dd.version, | |
3382 | } | |
3383 | return json.dumps(r, indent=4, sort_keys=True) | |
3384 | ||
3385 | @trivial_completion | |
3386 | def upgrade_status(self): | |
3387 | r = orchestrator.UpgradeStatusSpec() | |
3388 | if self.upgrade_state: | |
3389 | r.target_image = self.upgrade_state.get('target_name') | |
3390 | r.in_progress = True | |
3391 | if self.upgrade_state.get('error'): | |
3392 | r.message = 'Error: ' + self.upgrade_state.get('error') | |
3393 | elif self.upgrade_state.get('paused'): | |
3394 | r.message = 'Upgrade paused' | |
3395 | return r | |
3396 | ||
3397 | @trivial_completion | |
3398 | def upgrade_start(self, image, version): | |
3399 | if self.mode != 'root': | |
3400 | raise OrchestratorError('upgrade is not supported in %s mode' % ( | |
3401 | self.mode)) | |
3402 | if version: | |
3403 | try: | |
3404 | (major, minor, patch) = version.split('.') | |
3405 | assert int(minor) >= 0 | |
3406 | assert int(patch) >= 0 | |
3407 | except: | |
3408 | raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)') | |
3409 | if int(major) < 15 or (int(major) == 15 and int(minor) < 2): | |
3410 | raise OrchestratorError('cephadm only supports octopus (15.2.0) or later') | |
3411 | target_name = self.container_image_base + ':v' + version | |
3412 | elif image: | |
3413 | target_name = image | |
3414 | else: | |
3415 | raise OrchestratorError('must specify either image or version') | |
3416 | if self.upgrade_state: | |
3417 | if self.upgrade_state.get('target_name') != target_name: | |
3418 | raise OrchestratorError( | |
3419 | 'Upgrade to %s (not %s) already in progress' % | |
3420 | (self.upgrade_state.get('target_name'), target_name)) | |
3421 | if self.upgrade_state.get('paused'): | |
3422 | del self.upgrade_state['paused'] | |
3423 | self._save_upgrade_state() | |
3424 | return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name') | |
3425 | return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name') | |
3426 | self.upgrade_state = { | |
3427 | 'target_name': target_name, | |
3428 | 'progress_id': str(uuid.uuid4()), | |
3429 | } | |
3430 | self._update_upgrade_progress(0.0) | |
3431 | self._save_upgrade_state() | |
3432 | self._clear_upgrade_health_checks() | |
3433 | self.event.set() | |
801d1391 | 3434 | return 'Initiating upgrade to %s' % (target_name) |
9f95a23c TL |
3435 | |
3436 | @trivial_completion | |
3437 | def upgrade_pause(self): | |
3438 | if not self.upgrade_state: | |
3439 | raise OrchestratorError('No upgrade in progress') | |
3440 | if self.upgrade_state.get('paused'): | |
3441 | return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name') | |
3442 | self.upgrade_state['paused'] = True | |
3443 | self._save_upgrade_state() | |
3444 | return 'Paused upgrade to %s' % self.upgrade_state.get('target_name') | |
3445 | ||
3446 | @trivial_completion | |
3447 | def upgrade_resume(self): | |
3448 | if not self.upgrade_state: | |
3449 | raise OrchestratorError('No upgrade in progress') | |
3450 | if not self.upgrade_state.get('paused'): | |
3451 | return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name') | |
3452 | del self.upgrade_state['paused'] | |
3453 | self._save_upgrade_state() | |
3454 | self.event.set() | |
3455 | return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name') | |
3456 | ||
3457 | @trivial_completion | |
3458 | def upgrade_stop(self): | |
3459 | if not self.upgrade_state: | |
3460 | return 'No upgrade in progress' | |
3461 | target_name = self.upgrade_state.get('target_name') | |
3462 | if 'progress_id' in self.upgrade_state: | |
3463 | self.remote('progress', 'complete', | |
3464 | self.upgrade_state['progress_id']) | |
3465 | self.upgrade_state = None | |
3466 | self._save_upgrade_state() | |
3467 | self._clear_upgrade_health_checks() | |
3468 | self.event.set() | |
3469 | return 'Stopped upgrade to %s' % target_name | |
3470 | ||
3471 | @trivial_completion | |
3472 | def remove_osds(self, osd_ids: List[str], | |
3473 | replace: bool = False, | |
3474 | force: bool = False): | |
3475 | """ | |
3476 | Takes a list of OSDs and schedules them for removal. | |
3477 | The function that takes care of the actual removal is | |
3478 | _remove_osds_bg(). | |
3479 | """ | |
3480 | ||
3481 | daemons = self.cache.get_daemons_by_service('osd') | |
3482 | found: Set[OSDRemoval] = set() | |
3483 | for daemon in daemons: | |
3484 | if daemon.daemon_id not in osd_ids: | |
3485 | continue | |
3486 | found.add(OSDRemoval(daemon.daemon_id, replace, force, | |
3487 | daemon.hostname, daemon.name(), | |
3488 | datetime.datetime.utcnow(), -1)) | |
3489 | ||
3490 | not_found = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]} | |
3491 | if not_found: | |
3492 | raise OrchestratorError('Unable to find OSD: %s' % not_found) | |
3493 | ||
3494 | self.rm_util.queue_osds_for_removal(found) | |
3495 | ||
3496 | # trigger the serve loop to initiate the removal | |
3497 | self._kick_serve_loop() | |
3498 | return "Scheduled OSD(s) for removal" | |
3499 | ||
3500 | @trivial_completion | |
3501 | def remove_osds_status(self): | |
3502 | """ | |
3503 | The CLI call to retrieve an osd removal report | |
3504 | """ | |
3505 | return self.rm_util.report | |
3506 | ||
9f95a23c TL |
3507 | |
3508 | class BaseScheduler(object): | |
3509 | """ | |
3510 | Base Scheduler Interface | |
3511 | ||
3512 | * requires a placement_spec | |
3513 | ||
3514 | `place(host_pool)` needs to return a List[HostPlacementSpec, ..] | |
3515 | """ | |
3516 | ||
3517 | def __init__(self, placement_spec): | |
3518 | # type: (PlacementSpec) -> None | |
3519 | self.placement_spec = placement_spec | |
3520 | ||
3521 | def place(self, host_pool, count=None): | |
3522 | # type: (List, Optional[int]) -> List[HostPlacementSpec] | |
3523 | raise NotImplementedError | |
3524 | ||
3525 | ||
3526 | class SimpleScheduler(BaseScheduler): | |
3527 | """ | |
3528 | The most simple way to pick/schedule a set of hosts. | |
3529 | 1) Shuffle the provided host_pool | |
3530 | 2) Select from list up to :count | |
3531 | """ | |
3532 | def __init__(self, placement_spec): | |
3533 | super(SimpleScheduler, self).__init__(placement_spec) | |
3534 | ||
3535 | def place(self, host_pool, count=None): | |
3536 | # type: (List, Optional[int]) -> List[HostPlacementSpec] | |
3537 | if not host_pool: | |
3538 | return [] | |
3539 | host_pool = [x for x in host_pool] | |
3540 | # shuffle for pseudo random selection | |
3541 | random.shuffle(host_pool) | |
3542 | return host_pool[:count] | |
3543 | ||
3544 | ||
3545 | class HostAssignment(object): | |
3546 | """ | |
3547 | A class to detect if hosts are being passed imperative or declarative | |
3548 | If the spec is populated via the `hosts/hosts` field it will not load | |
3549 | any hosts into the list. | |
3550 | If the spec isn't populated, i.e. when only num or label is present (declarative) | |
3551 | it will use the provided `get_host_func` to load it from the inventory. | |
3552 | ||
3553 | Schedulers can be assigned to pick hosts from the pool. | |
3554 | """ | |
3555 | ||
3556 | def __init__(self, | |
3557 | spec, # type: ServiceSpec | |
3558 | get_hosts_func, # type: Callable[[Optional[str]],List[str]] | |
3559 | get_daemons_func, # type: Callable[[str],List[orchestrator.DaemonDescription]] | |
3560 | ||
3561 | filter_new_host=None, # type: Optional[Callable[[str],bool]] | |
3562 | scheduler=None, # type: Optional[BaseScheduler] | |
3563 | ): | |
3564 | assert spec and get_hosts_func and get_daemons_func | |
3565 | self.spec = spec # type: ServiceSpec | |
3566 | self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement) | |
3567 | self.get_hosts_func = get_hosts_func | |
3568 | self.get_daemons_func = get_daemons_func | |
3569 | self.filter_new_host = filter_new_host | |
3570 | self.service_name = spec.service_name() | |
3571 | ||
3572 | ||
3573 | def validate(self): | |
3574 | self.spec.validate() | |
3575 | ||
3576 | if self.spec.placement.hosts: | |
3577 | explicit_hostnames = {h.hostname for h in self.spec.placement.hosts} | |
3578 | unknown_hosts = explicit_hostnames.difference(set(self.get_hosts_func(None))) | |
3579 | if unknown_hosts: | |
3580 | raise OrchestratorValidationError( | |
3581 | f'Cannot place {self.spec.one_line_str()} on {unknown_hosts}: Unknown hosts') | |
3582 | ||
3583 | if self.spec.placement.host_pattern: | |
3584 | pattern_hostnames = self.spec.placement.pattern_matches_hosts(self.get_hosts_func(None)) | |
3585 | if not pattern_hostnames: | |
3586 | raise OrchestratorValidationError( | |
3587 | f'Cannot place {self.spec.one_line_str()}: No matching hosts') | |
3588 | ||
3589 | if self.spec.placement.label: | |
3590 | label_hostnames = self.get_hosts_func(self.spec.placement.label) | |
3591 | if not label_hostnames: | |
3592 | raise OrchestratorValidationError( | |
3593 | f'Cannot place {self.spec.one_line_str()}: No matching ' | |
3594 | f'hosts for label {self.spec.placement.label}') | |
3595 | ||
3596 | def place(self): | |
3597 | # type: () -> List[HostPlacementSpec] | |
3598 | """ | |
3599 | Load hosts into the spec.placement.hosts container. | |
3600 | """ | |
3601 | ||
3602 | self.validate() | |
3603 | ||
3604 | # count == 0 | |
3605 | if self.spec.placement.count == 0: | |
3606 | return [] | |
3607 | ||
3608 | # respect any explicit host list | |
3609 | if self.spec.placement.hosts and not self.spec.placement.count: | |
3610 | logger.debug('Provided hosts: %s' % self.spec.placement.hosts) | |
3611 | return self.spec.placement.hosts | |
3612 | ||
3613 | # respect host_pattern | |
3614 | if self.spec.placement.host_pattern: | |
3615 | candidates = [ | |
3616 | HostPlacementSpec(x, '', '') | |
3617 | for x in self.spec.placement.pattern_matches_hosts(self.get_hosts_func(None)) | |
3618 | ] | |
3619 | logger.debug('All hosts: {}'.format(candidates)) | |
3620 | return candidates | |
3621 | ||
3622 | count = 0 | |
3623 | if self.spec.placement.hosts and \ | |
3624 | self.spec.placement.count and \ | |
3625 | len(self.spec.placement.hosts) >= self.spec.placement.count: | |
3626 | hosts = self.spec.placement.hosts | |
3627 | logger.debug('place %d over provided host list: %s' % ( | |
3628 | count, hosts)) | |
3629 | count = self.spec.placement.count | |
3630 | elif self.spec.placement.label: | |
3631 | hosts = [ | |
3632 | HostPlacementSpec(x, '', '') | |
3633 | for x in self.get_hosts_func(self.spec.placement.label) | |
3634 | ] | |
3635 | if not self.spec.placement.count: | |
3636 | logger.debug('Labeled hosts: {}'.format(hosts)) | |
3637 | return hosts | |
3638 | count = self.spec.placement.count | |
3639 | logger.debug('place %d over label %s: %s' % ( | |
3640 | count, self.spec.placement.label, hosts)) | |
3641 | else: | |
3642 | hosts = [ | |
3643 | HostPlacementSpec(x, '', '') | |
3644 | for x in self.get_hosts_func(None) | |
3645 | ] | |
3646 | if self.spec.placement.count: | |
3647 | count = self.spec.placement.count | |
3648 | else: | |
3649 | # this should be a totally empty spec given all of the | |
3650 | # alternative paths above. | |
3651 | assert self.spec.placement.count is None | |
3652 | assert not self.spec.placement.hosts | |
3653 | assert not self.spec.placement.label | |
3654 | count = 1 | |
3655 | logger.debug('place %d over all hosts: %s' % (count, hosts)) | |
3656 | ||
3657 | # we need to select a subset of the candidates | |
3658 | ||
3659 | # if a partial host list is provided, always start with that | |
3660 | if len(self.spec.placement.hosts) < count: | |
3661 | chosen = self.spec.placement.hosts | |
3662 | else: | |
3663 | chosen = [] | |
3664 | ||
3665 | # prefer hosts that already have services | |
3666 | daemons = self.get_daemons_func(self.service_name) | |
3667 | hosts_with_daemons = {d.hostname for d in daemons} | |
3668 | # calc existing daemons (that aren't already in chosen) | |
3669 | chosen_hosts = [hs.hostname for hs in chosen] | |
3670 | existing = [hs for hs in hosts | |
3671 | if hs.hostname in hosts_with_daemons and \ | |
3672 | hs.hostname not in chosen_hosts] | |
3673 | if len(chosen + existing) >= count: | |
3674 | chosen = chosen + self.scheduler.place( | |
3675 | existing, | |
3676 | count - len(chosen)) | |
3677 | logger.debug('Hosts with existing daemons: {}'.format(chosen)) | |
3678 | return chosen | |
3679 | ||
3680 | need = count - len(existing + chosen) | |
3681 | others = [hs for hs in hosts | |
3682 | if hs.hostname not in hosts_with_daemons] | |
3683 | if self.filter_new_host: | |
3684 | old = others | |
3685 | others = [h for h in others if self.filter_new_host(h.hostname)] | |
3686 | logger.debug('filtered %s down to %s' % (old, hosts)) | |
3687 | chosen = chosen + self.scheduler.place(others, need) | |
3688 | logger.debug('Combine hosts with existing daemons %s + new hosts %s' % ( | |
3689 | existing, chosen)) | |
3690 | return existing + chosen |