]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/orchestrator/_interface.py
import ceph 16.2.7
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
CommitLineData
9f95a23c
TL
1
2"""
3ceph-mgr orchestrator interface
4
5Please see the ceph-mgr module developer's guide for more information.
6"""
e306af50
TL
7
8import copy
9import datetime
f67539c2 10import enum
e306af50 11import errno
9f95a23c
TL
12import logging
13import pickle
e306af50 14import re
e306af50 15
f6b5b4d7
TL
16from collections import namedtuple, OrderedDict
17from contextlib import contextmanager
f67539c2
TL
18from functools import wraps, reduce
19
20from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
522d829b 21 Sequence, Dict, cast, Mapping
f67539c2
TL
22
23try:
24 from typing import Protocol # Protocol was added in Python 3.8
25except ImportError:
26 class Protocol: # type: ignore
27 pass
28
9f95a23c 29
f6b5b4d7
TL
30import yaml
31
9f95a23c
TL
32from ceph.deployment import inventory
33from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
f67539c2 34 IscsiServiceSpec, IngressSpec
9f95a23c 35from ceph.deployment.drive_group import DriveGroupSpec
f67539c2 36from ceph.deployment.hostspec import HostSpec, SpecValidationError
adb31ebb 37from ceph.utils import datetime_to_str, str_to_datetime
9f95a23c
TL
38
39from mgr_module import MgrModule, CLICommand, HandleCommandResult
40
9f95a23c
TL
41
42logger = logging.getLogger(__name__)
43
f6b5b4d7 44T = TypeVar('T')
f67539c2 45FuncT = TypeVar('FuncT', bound=Callable[..., Any])
f6b5b4d7 46
9f95a23c
TL
47
48class OrchestratorError(Exception):
49 """
50 General orchestrator specific error.
51
52 Used for deployment, configuration or user errors.
53
54 It's not intended for programming errors or orchestrator internal errors.
55 """
56
f6b5b4d7
TL
57 def __init__(self,
58 msg: str,
59 errno: int = -errno.EINVAL,
f67539c2 60 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
f6b5b4d7
TL
61 super(Exception, self).__init__(msg)
62 self.errno = errno
63 # See OrchestratorEvent.subject
64 self.event_subject = event_kind_subject
65
9f95a23c
TL
66
67class NoOrchestrator(OrchestratorError):
68 """
69 No orchestrator in configured.
70 """
f6b5b4d7 71
f67539c2 72 def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
f6b5b4d7 73 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
9f95a23c
TL
74
75
76class OrchestratorValidationError(OrchestratorError):
77 """
78 Raised when an orchestrator doesn't support a specific feature.
79 """
80
81
f6b5b4d7 82@contextmanager
f67539c2 83def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
f6b5b4d7
TL
84 try:
85 yield
86 except OrchestratorError as e:
87 if overwrite or hasattr(e, 'event_subject'):
88 e.event_subject = (kind, subject)
89 raise
90
91
f67539c2 92def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
9f95a23c 93 @wraps(func)
f67539c2 94 def wrapper(*args: Any, **kwargs: Any) -> Any:
9f95a23c
TL
95 try:
96 return func(*args, **kwargs)
f67539c2 97 except (OrchestratorError, SpecValidationError) as e:
9f95a23c 98 # Do not print Traceback for expected errors.
f6b5b4d7
TL
99 return HandleCommandResult(e.errno, stderr=str(e))
100 except ImportError as e:
9f95a23c
TL
101 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
102 except NotImplementedError:
103 msg = 'This Orchestrator does not support `{}`'.format(prefix)
104 return HandleCommandResult(-errno.ENOENT, stderr=msg)
105
f67539c2
TL
106 # misuse lambda to copy `wrapper`
107 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731
9f95a23c 108 wrapper_copy._prefix = prefix # type: ignore
f67539c2
TL
109 wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore
110 wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
9f95a23c
TL
111 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
112
f67539c2 113 return cast(FuncT, wrapper_copy)
9f95a23c
TL
114
115
f67539c2
TL
116def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
117 """
118 Decorator to make Orchestrator methods return
119 an OrchResult.
120 """
121
122 @wraps(f)
123 def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
124 try:
125 return OrchResult(f(*args, **kwargs))
126 except Exception as e:
a4b75251 127 logger.exception(e)
f67539c2
TL
128 return OrchResult(None, exception=e)
129
130 return cast(Callable[..., OrchResult[T]], wrapper)
131
132
133class InnerCliCommandCallable(Protocol):
134 def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
135 ...
136
137
138def _cli_command(perm: str) -> InnerCliCommandCallable:
139 def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
140 return lambda func: handle_exception(prefix, perm, func)
9f95a23c
TL
141 return inner_cli_command
142
143
144_cli_read_command = _cli_command('r')
145_cli_write_command = _cli_command('rw')
146
147
148class CLICommandMeta(type):
149 """
150 This is a workaround for the use of a global variable CLICommand.COMMANDS which
151 prevents modules from importing any other module.
152
153 We make use of CLICommand, except for the use of the global variable.
154 """
f67539c2 155 def __init__(cls, name: str, bases: Any, dct: Any) -> None:
9f95a23c 156 super(CLICommandMeta, cls).__init__(name, bases, dct)
f6b5b4d7 157 dispatch: Dict[str, CLICommand] = {}
9f95a23c
TL
158 for v in dct.values():
159 try:
160 dispatch[v._prefix] = v._cli_command
161 except AttributeError:
162 pass
163
f67539c2 164 def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
9f95a23c
TL
165 if cmd['prefix'] not in dispatch:
166 return self.handle_command(inbuf, cmd)
167
168 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
169
170 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
171 cls.handle_command = handle_command
172
173
f67539c2
TL
174class OrchResult(Generic[T]):
175 """
176 Stores a result and an exception. Mainly to circumvent the
177 MgrModule.remote() method that hides all exceptions and for
178 handling different sub-interpreters.
9f95a23c 179 """
9f95a23c 180
f67539c2
TL
181 def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
182 self.result = result
183 self.serialized_exception: Optional[bytes] = None
184 self.exception_str: str = ''
185 self.set_exception(exception)
9f95a23c 186
f67539c2 187 __slots__ = 'result', 'serialized_exception', 'exception_str'
9f95a23c 188
f67539c2
TL
189 def set_exception(self, e: Optional[Exception]) -> None:
190 if e is None:
191 self.serialized_exception = None
192 self.exception_str = ''
193 return
9f95a23c 194
f67539c2 195 self.exception_str = f'{type(e)}: {str(e)}'
9f95a23c 196 try:
f67539c2 197 self.serialized_exception = pickle.dumps(e)
9f95a23c
TL
198 except pickle.PicklingError:
199 logger.error(f"failed to pickle {e}")
200 if isinstance(e, Exception):
201 e = Exception(*e.args)
202 else:
203 e = Exception(str(e))
204 # degenerate to a plain Exception
f67539c2 205 self.serialized_exception = pickle.dumps(e)
9f95a23c 206
f6b5b4d7 207 def result_str(self) -> str:
9f95a23c
TL
208 """Force a string."""
209 if self.result is None:
210 return ''
211 if isinstance(self.result, list):
212 return '\n'.join(str(x) for x in self.result)
213 return str(self.result)
214
9f95a23c 215
f67539c2 216def raise_if_exception(c: OrchResult[T]) -> T:
9f95a23c 217 """
f67539c2 218 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
9f95a23c
TL
219 """
220 if c.serialized_exception is not None:
221 try:
222 e = pickle.loads(c.serialized_exception)
223 except (KeyError, AttributeError):
f67539c2 224 raise Exception(c.exception_str)
9f95a23c 225 raise e
f67539c2
TL
226 assert c.result is not None, 'OrchResult should either have an exception or a result'
227 return c.result
9f95a23c
TL
228
229
f67539c2
TL
230def _hide_in_features(f: FuncT) -> FuncT:
231 f._hide_in_features = True # type: ignore
9f95a23c
TL
232 return f
233
234
235class Orchestrator(object):
236 """
237 Calls in this class may do long running remote operations, with time
238 periods ranging from network latencies to package install latencies and large
239 internet downloads. For that reason, all are asynchronous, and return
240 ``Completion`` objects.
241
242 Methods should only return the completion and not directly execute
243 anything, like network calls. Otherwise the purpose of
244 those completions is defeated.
245
246 Implementations are not required to start work on an operation until
247 the caller waits on the relevant Completion objects. Callers making
248 multiple updates should not wait on Completions until they're done
249 sending operations: this enables implementations to batch up a series
250 of updates when wait() is called on a set of Completion objects.
251
252 Implementations are encouraged to keep reasonably fresh caches of
253 the status of the system: it is better to serve a stale-but-recent
254 result read of e.g. device inventory than it is to keep the caller waiting
255 while you scan hosts every time.
256 """
257
258 @_hide_in_features
f67539c2 259 def is_orchestrator_module(self) -> bool:
9f95a23c
TL
260 """
261 Enable other modules to interrogate this module to discover
262 whether it's usable as an orchestrator module.
263
264 Subclasses do not need to override this.
265 """
266 return True
267
268 @_hide_in_features
f67539c2 269 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
9f95a23c
TL
270 """
271 Report whether we can talk to the orchestrator. This is the
272 place to give the user a meaningful message if the orchestrator
273 isn't running or can't be contacted.
274
275 This method may be called frequently (e.g. every page load
276 to conditionally display a warning banner), so make sure it's
277 not too expensive. It's okay to give a slightly stale status
278 (e.g. based on a periodic background ping of the orchestrator)
279 if that's necessary to make this method fast.
280
281 .. note::
282 `True` doesn't mean that the desired functionality
283 is actually available in the orchestrator. I.e. this
284 won't work as expected::
285
1911f103
TL
286 >>> #doctest: +SKIP
287 ... if OrchestratorClientMixin().available()[0]: # wrong.
9f95a23c
TL
288 ... OrchestratorClientMixin().get_hosts()
289
f67539c2
TL
290 :return: boolean representing whether the module is available/usable
291 :return: string describing any error
292 :return: dict containing any module specific information
9f95a23c
TL
293 """
294 raise NotImplementedError()
295
296 @_hide_in_features
f67539c2 297 def get_feature_set(self) -> Dict[str, dict]:
9f95a23c
TL
298 """Describes which methods this orchestrator implements
299
300 .. note::
301 `True` doesn't mean that the desired functionality
302 is actually possible in the orchestrator. I.e. this
303 won't work as expected::
304
1911f103
TL
305 >>> #doctest: +SKIP
306 ... api = OrchestratorClientMixin()
9f95a23c
TL
307 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
308 ... api.get_hosts()
309
310 It's better to ask for forgiveness instead::
311
1911f103
TL
312 >>> #doctest: +SKIP
313 ... try:
9f95a23c
TL
314 ... OrchestratorClientMixin().get_hosts()
315 ... except (OrchestratorError, NotImplementedError):
316 ... ...
317
318 :returns: Dict of API method names to ``{'available': True or False}``
319 """
320 module = self.__class__
321 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
322 for a in Orchestrator.__dict__
323 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
324 }
325 return features
326
f6b5b4d7 327 def cancel_completions(self) -> None:
9f95a23c
TL
328 """
329 Cancels ongoing completions. Unstuck the mgr.
330 """
331 raise NotImplementedError()
332
f6b5b4d7 333 def pause(self) -> None:
9f95a23c
TL
334 raise NotImplementedError()
335
f6b5b4d7 336 def resume(self) -> None:
9f95a23c
TL
337 raise NotImplementedError()
338
f67539c2 339 def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
9f95a23c
TL
340 """
341 Add a host to the orchestrator inventory.
342
343 :param host: hostname
344 """
345 raise NotImplementedError()
346
522d829b 347 def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
9f95a23c
TL
348 """
349 Remove a host from the orchestrator inventory.
350
351 :param host: hostname
352 """
353 raise NotImplementedError()
354
522d829b
TL
355 def drain_host(self, hostname: str) -> OrchResult[str]:
356 """
357 drain all daemons from a host
358
359 :param hostname: hostname
360 """
361 raise NotImplementedError()
362
f67539c2 363 def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
9f95a23c
TL
364 """
365 Update a host's address
366
367 :param host: hostname
368 :param addr: address (dns name or IP)
369 """
370 raise NotImplementedError()
371
f67539c2 372 def get_hosts(self) -> OrchResult[List[HostSpec]]:
9f95a23c
TL
373 """
374 Report the hosts in the cluster.
375
376 :return: list of HostSpec
377 """
378 raise NotImplementedError()
379
a4b75251
TL
380 def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
381 """
382 Return hosts metadata(gather_facts).
383 """
384 raise NotImplementedError()
385
f67539c2 386 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
9f95a23c
TL
387 """
388 Add a host label
389 """
390 raise NotImplementedError()
391
f67539c2 392 def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
9f95a23c
TL
393 """
394 Remove a host label
395 """
396 raise NotImplementedError()
397
f67539c2 398 def host_ok_to_stop(self, hostname: str) -> OrchResult:
f6b5b4d7
TL
399 """
400 Check if the specified host can be safely stopped without reducing availability
401
402 :param host: hostname
403 """
404 raise NotImplementedError()
405
f67539c2
TL
406 def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
407 """
408 Place a host in maintenance, stopping daemons and disabling it's systemd target
409 """
410 raise NotImplementedError()
411
412 def exit_host_maintenance(self, hostname: str) -> OrchResult:
413 """
414 Return a host from maintenance, restarting the clusters systemd target
415 """
416 raise NotImplementedError()
417
418 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
9f95a23c
TL
419 """
420 Returns something that was created by `ceph-volume inventory`.
421
422 :return: list of InventoryHost
423 """
424 raise NotImplementedError()
425
f67539c2 426 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
9f95a23c
TL
427 """
428 Describe a service (of any kind) that is already configured in
429 the orchestrator. For example, when viewing an OSD in the dashboard
430 we might like to also display information about the orchestrator's
431 view of the service (like the kubernetes pod ID).
432
433 When viewing a CephFS filesystem in the dashboard, we would use this
434 to display the pods being currently run for MDS daemons.
435
436 :return: list of ServiceDescription objects.
437 """
438 raise NotImplementedError()
439
f67539c2 440 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
9f95a23c
TL
441 """
442 Describe a daemon (of any kind) that is already configured in
443 the orchestrator.
444
445 :return: list of DaemonDescription objects.
446 """
447 raise NotImplementedError()
448
f67539c2
TL
449 @handle_orch_error
450 def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
9f95a23c
TL
451 """
452 Applies any spec
453 """
f67539c2 454 fns: Dict[str, Callable[..., OrchResult[str]]] = {
9f95a23c
TL
455 'alertmanager': self.apply_alertmanager,
456 'crash': self.apply_crash,
457 'grafana': self.apply_grafana,
e306af50 458 'iscsi': self.apply_iscsi,
9f95a23c
TL
459 'mds': self.apply_mds,
460 'mgr': self.apply_mgr,
461 'mon': self.apply_mon,
e306af50 462 'nfs': self.apply_nfs,
9f95a23c 463 'node-exporter': self.apply_node_exporter,
f67539c2 464 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
9f95a23c
TL
465 'prometheus': self.apply_prometheus,
466 'rbd-mirror': self.apply_rbd_mirror,
e306af50 467 'rgw': self.apply_rgw,
f67539c2 468 'ingress': self.apply_ingress,
e306af50 469 'host': self.add_host,
f67539c2 470 'cephadm-exporter': self.apply_cephadm_exporter,
9f95a23c
TL
471 }
472
f67539c2
TL
473 def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
474 l_res = raise_if_exception(l)
475 r_res = raise_if_exception(r)
476 l_res.append(r_res)
477 return OrchResult(l_res)
478 return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
9f95a23c 479
f67539c2 480 def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
f6b5b4d7
TL
481 """
482 Plan (Dry-run, Preview) a List of Specs.
483 """
484 raise NotImplementedError()
485
f67539c2 486 def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
9f95a23c
TL
487 """
488 Remove specific daemon(s).
489
490 :return: None
491 """
492 raise NotImplementedError()
493
a4b75251 494 def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
9f95a23c
TL
495 """
496 Remove a service (a collection of daemons).
497
498 :return: None
499 """
500 raise NotImplementedError()
501
f67539c2 502 def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
9f95a23c
TL
503 """
504 Perform an action (start/stop/reload) on a service (i.e., all daemons
505 providing the logical service).
506
507 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
f6b5b4d7
TL
508 :param service_name: service_type + '.' + service_id
509 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
f67539c2 510 :rtype: OrchResult
9f95a23c 511 """
f6b5b4d7 512 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
9f95a23c
TL
513 raise NotImplementedError()
514
f67539c2 515 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
9f95a23c
TL
516 """
517 Perform an action (start/stop/reload) on a daemon.
518
519 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
f6b5b4d7
TL
520 :param daemon_name: name of daemon
521 :param image: Container image when redeploying that daemon
f67539c2 522 :rtype: OrchResult
9f95a23c 523 """
f6b5b4d7 524 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
9f95a23c
TL
525 raise NotImplementedError()
526
f67539c2 527 def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
9f95a23c
TL
528 """
529 Create one or more OSDs within a single Drive Group.
530
531 The principal argument here is the drive_group member
532 of OsdSpec: other fields are advisory/extensible for any
533 finer-grained OSD feature enablement (choice of backing store,
534 compression/encryption, etc).
535 """
536 raise NotImplementedError()
537
f67539c2 538 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
9f95a23c
TL
539 """ Update OSD cluster """
540 raise NotImplementedError()
541
e306af50
TL
542 def set_unmanaged_flag(self,
543 unmanaged_flag: bool,
544 service_type: str = 'osd',
f67539c2 545 service_name: Optional[str] = None
e306af50 546 ) -> HandleCommandResult:
1911f103
TL
547 raise NotImplementedError()
548
e306af50
TL
549 def preview_osdspecs(self,
550 osdspec_name: Optional[str] = 'osd',
551 osdspecs: Optional[List[DriveGroupSpec]] = None
f67539c2 552 ) -> OrchResult[str]:
1911f103
TL
553 """ Get a preview for OSD deployments """
554 raise NotImplementedError()
555
9f95a23c
TL
556 def remove_osds(self, osd_ids: List[str],
557 replace: bool = False,
a4b75251
TL
558 force: bool = False,
559 zap: bool = False) -> OrchResult[str]:
9f95a23c
TL
560 """
561 :param osd_ids: list of OSD IDs
562 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
563 :param force: Forces the OSD removal process without waiting for the data to be drained first.
a4b75251 564 :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
9f95a23c
TL
565 Note that this can only remove OSDs that were successfully
566 created (i.e. got an OSD ID).
567 """
568 raise NotImplementedError()
569
f67539c2 570 def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
f6b5b4d7
TL
571 """
572 TODO
573 """
574 raise NotImplementedError()
575
f67539c2 576 def remove_osds_status(self) -> OrchResult:
9f95a23c
TL
577 """
578 Returns a status of the ongoing OSD removal operations.
579 """
580 raise NotImplementedError()
581
f67539c2 582 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
9f95a23c
TL
583 """
584 Instructs the orchestrator to enable or disable either the ident or the fault LED.
585
586 :param ident_fault: either ``"ident"`` or ``"fault"``
587 :param on: ``True`` = on.
588 :param locations: See :class:`orchestrator.DeviceLightLoc`
589 """
590 raise NotImplementedError()
591
f67539c2 592 def zap_device(self, host: str, path: str) -> OrchResult[str]:
9f95a23c
TL
593 """Zap/Erase a device (DESTROYS DATA)"""
594 raise NotImplementedError()
595
f67539c2
TL
596 def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
597 """Create daemons daemon(s) for unmanaged services"""
9f95a23c
TL
598 raise NotImplementedError()
599
f67539c2 600 def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
601 """Update mon cluster"""
602 raise NotImplementedError()
603
f67539c2 604 def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
605 """Update mgr cluster"""
606 raise NotImplementedError()
607
f67539c2 608 def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
609 """Update MDS cluster"""
610 raise NotImplementedError()
611
f67539c2 612 def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
801d1391
TL
613 """Update RGW cluster"""
614 raise NotImplementedError()
615
f67539c2
TL
616 def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
617 """Update ingress daemons"""
9f95a23c
TL
618 raise NotImplementedError()
619
f67539c2 620 def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
621 """Update rbd-mirror cluster"""
622 raise NotImplementedError()
623
f67539c2 624 def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
9f95a23c
TL
625 """Update NFS cluster"""
626 raise NotImplementedError()
627
f67539c2 628 def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
1911f103
TL
629 """Update iscsi cluster"""
630 raise NotImplementedError()
631
f67539c2 632 def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
633 """Update prometheus cluster"""
634 raise NotImplementedError()
635
f67539c2 636 def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
637 """Update existing a Node-Exporter daemon(s)"""
638 raise NotImplementedError()
639
f67539c2 640 def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
641 """Update existing a crash daemon(s)"""
642 raise NotImplementedError()
643
f67539c2
TL
644 def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
645 """Update existing a grafana service"""
9f95a23c
TL
646 raise NotImplementedError()
647
f67539c2
TL
648 def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
649 """Update an existing AlertManager daemon(s)"""
9f95a23c
TL
650 raise NotImplementedError()
651
f67539c2
TL
652 def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
653 """Update an existing cephadm exporter daemon"""
9f95a23c
TL
654 raise NotImplementedError()
655
f67539c2 656 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
9f95a23c
TL
657 raise NotImplementedError()
658
a4b75251
TL
659 def upgrade_ls(self, image: Optional[str], tags: bool) -> OrchResult[Dict[Any, Any]]:
660 raise NotImplementedError()
661
f67539c2 662 def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
9f95a23c
TL
663 raise NotImplementedError()
664
f67539c2 665 def upgrade_pause(self) -> OrchResult[str]:
9f95a23c
TL
666 raise NotImplementedError()
667
f67539c2 668 def upgrade_resume(self) -> OrchResult[str]:
9f95a23c
TL
669 raise NotImplementedError()
670
f67539c2 671 def upgrade_stop(self) -> OrchResult[str]:
9f95a23c
TL
672 raise NotImplementedError()
673
f67539c2 674 def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
9f95a23c
TL
675 """
676 If an upgrade is currently underway, report on where
677 we are in the process, or if some error has occurred.
678
679 :return: UpgradeStatusSpec instance
680 """
681 raise NotImplementedError()
682
683 @_hide_in_features
f67539c2 684 def upgrade_available(self) -> OrchResult:
9f95a23c
TL
685 """
686 Report on what versions are available to upgrade to
687
688 :return: List of strings
689 """
690 raise NotImplementedError()
691
692
e306af50
TL
693GenericSpec = Union[ServiceSpec, HostSpec]
694
f6b5b4d7
TL
695
696def json_to_generic_spec(spec: dict) -> GenericSpec:
e306af50
TL
697 if 'service_type' in spec and spec['service_type'] == 'host':
698 return HostSpec.from_json(spec)
699 else:
700 return ServiceSpec.from_json(spec)
9f95a23c 701
f6b5b4d7 702
f67539c2
TL
703def daemon_type_to_service(dtype: str) -> str:
704 mapping = {
705 'mon': 'mon',
706 'mgr': 'mgr',
707 'mds': 'mds',
708 'rgw': 'rgw',
709 'osd': 'osd',
710 'haproxy': 'ingress',
711 'keepalived': 'ingress',
712 'iscsi': 'iscsi',
713 'rbd-mirror': 'rbd-mirror',
714 'cephfs-mirror': 'cephfs-mirror',
715 'nfs': 'nfs',
716 'grafana': 'grafana',
717 'alertmanager': 'alertmanager',
718 'prometheus': 'prometheus',
719 'node-exporter': 'node-exporter',
720 'crash': 'crash',
721 'crashcollector': 'crash', # Specific Rook Daemon
722 'container': 'container',
723 'cephadm-exporter': 'cephadm-exporter',
724 }
725 return mapping[dtype]
726
727
728def service_to_daemon_types(stype: str) -> List[str]:
729 mapping = {
730 'mon': ['mon'],
731 'mgr': ['mgr'],
732 'mds': ['mds'],
733 'rgw': ['rgw'],
734 'osd': ['osd'],
735 'ingress': ['haproxy', 'keepalived'],
736 'iscsi': ['iscsi'],
737 'rbd-mirror': ['rbd-mirror'],
738 'cephfs-mirror': ['cephfs-mirror'],
739 'nfs': ['nfs'],
740 'grafana': ['grafana'],
741 'alertmanager': ['alertmanager'],
742 'prometheus': ['prometheus'],
743 'node-exporter': ['node-exporter'],
744 'crash': ['crash'],
745 'container': ['container'],
746 'cephadm-exporter': ['cephadm-exporter'],
747 }
748 return mapping[stype]
749
750
522d829b
TL
751KNOWN_DAEMON_TYPES: List[str] = list(
752 sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
753
754
9f95a23c
TL
755class UpgradeStatusSpec(object):
756 # Orchestrator's report on what's going on with any ongoing upgrade
f67539c2 757 def __init__(self) -> None:
9f95a23c 758 self.in_progress = False # Is an upgrade underway?
f67539c2
TL
759 self.target_image: Optional[str] = None
760 self.services_complete: List[str] = [] # Which daemon types are fully updated?
761 self.progress: Optional[str] = None # How many of the daemons have we upgraded
9f95a23c
TL
762 self.message = "" # Freeform description
763
764
f67539c2 765def handle_type_error(method: FuncT) -> FuncT:
9f95a23c 766 @wraps(method)
f67539c2 767 def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
9f95a23c
TL
768 try:
769 return method(cls, *args, **kwargs)
770 except TypeError as e:
771 error_msg = '{}: {}'.format(cls.__name__, e)
772 raise OrchestratorValidationError(error_msg)
f67539c2
TL
773 return cast(FuncT, inner)
774
775
776class DaemonDescriptionStatus(enum.IntEnum):
777 error = -1
778 stopped = 0
779 running = 1
9f95a23c
TL
780
781
782class DaemonDescription(object):
783 """
784 For responding to queries about the status of a particular daemon,
785 stateful or stateless.
786
787 This is not about health or performance monitoring of daemons: it's
788 about letting the orchestrator tell Ceph whether and where a
789 daemon is scheduled in the cluster. When an orchestrator tells
790 Ceph "it's running on host123", that's not a promise that the process
791 is literally up this second, it's a description of where the orchestrator
792 has decided the daemon should run.
793 """
794
795 def __init__(self,
f67539c2
TL
796 daemon_type: Optional[str] = None,
797 daemon_id: Optional[str] = None,
798 hostname: Optional[str] = None,
799 container_id: Optional[str] = None,
800 container_image_id: Optional[str] = None,
801 container_image_name: Optional[str] = None,
802 container_image_digests: Optional[List[str]] = None,
803 version: Optional[str] = None,
804 status: Optional[DaemonDescriptionStatus] = None,
805 status_desc: Optional[str] = None,
806 last_refresh: Optional[datetime.datetime] = None,
807 created: Optional[datetime.datetime] = None,
808 started: Optional[datetime.datetime] = None,
809 last_configured: Optional[datetime.datetime] = None,
810 osdspec_affinity: Optional[str] = None,
811 last_deployed: Optional[datetime.datetime] = None,
f6b5b4d7 812 events: Optional[List['OrchestratorEvent']] = None,
f67539c2
TL
813 is_active: bool = False,
814 memory_usage: Optional[int] = None,
815 memory_request: Optional[int] = None,
816 memory_limit: Optional[int] = None,
817 service_name: Optional[str] = None,
818 ports: Optional[List[int]] = None,
819 ip: Optional[str] = None,
820 deployed_by: Optional[List[str]] = None,
b3b6e05e
TL
821 rank: Optional[int] = None,
822 rank_generation: Optional[int] = None,
f67539c2 823 ) -> None:
f6b5b4d7 824
9f95a23c 825 # Host is at the same granularity as InventoryHost
f67539c2 826 self.hostname: Optional[str] = hostname
9f95a23c
TL
827
828 # Not everyone runs in containers, but enough people do to
829 # justify having the container_id (runtime id) and container_image
830 # (image name)
831 self.container_id = container_id # runtime id
f67539c2 832 self.container_image_id = container_image_id # image id locally
9f95a23c 833 self.container_image_name = container_image_name # image friendly name
f67539c2 834 self.container_image_digests = container_image_digests # reg hashes
9f95a23c
TL
835
836 # The type of service (osd, mon, mgr, etc.)
837 self.daemon_type = daemon_type
838
839 # The orchestrator will have picked some names for daemons,
840 # typically either based on hostnames or on pod names.
841 # This is the <foo> in mds.<foo>, the ID that will appear
842 # in the FSMap/ServiceMap.
f67539c2 843 self.daemon_id: Optional[str] = daemon_id
a4b75251 844 self.daemon_name = self.name()
f67539c2 845
b3b6e05e
TL
846 # Some daemon types have a numeric rank assigned
847 self.rank: Optional[int] = rank
848 self.rank_generation: Optional[int] = rank_generation
849
f67539c2 850 self._service_name: Optional[str] = service_name
9f95a23c
TL
851
852 # Service version that was deployed
853 self.version = version
854
855 # Service status: -1 error, 0 stopped, 1 running
856 self.status = status
857
f67539c2 858 # Service status description when status == error.
9f95a23c
TL
859 self.status_desc = status_desc
860
861 # datetime when this info was last refreshed
f6b5b4d7 862 self.last_refresh: Optional[datetime.datetime] = last_refresh
9f95a23c 863
f6b5b4d7
TL
864 self.created: Optional[datetime.datetime] = created
865 self.started: Optional[datetime.datetime] = started
866 self.last_configured: Optional[datetime.datetime] = last_configured
867 self.last_deployed: Optional[datetime.datetime] = last_deployed
9f95a23c 868
e306af50 869 # Affinity to a certain OSDSpec
f6b5b4d7
TL
870 self.osdspec_affinity: Optional[str] = osdspec_affinity
871
872 self.events: List[OrchestratorEvent] = events or []
f91f0fd5 873
f67539c2
TL
874 self.memory_usage: Optional[int] = memory_usage
875 self.memory_request: Optional[int] = memory_request
876 self.memory_limit: Optional[int] = memory_limit
877
878 self.ports: Optional[List[int]] = ports
879 self.ip: Optional[str] = ip
880
881 self.deployed_by = deployed_by
882
f6b5b4d7 883 self.is_active = is_active
e306af50 884
f67539c2
TL
885 def get_port_summary(self) -> str:
886 if not self.ports:
887 return ''
888 return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
889
890 def name(self) -> str:
9f95a23c
TL
891 return '%s.%s' % (self.daemon_type, self.daemon_id)
892
f6b5b4d7 893 def matches_service(self, service_name: Optional[str]) -> bool:
f67539c2
TL
894 assert self.daemon_id is not None
895 assert self.daemon_type is not None
9f95a23c 896 if service_name:
f67539c2 897 return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
9f95a23c
TL
898 return False
899
f67539c2
TL
900 def service_id(self) -> str:
901 assert self.daemon_id is not None
902 assert self.daemon_type is not None
f6b5b4d7 903
f67539c2
TL
904 if self._service_name:
905 if '.' in self._service_name:
906 return self._service_name.split('.', 1)[1]
907 else:
908 return ''
909
910 if self.daemon_type == 'osd':
911 if self.osdspec_affinity and self.osdspec_affinity != 'None':
912 return self.osdspec_affinity
913 return 'unmanaged'
914
915 def _match() -> str:
916 assert self.daemon_id is not None
f6b5b4d7
TL
917 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
918 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
e306af50
TL
919
920 if not self.hostname:
921 # TODO: can a DaemonDescription exist without a hostname?
922 raise err
923
924 # use the bare hostname, not the FQDN.
925 host = self.hostname.split('.')[0]
926
927 if host == self.daemon_id:
928 # daemon_id == "host"
929 return self.daemon_id
930
931 elif host in self.daemon_id:
932 # daemon_id == "service_id.host"
933 # daemon_id == "service_id.host.random"
934 pre, post = self.daemon_id.rsplit(host, 1)
935 if not pre.endswith('.'):
936 # '.' sep missing at front of host
937 raise err
938 elif post and not post.startswith('.'):
939 # '.' sep missing at end of host
940 raise err
1911f103 941 return pre[:-1]
e306af50
TL
942
943 # daemon_id == "service_id.random"
944 if self.daemon_type == 'rgw':
1911f103 945 v = self.daemon_id.split('.')
e306af50 946 if len(v) in [3, 4]:
1911f103 947 return '.'.join(v[0:2])
e306af50 948
f91f0fd5
TL
949 if self.daemon_type == 'iscsi':
950 v = self.daemon_id.split('.')
951 return '.'.join(v[0:-1])
952
e306af50
TL
953 # daemon_id == "service_id"
954 return self.daemon_id
955
f67539c2 956 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
e306af50
TL
957 return _match()
958
959 return self.daemon_id
1911f103 960
f67539c2
TL
961 def service_name(self) -> str:
962 if self._service_name:
963 return self._service_name
964 assert self.daemon_type is not None
965 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
966 return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
967 return daemon_type_to_service(self.daemon_type)
9f95a23c 968
f67539c2 969 def __repr__(self) -> str:
9f95a23c
TL
970 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
971 id=self.daemon_id)
972
f67539c2
TL
973 def to_json(self) -> dict:
974 out: Dict[str, Any] = OrderedDict()
f6b5b4d7
TL
975 out['daemon_type'] = self.daemon_type
976 out['daemon_id'] = self.daemon_id
b3b6e05e 977 out['service_name'] = self._service_name
a4b75251 978 out['daemon_name'] = self.name()
f6b5b4d7
TL
979 out['hostname'] = self.hostname
980 out['container_id'] = self.container_id
981 out['container_image_id'] = self.container_image_id
982 out['container_image_name'] = self.container_image_name
f67539c2
TL
983 out['container_image_digests'] = self.container_image_digests
984 out['memory_usage'] = self.memory_usage
985 out['memory_request'] = self.memory_request
986 out['memory_limit'] = self.memory_limit
f6b5b4d7 987 out['version'] = self.version
f67539c2 988 out['status'] = self.status.value if self.status is not None else None
f6b5b4d7
TL
989 out['status_desc'] = self.status_desc
990 if self.daemon_type == 'osd':
991 out['osdspec_affinity'] = self.osdspec_affinity
992 out['is_active'] = self.is_active
f67539c2
TL
993 out['ports'] = self.ports
994 out['ip'] = self.ip
b3b6e05e
TL
995 out['rank'] = self.rank
996 out['rank_generation'] = self.rank_generation
f6b5b4d7 997
9f95a23c
TL
998 for k in ['last_refresh', 'created', 'started', 'last_deployed',
999 'last_configured']:
1000 if getattr(self, k):
adb31ebb 1001 out[k] = datetime_to_str(getattr(self, k))
f6b5b4d7
TL
1002
1003 if self.events:
1004 out['events'] = [e.to_json() for e in self.events]
1005
1006 empty = [k for k, v in out.items() if v is None]
1007 for e in empty:
1008 del out[e]
1009 return out
9f95a23c 1010
b3b6e05e
TL
1011 def to_dict(self) -> dict:
1012 out: Dict[str, Any] = OrderedDict()
1013 out['daemon_type'] = self.daemon_type
1014 out['daemon_id'] = self.daemon_id
a4b75251 1015 out['daemon_name'] = self.name()
b3b6e05e
TL
1016 out['hostname'] = self.hostname
1017 out['container_id'] = self.container_id
1018 out['container_image_id'] = self.container_image_id
1019 out['container_image_name'] = self.container_image_name
1020 out['container_image_digests'] = self.container_image_digests
1021 out['memory_usage'] = self.memory_usage
1022 out['memory_request'] = self.memory_request
1023 out['memory_limit'] = self.memory_limit
1024 out['version'] = self.version
1025 out['status'] = self.status.value if self.status is not None else None
1026 out['status_desc'] = self.status_desc
1027 if self.daemon_type == 'osd':
1028 out['osdspec_affinity'] = self.osdspec_affinity
1029 out['is_active'] = self.is_active
1030 out['ports'] = self.ports
1031 out['ip'] = self.ip
1032
1033 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1034 'last_configured']:
1035 if getattr(self, k):
1036 out[k] = datetime_to_str(getattr(self, k))
1037
1038 if self.events:
1039 out['events'] = [e.to_dict() for e in self.events]
1040
1041 empty = [k for k, v in out.items() if v is None]
1042 for e in empty:
1043 del out[e]
1044 return out
1045
9f95a23c
TL
1046 @classmethod
1047 @handle_type_error
f67539c2 1048 def from_json(cls, data: dict) -> 'DaemonDescription':
9f95a23c 1049 c = data.copy()
f6b5b4d7 1050 event_strs = c.pop('events', [])
9f95a23c
TL
1051 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1052 'last_configured']:
1053 if k in c:
adb31ebb 1054 c[k] = str_to_datetime(c[k])
f6b5b4d7 1055 events = [OrchestratorEvent.from_json(e) for e in event_strs]
f67539c2 1056 status_int = c.pop('status', None)
a4b75251
TL
1057 if 'daemon_name' in c:
1058 del c['daemon_name']
f67539c2
TL
1059 status = DaemonDescriptionStatus(status_int) if status_int is not None else None
1060 return cls(events=events, status=status, **c)
9f95a23c 1061
f67539c2 1062 def __copy__(self) -> 'DaemonDescription':
1911f103
TL
1063 # feel free to change this:
1064 return DaemonDescription.from_json(self.to_json())
1065
f6b5b4d7 1066 @staticmethod
f67539c2 1067 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
522d829b 1068 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
f6b5b4d7
TL
1069
1070
1071yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1072
1073
9f95a23c
TL
1074class ServiceDescription(object):
1075 """
1076 For responding to queries about the status of a particular service,
1077 stateful or stateless.
1078
1079 This is not about health or performance monitoring of services: it's
1080 about letting the orchestrator tell Ceph whether and where a
1081 service is scheduled in the cluster. When an orchestrator tells
1082 Ceph "it's running on host123", that's not a promise that the process
1083 is literally up this second, it's a description of where the orchestrator
1084 has decided the service should run.
1085 """
1086
1087 def __init__(self,
1911f103 1088 spec: ServiceSpec,
f67539c2
TL
1089 container_image_id: Optional[str] = None,
1090 container_image_name: Optional[str] = None,
f67539c2
TL
1091 service_url: Optional[str] = None,
1092 last_refresh: Optional[datetime.datetime] = None,
1093 created: Optional[datetime.datetime] = None,
1094 deleted: Optional[datetime.datetime] = None,
1095 size: int = 0,
1096 running: int = 0,
1097 events: Optional[List['OrchestratorEvent']] = None,
1098 virtual_ip: Optional[str] = None,
1099 ports: List[int] = []) -> None:
9f95a23c
TL
1100 # Not everyone runs in containers, but enough people do to
1101 # justify having the container_image_id (image hash) and container_image
1102 # (image name)
1103 self.container_image_id = container_image_id # image hash
1104 self.container_image_name = container_image_name # image friendly name
1105
9f95a23c
TL
1106 # If the service exposes REST-like API, this attribute should hold
1107 # the URL.
1108 self.service_url = service_url
1109
1110 # Number of daemons
1111 self.size = size
1112
1113 # Number of daemons up
1114 self.running = running
1115
1116 # datetime when this info was last refreshed
f6b5b4d7
TL
1117 self.last_refresh: Optional[datetime.datetime] = last_refresh
1118 self.created: Optional[datetime.datetime] = created
f67539c2 1119 self.deleted: Optional[datetime.datetime] = deleted
9f95a23c 1120
1911f103 1121 self.spec: ServiceSpec = spec
9f95a23c 1122
f6b5b4d7
TL
1123 self.events: List[OrchestratorEvent] = events or []
1124
f67539c2
TL
1125 self.virtual_ip = virtual_ip
1126 self.ports = ports
1127
1128 def service_type(self) -> str:
1911f103 1129 return self.spec.service_type
9f95a23c 1130
f67539c2 1131 def __repr__(self) -> str:
1911f103 1132 return f"<ServiceDescription of {self.spec.one_line_str()}>"
9f95a23c 1133
f67539c2
TL
1134 def get_port_summary(self) -> str:
1135 if not self.ports:
1136 return ''
1137 return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1138
f6b5b4d7 1139 def to_json(self) -> OrderedDict:
1911f103
TL
1140 out = self.spec.to_json()
1141 status = {
9f95a23c
TL
1142 'container_image_id': self.container_image_id,
1143 'container_image_name': self.container_image_name,
9f95a23c
TL
1144 'service_url': self.service_url,
1145 'size': self.size,
1146 'running': self.running,
1911f103 1147 'last_refresh': self.last_refresh,
f6b5b4d7 1148 'created': self.created,
f67539c2
TL
1149 'virtual_ip': self.virtual_ip,
1150 'ports': self.ports if self.ports else None,
9f95a23c
TL
1151 }
1152 for k in ['last_refresh', 'created']:
1153 if getattr(self, k):
adb31ebb 1154 status[k] = datetime_to_str(getattr(self, k))
1911f103
TL
1155 status = {k: v for (k, v) in status.items() if v is not None}
1156 out['status'] = status
f6b5b4d7
TL
1157 if self.events:
1158 out['events'] = [e.to_json() for e in self.events]
1911f103 1159 return out
9f95a23c 1160
b3b6e05e
TL
1161 def to_dict(self) -> OrderedDict:
1162 out = self.spec.to_json()
1163 status = {
1164 'container_image_id': self.container_image_id,
1165 'container_image_name': self.container_image_name,
b3b6e05e
TL
1166 'service_url': self.service_url,
1167 'size': self.size,
1168 'running': self.running,
1169 'last_refresh': self.last_refresh,
1170 'created': self.created,
1171 'virtual_ip': self.virtual_ip,
1172 'ports': self.ports if self.ports else None,
1173 }
1174 for k in ['last_refresh', 'created']:
1175 if getattr(self, k):
1176 status[k] = datetime_to_str(getattr(self, k))
1177 status = {k: v for (k, v) in status.items() if v is not None}
1178 out['status'] = status
1179 if self.events:
1180 out['events'] = [e.to_dict() for e in self.events]
1181 return out
1182
9f95a23c
TL
1183 @classmethod
1184 @handle_type_error
f67539c2 1185 def from_json(cls, data: dict) -> 'ServiceDescription':
9f95a23c 1186 c = data.copy()
1911f103 1187 status = c.pop('status', {})
f6b5b4d7 1188 event_strs = c.pop('events', [])
1911f103
TL
1189 spec = ServiceSpec.from_json(c)
1190
1191 c_status = status.copy()
9f95a23c 1192 for k in ['last_refresh', 'created']:
1911f103 1193 if k in c_status:
adb31ebb 1194 c_status[k] = str_to_datetime(c_status[k])
f6b5b4d7
TL
1195 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1196 return cls(spec=spec, events=events, **c_status)
1197
1198 @staticmethod
522d829b
TL
1199 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
1200 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
f6b5b4d7
TL
1201
1202
1203yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
9f95a23c
TL
1204
1205
1206class InventoryFilter(object):
1207 """
1208 When fetching inventory, use this filter to avoid unnecessarily
1209 scanning the whole estate.
1210
1211 Typical use: filter by host when presenting UI workflow for configuring
1212 a particular server.
1213 filter by label when not all of estate is Ceph servers,
1214 and we want to only learn about the Ceph servers.
1215 filter by label when we are interested particularly
1216 in e.g. OSD servers.
1217
1218 """
f6b5b4d7
TL
1219
1220 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
9f95a23c
TL
1221
1222 #: Optional: get info about hosts matching labels
1223 self.labels = labels
1224
1225 #: Optional: get info about certain named hosts only
1226 self.hosts = hosts
1227
1228
1229class InventoryHost(object):
1230 """
1231 When fetching inventory, all Devices are groups inside of an
1232 InventoryHost.
1233 """
f6b5b4d7
TL
1234
1235 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
9f95a23c
TL
1236 if devices is None:
1237 devices = inventory.Devices([])
1238 if labels is None:
1239 labels = []
1240 assert isinstance(devices, inventory.Devices)
1241
1242 self.name = name # unique within cluster. For example a hostname.
1243 self.addr = addr or name
1244 self.devices = devices
1245 self.labels = labels
1246
f67539c2 1247 def to_json(self) -> dict:
9f95a23c
TL
1248 return {
1249 'name': self.name,
1250 'addr': self.addr,
1251 'devices': self.devices.to_json(),
1252 'labels': self.labels,
1253 }
1254
1255 @classmethod
f67539c2 1256 def from_json(cls, data: dict) -> 'InventoryHost':
9f95a23c
TL
1257 try:
1258 _data = copy.deepcopy(data)
1259 name = _data.pop('name')
1260 addr = _data.pop('addr', None) or name
1261 devices = inventory.Devices.from_json(_data.pop('devices'))
1911f103 1262 labels = _data.pop('labels', list())
9f95a23c
TL
1263 if _data:
1264 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1265 raise OrchestratorValidationError(error_msg)
9f95a23c
TL
1266 return cls(name, devices, labels, addr)
1267 except KeyError as e:
1268 error_msg = '{} is required for {}'.format(e, cls.__name__)
1269 raise OrchestratorValidationError(error_msg)
1270 except TypeError as e:
1271 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1272
9f95a23c 1273 @classmethod
f67539c2 1274 def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
9f95a23c
TL
1275 devs = inventory.Devices.from_json
1276 return [cls(item[0], devs(item[1].data)) for item in hosts]
1277
f67539c2 1278 def __repr__(self) -> str:
9f95a23c
TL
1279 return "<InventoryHost>({name})".format(name=self.name)
1280
1281 @staticmethod
f6b5b4d7 1282 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
9f95a23c
TL
1283 return [host.name for host in hosts]
1284
f67539c2 1285 def __eq__(self, other: Any) -> bool:
9f95a23c
TL
1286 return self.name == other.name and self.devices == other.devices
1287
1288
1289class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1290 """
1291 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1292 on devices.
1293
1294 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1295
1296 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1297 See ``ceph osd metadata | jq '.[].device_ids'``
1298 """
1299 __slots__ = ()
1300
1301
f6b5b4d7
TL
1302class OrchestratorEvent:
1303 """
1304 Similar to K8s Events.
1305
1306 Some form of "important" log message attached to something.
1307 """
1308 INFO = 'INFO'
1309 ERROR = 'ERROR'
1310 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1311
f67539c2
TL
1312 def __init__(self, created: Union[str, datetime.datetime], kind: str,
1313 subject: str, level: str, message: str) -> None:
f6b5b4d7 1314 if isinstance(created, str):
adb31ebb 1315 created = str_to_datetime(created)
f6b5b4d7
TL
1316 self.created: datetime.datetime = created
1317
1318 assert kind in "service daemon".split()
1319 self.kind: str = kind
1320
1321 # service name, or daemon danem or something
1322 self.subject: str = subject
1323
1324 # Events are not meant for debugging. debugs should end in the log.
1325 assert level in "INFO ERROR".split()
1326 self.level = level
1327
1328 self.message: str = message
1329
1330 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1331
1332 def kind_subject(self) -> str:
1333 return f'{self.kind}:{self.subject}'
1334
1335 def to_json(self) -> str:
1336 # Make a long list of events readable.
adb31ebb 1337 created = datetime_to_str(self.created)
f6b5b4d7
TL
1338 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1339
b3b6e05e
TL
1340 def to_dict(self) -> dict:
1341 # Convert events data to dict.
1342 return {
1343 'created': datetime_to_str(self.created),
1344 'subject': self.kind_subject(),
1345 'level': self.level,
1346 'message': self.message
1347 }
1348
f6b5b4d7
TL
1349 @classmethod
1350 @handle_type_error
f67539c2 1351 def from_json(cls, data: str) -> "OrchestratorEvent":
f6b5b4d7
TL
1352 """
1353 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
adb31ebb 1354 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
f6b5b4d7
TL
1355
1356 :param data:
1357 :return:
1358 """
1359 match = cls.regex_v1.match(data)
1360 if match:
1361 return cls(*match.groups())
1362 raise ValueError(f'Unable to match: "{data}"')
1363
f67539c2 1364 def __eq__(self, other: Any) -> bool:
f6b5b4d7
TL
1365 if not isinstance(other, OrchestratorEvent):
1366 return False
1367
1368 return self.created == other.created and self.kind == other.kind \
1369 and self.subject == other.subject and self.message == other.message
1370
1371
f67539c2 1372def _mk_orch_methods(cls: Any) -> Any:
9f95a23c
TL
1373 # Needs to be defined outside of for.
1374 # Otherwise meth is always bound to last key
f67539c2
TL
1375 def shim(method_name: str) -> Callable:
1376 def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
9f95a23c
TL
1377 completion = self._oremote(method_name, args, kwargs)
1378 return completion
1379 return inner
1380
1381 for meth in Orchestrator.__dict__:
1382 if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
1383 setattr(cls, meth, shim(meth))
1384 return cls
1385
1386
1387@_mk_orch_methods
1388class OrchestratorClientMixin(Orchestrator):
1389 """
1390 A module that inherents from `OrchestratorClientMixin` can directly call
1391 all :class:`Orchestrator` methods without manually calling remote.
1392
1393 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1394 calls :func:`OrchestratorClientMixin._oremote`
1395
1396 >>> class MyModule(OrchestratorClientMixin):
1397 ... def func(self):
1398 ... completion = self.add_host('somehost') # calls `_oremote()`
9f95a23c
TL
1399 ... self.log.debug(completion.result)
1400
1401 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1402 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1403 "real" implementation of the orchestrator.
1404
1405
1406 >>> import mgr_module
1911f103
TL
1407 >>> #doctest: +SKIP
1408 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
9f95a23c
TL
1409 ... def __init__(self, ...):
1410 ... self.orch_client = OrchestratorClientMixin()
1411 ... self.orch_client.set_mgr(self.mgr))
1412 """
1413
f6b5b4d7 1414 def set_mgr(self, mgr: MgrModule) -> None:
9f95a23c
TL
1415 """
1416 Useable in the Dashbord that uses a global ``mgr``
1417 """
1418
1419 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1420
f67539c2 1421 def __get_mgr(self) -> Any:
9f95a23c
TL
1422 try:
1423 return self.__mgr
1424 except AttributeError:
1425 return self
1426
f67539c2 1427 def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
9f95a23c
TL
1428 """
1429 Helper for invoking `remote` on whichever orchestrator is enabled
1430
1431 :raises RuntimeError: If the remote method failed.
1432 :raises OrchestratorError: orchestrator failed to perform
1433 :raises ImportError: no `orchestrator` module or backend not found.
1434 """
1435 mgr = self.__get_mgr()
1436
1437 try:
1438 o = mgr._select_orchestrator()
1439 except AttributeError:
1440 o = mgr.remote('orchestrator', '_select_orchestrator')
1441
1442 if o is None:
1443 raise NoOrchestrator()
1444
1445 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1446 try:
1447 return mgr.remote(o, meth, *args, **kwargs)
1448 except Exception as e:
1449 if meth == 'get_feature_set':
1450 raise # self.get_feature_set() calls self._oremote()
1451 f_set = self.get_feature_set()
1452 if meth not in f_set or not f_set[meth]['available']:
1453 raise NotImplementedError(f'{o} does not implement {meth}') from e
1454 raise