]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/orchestrator/_interface.py
import ceph quincy 17.2.4
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
CommitLineData
9f95a23c
TL
1
2"""
3ceph-mgr orchestrator interface
4
5Please see the ceph-mgr module developer's guide for more information.
6"""
e306af50
TL
7
8import copy
9import datetime
f67539c2 10import enum
e306af50 11import errno
9f95a23c
TL
12import logging
13import pickle
e306af50 14import re
e306af50 15
f6b5b4d7
TL
16from collections import namedtuple, OrderedDict
17from contextlib import contextmanager
1d09f67e 18from functools import wraps, reduce, update_wrapper
f67539c2
TL
19
20from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
522d829b 21 Sequence, Dict, cast, Mapping
f67539c2
TL
22
23try:
24 from typing import Protocol # Protocol was added in Python 3.8
25except ImportError:
26 class Protocol: # type: ignore
27 pass
28
9f95a23c 29
f6b5b4d7
TL
30import yaml
31
9f95a23c
TL
32from ceph.deployment import inventory
33from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
2a845540 34 IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec, TunedProfileSpec
9f95a23c 35from ceph.deployment.drive_group import DriveGroupSpec
f67539c2 36from ceph.deployment.hostspec import HostSpec, SpecValidationError
adb31ebb 37from ceph.utils import datetime_to_str, str_to_datetime
9f95a23c
TL
38
39from mgr_module import MgrModule, CLICommand, HandleCommandResult
40
9f95a23c
TL
41
42logger = logging.getLogger(__name__)
43
f6b5b4d7 44T = TypeVar('T')
f67539c2 45FuncT = TypeVar('FuncT', bound=Callable[..., Any])
f6b5b4d7 46
9f95a23c
TL
47
48class OrchestratorError(Exception):
49 """
50 General orchestrator specific error.
51
52 Used for deployment, configuration or user errors.
53
54 It's not intended for programming errors or orchestrator internal errors.
55 """
56
f6b5b4d7
TL
57 def __init__(self,
58 msg: str,
59 errno: int = -errno.EINVAL,
f67539c2 60 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
f6b5b4d7
TL
61 super(Exception, self).__init__(msg)
62 self.errno = errno
63 # See OrchestratorEvent.subject
64 self.event_subject = event_kind_subject
65
9f95a23c
TL
66
67class NoOrchestrator(OrchestratorError):
68 """
69 No orchestrator in configured.
70 """
f6b5b4d7 71
f67539c2 72 def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
f6b5b4d7 73 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
9f95a23c
TL
74
75
76class OrchestratorValidationError(OrchestratorError):
77 """
78 Raised when an orchestrator doesn't support a specific feature.
79 """
80
81
f6b5b4d7 82@contextmanager
f67539c2 83def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
f6b5b4d7
TL
84 try:
85 yield
86 except OrchestratorError as e:
87 if overwrite or hasattr(e, 'event_subject'):
88 e.event_subject = (kind, subject)
89 raise
90
91
f67539c2 92def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
9f95a23c 93 @wraps(func)
f67539c2 94 def wrapper(*args: Any, **kwargs: Any) -> Any:
9f95a23c
TL
95 try:
96 return func(*args, **kwargs)
f67539c2 97 except (OrchestratorError, SpecValidationError) as e:
9f95a23c 98 # Do not print Traceback for expected errors.
f6b5b4d7
TL
99 return HandleCommandResult(e.errno, stderr=str(e))
100 except ImportError as e:
9f95a23c
TL
101 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
102 except NotImplementedError:
103 msg = 'This Orchestrator does not support `{}`'.format(prefix)
104 return HandleCommandResult(-errno.ENOENT, stderr=msg)
105
f67539c2
TL
106 # misuse lambda to copy `wrapper`
107 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731
9f95a23c 108 wrapper_copy._prefix = prefix # type: ignore
f67539c2
TL
109 wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore
110 wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
9f95a23c
TL
111 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
112
f67539c2 113 return cast(FuncT, wrapper_copy)
9f95a23c
TL
114
115
f67539c2
TL
116def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
117 """
118 Decorator to make Orchestrator methods return
119 an OrchResult.
120 """
121
122 @wraps(f)
123 def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
124 try:
125 return OrchResult(f(*args, **kwargs))
126 except Exception as e:
a4b75251 127 logger.exception(e)
20effc67
TL
128 import os
129 if 'UNITTEST' in os.environ:
130 raise # This makes debugging of Tracebacks from unittests a bit easier
f67539c2
TL
131 return OrchResult(None, exception=e)
132
133 return cast(Callable[..., OrchResult[T]], wrapper)
134
135
136class InnerCliCommandCallable(Protocol):
137 def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
138 ...
139
140
141def _cli_command(perm: str) -> InnerCliCommandCallable:
142 def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
143 return lambda func: handle_exception(prefix, perm, func)
9f95a23c
TL
144 return inner_cli_command
145
146
147_cli_read_command = _cli_command('r')
148_cli_write_command = _cli_command('rw')
149
150
151class CLICommandMeta(type):
152 """
153 This is a workaround for the use of a global variable CLICommand.COMMANDS which
154 prevents modules from importing any other module.
155
156 We make use of CLICommand, except for the use of the global variable.
157 """
f67539c2 158 def __init__(cls, name: str, bases: Any, dct: Any) -> None:
9f95a23c 159 super(CLICommandMeta, cls).__init__(name, bases, dct)
f6b5b4d7 160 dispatch: Dict[str, CLICommand] = {}
9f95a23c
TL
161 for v in dct.values():
162 try:
163 dispatch[v._prefix] = v._cli_command
164 except AttributeError:
165 pass
166
f67539c2 167 def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
9f95a23c
TL
168 if cmd['prefix'] not in dispatch:
169 return self.handle_command(inbuf, cmd)
170
171 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
172
173 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
174 cls.handle_command = handle_command
175
176
f67539c2
TL
177class OrchResult(Generic[T]):
178 """
179 Stores a result and an exception. Mainly to circumvent the
180 MgrModule.remote() method that hides all exceptions and for
181 handling different sub-interpreters.
9f95a23c 182 """
9f95a23c 183
f67539c2
TL
184 def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
185 self.result = result
186 self.serialized_exception: Optional[bytes] = None
187 self.exception_str: str = ''
188 self.set_exception(exception)
9f95a23c 189
f67539c2 190 __slots__ = 'result', 'serialized_exception', 'exception_str'
9f95a23c 191
f67539c2
TL
192 def set_exception(self, e: Optional[Exception]) -> None:
193 if e is None:
194 self.serialized_exception = None
195 self.exception_str = ''
196 return
9f95a23c 197
f67539c2 198 self.exception_str = f'{type(e)}: {str(e)}'
9f95a23c 199 try:
f67539c2 200 self.serialized_exception = pickle.dumps(e)
9f95a23c
TL
201 except pickle.PicklingError:
202 logger.error(f"failed to pickle {e}")
203 if isinstance(e, Exception):
204 e = Exception(*e.args)
205 else:
206 e = Exception(str(e))
207 # degenerate to a plain Exception
f67539c2 208 self.serialized_exception = pickle.dumps(e)
9f95a23c 209
f6b5b4d7 210 def result_str(self) -> str:
9f95a23c
TL
211 """Force a string."""
212 if self.result is None:
213 return ''
214 if isinstance(self.result, list):
215 return '\n'.join(str(x) for x in self.result)
216 return str(self.result)
217
9f95a23c 218
f67539c2 219def raise_if_exception(c: OrchResult[T]) -> T:
9f95a23c 220 """
f67539c2 221 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
9f95a23c
TL
222 """
223 if c.serialized_exception is not None:
224 try:
225 e = pickle.loads(c.serialized_exception)
226 except (KeyError, AttributeError):
f67539c2 227 raise Exception(c.exception_str)
9f95a23c 228 raise e
f67539c2
TL
229 assert c.result is not None, 'OrchResult should either have an exception or a result'
230 return c.result
9f95a23c
TL
231
232
f67539c2
TL
233def _hide_in_features(f: FuncT) -> FuncT:
234 f._hide_in_features = True # type: ignore
9f95a23c
TL
235 return f
236
237
238class Orchestrator(object):
239 """
240 Calls in this class may do long running remote operations, with time
241 periods ranging from network latencies to package install latencies and large
242 internet downloads. For that reason, all are asynchronous, and return
243 ``Completion`` objects.
244
245 Methods should only return the completion and not directly execute
246 anything, like network calls. Otherwise the purpose of
247 those completions is defeated.
248
249 Implementations are not required to start work on an operation until
250 the caller waits on the relevant Completion objects. Callers making
251 multiple updates should not wait on Completions until they're done
252 sending operations: this enables implementations to batch up a series
253 of updates when wait() is called on a set of Completion objects.
254
255 Implementations are encouraged to keep reasonably fresh caches of
256 the status of the system: it is better to serve a stale-but-recent
257 result read of e.g. device inventory than it is to keep the caller waiting
258 while you scan hosts every time.
259 """
260
261 @_hide_in_features
f67539c2 262 def is_orchestrator_module(self) -> bool:
9f95a23c
TL
263 """
264 Enable other modules to interrogate this module to discover
265 whether it's usable as an orchestrator module.
266
267 Subclasses do not need to override this.
268 """
269 return True
270
271 @_hide_in_features
f67539c2 272 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
9f95a23c
TL
273 """
274 Report whether we can talk to the orchestrator. This is the
275 place to give the user a meaningful message if the orchestrator
276 isn't running or can't be contacted.
277
278 This method may be called frequently (e.g. every page load
279 to conditionally display a warning banner), so make sure it's
280 not too expensive. It's okay to give a slightly stale status
281 (e.g. based on a periodic background ping of the orchestrator)
282 if that's necessary to make this method fast.
283
284 .. note::
285 `True` doesn't mean that the desired functionality
286 is actually available in the orchestrator. I.e. this
287 won't work as expected::
288
1911f103
TL
289 >>> #doctest: +SKIP
290 ... if OrchestratorClientMixin().available()[0]: # wrong.
9f95a23c
TL
291 ... OrchestratorClientMixin().get_hosts()
292
f67539c2
TL
293 :return: boolean representing whether the module is available/usable
294 :return: string describing any error
295 :return: dict containing any module specific information
9f95a23c
TL
296 """
297 raise NotImplementedError()
298
299 @_hide_in_features
f67539c2 300 def get_feature_set(self) -> Dict[str, dict]:
9f95a23c
TL
301 """Describes which methods this orchestrator implements
302
303 .. note::
304 `True` doesn't mean that the desired functionality
305 is actually possible in the orchestrator. I.e. this
306 won't work as expected::
307
1911f103
TL
308 >>> #doctest: +SKIP
309 ... api = OrchestratorClientMixin()
9f95a23c
TL
310 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
311 ... api.get_hosts()
312
313 It's better to ask for forgiveness instead::
314
1911f103
TL
315 >>> #doctest: +SKIP
316 ... try:
9f95a23c
TL
317 ... OrchestratorClientMixin().get_hosts()
318 ... except (OrchestratorError, NotImplementedError):
319 ... ...
320
321 :returns: Dict of API method names to ``{'available': True or False}``
322 """
323 module = self.__class__
324 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
325 for a in Orchestrator.__dict__
326 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
327 }
328 return features
329
f6b5b4d7 330 def cancel_completions(self) -> None:
9f95a23c
TL
331 """
332 Cancels ongoing completions. Unstuck the mgr.
333 """
334 raise NotImplementedError()
335
f6b5b4d7 336 def pause(self) -> None:
9f95a23c
TL
337 raise NotImplementedError()
338
f6b5b4d7 339 def resume(self) -> None:
9f95a23c
TL
340 raise NotImplementedError()
341
f67539c2 342 def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
9f95a23c
TL
343 """
344 Add a host to the orchestrator inventory.
345
346 :param host: hostname
347 """
348 raise NotImplementedError()
349
522d829b 350 def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
9f95a23c
TL
351 """
352 Remove a host from the orchestrator inventory.
353
354 :param host: hostname
355 """
356 raise NotImplementedError()
357
33c7a0ef 358 def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]:
522d829b
TL
359 """
360 drain all daemons from a host
361
362 :param hostname: hostname
363 """
364 raise NotImplementedError()
365
f67539c2 366 def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
9f95a23c
TL
367 """
368 Update a host's address
369
370 :param host: hostname
371 :param addr: address (dns name or IP)
372 """
373 raise NotImplementedError()
374
f67539c2 375 def get_hosts(self) -> OrchResult[List[HostSpec]]:
9f95a23c
TL
376 """
377 Report the hosts in the cluster.
378
379 :return: list of HostSpec
380 """
381 raise NotImplementedError()
382
a4b75251
TL
383 def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
384 """
385 Return hosts metadata(gather_facts).
386 """
387 raise NotImplementedError()
388
f67539c2 389 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
9f95a23c
TL
390 """
391 Add a host label
392 """
393 raise NotImplementedError()
394
33c7a0ef 395 def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
9f95a23c
TL
396 """
397 Remove a host label
398 """
399 raise NotImplementedError()
400
f67539c2 401 def host_ok_to_stop(self, hostname: str) -> OrchResult:
f6b5b4d7
TL
402 """
403 Check if the specified host can be safely stopped without reducing availability
404
405 :param host: hostname
406 """
407 raise NotImplementedError()
408
f67539c2
TL
409 def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
410 """
411 Place a host in maintenance, stopping daemons and disabling it's systemd target
412 """
413 raise NotImplementedError()
414
415 def exit_host_maintenance(self, hostname: str) -> OrchResult:
416 """
417 Return a host from maintenance, restarting the clusters systemd target
418 """
419 raise NotImplementedError()
420
2a845540
TL
421 def rescan_host(self, hostname: str) -> OrchResult:
422 """Use cephadm to issue a disk rescan on each HBA
423
424 Some HBAs and external enclosures don't automatically register
425 device insertion with the kernel, so for these scenarios we need
426 to manually rescan
427
428 :param hostname: (str) host name
429 """
430 raise NotImplementedError()
431
f67539c2 432 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
9f95a23c
TL
433 """
434 Returns something that was created by `ceph-volume inventory`.
435
436 :return: list of InventoryHost
437 """
438 raise NotImplementedError()
439
f67539c2 440 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
9f95a23c
TL
441 """
442 Describe a service (of any kind) that is already configured in
443 the orchestrator. For example, when viewing an OSD in the dashboard
444 we might like to also display information about the orchestrator's
445 view of the service (like the kubernetes pod ID).
446
447 When viewing a CephFS filesystem in the dashboard, we would use this
448 to display the pods being currently run for MDS daemons.
449
450 :return: list of ServiceDescription objects.
451 """
452 raise NotImplementedError()
453
f67539c2 454 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
9f95a23c
TL
455 """
456 Describe a daemon (of any kind) that is already configured in
457 the orchestrator.
458
459 :return: list of DaemonDescription objects.
460 """
461 raise NotImplementedError()
462
f67539c2
TL
463 @handle_orch_error
464 def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
9f95a23c
TL
465 """
466 Applies any spec
467 """
f67539c2 468 fns: Dict[str, Callable[..., OrchResult[str]]] = {
9f95a23c
TL
469 'alertmanager': self.apply_alertmanager,
470 'crash': self.apply_crash,
471 'grafana': self.apply_grafana,
e306af50 472 'iscsi': self.apply_iscsi,
9f95a23c
TL
473 'mds': self.apply_mds,
474 'mgr': self.apply_mgr,
475 'mon': self.apply_mon,
e306af50 476 'nfs': self.apply_nfs,
9f95a23c 477 'node-exporter': self.apply_node_exporter,
f67539c2 478 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
9f95a23c 479 'prometheus': self.apply_prometheus,
33c7a0ef
TL
480 'loki': self.apply_loki,
481 'promtail': self.apply_promtail,
9f95a23c 482 'rbd-mirror': self.apply_rbd_mirror,
e306af50 483 'rgw': self.apply_rgw,
f67539c2 484 'ingress': self.apply_ingress,
20effc67 485 'snmp-gateway': self.apply_snmp_gateway,
e306af50 486 'host': self.add_host,
9f95a23c
TL
487 }
488
f67539c2
TL
489 def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
490 l_res = raise_if_exception(l)
491 r_res = raise_if_exception(r)
492 l_res.append(r_res)
493 return OrchResult(l_res)
494 return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
9f95a23c 495
f67539c2 496 def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
f6b5b4d7
TL
497 """
498 Plan (Dry-run, Preview) a List of Specs.
499 """
500 raise NotImplementedError()
501
f67539c2 502 def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
9f95a23c
TL
503 """
504 Remove specific daemon(s).
505
506 :return: None
507 """
508 raise NotImplementedError()
509
a4b75251 510 def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
9f95a23c
TL
511 """
512 Remove a service (a collection of daemons).
513
514 :return: None
515 """
516 raise NotImplementedError()
517
f67539c2 518 def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
9f95a23c
TL
519 """
520 Perform an action (start/stop/reload) on a service (i.e., all daemons
521 providing the logical service).
522
523 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
f6b5b4d7
TL
524 :param service_name: service_type + '.' + service_id
525 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
f67539c2 526 :rtype: OrchResult
9f95a23c 527 """
f6b5b4d7 528 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
9f95a23c
TL
529 raise NotImplementedError()
530
f67539c2 531 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
9f95a23c
TL
532 """
533 Perform an action (start/stop/reload) on a daemon.
534
535 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
f6b5b4d7
TL
536 :param daemon_name: name of daemon
537 :param image: Container image when redeploying that daemon
f67539c2 538 :rtype: OrchResult
9f95a23c 539 """
f6b5b4d7 540 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
9f95a23c
TL
541 raise NotImplementedError()
542
f67539c2 543 def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
9f95a23c
TL
544 """
545 Create one or more OSDs within a single Drive Group.
546
547 The principal argument here is the drive_group member
548 of OsdSpec: other fields are advisory/extensible for any
549 finer-grained OSD feature enablement (choice of backing store,
550 compression/encryption, etc).
551 """
552 raise NotImplementedError()
553
f67539c2 554 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
9f95a23c
TL
555 """ Update OSD cluster """
556 raise NotImplementedError()
557
e306af50
TL
558 def set_unmanaged_flag(self,
559 unmanaged_flag: bool,
560 service_type: str = 'osd',
f67539c2 561 service_name: Optional[str] = None
e306af50 562 ) -> HandleCommandResult:
1911f103
TL
563 raise NotImplementedError()
564
e306af50
TL
565 def preview_osdspecs(self,
566 osdspec_name: Optional[str] = 'osd',
567 osdspecs: Optional[List[DriveGroupSpec]] = None
f67539c2 568 ) -> OrchResult[str]:
1911f103
TL
569 """ Get a preview for OSD deployments """
570 raise NotImplementedError()
571
9f95a23c
TL
572 def remove_osds(self, osd_ids: List[str],
573 replace: bool = False,
a4b75251
TL
574 force: bool = False,
575 zap: bool = False) -> OrchResult[str]:
9f95a23c
TL
576 """
577 :param osd_ids: list of OSD IDs
578 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
579 :param force: Forces the OSD removal process without waiting for the data to be drained first.
a4b75251 580 :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
1d09f67e
TL
581
582
583 .. note:: this can only remove OSDs that were successfully
584 created (i.e. got an OSD ID).
9f95a23c
TL
585 """
586 raise NotImplementedError()
587
f67539c2 588 def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
f6b5b4d7
TL
589 """
590 TODO
591 """
592 raise NotImplementedError()
593
f67539c2 594 def remove_osds_status(self) -> OrchResult:
9f95a23c
TL
595 """
596 Returns a status of the ongoing OSD removal operations.
597 """
598 raise NotImplementedError()
599
f67539c2 600 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
9f95a23c
TL
601 """
602 Instructs the orchestrator to enable or disable either the ident or the fault LED.
603
604 :param ident_fault: either ``"ident"`` or ``"fault"``
605 :param on: ``True`` = on.
606 :param locations: See :class:`orchestrator.DeviceLightLoc`
607 """
608 raise NotImplementedError()
609
f67539c2 610 def zap_device(self, host: str, path: str) -> OrchResult[str]:
9f95a23c
TL
611 """Zap/Erase a device (DESTROYS DATA)"""
612 raise NotImplementedError()
613
f67539c2
TL
614 def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
615 """Create daemons daemon(s) for unmanaged services"""
9f95a23c
TL
616 raise NotImplementedError()
617
f67539c2 618 def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
619 """Update mon cluster"""
620 raise NotImplementedError()
621
f67539c2 622 def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
623 """Update mgr cluster"""
624 raise NotImplementedError()
625
33c7a0ef 626 def apply_mds(self, spec: MDSSpec) -> OrchResult[str]:
9f95a23c
TL
627 """Update MDS cluster"""
628 raise NotImplementedError()
629
f67539c2 630 def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
801d1391
TL
631 """Update RGW cluster"""
632 raise NotImplementedError()
633
f67539c2
TL
634 def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
635 """Update ingress daemons"""
9f95a23c
TL
636 raise NotImplementedError()
637
f67539c2 638 def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
639 """Update rbd-mirror cluster"""
640 raise NotImplementedError()
641
f67539c2 642 def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
9f95a23c
TL
643 """Update NFS cluster"""
644 raise NotImplementedError()
645
f67539c2 646 def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
1911f103
TL
647 """Update iscsi cluster"""
648 raise NotImplementedError()
649
f67539c2 650 def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
651 """Update prometheus cluster"""
652 raise NotImplementedError()
653
f67539c2 654 def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
655 """Update existing a Node-Exporter daemon(s)"""
656 raise NotImplementedError()
657
33c7a0ef
TL
658 def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]:
659 """Update existing a Loki daemon(s)"""
660 raise NotImplementedError()
661
662 def apply_promtail(self, spec: ServiceSpec) -> OrchResult[str]:
663 """Update existing a Promtail daemon(s)"""
664 raise NotImplementedError()
665
f67539c2 666 def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
667 """Update existing a crash daemon(s)"""
668 raise NotImplementedError()
669
f67539c2
TL
670 def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
671 """Update existing a grafana service"""
9f95a23c
TL
672 raise NotImplementedError()
673
f67539c2
TL
674 def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
675 """Update an existing AlertManager daemon(s)"""
9f95a23c
TL
676 raise NotImplementedError()
677
20effc67
TL
678 def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
679 """Update an existing snmp gateway service"""
9f95a23c
TL
680 raise NotImplementedError()
681
2a845540
TL
682 def apply_tuned_profiles(self, specs: List[TunedProfileSpec], no_overwrite: bool) -> OrchResult[str]:
683 """Add or update an existing tuned profile"""
684 raise NotImplementedError()
685
686 def rm_tuned_profile(self, profile_name: str) -> OrchResult[str]:
687 """Remove a tuned profile"""
688 raise NotImplementedError()
689
690 def tuned_profile_ls(self) -> OrchResult[List[TunedProfileSpec]]:
691 """See current tuned profiles"""
692 raise NotImplementedError()
693
694 def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> OrchResult[str]:
695 """Change/Add a specific setting for a tuned profile"""
696 raise NotImplementedError()
697
698 def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> OrchResult[str]:
699 """Remove a specific setting for a tuned profile"""
700 raise NotImplementedError()
701
f67539c2 702 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
9f95a23c
TL
703 raise NotImplementedError()
704
33c7a0ef 705 def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]:
a4b75251
TL
706 raise NotImplementedError()
707
33c7a0ef
TL
708 def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
709 hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
9f95a23c
TL
710 raise NotImplementedError()
711
f67539c2 712 def upgrade_pause(self) -> OrchResult[str]:
9f95a23c
TL
713 raise NotImplementedError()
714
f67539c2 715 def upgrade_resume(self) -> OrchResult[str]:
9f95a23c
TL
716 raise NotImplementedError()
717
f67539c2 718 def upgrade_stop(self) -> OrchResult[str]:
9f95a23c
TL
719 raise NotImplementedError()
720
f67539c2 721 def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
9f95a23c
TL
722 """
723 If an upgrade is currently underway, report on where
724 we are in the process, or if some error has occurred.
725
726 :return: UpgradeStatusSpec instance
727 """
728 raise NotImplementedError()
729
730 @_hide_in_features
f67539c2 731 def upgrade_available(self) -> OrchResult:
9f95a23c
TL
732 """
733 Report on what versions are available to upgrade to
734
735 :return: List of strings
736 """
737 raise NotImplementedError()
738
739
e306af50
TL
740GenericSpec = Union[ServiceSpec, HostSpec]
741
f6b5b4d7
TL
742
743def json_to_generic_spec(spec: dict) -> GenericSpec:
e306af50
TL
744 if 'service_type' in spec and spec['service_type'] == 'host':
745 return HostSpec.from_json(spec)
746 else:
747 return ServiceSpec.from_json(spec)
9f95a23c 748
f6b5b4d7 749
f67539c2
TL
750def daemon_type_to_service(dtype: str) -> str:
751 mapping = {
752 'mon': 'mon',
753 'mgr': 'mgr',
754 'mds': 'mds',
755 'rgw': 'rgw',
756 'osd': 'osd',
757 'haproxy': 'ingress',
758 'keepalived': 'ingress',
759 'iscsi': 'iscsi',
760 'rbd-mirror': 'rbd-mirror',
761 'cephfs-mirror': 'cephfs-mirror',
762 'nfs': 'nfs',
763 'grafana': 'grafana',
764 'alertmanager': 'alertmanager',
765 'prometheus': 'prometheus',
766 'node-exporter': 'node-exporter',
33c7a0ef
TL
767 'loki': 'loki',
768 'promtail': 'promtail',
f67539c2
TL
769 'crash': 'crash',
770 'crashcollector': 'crash', # Specific Rook Daemon
771 'container': 'container',
20effc67
TL
772 'agent': 'agent',
773 'snmp-gateway': 'snmp-gateway',
f67539c2
TL
774 }
775 return mapping[dtype]
776
777
778def service_to_daemon_types(stype: str) -> List[str]:
779 mapping = {
780 'mon': ['mon'],
781 'mgr': ['mgr'],
782 'mds': ['mds'],
783 'rgw': ['rgw'],
784 'osd': ['osd'],
785 'ingress': ['haproxy', 'keepalived'],
786 'iscsi': ['iscsi'],
787 'rbd-mirror': ['rbd-mirror'],
788 'cephfs-mirror': ['cephfs-mirror'],
789 'nfs': ['nfs'],
790 'grafana': ['grafana'],
791 'alertmanager': ['alertmanager'],
792 'prometheus': ['prometheus'],
33c7a0ef
TL
793 'loki': ['loki'],
794 'promtail': ['promtail'],
f67539c2
TL
795 'node-exporter': ['node-exporter'],
796 'crash': ['crash'],
797 'container': ['container'],
20effc67
TL
798 'agent': ['agent'],
799 'snmp-gateway': ['snmp-gateway'],
f67539c2
TL
800 }
801 return mapping[stype]
802
803
522d829b
TL
804KNOWN_DAEMON_TYPES: List[str] = list(
805 sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
806
807
9f95a23c
TL
808class UpgradeStatusSpec(object):
809 # Orchestrator's report on what's going on with any ongoing upgrade
f67539c2 810 def __init__(self) -> None:
9f95a23c 811 self.in_progress = False # Is an upgrade underway?
f67539c2
TL
812 self.target_image: Optional[str] = None
813 self.services_complete: List[str] = [] # Which daemon types are fully updated?
33c7a0ef 814 self.which: str = '<unknown>' # for if user specified daemon types, services or hosts
f67539c2 815 self.progress: Optional[str] = None # How many of the daemons have we upgraded
9f95a23c 816 self.message = "" # Freeform description
2a845540 817 self.is_paused: bool = False # Is the upgrade paused?
9f95a23c
TL
818
819
f67539c2 820def handle_type_error(method: FuncT) -> FuncT:
9f95a23c 821 @wraps(method)
f67539c2 822 def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
9f95a23c
TL
823 try:
824 return method(cls, *args, **kwargs)
825 except TypeError as e:
826 error_msg = '{}: {}'.format(cls.__name__, e)
827 raise OrchestratorValidationError(error_msg)
f67539c2
TL
828 return cast(FuncT, inner)
829
830
831class DaemonDescriptionStatus(enum.IntEnum):
20effc67 832 unknown = -2
f67539c2
TL
833 error = -1
834 stopped = 0
835 running = 1
20effc67
TL
836 starting = 2 #: Daemon is deployed, but not yet running
837
838 @staticmethod
839 def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
840 if status is None:
841 status = DaemonDescriptionStatus.unknown
842 return {
843 DaemonDescriptionStatus.unknown: 'unknown',
844 DaemonDescriptionStatus.error: 'error',
845 DaemonDescriptionStatus.stopped: 'stopped',
846 DaemonDescriptionStatus.running: 'running',
847 DaemonDescriptionStatus.starting: 'starting',
848 }.get(status, '<unknown>')
9f95a23c
TL
849
850
851class DaemonDescription(object):
852 """
853 For responding to queries about the status of a particular daemon,
854 stateful or stateless.
855
856 This is not about health or performance monitoring of daemons: it's
857 about letting the orchestrator tell Ceph whether and where a
858 daemon is scheduled in the cluster. When an orchestrator tells
859 Ceph "it's running on host123", that's not a promise that the process
860 is literally up this second, it's a description of where the orchestrator
861 has decided the daemon should run.
862 """
863
864 def __init__(self,
f67539c2
TL
865 daemon_type: Optional[str] = None,
866 daemon_id: Optional[str] = None,
867 hostname: Optional[str] = None,
868 container_id: Optional[str] = None,
869 container_image_id: Optional[str] = None,
870 container_image_name: Optional[str] = None,
871 container_image_digests: Optional[List[str]] = None,
872 version: Optional[str] = None,
873 status: Optional[DaemonDescriptionStatus] = None,
874 status_desc: Optional[str] = None,
875 last_refresh: Optional[datetime.datetime] = None,
876 created: Optional[datetime.datetime] = None,
877 started: Optional[datetime.datetime] = None,
878 last_configured: Optional[datetime.datetime] = None,
879 osdspec_affinity: Optional[str] = None,
880 last_deployed: Optional[datetime.datetime] = None,
f6b5b4d7 881 events: Optional[List['OrchestratorEvent']] = None,
f67539c2
TL
882 is_active: bool = False,
883 memory_usage: Optional[int] = None,
884 memory_request: Optional[int] = None,
885 memory_limit: Optional[int] = None,
33c7a0ef 886 cpu_percentage: Optional[str] = None,
f67539c2
TL
887 service_name: Optional[str] = None,
888 ports: Optional[List[int]] = None,
889 ip: Optional[str] = None,
890 deployed_by: Optional[List[str]] = None,
b3b6e05e
TL
891 rank: Optional[int] = None,
892 rank_generation: Optional[int] = None,
20effc67 893 extra_container_args: Optional[List[str]] = None,
f67539c2 894 ) -> None:
f6b5b4d7 895
20effc67 896 #: Host is at the same granularity as InventoryHost
f67539c2 897 self.hostname: Optional[str] = hostname
9f95a23c
TL
898
899 # Not everyone runs in containers, but enough people do to
900 # justify having the container_id (runtime id) and container_image
901 # (image name)
902 self.container_id = container_id # runtime id
f67539c2 903 self.container_image_id = container_image_id # image id locally
9f95a23c 904 self.container_image_name = container_image_name # image friendly name
f67539c2 905 self.container_image_digests = container_image_digests # reg hashes
9f95a23c 906
20effc67 907 #: The type of service (osd, mon, mgr, etc.)
9f95a23c
TL
908 self.daemon_type = daemon_type
909
20effc67
TL
910 #: The orchestrator will have picked some names for daemons,
911 #: typically either based on hostnames or on pod names.
912 #: This is the <foo> in mds.<foo>, the ID that will appear
913 #: in the FSMap/ServiceMap.
f67539c2 914 self.daemon_id: Optional[str] = daemon_id
a4b75251 915 self.daemon_name = self.name()
f67539c2 916
20effc67 917 #: Some daemon types have a numeric rank assigned
b3b6e05e
TL
918 self.rank: Optional[int] = rank
919 self.rank_generation: Optional[int] = rank_generation
920
f67539c2 921 self._service_name: Optional[str] = service_name
9f95a23c 922
20effc67 923 #: Service version that was deployed
9f95a23c
TL
924 self.version = version
925
20effc67
TL
926 # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
927 self._status = status
9f95a23c 928
20effc67 929 #: Service status description when status == error.
9f95a23c
TL
930 self.status_desc = status_desc
931
20effc67 932 #: datetime when this info was last refreshed
f6b5b4d7 933 self.last_refresh: Optional[datetime.datetime] = last_refresh
9f95a23c 934
f6b5b4d7
TL
935 self.created: Optional[datetime.datetime] = created
936 self.started: Optional[datetime.datetime] = started
937 self.last_configured: Optional[datetime.datetime] = last_configured
938 self.last_deployed: Optional[datetime.datetime] = last_deployed
9f95a23c 939
20effc67 940 #: Affinity to a certain OSDSpec
f6b5b4d7
TL
941 self.osdspec_affinity: Optional[str] = osdspec_affinity
942
943 self.events: List[OrchestratorEvent] = events or []
f91f0fd5 944
f67539c2
TL
945 self.memory_usage: Optional[int] = memory_usage
946 self.memory_request: Optional[int] = memory_request
947 self.memory_limit: Optional[int] = memory_limit
948
33c7a0ef
TL
949 self.cpu_percentage: Optional[str] = cpu_percentage
950
f67539c2
TL
951 self.ports: Optional[List[int]] = ports
952 self.ip: Optional[str] = ip
953
954 self.deployed_by = deployed_by
955
f6b5b4d7 956 self.is_active = is_active
e306af50 957
20effc67
TL
958 self.extra_container_args = extra_container_args
959
960 @property
961 def status(self) -> Optional[DaemonDescriptionStatus]:
962 return self._status
963
964 @status.setter
965 def status(self, new: DaemonDescriptionStatus) -> None:
966 self._status = new
967 self.status_desc = DaemonDescriptionStatus.to_str(new)
968
f67539c2
TL
969 def get_port_summary(self) -> str:
970 if not self.ports:
971 return ''
972 return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
973
974 def name(self) -> str:
9f95a23c
TL
975 return '%s.%s' % (self.daemon_type, self.daemon_id)
976
f6b5b4d7 977 def matches_service(self, service_name: Optional[str]) -> bool:
f67539c2
TL
978 assert self.daemon_id is not None
979 assert self.daemon_type is not None
9f95a23c 980 if service_name:
f67539c2 981 return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
9f95a23c
TL
982 return False
983
f67539c2
TL
984 def service_id(self) -> str:
985 assert self.daemon_id is not None
986 assert self.daemon_type is not None
f6b5b4d7 987
f67539c2
TL
988 if self._service_name:
989 if '.' in self._service_name:
990 return self._service_name.split('.', 1)[1]
991 else:
992 return ''
993
994 if self.daemon_type == 'osd':
995 if self.osdspec_affinity and self.osdspec_affinity != 'None':
996 return self.osdspec_affinity
20effc67 997 return ''
f67539c2
TL
998
999 def _match() -> str:
1000 assert self.daemon_id is not None
f6b5b4d7
TL
1001 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
1002 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
e306af50
TL
1003
1004 if not self.hostname:
1005 # TODO: can a DaemonDescription exist without a hostname?
1006 raise err
1007
1008 # use the bare hostname, not the FQDN.
1009 host = self.hostname.split('.')[0]
1010
1011 if host == self.daemon_id:
1012 # daemon_id == "host"
1013 return self.daemon_id
1014
1015 elif host in self.daemon_id:
1016 # daemon_id == "service_id.host"
1017 # daemon_id == "service_id.host.random"
1018 pre, post = self.daemon_id.rsplit(host, 1)
1019 if not pre.endswith('.'):
1020 # '.' sep missing at front of host
1021 raise err
1022 elif post and not post.startswith('.'):
1023 # '.' sep missing at end of host
1024 raise err
1911f103 1025 return pre[:-1]
e306af50
TL
1026
1027 # daemon_id == "service_id.random"
1028 if self.daemon_type == 'rgw':
1911f103 1029 v = self.daemon_id.split('.')
e306af50 1030 if len(v) in [3, 4]:
1911f103 1031 return '.'.join(v[0:2])
e306af50 1032
f91f0fd5
TL
1033 if self.daemon_type == 'iscsi':
1034 v = self.daemon_id.split('.')
1035 return '.'.join(v[0:-1])
1036
e306af50
TL
1037 # daemon_id == "service_id"
1038 return self.daemon_id
1039
f67539c2 1040 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
e306af50
TL
1041 return _match()
1042
1043 return self.daemon_id
1911f103 1044
f67539c2
TL
1045 def service_name(self) -> str:
1046 if self._service_name:
1047 return self._service_name
1048 assert self.daemon_type is not None
1049 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
1050 return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
1051 return daemon_type_to_service(self.daemon_type)
9f95a23c 1052
f67539c2 1053 def __repr__(self) -> str:
9f95a23c
TL
1054 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
1055 id=self.daemon_id)
1056
20effc67
TL
1057 def __str__(self) -> str:
1058 return f"{self.name()} in status {self.status_desc} on {self.hostname}"
1059
f67539c2
TL
1060 def to_json(self) -> dict:
1061 out: Dict[str, Any] = OrderedDict()
f6b5b4d7
TL
1062 out['daemon_type'] = self.daemon_type
1063 out['daemon_id'] = self.daemon_id
b3b6e05e 1064 out['service_name'] = self._service_name
a4b75251 1065 out['daemon_name'] = self.name()
f6b5b4d7
TL
1066 out['hostname'] = self.hostname
1067 out['container_id'] = self.container_id
1068 out['container_image_id'] = self.container_image_id
1069 out['container_image_name'] = self.container_image_name
f67539c2
TL
1070 out['container_image_digests'] = self.container_image_digests
1071 out['memory_usage'] = self.memory_usage
1072 out['memory_request'] = self.memory_request
1073 out['memory_limit'] = self.memory_limit
33c7a0ef 1074 out['cpu_percentage'] = self.cpu_percentage
f6b5b4d7 1075 out['version'] = self.version
f67539c2 1076 out['status'] = self.status.value if self.status is not None else None
f6b5b4d7
TL
1077 out['status_desc'] = self.status_desc
1078 if self.daemon_type == 'osd':
1079 out['osdspec_affinity'] = self.osdspec_affinity
1080 out['is_active'] = self.is_active
f67539c2
TL
1081 out['ports'] = self.ports
1082 out['ip'] = self.ip
b3b6e05e
TL
1083 out['rank'] = self.rank
1084 out['rank_generation'] = self.rank_generation
f6b5b4d7 1085
9f95a23c
TL
1086 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1087 'last_configured']:
1088 if getattr(self, k):
adb31ebb 1089 out[k] = datetime_to_str(getattr(self, k))
f6b5b4d7
TL
1090
1091 if self.events:
1092 out['events'] = [e.to_json() for e in self.events]
1093
1094 empty = [k for k, v in out.items() if v is None]
1095 for e in empty:
1096 del out[e]
1097 return out
9f95a23c 1098
b3b6e05e
TL
1099 def to_dict(self) -> dict:
1100 out: Dict[str, Any] = OrderedDict()
1101 out['daemon_type'] = self.daemon_type
1102 out['daemon_id'] = self.daemon_id
a4b75251 1103 out['daemon_name'] = self.name()
b3b6e05e
TL
1104 out['hostname'] = self.hostname
1105 out['container_id'] = self.container_id
1106 out['container_image_id'] = self.container_image_id
1107 out['container_image_name'] = self.container_image_name
1108 out['container_image_digests'] = self.container_image_digests
1109 out['memory_usage'] = self.memory_usage
1110 out['memory_request'] = self.memory_request
1111 out['memory_limit'] = self.memory_limit
33c7a0ef 1112 out['cpu_percentage'] = self.cpu_percentage
b3b6e05e
TL
1113 out['version'] = self.version
1114 out['status'] = self.status.value if self.status is not None else None
1115 out['status_desc'] = self.status_desc
1116 if self.daemon_type == 'osd':
1117 out['osdspec_affinity'] = self.osdspec_affinity
1118 out['is_active'] = self.is_active
1119 out['ports'] = self.ports
1120 out['ip'] = self.ip
1121
1122 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1123 'last_configured']:
1124 if getattr(self, k):
1125 out[k] = datetime_to_str(getattr(self, k))
1126
1127 if self.events:
1128 out['events'] = [e.to_dict() for e in self.events]
1129
1130 empty = [k for k, v in out.items() if v is None]
1131 for e in empty:
1132 del out[e]
1133 return out
1134
9f95a23c
TL
1135 @classmethod
1136 @handle_type_error
f67539c2 1137 def from_json(cls, data: dict) -> 'DaemonDescription':
9f95a23c 1138 c = data.copy()
f6b5b4d7 1139 event_strs = c.pop('events', [])
9f95a23c
TL
1140 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1141 'last_configured']:
1142 if k in c:
adb31ebb 1143 c[k] = str_to_datetime(c[k])
f6b5b4d7 1144 events = [OrchestratorEvent.from_json(e) for e in event_strs]
f67539c2 1145 status_int = c.pop('status', None)
a4b75251
TL
1146 if 'daemon_name' in c:
1147 del c['daemon_name']
20effc67
TL
1148 if 'service_name' in c and c['service_name'].startswith('osd.'):
1149 # if the service_name is a osd.NNN (numeric osd id) then
1150 # ignore it -- it is not a valid service_name and
1151 # (presumably) came from an older version of cephadm.
1152 try:
1153 int(c['service_name'][4:])
1154 del c['service_name']
1155 except ValueError:
1156 pass
f67539c2
TL
1157 status = DaemonDescriptionStatus(status_int) if status_int is not None else None
1158 return cls(events=events, status=status, **c)
9f95a23c 1159
f67539c2 1160 def __copy__(self) -> 'DaemonDescription':
1911f103
TL
1161 # feel free to change this:
1162 return DaemonDescription.from_json(self.to_json())
1163
f6b5b4d7 1164 @staticmethod
f67539c2 1165 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
522d829b 1166 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
f6b5b4d7
TL
1167
1168
1169yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1170
1171
9f95a23c
TL
1172class ServiceDescription(object):
1173 """
1174 For responding to queries about the status of a particular service,
1175 stateful or stateless.
1176
1177 This is not about health or performance monitoring of services: it's
1178 about letting the orchestrator tell Ceph whether and where a
1179 service is scheduled in the cluster. When an orchestrator tells
1180 Ceph "it's running on host123", that's not a promise that the process
1181 is literally up this second, it's a description of where the orchestrator
1182 has decided the service should run.
1183 """
1184
1185 def __init__(self,
1911f103 1186 spec: ServiceSpec,
f67539c2
TL
1187 container_image_id: Optional[str] = None,
1188 container_image_name: Optional[str] = None,
f67539c2
TL
1189 service_url: Optional[str] = None,
1190 last_refresh: Optional[datetime.datetime] = None,
1191 created: Optional[datetime.datetime] = None,
1192 deleted: Optional[datetime.datetime] = None,
1193 size: int = 0,
1194 running: int = 0,
1195 events: Optional[List['OrchestratorEvent']] = None,
1196 virtual_ip: Optional[str] = None,
1197 ports: List[int] = []) -> None:
9f95a23c
TL
1198 # Not everyone runs in containers, but enough people do to
1199 # justify having the container_image_id (image hash) and container_image
1200 # (image name)
1201 self.container_image_id = container_image_id # image hash
1202 self.container_image_name = container_image_name # image friendly name
1203
9f95a23c
TL
1204 # If the service exposes REST-like API, this attribute should hold
1205 # the URL.
1206 self.service_url = service_url
1207
1208 # Number of daemons
1209 self.size = size
1210
1211 # Number of daemons up
1212 self.running = running
1213
1214 # datetime when this info was last refreshed
f6b5b4d7
TL
1215 self.last_refresh: Optional[datetime.datetime] = last_refresh
1216 self.created: Optional[datetime.datetime] = created
f67539c2 1217 self.deleted: Optional[datetime.datetime] = deleted
9f95a23c 1218
1911f103 1219 self.spec: ServiceSpec = spec
9f95a23c 1220
f6b5b4d7
TL
1221 self.events: List[OrchestratorEvent] = events or []
1222
f67539c2
TL
1223 self.virtual_ip = virtual_ip
1224 self.ports = ports
1225
1226 def service_type(self) -> str:
1911f103 1227 return self.spec.service_type
9f95a23c 1228
f67539c2 1229 def __repr__(self) -> str:
1911f103 1230 return f"<ServiceDescription of {self.spec.one_line_str()}>"
9f95a23c 1231
f67539c2
TL
1232 def get_port_summary(self) -> str:
1233 if not self.ports:
1234 return ''
1235 return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1236
f6b5b4d7 1237 def to_json(self) -> OrderedDict:
1911f103
TL
1238 out = self.spec.to_json()
1239 status = {
9f95a23c
TL
1240 'container_image_id': self.container_image_id,
1241 'container_image_name': self.container_image_name,
9f95a23c
TL
1242 'service_url': self.service_url,
1243 'size': self.size,
1244 'running': self.running,
1911f103 1245 'last_refresh': self.last_refresh,
f6b5b4d7 1246 'created': self.created,
f67539c2
TL
1247 'virtual_ip': self.virtual_ip,
1248 'ports': self.ports if self.ports else None,
9f95a23c
TL
1249 }
1250 for k in ['last_refresh', 'created']:
1251 if getattr(self, k):
adb31ebb 1252 status[k] = datetime_to_str(getattr(self, k))
1911f103
TL
1253 status = {k: v for (k, v) in status.items() if v is not None}
1254 out['status'] = status
f6b5b4d7
TL
1255 if self.events:
1256 out['events'] = [e.to_json() for e in self.events]
1911f103 1257 return out
9f95a23c 1258
b3b6e05e
TL
1259 def to_dict(self) -> OrderedDict:
1260 out = self.spec.to_json()
1261 status = {
1262 'container_image_id': self.container_image_id,
1263 'container_image_name': self.container_image_name,
b3b6e05e
TL
1264 'service_url': self.service_url,
1265 'size': self.size,
1266 'running': self.running,
1267 'last_refresh': self.last_refresh,
1268 'created': self.created,
1269 'virtual_ip': self.virtual_ip,
1270 'ports': self.ports if self.ports else None,
1271 }
1272 for k in ['last_refresh', 'created']:
1273 if getattr(self, k):
1274 status[k] = datetime_to_str(getattr(self, k))
1275 status = {k: v for (k, v) in status.items() if v is not None}
1276 out['status'] = status
1277 if self.events:
1278 out['events'] = [e.to_dict() for e in self.events]
1279 return out
1280
9f95a23c
TL
1281 @classmethod
1282 @handle_type_error
f67539c2 1283 def from_json(cls, data: dict) -> 'ServiceDescription':
9f95a23c 1284 c = data.copy()
1911f103 1285 status = c.pop('status', {})
f6b5b4d7 1286 event_strs = c.pop('events', [])
1911f103
TL
1287 spec = ServiceSpec.from_json(c)
1288
1289 c_status = status.copy()
9f95a23c 1290 for k in ['last_refresh', 'created']:
1911f103 1291 if k in c_status:
adb31ebb 1292 c_status[k] = str_to_datetime(c_status[k])
f6b5b4d7
TL
1293 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1294 return cls(spec=spec, events=events, **c_status)
1295
1296 @staticmethod
522d829b
TL
1297 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
1298 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
f6b5b4d7
TL
1299
1300
1301yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
9f95a23c
TL
1302
1303
1304class InventoryFilter(object):
1305 """
1306 When fetching inventory, use this filter to avoid unnecessarily
1307 scanning the whole estate.
1308
1d09f67e 1309 Typical use:
9f95a23c 1310
1d09f67e
TL
1311 filter by host when presentig UI workflow for configuring
1312 a particular server.
1313 filter by label when not all of estate is Ceph servers,
1314 and we want to only learn about the Ceph servers.
1315 filter by label when we are interested particularly
1316 in e.g. OSD servers.
9f95a23c 1317 """
f6b5b4d7
TL
1318
1319 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
9f95a23c
TL
1320
1321 #: Optional: get info about hosts matching labels
1322 self.labels = labels
1323
1324 #: Optional: get info about certain named hosts only
1325 self.hosts = hosts
1326
1327
1328class InventoryHost(object):
1329 """
1330 When fetching inventory, all Devices are groups inside of an
1331 InventoryHost.
1332 """
f6b5b4d7
TL
1333
1334 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
9f95a23c
TL
1335 if devices is None:
1336 devices = inventory.Devices([])
1337 if labels is None:
1338 labels = []
1339 assert isinstance(devices, inventory.Devices)
1340
1341 self.name = name # unique within cluster. For example a hostname.
1342 self.addr = addr or name
1343 self.devices = devices
1344 self.labels = labels
1345
f67539c2 1346 def to_json(self) -> dict:
9f95a23c
TL
1347 return {
1348 'name': self.name,
1349 'addr': self.addr,
1350 'devices': self.devices.to_json(),
1351 'labels': self.labels,
1352 }
1353
1354 @classmethod
f67539c2 1355 def from_json(cls, data: dict) -> 'InventoryHost':
9f95a23c
TL
1356 try:
1357 _data = copy.deepcopy(data)
1358 name = _data.pop('name')
1359 addr = _data.pop('addr', None) or name
1360 devices = inventory.Devices.from_json(_data.pop('devices'))
1911f103 1361 labels = _data.pop('labels', list())
9f95a23c
TL
1362 if _data:
1363 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1364 raise OrchestratorValidationError(error_msg)
9f95a23c
TL
1365 return cls(name, devices, labels, addr)
1366 except KeyError as e:
1367 error_msg = '{} is required for {}'.format(e, cls.__name__)
1368 raise OrchestratorValidationError(error_msg)
1369 except TypeError as e:
1370 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1371
9f95a23c 1372 @classmethod
f67539c2 1373 def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
9f95a23c
TL
1374 devs = inventory.Devices.from_json
1375 return [cls(item[0], devs(item[1].data)) for item in hosts]
1376
f67539c2 1377 def __repr__(self) -> str:
9f95a23c
TL
1378 return "<InventoryHost>({name})".format(name=self.name)
1379
1380 @staticmethod
f6b5b4d7 1381 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
9f95a23c
TL
1382 return [host.name for host in hosts]
1383
f67539c2 1384 def __eq__(self, other: Any) -> bool:
9f95a23c
TL
1385 return self.name == other.name and self.devices == other.devices
1386
1387
1388class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1389 """
1390 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1391 on devices.
1392
1393 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1394
1395 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1396 See ``ceph osd metadata | jq '.[].device_ids'``
1397 """
1398 __slots__ = ()
1399
1400
f6b5b4d7
TL
1401class OrchestratorEvent:
1402 """
1403 Similar to K8s Events.
1404
1405 Some form of "important" log message attached to something.
1406 """
1407 INFO = 'INFO'
1408 ERROR = 'ERROR'
1409 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1410
f67539c2
TL
1411 def __init__(self, created: Union[str, datetime.datetime], kind: str,
1412 subject: str, level: str, message: str) -> None:
f6b5b4d7 1413 if isinstance(created, str):
adb31ebb 1414 created = str_to_datetime(created)
f6b5b4d7
TL
1415 self.created: datetime.datetime = created
1416
1417 assert kind in "service daemon".split()
1418 self.kind: str = kind
1419
1420 # service name, or daemon danem or something
1421 self.subject: str = subject
1422
1423 # Events are not meant for debugging. debugs should end in the log.
1424 assert level in "INFO ERROR".split()
1425 self.level = level
1426
1427 self.message: str = message
1428
1429 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1430
1431 def kind_subject(self) -> str:
1432 return f'{self.kind}:{self.subject}'
1433
1434 def to_json(self) -> str:
1435 # Make a long list of events readable.
adb31ebb 1436 created = datetime_to_str(self.created)
f6b5b4d7
TL
1437 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1438
b3b6e05e
TL
1439 def to_dict(self) -> dict:
1440 # Convert events data to dict.
1441 return {
1442 'created': datetime_to_str(self.created),
1443 'subject': self.kind_subject(),
1444 'level': self.level,
1445 'message': self.message
1446 }
1447
f6b5b4d7
TL
1448 @classmethod
1449 @handle_type_error
f67539c2 1450 def from_json(cls, data: str) -> "OrchestratorEvent":
f6b5b4d7
TL
1451 """
1452 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
adb31ebb 1453 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
f6b5b4d7
TL
1454
1455 :param data:
1456 :return:
1457 """
1458 match = cls.regex_v1.match(data)
1459 if match:
1460 return cls(*match.groups())
1461 raise ValueError(f'Unable to match: "{data}"')
1462
f67539c2 1463 def __eq__(self, other: Any) -> bool:
f6b5b4d7
TL
1464 if not isinstance(other, OrchestratorEvent):
1465 return False
1466
1467 return self.created == other.created and self.kind == other.kind \
1468 and self.subject == other.subject and self.message == other.message
1469
20effc67
TL
1470 def __repr__(self) -> str:
1471 return f'OrchestratorEvent.from_json({self.to_json()!r})'
1472
f6b5b4d7 1473
f67539c2 1474def _mk_orch_methods(cls: Any) -> Any:
9f95a23c
TL
1475 # Needs to be defined outside of for.
1476 # Otherwise meth is always bound to last key
f67539c2
TL
1477 def shim(method_name: str) -> Callable:
1478 def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
9f95a23c
TL
1479 completion = self._oremote(method_name, args, kwargs)
1480 return completion
1481 return inner
1482
1d09f67e
TL
1483 for name, method in Orchestrator.__dict__.items():
1484 if not name.startswith('_') and name not in ['is_orchestrator_module']:
1485 remote_call = update_wrapper(shim(name), method)
1486 setattr(cls, name, remote_call)
9f95a23c
TL
1487 return cls
1488
1489
1490@_mk_orch_methods
1491class OrchestratorClientMixin(Orchestrator):
1492 """
1493 A module that inherents from `OrchestratorClientMixin` can directly call
1494 all :class:`Orchestrator` methods without manually calling remote.
1495
1496 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1497 calls :func:`OrchestratorClientMixin._oremote`
1498
1499 >>> class MyModule(OrchestratorClientMixin):
1500 ... def func(self):
1501 ... completion = self.add_host('somehost') # calls `_oremote()`
9f95a23c
TL
1502 ... self.log.debug(completion.result)
1503
1504 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1505 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1506 "real" implementation of the orchestrator.
1507
1508
1509 >>> import mgr_module
1911f103
TL
1510 >>> #doctest: +SKIP
1511 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
9f95a23c
TL
1512 ... def __init__(self, ...):
1513 ... self.orch_client = OrchestratorClientMixin()
1514 ... self.orch_client.set_mgr(self.mgr))
1515 """
1516
f6b5b4d7 1517 def set_mgr(self, mgr: MgrModule) -> None:
9f95a23c
TL
1518 """
1519 Useable in the Dashbord that uses a global ``mgr``
1520 """
1521
1522 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1523
f67539c2 1524 def __get_mgr(self) -> Any:
9f95a23c
TL
1525 try:
1526 return self.__mgr
1527 except AttributeError:
1528 return self
1529
f67539c2 1530 def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
9f95a23c
TL
1531 """
1532 Helper for invoking `remote` on whichever orchestrator is enabled
1533
1534 :raises RuntimeError: If the remote method failed.
1535 :raises OrchestratorError: orchestrator failed to perform
1536 :raises ImportError: no `orchestrator` module or backend not found.
1537 """
1538 mgr = self.__get_mgr()
1539
1540 try:
1541 o = mgr._select_orchestrator()
1542 except AttributeError:
1543 o = mgr.remote('orchestrator', '_select_orchestrator')
1544
1545 if o is None:
1546 raise NoOrchestrator()
1547
1548 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1549 try:
1550 return mgr.remote(o, meth, *args, **kwargs)
1551 except Exception as e:
1552 if meth == 'get_feature_set':
1553 raise # self.get_feature_set() calls self._oremote()
1554 f_set = self.get_feature_set()
1555 if meth not in f_set or not f_set[meth]['available']:
1556 raise NotImplementedError(f'{o} does not implement {meth}') from e
1557 raise