]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
import ceph pacific 16.2.5
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
1
2 """
3 ceph-mgr orchestrator interface
4
5 Please see the ceph-mgr module developer's guide for more information.
6 """
7
8 import copy
9 import datetime
10 import enum
11 import errno
12 import logging
13 import pickle
14 import re
15
16 from collections import namedtuple, OrderedDict
17 from contextlib import contextmanager
18 from functools import wraps, reduce
19
20 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
21 Sequence, Dict, cast
22
23 try:
24 from typing import Protocol # Protocol was added in Python 3.8
25 except ImportError:
26 class Protocol: # type: ignore
27 pass
28
29
30 import yaml
31
32 from ceph.deployment import inventory
33 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
34 IscsiServiceSpec, IngressSpec
35 from ceph.deployment.drive_group import DriveGroupSpec
36 from ceph.deployment.hostspec import HostSpec, SpecValidationError
37 from ceph.utils import datetime_to_str, str_to_datetime
38
39 from mgr_module import MgrModule, CLICommand, HandleCommandResult
40
41
42 logger = logging.getLogger(__name__)
43
44 T = TypeVar('T')
45 FuncT = TypeVar('FuncT', bound=Callable[..., Any])
46
47
48 class OrchestratorError(Exception):
49 """
50 General orchestrator specific error.
51
52 Used for deployment, configuration or user errors.
53
54 It's not intended for programming errors or orchestrator internal errors.
55 """
56
57 def __init__(self,
58 msg: str,
59 errno: int = -errno.EINVAL,
60 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
61 super(Exception, self).__init__(msg)
62 self.errno = errno
63 # See OrchestratorEvent.subject
64 self.event_subject = event_kind_subject
65
66
67 class NoOrchestrator(OrchestratorError):
68 """
69 No orchestrator in configured.
70 """
71
72 def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
74
75
76 class OrchestratorValidationError(OrchestratorError):
77 """
78 Raised when an orchestrator doesn't support a specific feature.
79 """
80
81
82 @contextmanager
83 def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
84 try:
85 yield
86 except OrchestratorError as e:
87 if overwrite or hasattr(e, 'event_subject'):
88 e.event_subject = (kind, subject)
89 raise
90
91
92 def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
93 @wraps(func)
94 def wrapper(*args: Any, **kwargs: Any) -> Any:
95 try:
96 return func(*args, **kwargs)
97 except (OrchestratorError, SpecValidationError) as e:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e.errno, stderr=str(e))
100 except ImportError as e:
101 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
102 except NotImplementedError:
103 msg = 'This Orchestrator does not support `{}`'.format(prefix)
104 return HandleCommandResult(-errno.ENOENT, stderr=msg)
105
106 # misuse lambda to copy `wrapper`
107 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731
108 wrapper_copy._prefix = prefix # type: ignore
109 wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore
110 wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
111 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
112
113 return cast(FuncT, wrapper_copy)
114
115
116 def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
117 """
118 Decorator to make Orchestrator methods return
119 an OrchResult.
120 """
121
122 @wraps(f)
123 def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
124 try:
125 return OrchResult(f(*args, **kwargs))
126 except Exception as e:
127 return OrchResult(None, exception=e)
128
129 return cast(Callable[..., OrchResult[T]], wrapper)
130
131
132 class InnerCliCommandCallable(Protocol):
133 def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
134 ...
135
136
137 def _cli_command(perm: str) -> InnerCliCommandCallable:
138 def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
139 return lambda func: handle_exception(prefix, perm, func)
140 return inner_cli_command
141
142
143 _cli_read_command = _cli_command('r')
144 _cli_write_command = _cli_command('rw')
145
146
147 class CLICommandMeta(type):
148 """
149 This is a workaround for the use of a global variable CLICommand.COMMANDS which
150 prevents modules from importing any other module.
151
152 We make use of CLICommand, except for the use of the global variable.
153 """
154 def __init__(cls, name: str, bases: Any, dct: Any) -> None:
155 super(CLICommandMeta, cls).__init__(name, bases, dct)
156 dispatch: Dict[str, CLICommand] = {}
157 for v in dct.values():
158 try:
159 dispatch[v._prefix] = v._cli_command
160 except AttributeError:
161 pass
162
163 def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
164 if cmd['prefix'] not in dispatch:
165 return self.handle_command(inbuf, cmd)
166
167 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
168
169 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
170 cls.handle_command = handle_command
171
172
173 class OrchResult(Generic[T]):
174 """
175 Stores a result and an exception. Mainly to circumvent the
176 MgrModule.remote() method that hides all exceptions and for
177 handling different sub-interpreters.
178 """
179
180 def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
181 self.result = result
182 self.serialized_exception: Optional[bytes] = None
183 self.exception_str: str = ''
184 self.set_exception(exception)
185
186 __slots__ = 'result', 'serialized_exception', 'exception_str'
187
188 def set_exception(self, e: Optional[Exception]) -> None:
189 if e is None:
190 self.serialized_exception = None
191 self.exception_str = ''
192 return
193
194 self.exception_str = f'{type(e)}: {str(e)}'
195 try:
196 self.serialized_exception = pickle.dumps(e)
197 except pickle.PicklingError:
198 logger.error(f"failed to pickle {e}")
199 if isinstance(e, Exception):
200 e = Exception(*e.args)
201 else:
202 e = Exception(str(e))
203 # degenerate to a plain Exception
204 self.serialized_exception = pickle.dumps(e)
205
206 def result_str(self) -> str:
207 """Force a string."""
208 if self.result is None:
209 return ''
210 if isinstance(self.result, list):
211 return '\n'.join(str(x) for x in self.result)
212 return str(self.result)
213
214
215 def raise_if_exception(c: OrchResult[T]) -> T:
216 """
217 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
218 """
219 if c.serialized_exception is not None:
220 try:
221 e = pickle.loads(c.serialized_exception)
222 except (KeyError, AttributeError):
223 raise Exception(c.exception_str)
224 raise e
225 assert c.result is not None, 'OrchResult should either have an exception or a result'
226 return c.result
227
228
229 def _hide_in_features(f: FuncT) -> FuncT:
230 f._hide_in_features = True # type: ignore
231 return f
232
233
234 class Orchestrator(object):
235 """
236 Calls in this class may do long running remote operations, with time
237 periods ranging from network latencies to package install latencies and large
238 internet downloads. For that reason, all are asynchronous, and return
239 ``Completion`` objects.
240
241 Methods should only return the completion and not directly execute
242 anything, like network calls. Otherwise the purpose of
243 those completions is defeated.
244
245 Implementations are not required to start work on an operation until
246 the caller waits on the relevant Completion objects. Callers making
247 multiple updates should not wait on Completions until they're done
248 sending operations: this enables implementations to batch up a series
249 of updates when wait() is called on a set of Completion objects.
250
251 Implementations are encouraged to keep reasonably fresh caches of
252 the status of the system: it is better to serve a stale-but-recent
253 result read of e.g. device inventory than it is to keep the caller waiting
254 while you scan hosts every time.
255 """
256
257 @_hide_in_features
258 def is_orchestrator_module(self) -> bool:
259 """
260 Enable other modules to interrogate this module to discover
261 whether it's usable as an orchestrator module.
262
263 Subclasses do not need to override this.
264 """
265 return True
266
267 @_hide_in_features
268 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
269 """
270 Report whether we can talk to the orchestrator. This is the
271 place to give the user a meaningful message if the orchestrator
272 isn't running or can't be contacted.
273
274 This method may be called frequently (e.g. every page load
275 to conditionally display a warning banner), so make sure it's
276 not too expensive. It's okay to give a slightly stale status
277 (e.g. based on a periodic background ping of the orchestrator)
278 if that's necessary to make this method fast.
279
280 .. note::
281 `True` doesn't mean that the desired functionality
282 is actually available in the orchestrator. I.e. this
283 won't work as expected::
284
285 >>> #doctest: +SKIP
286 ... if OrchestratorClientMixin().available()[0]: # wrong.
287 ... OrchestratorClientMixin().get_hosts()
288
289 :return: boolean representing whether the module is available/usable
290 :return: string describing any error
291 :return: dict containing any module specific information
292 """
293 raise NotImplementedError()
294
295 @_hide_in_features
296 def get_feature_set(self) -> Dict[str, dict]:
297 """Describes which methods this orchestrator implements
298
299 .. note::
300 `True` doesn't mean that the desired functionality
301 is actually possible in the orchestrator. I.e. this
302 won't work as expected::
303
304 >>> #doctest: +SKIP
305 ... api = OrchestratorClientMixin()
306 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
307 ... api.get_hosts()
308
309 It's better to ask for forgiveness instead::
310
311 >>> #doctest: +SKIP
312 ... try:
313 ... OrchestratorClientMixin().get_hosts()
314 ... except (OrchestratorError, NotImplementedError):
315 ... ...
316
317 :returns: Dict of API method names to ``{'available': True or False}``
318 """
319 module = self.__class__
320 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
321 for a in Orchestrator.__dict__
322 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
323 }
324 return features
325
326 def cancel_completions(self) -> None:
327 """
328 Cancels ongoing completions. Unstuck the mgr.
329 """
330 raise NotImplementedError()
331
332 def pause(self) -> None:
333 raise NotImplementedError()
334
335 def resume(self) -> None:
336 raise NotImplementedError()
337
338 def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
339 """
340 Add a host to the orchestrator inventory.
341
342 :param host: hostname
343 """
344 raise NotImplementedError()
345
346 def remove_host(self, host: str) -> OrchResult[str]:
347 """
348 Remove a host from the orchestrator inventory.
349
350 :param host: hostname
351 """
352 raise NotImplementedError()
353
354 def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
355 """
356 Update a host's address
357
358 :param host: hostname
359 :param addr: address (dns name or IP)
360 """
361 raise NotImplementedError()
362
363 def get_hosts(self) -> OrchResult[List[HostSpec]]:
364 """
365 Report the hosts in the cluster.
366
367 :return: list of HostSpec
368 """
369 raise NotImplementedError()
370
371 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
372 """
373 Add a host label
374 """
375 raise NotImplementedError()
376
377 def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
378 """
379 Remove a host label
380 """
381 raise NotImplementedError()
382
383 def host_ok_to_stop(self, hostname: str) -> OrchResult:
384 """
385 Check if the specified host can be safely stopped without reducing availability
386
387 :param host: hostname
388 """
389 raise NotImplementedError()
390
391 def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
392 """
393 Place a host in maintenance, stopping daemons and disabling it's systemd target
394 """
395 raise NotImplementedError()
396
397 def exit_host_maintenance(self, hostname: str) -> OrchResult:
398 """
399 Return a host from maintenance, restarting the clusters systemd target
400 """
401 raise NotImplementedError()
402
403 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
404 """
405 Returns something that was created by `ceph-volume inventory`.
406
407 :return: list of InventoryHost
408 """
409 raise NotImplementedError()
410
411 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
412 """
413 Describe a service (of any kind) that is already configured in
414 the orchestrator. For example, when viewing an OSD in the dashboard
415 we might like to also display information about the orchestrator's
416 view of the service (like the kubernetes pod ID).
417
418 When viewing a CephFS filesystem in the dashboard, we would use this
419 to display the pods being currently run for MDS daemons.
420
421 :return: list of ServiceDescription objects.
422 """
423 raise NotImplementedError()
424
425 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
426 """
427 Describe a daemon (of any kind) that is already configured in
428 the orchestrator.
429
430 :return: list of DaemonDescription objects.
431 """
432 raise NotImplementedError()
433
434 @handle_orch_error
435 def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
436 """
437 Applies any spec
438 """
439 fns: Dict[str, Callable[..., OrchResult[str]]] = {
440 'alertmanager': self.apply_alertmanager,
441 'crash': self.apply_crash,
442 'grafana': self.apply_grafana,
443 'iscsi': self.apply_iscsi,
444 'mds': self.apply_mds,
445 'mgr': self.apply_mgr,
446 'mon': self.apply_mon,
447 'nfs': self.apply_nfs,
448 'node-exporter': self.apply_node_exporter,
449 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
450 'prometheus': self.apply_prometheus,
451 'rbd-mirror': self.apply_rbd_mirror,
452 'rgw': self.apply_rgw,
453 'ingress': self.apply_ingress,
454 'host': self.add_host,
455 'cephadm-exporter': self.apply_cephadm_exporter,
456 }
457
458 def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
459 l_res = raise_if_exception(l)
460 r_res = raise_if_exception(r)
461 l_res.append(r_res)
462 return OrchResult(l_res)
463 return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
464
465 def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
466 """
467 Plan (Dry-run, Preview) a List of Specs.
468 """
469 raise NotImplementedError()
470
471 def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
472 """
473 Remove specific daemon(s).
474
475 :return: None
476 """
477 raise NotImplementedError()
478
479 def remove_service(self, service_name: str) -> OrchResult[str]:
480 """
481 Remove a service (a collection of daemons).
482
483 :return: None
484 """
485 raise NotImplementedError()
486
487 def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
488 """
489 Perform an action (start/stop/reload) on a service (i.e., all daemons
490 providing the logical service).
491
492 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
493 :param service_name: service_type + '.' + service_id
494 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
495 :rtype: OrchResult
496 """
497 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
498 raise NotImplementedError()
499
500 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
501 """
502 Perform an action (start/stop/reload) on a daemon.
503
504 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
505 :param daemon_name: name of daemon
506 :param image: Container image when redeploying that daemon
507 :rtype: OrchResult
508 """
509 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
510 raise NotImplementedError()
511
512 def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
513 """
514 Create one or more OSDs within a single Drive Group.
515
516 The principal argument here is the drive_group member
517 of OsdSpec: other fields are advisory/extensible for any
518 finer-grained OSD feature enablement (choice of backing store,
519 compression/encryption, etc).
520 """
521 raise NotImplementedError()
522
523 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
524 """ Update OSD cluster """
525 raise NotImplementedError()
526
527 def set_unmanaged_flag(self,
528 unmanaged_flag: bool,
529 service_type: str = 'osd',
530 service_name: Optional[str] = None
531 ) -> HandleCommandResult:
532 raise NotImplementedError()
533
534 def preview_osdspecs(self,
535 osdspec_name: Optional[str] = 'osd',
536 osdspecs: Optional[List[DriveGroupSpec]] = None
537 ) -> OrchResult[str]:
538 """ Get a preview for OSD deployments """
539 raise NotImplementedError()
540
541 def remove_osds(self, osd_ids: List[str],
542 replace: bool = False,
543 force: bool = False) -> OrchResult[str]:
544 """
545 :param osd_ids: list of OSD IDs
546 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
547 :param force: Forces the OSD removal process without waiting for the data to be drained first.
548 Note that this can only remove OSDs that were successfully
549 created (i.e. got an OSD ID).
550 """
551 raise NotImplementedError()
552
553 def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
554 """
555 TODO
556 """
557 raise NotImplementedError()
558
559 def remove_osds_status(self) -> OrchResult:
560 """
561 Returns a status of the ongoing OSD removal operations.
562 """
563 raise NotImplementedError()
564
565 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
566 """
567 Instructs the orchestrator to enable or disable either the ident or the fault LED.
568
569 :param ident_fault: either ``"ident"`` or ``"fault"``
570 :param on: ``True`` = on.
571 :param locations: See :class:`orchestrator.DeviceLightLoc`
572 """
573 raise NotImplementedError()
574
575 def zap_device(self, host: str, path: str) -> OrchResult[str]:
576 """Zap/Erase a device (DESTROYS DATA)"""
577 raise NotImplementedError()
578
579 def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
580 """Create daemons daemon(s) for unmanaged services"""
581 raise NotImplementedError()
582
583 def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
584 """Update mon cluster"""
585 raise NotImplementedError()
586
587 def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
588 """Update mgr cluster"""
589 raise NotImplementedError()
590
591 def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]:
592 """Update MDS cluster"""
593 raise NotImplementedError()
594
595 def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
596 """Update RGW cluster"""
597 raise NotImplementedError()
598
599 def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
600 """Update ingress daemons"""
601 raise NotImplementedError()
602
603 def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
604 """Update rbd-mirror cluster"""
605 raise NotImplementedError()
606
607 def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
608 """Update NFS cluster"""
609 raise NotImplementedError()
610
611 def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
612 """Update iscsi cluster"""
613 raise NotImplementedError()
614
615 def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
616 """Update prometheus cluster"""
617 raise NotImplementedError()
618
619 def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
620 """Update existing a Node-Exporter daemon(s)"""
621 raise NotImplementedError()
622
623 def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
624 """Update existing a crash daemon(s)"""
625 raise NotImplementedError()
626
627 def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
628 """Update existing a grafana service"""
629 raise NotImplementedError()
630
631 def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
632 """Update an existing AlertManager daemon(s)"""
633 raise NotImplementedError()
634
635 def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
636 """Update an existing cephadm exporter daemon"""
637 raise NotImplementedError()
638
639 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
640 raise NotImplementedError()
641
642 def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
643 raise NotImplementedError()
644
645 def upgrade_pause(self) -> OrchResult[str]:
646 raise NotImplementedError()
647
648 def upgrade_resume(self) -> OrchResult[str]:
649 raise NotImplementedError()
650
651 def upgrade_stop(self) -> OrchResult[str]:
652 raise NotImplementedError()
653
654 def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
655 """
656 If an upgrade is currently underway, report on where
657 we are in the process, or if some error has occurred.
658
659 :return: UpgradeStatusSpec instance
660 """
661 raise NotImplementedError()
662
663 @_hide_in_features
664 def upgrade_available(self) -> OrchResult:
665 """
666 Report on what versions are available to upgrade to
667
668 :return: List of strings
669 """
670 raise NotImplementedError()
671
672
673 GenericSpec = Union[ServiceSpec, HostSpec]
674
675
676 def json_to_generic_spec(spec: dict) -> GenericSpec:
677 if 'service_type' in spec and spec['service_type'] == 'host':
678 return HostSpec.from_json(spec)
679 else:
680 return ServiceSpec.from_json(spec)
681
682
683 def daemon_type_to_service(dtype: str) -> str:
684 mapping = {
685 'mon': 'mon',
686 'mgr': 'mgr',
687 'mds': 'mds',
688 'rgw': 'rgw',
689 'osd': 'osd',
690 'haproxy': 'ingress',
691 'keepalived': 'ingress',
692 'iscsi': 'iscsi',
693 'rbd-mirror': 'rbd-mirror',
694 'cephfs-mirror': 'cephfs-mirror',
695 'nfs': 'nfs',
696 'grafana': 'grafana',
697 'alertmanager': 'alertmanager',
698 'prometheus': 'prometheus',
699 'node-exporter': 'node-exporter',
700 'crash': 'crash',
701 'crashcollector': 'crash', # Specific Rook Daemon
702 'container': 'container',
703 'cephadm-exporter': 'cephadm-exporter',
704 }
705 return mapping[dtype]
706
707
708 def service_to_daemon_types(stype: str) -> List[str]:
709 mapping = {
710 'mon': ['mon'],
711 'mgr': ['mgr'],
712 'mds': ['mds'],
713 'rgw': ['rgw'],
714 'osd': ['osd'],
715 'ingress': ['haproxy', 'keepalived'],
716 'iscsi': ['iscsi'],
717 'rbd-mirror': ['rbd-mirror'],
718 'cephfs-mirror': ['cephfs-mirror'],
719 'nfs': ['nfs'],
720 'grafana': ['grafana'],
721 'alertmanager': ['alertmanager'],
722 'prometheus': ['prometheus'],
723 'node-exporter': ['node-exporter'],
724 'crash': ['crash'],
725 'container': ['container'],
726 'cephadm-exporter': ['cephadm-exporter'],
727 }
728 return mapping[stype]
729
730
731 class UpgradeStatusSpec(object):
732 # Orchestrator's report on what's going on with any ongoing upgrade
733 def __init__(self) -> None:
734 self.in_progress = False # Is an upgrade underway?
735 self.target_image: Optional[str] = None
736 self.services_complete: List[str] = [] # Which daemon types are fully updated?
737 self.progress: Optional[str] = None # How many of the daemons have we upgraded
738 self.message = "" # Freeform description
739
740
741 def handle_type_error(method: FuncT) -> FuncT:
742 @wraps(method)
743 def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
744 try:
745 return method(cls, *args, **kwargs)
746 except TypeError as e:
747 error_msg = '{}: {}'.format(cls.__name__, e)
748 raise OrchestratorValidationError(error_msg)
749 return cast(FuncT, inner)
750
751
752 class DaemonDescriptionStatus(enum.IntEnum):
753 error = -1
754 stopped = 0
755 running = 1
756
757
758 class DaemonDescription(object):
759 """
760 For responding to queries about the status of a particular daemon,
761 stateful or stateless.
762
763 This is not about health or performance monitoring of daemons: it's
764 about letting the orchestrator tell Ceph whether and where a
765 daemon is scheduled in the cluster. When an orchestrator tells
766 Ceph "it's running on host123", that's not a promise that the process
767 is literally up this second, it's a description of where the orchestrator
768 has decided the daemon should run.
769 """
770
771 def __init__(self,
772 daemon_type: Optional[str] = None,
773 daemon_id: Optional[str] = None,
774 hostname: Optional[str] = None,
775 container_id: Optional[str] = None,
776 container_image_id: Optional[str] = None,
777 container_image_name: Optional[str] = None,
778 container_image_digests: Optional[List[str]] = None,
779 version: Optional[str] = None,
780 status: Optional[DaemonDescriptionStatus] = None,
781 status_desc: Optional[str] = None,
782 last_refresh: Optional[datetime.datetime] = None,
783 created: Optional[datetime.datetime] = None,
784 started: Optional[datetime.datetime] = None,
785 last_configured: Optional[datetime.datetime] = None,
786 osdspec_affinity: Optional[str] = None,
787 last_deployed: Optional[datetime.datetime] = None,
788 events: Optional[List['OrchestratorEvent']] = None,
789 is_active: bool = False,
790 memory_usage: Optional[int] = None,
791 memory_request: Optional[int] = None,
792 memory_limit: Optional[int] = None,
793 service_name: Optional[str] = None,
794 ports: Optional[List[int]] = None,
795 ip: Optional[str] = None,
796 deployed_by: Optional[List[str]] = None,
797 rank: Optional[int] = None,
798 rank_generation: Optional[int] = None,
799 ) -> None:
800
801 # Host is at the same granularity as InventoryHost
802 self.hostname: Optional[str] = hostname
803
804 # Not everyone runs in containers, but enough people do to
805 # justify having the container_id (runtime id) and container_image
806 # (image name)
807 self.container_id = container_id # runtime id
808 self.container_image_id = container_image_id # image id locally
809 self.container_image_name = container_image_name # image friendly name
810 self.container_image_digests = container_image_digests # reg hashes
811
812 # The type of service (osd, mon, mgr, etc.)
813 self.daemon_type = daemon_type
814
815 # The orchestrator will have picked some names for daemons,
816 # typically either based on hostnames or on pod names.
817 # This is the <foo> in mds.<foo>, the ID that will appear
818 # in the FSMap/ServiceMap.
819 self.daemon_id: Optional[str] = daemon_id
820
821 # Some daemon types have a numeric rank assigned
822 self.rank: Optional[int] = rank
823 self.rank_generation: Optional[int] = rank_generation
824
825 self._service_name: Optional[str] = service_name
826
827 # Service version that was deployed
828 self.version = version
829
830 # Service status: -1 error, 0 stopped, 1 running
831 self.status = status
832
833 # Service status description when status == error.
834 self.status_desc = status_desc
835
836 # datetime when this info was last refreshed
837 self.last_refresh: Optional[datetime.datetime] = last_refresh
838
839 self.created: Optional[datetime.datetime] = created
840 self.started: Optional[datetime.datetime] = started
841 self.last_configured: Optional[datetime.datetime] = last_configured
842 self.last_deployed: Optional[datetime.datetime] = last_deployed
843
844 # Affinity to a certain OSDSpec
845 self.osdspec_affinity: Optional[str] = osdspec_affinity
846
847 self.events: List[OrchestratorEvent] = events or []
848
849 self.memory_usage: Optional[int] = memory_usage
850 self.memory_request: Optional[int] = memory_request
851 self.memory_limit: Optional[int] = memory_limit
852
853 self.ports: Optional[List[int]] = ports
854 self.ip: Optional[str] = ip
855
856 self.deployed_by = deployed_by
857
858 self.is_active = is_active
859
860 def get_port_summary(self) -> str:
861 if not self.ports:
862 return ''
863 return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
864
865 def name(self) -> str:
866 return '%s.%s' % (self.daemon_type, self.daemon_id)
867
868 def matches_service(self, service_name: Optional[str]) -> bool:
869 assert self.daemon_id is not None
870 assert self.daemon_type is not None
871 if service_name:
872 return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
873 return False
874
875 def service_id(self) -> str:
876 assert self.daemon_id is not None
877 assert self.daemon_type is not None
878
879 if self._service_name:
880 if '.' in self._service_name:
881 return self._service_name.split('.', 1)[1]
882 else:
883 return ''
884
885 if self.daemon_type == 'osd':
886 if self.osdspec_affinity and self.osdspec_affinity != 'None':
887 return self.osdspec_affinity
888 return 'unmanaged'
889
890 def _match() -> str:
891 assert self.daemon_id is not None
892 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
893 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
894
895 if not self.hostname:
896 # TODO: can a DaemonDescription exist without a hostname?
897 raise err
898
899 # use the bare hostname, not the FQDN.
900 host = self.hostname.split('.')[0]
901
902 if host == self.daemon_id:
903 # daemon_id == "host"
904 return self.daemon_id
905
906 elif host in self.daemon_id:
907 # daemon_id == "service_id.host"
908 # daemon_id == "service_id.host.random"
909 pre, post = self.daemon_id.rsplit(host, 1)
910 if not pre.endswith('.'):
911 # '.' sep missing at front of host
912 raise err
913 elif post and not post.startswith('.'):
914 # '.' sep missing at end of host
915 raise err
916 return pre[:-1]
917
918 # daemon_id == "service_id.random"
919 if self.daemon_type == 'rgw':
920 v = self.daemon_id.split('.')
921 if len(v) in [3, 4]:
922 return '.'.join(v[0:2])
923
924 if self.daemon_type == 'iscsi':
925 v = self.daemon_id.split('.')
926 return '.'.join(v[0:-1])
927
928 # daemon_id == "service_id"
929 return self.daemon_id
930
931 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
932 return _match()
933
934 return self.daemon_id
935
936 def service_name(self) -> str:
937 if self._service_name:
938 return self._service_name
939 assert self.daemon_type is not None
940 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
941 return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
942 return daemon_type_to_service(self.daemon_type)
943
944 def __repr__(self) -> str:
945 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
946 id=self.daemon_id)
947
948 def to_json(self) -> dict:
949 out: Dict[str, Any] = OrderedDict()
950 out['daemon_type'] = self.daemon_type
951 out['daemon_id'] = self.daemon_id
952 out['service_name'] = self._service_name
953 out['hostname'] = self.hostname
954 out['container_id'] = self.container_id
955 out['container_image_id'] = self.container_image_id
956 out['container_image_name'] = self.container_image_name
957 out['container_image_digests'] = self.container_image_digests
958 out['memory_usage'] = self.memory_usage
959 out['memory_request'] = self.memory_request
960 out['memory_limit'] = self.memory_limit
961 out['version'] = self.version
962 out['status'] = self.status.value if self.status is not None else None
963 out['status_desc'] = self.status_desc
964 if self.daemon_type == 'osd':
965 out['osdspec_affinity'] = self.osdspec_affinity
966 out['is_active'] = self.is_active
967 out['ports'] = self.ports
968 out['ip'] = self.ip
969 out['rank'] = self.rank
970 out['rank_generation'] = self.rank_generation
971
972 for k in ['last_refresh', 'created', 'started', 'last_deployed',
973 'last_configured']:
974 if getattr(self, k):
975 out[k] = datetime_to_str(getattr(self, k))
976
977 if self.events:
978 out['events'] = [e.to_json() for e in self.events]
979
980 empty = [k for k, v in out.items() if v is None]
981 for e in empty:
982 del out[e]
983 return out
984
985 def to_dict(self) -> dict:
986 out: Dict[str, Any] = OrderedDict()
987 out['daemon_type'] = self.daemon_type
988 out['daemon_id'] = self.daemon_id
989 out['hostname'] = self.hostname
990 out['container_id'] = self.container_id
991 out['container_image_id'] = self.container_image_id
992 out['container_image_name'] = self.container_image_name
993 out['container_image_digests'] = self.container_image_digests
994 out['memory_usage'] = self.memory_usage
995 out['memory_request'] = self.memory_request
996 out['memory_limit'] = self.memory_limit
997 out['version'] = self.version
998 out['status'] = self.status.value if self.status is not None else None
999 out['status_desc'] = self.status_desc
1000 if self.daemon_type == 'osd':
1001 out['osdspec_affinity'] = self.osdspec_affinity
1002 out['is_active'] = self.is_active
1003 out['ports'] = self.ports
1004 out['ip'] = self.ip
1005
1006 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1007 'last_configured']:
1008 if getattr(self, k):
1009 out[k] = datetime_to_str(getattr(self, k))
1010
1011 if self.events:
1012 out['events'] = [e.to_dict() for e in self.events]
1013
1014 empty = [k for k, v in out.items() if v is None]
1015 for e in empty:
1016 del out[e]
1017 return out
1018
1019 @classmethod
1020 @handle_type_error
1021 def from_json(cls, data: dict) -> 'DaemonDescription':
1022 c = data.copy()
1023 event_strs = c.pop('events', [])
1024 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1025 'last_configured']:
1026 if k in c:
1027 c[k] = str_to_datetime(c[k])
1028 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1029 status_int = c.pop('status', None)
1030 status = DaemonDescriptionStatus(status_int) if status_int is not None else None
1031 return cls(events=events, status=status, **c)
1032
1033 def __copy__(self) -> 'DaemonDescription':
1034 # feel free to change this:
1035 return DaemonDescription.from_json(self.to_json())
1036
1037 @staticmethod
1038 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
1039 return dumper.represent_dict(data.to_json().items())
1040
1041
1042 yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1043
1044
1045 class ServiceDescription(object):
1046 """
1047 For responding to queries about the status of a particular service,
1048 stateful or stateless.
1049
1050 This is not about health or performance monitoring of services: it's
1051 about letting the orchestrator tell Ceph whether and where a
1052 service is scheduled in the cluster. When an orchestrator tells
1053 Ceph "it's running on host123", that's not a promise that the process
1054 is literally up this second, it's a description of where the orchestrator
1055 has decided the service should run.
1056 """
1057
1058 def __init__(self,
1059 spec: ServiceSpec,
1060 container_image_id: Optional[str] = None,
1061 container_image_name: Optional[str] = None,
1062 rados_config_location: Optional[str] = None,
1063 service_url: Optional[str] = None,
1064 last_refresh: Optional[datetime.datetime] = None,
1065 created: Optional[datetime.datetime] = None,
1066 deleted: Optional[datetime.datetime] = None,
1067 size: int = 0,
1068 running: int = 0,
1069 events: Optional[List['OrchestratorEvent']] = None,
1070 virtual_ip: Optional[str] = None,
1071 ports: List[int] = []) -> None:
1072 # Not everyone runs in containers, but enough people do to
1073 # justify having the container_image_id (image hash) and container_image
1074 # (image name)
1075 self.container_image_id = container_image_id # image hash
1076 self.container_image_name = container_image_name # image friendly name
1077
1078 # Location of the service configuration when stored in rados
1079 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1080 self.rados_config_location = rados_config_location
1081
1082 # If the service exposes REST-like API, this attribute should hold
1083 # the URL.
1084 self.service_url = service_url
1085
1086 # Number of daemons
1087 self.size = size
1088
1089 # Number of daemons up
1090 self.running = running
1091
1092 # datetime when this info was last refreshed
1093 self.last_refresh: Optional[datetime.datetime] = last_refresh
1094 self.created: Optional[datetime.datetime] = created
1095 self.deleted: Optional[datetime.datetime] = deleted
1096
1097 self.spec: ServiceSpec = spec
1098
1099 self.events: List[OrchestratorEvent] = events or []
1100
1101 self.virtual_ip = virtual_ip
1102 self.ports = ports
1103
1104 def service_type(self) -> str:
1105 return self.spec.service_type
1106
1107 def __repr__(self) -> str:
1108 return f"<ServiceDescription of {self.spec.one_line_str()}>"
1109
1110 def get_port_summary(self) -> str:
1111 if not self.ports:
1112 return ''
1113 return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1114
1115 def to_json(self) -> OrderedDict:
1116 out = self.spec.to_json()
1117 status = {
1118 'container_image_id': self.container_image_id,
1119 'container_image_name': self.container_image_name,
1120 'rados_config_location': self.rados_config_location,
1121 'service_url': self.service_url,
1122 'size': self.size,
1123 'running': self.running,
1124 'last_refresh': self.last_refresh,
1125 'created': self.created,
1126 'virtual_ip': self.virtual_ip,
1127 'ports': self.ports if self.ports else None,
1128 }
1129 for k in ['last_refresh', 'created']:
1130 if getattr(self, k):
1131 status[k] = datetime_to_str(getattr(self, k))
1132 status = {k: v for (k, v) in status.items() if v is not None}
1133 out['status'] = status
1134 if self.events:
1135 out['events'] = [e.to_json() for e in self.events]
1136 return out
1137
1138 def to_dict(self) -> OrderedDict:
1139 out = self.spec.to_json()
1140 status = {
1141 'container_image_id': self.container_image_id,
1142 'container_image_name': self.container_image_name,
1143 'rados_config_location': self.rados_config_location,
1144 'service_url': self.service_url,
1145 'size': self.size,
1146 'running': self.running,
1147 'last_refresh': self.last_refresh,
1148 'created': self.created,
1149 'virtual_ip': self.virtual_ip,
1150 'ports': self.ports if self.ports else None,
1151 }
1152 for k in ['last_refresh', 'created']:
1153 if getattr(self, k):
1154 status[k] = datetime_to_str(getattr(self, k))
1155 status = {k: v for (k, v) in status.items() if v is not None}
1156 out['status'] = status
1157 if self.events:
1158 out['events'] = [e.to_dict() for e in self.events]
1159 return out
1160
1161 @classmethod
1162 @handle_type_error
1163 def from_json(cls, data: dict) -> 'ServiceDescription':
1164 c = data.copy()
1165 status = c.pop('status', {})
1166 event_strs = c.pop('events', [])
1167 spec = ServiceSpec.from_json(c)
1168
1169 c_status = status.copy()
1170 for k in ['last_refresh', 'created']:
1171 if k in c_status:
1172 c_status[k] = str_to_datetime(c_status[k])
1173 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1174 return cls(spec=spec, events=events, **c_status)
1175
1176 @staticmethod
1177 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
1178 return dumper.represent_dict(data.to_json().items())
1179
1180
1181 yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
1182
1183
1184 class InventoryFilter(object):
1185 """
1186 When fetching inventory, use this filter to avoid unnecessarily
1187 scanning the whole estate.
1188
1189 Typical use: filter by host when presenting UI workflow for configuring
1190 a particular server.
1191 filter by label when not all of estate is Ceph servers,
1192 and we want to only learn about the Ceph servers.
1193 filter by label when we are interested particularly
1194 in e.g. OSD servers.
1195
1196 """
1197
1198 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
1199
1200 #: Optional: get info about hosts matching labels
1201 self.labels = labels
1202
1203 #: Optional: get info about certain named hosts only
1204 self.hosts = hosts
1205
1206
1207 class InventoryHost(object):
1208 """
1209 When fetching inventory, all Devices are groups inside of an
1210 InventoryHost.
1211 """
1212
1213 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
1214 if devices is None:
1215 devices = inventory.Devices([])
1216 if labels is None:
1217 labels = []
1218 assert isinstance(devices, inventory.Devices)
1219
1220 self.name = name # unique within cluster. For example a hostname.
1221 self.addr = addr or name
1222 self.devices = devices
1223 self.labels = labels
1224
1225 def to_json(self) -> dict:
1226 return {
1227 'name': self.name,
1228 'addr': self.addr,
1229 'devices': self.devices.to_json(),
1230 'labels': self.labels,
1231 }
1232
1233 @classmethod
1234 def from_json(cls, data: dict) -> 'InventoryHost':
1235 try:
1236 _data = copy.deepcopy(data)
1237 name = _data.pop('name')
1238 addr = _data.pop('addr', None) or name
1239 devices = inventory.Devices.from_json(_data.pop('devices'))
1240 labels = _data.pop('labels', list())
1241 if _data:
1242 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1243 raise OrchestratorValidationError(error_msg)
1244 return cls(name, devices, labels, addr)
1245 except KeyError as e:
1246 error_msg = '{} is required for {}'.format(e, cls.__name__)
1247 raise OrchestratorValidationError(error_msg)
1248 except TypeError as e:
1249 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1250
1251 @classmethod
1252 def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
1253 devs = inventory.Devices.from_json
1254 return [cls(item[0], devs(item[1].data)) for item in hosts]
1255
1256 def __repr__(self) -> str:
1257 return "<InventoryHost>({name})".format(name=self.name)
1258
1259 @staticmethod
1260 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
1261 return [host.name for host in hosts]
1262
1263 def __eq__(self, other: Any) -> bool:
1264 return self.name == other.name and self.devices == other.devices
1265
1266
1267 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1268 """
1269 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1270 on devices.
1271
1272 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1273
1274 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1275 See ``ceph osd metadata | jq '.[].device_ids'``
1276 """
1277 __slots__ = ()
1278
1279
1280 class OrchestratorEvent:
1281 """
1282 Similar to K8s Events.
1283
1284 Some form of "important" log message attached to something.
1285 """
1286 INFO = 'INFO'
1287 ERROR = 'ERROR'
1288 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1289
1290 def __init__(self, created: Union[str, datetime.datetime], kind: str,
1291 subject: str, level: str, message: str) -> None:
1292 if isinstance(created, str):
1293 created = str_to_datetime(created)
1294 self.created: datetime.datetime = created
1295
1296 assert kind in "service daemon".split()
1297 self.kind: str = kind
1298
1299 # service name, or daemon danem or something
1300 self.subject: str = subject
1301
1302 # Events are not meant for debugging. debugs should end in the log.
1303 assert level in "INFO ERROR".split()
1304 self.level = level
1305
1306 self.message: str = message
1307
1308 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1309
1310 def kind_subject(self) -> str:
1311 return f'{self.kind}:{self.subject}'
1312
1313 def to_json(self) -> str:
1314 # Make a long list of events readable.
1315 created = datetime_to_str(self.created)
1316 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1317
1318 def to_dict(self) -> dict:
1319 # Convert events data to dict.
1320 return {
1321 'created': datetime_to_str(self.created),
1322 'subject': self.kind_subject(),
1323 'level': self.level,
1324 'message': self.message
1325 }
1326
1327 @classmethod
1328 @handle_type_error
1329 def from_json(cls, data: str) -> "OrchestratorEvent":
1330 """
1331 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1332 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1333
1334 :param data:
1335 :return:
1336 """
1337 match = cls.regex_v1.match(data)
1338 if match:
1339 return cls(*match.groups())
1340 raise ValueError(f'Unable to match: "{data}"')
1341
1342 def __eq__(self, other: Any) -> bool:
1343 if not isinstance(other, OrchestratorEvent):
1344 return False
1345
1346 return self.created == other.created and self.kind == other.kind \
1347 and self.subject == other.subject and self.message == other.message
1348
1349
1350 def _mk_orch_methods(cls: Any) -> Any:
1351 # Needs to be defined outside of for.
1352 # Otherwise meth is always bound to last key
1353 def shim(method_name: str) -> Callable:
1354 def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
1355 completion = self._oremote(method_name, args, kwargs)
1356 return completion
1357 return inner
1358
1359 for meth in Orchestrator.__dict__:
1360 if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
1361 setattr(cls, meth, shim(meth))
1362 return cls
1363
1364
1365 @_mk_orch_methods
1366 class OrchestratorClientMixin(Orchestrator):
1367 """
1368 A module that inherents from `OrchestratorClientMixin` can directly call
1369 all :class:`Orchestrator` methods without manually calling remote.
1370
1371 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1372 calls :func:`OrchestratorClientMixin._oremote`
1373
1374 >>> class MyModule(OrchestratorClientMixin):
1375 ... def func(self):
1376 ... completion = self.add_host('somehost') # calls `_oremote()`
1377 ... self.log.debug(completion.result)
1378
1379 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1380 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1381 "real" implementation of the orchestrator.
1382
1383
1384 >>> import mgr_module
1385 >>> #doctest: +SKIP
1386 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1387 ... def __init__(self, ...):
1388 ... self.orch_client = OrchestratorClientMixin()
1389 ... self.orch_client.set_mgr(self.mgr))
1390 """
1391
1392 def set_mgr(self, mgr: MgrModule) -> None:
1393 """
1394 Useable in the Dashbord that uses a global ``mgr``
1395 """
1396
1397 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1398
1399 def __get_mgr(self) -> Any:
1400 try:
1401 return self.__mgr
1402 except AttributeError:
1403 return self
1404
1405 def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
1406 """
1407 Helper for invoking `remote` on whichever orchestrator is enabled
1408
1409 :raises RuntimeError: If the remote method failed.
1410 :raises OrchestratorError: orchestrator failed to perform
1411 :raises ImportError: no `orchestrator` module or backend not found.
1412 """
1413 mgr = self.__get_mgr()
1414
1415 try:
1416 o = mgr._select_orchestrator()
1417 except AttributeError:
1418 o = mgr.remote('orchestrator', '_select_orchestrator')
1419
1420 if o is None:
1421 raise NoOrchestrator()
1422
1423 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1424 try:
1425 return mgr.remote(o, meth, *args, **kwargs)
1426 except Exception as e:
1427 if meth == 'get_feature_set':
1428 raise # self.get_feature_set() calls self._oremote()
1429 f_set = self.get_feature_set()
1430 if meth not in f_set or not f_set[meth]['available']:
1431 raise NotImplementedError(f'{o} does not implement {meth}') from e
1432 raise