]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
3 ceph-mgr orchestrator interface
5 Please see the ceph-mgr module developer's guide for more information.
16 from collections
import namedtuple
, OrderedDict
17 from contextlib
import contextmanager
18 from functools
import wraps
, reduce, update_wrapper
20 from typing
import TypeVar
, Generic
, List
, Optional
, Union
, Tuple
, Iterator
, Callable
, Any
, \
21 Sequence
, Dict
, cast
, Mapping
24 from typing
import Protocol
# Protocol was added in Python 3.8
26 class Protocol
: # type: ignore
32 from ceph
.deployment
import inventory
33 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, \
34 IscsiServiceSpec
, IngressSpec
, SNMPGatewaySpec
35 from ceph
.deployment
.drive_group
import DriveGroupSpec
36 from ceph
.deployment
.hostspec
import HostSpec
, SpecValidationError
37 from ceph
.utils
import datetime_to_str
, str_to_datetime
39 from mgr_module
import MgrModule
, CLICommand
, HandleCommandResult
42 logger
= logging
.getLogger(__name__
)
45 FuncT
= TypeVar('FuncT', bound
=Callable
[..., Any
])
48 class OrchestratorError(Exception):
50 General orchestrator specific error.
52 Used for deployment, configuration or user errors.
54 It's not intended for programming errors or orchestrator internal errors.
59 errno
: int = -errno
.EINVAL
,
60 event_kind_subject
: Optional
[Tuple
[str, str]] = None) -> None:
61 super(Exception, self
).__init
__(msg
)
63 # See OrchestratorEvent.subject
64 self
.event_subject
= event_kind_subject
67 class NoOrchestrator(OrchestratorError
):
69 No orchestrator in configured.
72 def __init__(self
, msg
: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator
, self
).__init
__(msg
, errno
=-errno
.ENOENT
)
76 class OrchestratorValidationError(OrchestratorError
):
78 Raised when an orchestrator doesn't support a specific feature.
83 def set_exception_subject(kind
: str, subject
: str, overwrite
: bool = False) -> Iterator
[None]:
86 except OrchestratorError
as e
:
87 if overwrite
or hasattr(e
, 'event_subject'):
88 e
.event_subject
= (kind
, subject
)
92 def handle_exception(prefix
: str, perm
: str, func
: FuncT
) -> FuncT
:
94 def wrapper(*args
: Any
, **kwargs
: Any
) -> Any
:
96 return func(*args
, **kwargs
)
97 except (OrchestratorError
, SpecValidationError
) as e
:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e
.errno
, stderr
=str(e
))
100 except ImportError as e
:
101 return HandleCommandResult(-errno
.ENOENT
, stderr
=str(e
))
102 except NotImplementedError:
103 msg
= 'This Orchestrator does not support `{}`'.format(prefix
)
104 return HandleCommandResult(-errno
.ENOENT
, stderr
=msg
)
106 # misuse lambda to copy `wrapper`
107 wrapper_copy
= lambda *l_args
, **l_kwargs
: wrapper(*l_args
, **l_kwargs
) # noqa: E731
108 wrapper_copy
._prefix
= prefix
# type: ignore
109 wrapper_copy
._cli
_command
= CLICommand(prefix
, perm
) # type: ignore
110 wrapper_copy
._cli
_command
.store_func_metadata(func
) # type: ignore
111 wrapper_copy
._cli
_command
.func
= wrapper_copy
# type: ignore
113 return cast(FuncT
, wrapper_copy
)
116 def handle_orch_error(f
: Callable
[..., T
]) -> Callable
[..., 'OrchResult[T]']:
118 Decorator to make Orchestrator methods return
123 def wrapper(*args
: Any
, **kwargs
: Any
) -> OrchResult
[T
]:
125 return OrchResult(f(*args
, **kwargs
))
126 except Exception as e
:
129 if 'UNITTEST' in os
.environ
:
130 raise # This makes debugging of Tracebacks from unittests a bit easier
131 return OrchResult(None, exception
=e
)
133 return cast(Callable
[..., OrchResult
[T
]], wrapper
)
136 class InnerCliCommandCallable(Protocol
):
137 def __call__(self
, prefix
: str) -> Callable
[[FuncT
], FuncT
]:
141 def _cli_command(perm
: str) -> InnerCliCommandCallable
:
142 def inner_cli_command(prefix
: str) -> Callable
[[FuncT
], FuncT
]:
143 return lambda func
: handle_exception(prefix
, perm
, func
)
144 return inner_cli_command
147 _cli_read_command
= _cli_command('r')
148 _cli_write_command
= _cli_command('rw')
151 class CLICommandMeta(type):
153 This is a workaround for the use of a global variable CLICommand.COMMANDS which
154 prevents modules from importing any other module.
156 We make use of CLICommand, except for the use of the global variable.
158 def __init__(cls
, name
: str, bases
: Any
, dct
: Any
) -> None:
159 super(CLICommandMeta
, cls
).__init
__(name
, bases
, dct
)
160 dispatch
: Dict
[str, CLICommand
] = {}
161 for v
in dct
.values():
163 dispatch
[v
._prefix
] = v
._cli
_command
164 except AttributeError:
167 def handle_command(self
: Any
, inbuf
: Optional
[str], cmd
: dict) -> Any
:
168 if cmd
['prefix'] not in dispatch
:
169 return self
.handle_command(inbuf
, cmd
)
171 return dispatch
[cmd
['prefix']].call(self
, cmd
, inbuf
)
173 cls
.COMMANDS
= [cmd
.dump_cmd() for cmd
in dispatch
.values()]
174 cls
.handle_command
= handle_command
177 class OrchResult(Generic
[T
]):
179 Stores a result and an exception. Mainly to circumvent the
180 MgrModule.remote() method that hides all exceptions and for
181 handling different sub-interpreters.
184 def __init__(self
, result
: Optional
[T
], exception
: Optional
[Exception] = None) -> None:
186 self
.serialized_exception
: Optional
[bytes
] = None
187 self
.exception_str
: str = ''
188 self
.set_exception(exception
)
190 __slots__
= 'result', 'serialized_exception', 'exception_str'
192 def set_exception(self
, e
: Optional
[Exception]) -> None:
194 self
.serialized_exception
= None
195 self
.exception_str
= ''
198 self
.exception_str
= f
'{type(e)}: {str(e)}'
200 self
.serialized_exception
= pickle
.dumps(e
)
201 except pickle
.PicklingError
:
202 logger
.error(f
"failed to pickle {e}")
203 if isinstance(e
, Exception):
204 e
= Exception(*e
.args
)
206 e
= Exception(str(e
))
207 # degenerate to a plain Exception
208 self
.serialized_exception
= pickle
.dumps(e
)
210 def result_str(self
) -> str:
211 """Force a string."""
212 if self
.result
is None:
214 if isinstance(self
.result
, list):
215 return '\n'.join(str(x
) for x
in self
.result
)
216 return str(self
.result
)
219 def raise_if_exception(c
: OrchResult
[T
]) -> T
:
221 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
223 if c
.serialized_exception
is not None:
225 e
= pickle
.loads(c
.serialized_exception
)
226 except (KeyError, AttributeError):
227 raise Exception(c
.exception_str
)
229 assert c
.result
is not None, 'OrchResult should either have an exception or a result'
233 def _hide_in_features(f
: FuncT
) -> FuncT
:
234 f
._hide
_in
_features
= True # type: ignore
238 class Orchestrator(object):
240 Calls in this class may do long running remote operations, with time
241 periods ranging from network latencies to package install latencies and large
242 internet downloads. For that reason, all are asynchronous, and return
243 ``Completion`` objects.
245 Methods should only return the completion and not directly execute
246 anything, like network calls. Otherwise the purpose of
247 those completions is defeated.
249 Implementations are not required to start work on an operation until
250 the caller waits on the relevant Completion objects. Callers making
251 multiple updates should not wait on Completions until they're done
252 sending operations: this enables implementations to batch up a series
253 of updates when wait() is called on a set of Completion objects.
255 Implementations are encouraged to keep reasonably fresh caches of
256 the status of the system: it is better to serve a stale-but-recent
257 result read of e.g. device inventory than it is to keep the caller waiting
258 while you scan hosts every time.
262 def is_orchestrator_module(self
) -> bool:
264 Enable other modules to interrogate this module to discover
265 whether it's usable as an orchestrator module.
267 Subclasses do not need to override this.
272 def available(self
) -> Tuple
[bool, str, Dict
[str, Any
]]:
274 Report whether we can talk to the orchestrator. This is the
275 place to give the user a meaningful message if the orchestrator
276 isn't running or can't be contacted.
278 This method may be called frequently (e.g. every page load
279 to conditionally display a warning banner), so make sure it's
280 not too expensive. It's okay to give a slightly stale status
281 (e.g. based on a periodic background ping of the orchestrator)
282 if that's necessary to make this method fast.
285 `True` doesn't mean that the desired functionality
286 is actually available in the orchestrator. I.e. this
287 won't work as expected::
290 ... if OrchestratorClientMixin().available()[0]: # wrong.
291 ... OrchestratorClientMixin().get_hosts()
293 :return: boolean representing whether the module is available/usable
294 :return: string describing any error
295 :return: dict containing any module specific information
297 raise NotImplementedError()
300 def get_feature_set(self
) -> Dict
[str, dict]:
301 """Describes which methods this orchestrator implements
304 `True` doesn't mean that the desired functionality
305 is actually possible in the orchestrator. I.e. this
306 won't work as expected::
309 ... api = OrchestratorClientMixin()
310 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
313 It's better to ask for forgiveness instead::
317 ... OrchestratorClientMixin().get_hosts()
318 ... except (OrchestratorError, NotImplementedError):
321 :returns: Dict of API method names to ``{'available': True or False}``
323 module
= self
.__class
__
324 features
= {a
: {'available': getattr(Orchestrator
, a
, None) != getattr(module
, a
)}
325 for a
in Orchestrator
.__dict
__
326 if not a
.startswith('_') and not getattr(getattr(Orchestrator
, a
), '_hide_in_features', False)
330 def cancel_completions(self
) -> None:
332 Cancels ongoing completions. Unstuck the mgr.
334 raise NotImplementedError()
336 def pause(self
) -> None:
337 raise NotImplementedError()
339 def resume(self
) -> None:
340 raise NotImplementedError()
342 def add_host(self
, host_spec
: HostSpec
) -> OrchResult
[str]:
344 Add a host to the orchestrator inventory.
346 :param host: hostname
348 raise NotImplementedError()
350 def remove_host(self
, host
: str, force
: bool, offline
: bool) -> OrchResult
[str]:
352 Remove a host from the orchestrator inventory.
354 :param host: hostname
356 raise NotImplementedError()
358 def drain_host(self
, hostname
: str) -> OrchResult
[str]:
360 drain all daemons from a host
362 :param hostname: hostname
364 raise NotImplementedError()
366 def update_host_addr(self
, host
: str, addr
: str) -> OrchResult
[str]:
368 Update a host's address
370 :param host: hostname
371 :param addr: address (dns name or IP)
373 raise NotImplementedError()
375 def get_hosts(self
) -> OrchResult
[List
[HostSpec
]]:
377 Report the hosts in the cluster.
379 :return: list of HostSpec
381 raise NotImplementedError()
383 def get_facts(self
, hostname
: Optional
[str] = None) -> OrchResult
[List
[Dict
[str, Any
]]]:
385 Return hosts metadata(gather_facts).
387 raise NotImplementedError()
389 def add_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
393 raise NotImplementedError()
395 def remove_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
399 raise NotImplementedError()
401 def host_ok_to_stop(self
, hostname
: str) -> OrchResult
:
403 Check if the specified host can be safely stopped without reducing availability
405 :param host: hostname
407 raise NotImplementedError()
409 def enter_host_maintenance(self
, hostname
: str, force
: bool = False) -> OrchResult
:
411 Place a host in maintenance, stopping daemons and disabling it's systemd target
413 raise NotImplementedError()
415 def exit_host_maintenance(self
, hostname
: str) -> OrchResult
:
417 Return a host from maintenance, restarting the clusters systemd target
419 raise NotImplementedError()
421 def get_inventory(self
, host_filter
: Optional
['InventoryFilter'] = None, refresh
: bool = False) -> OrchResult
[List
['InventoryHost']]:
423 Returns something that was created by `ceph-volume inventory`.
425 :return: list of InventoryHost
427 raise NotImplementedError()
429 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['ServiceDescription']]:
431 Describe a service (of any kind) that is already configured in
432 the orchestrator. For example, when viewing an OSD in the dashboard
433 we might like to also display information about the orchestrator's
434 view of the service (like the kubernetes pod ID).
436 When viewing a CephFS filesystem in the dashboard, we would use this
437 to display the pods being currently run for MDS daemons.
439 :return: list of ServiceDescription objects.
441 raise NotImplementedError()
443 def list_daemons(self
, service_name
: Optional
[str] = None, daemon_type
: Optional
[str] = None, daemon_id
: Optional
[str] = None, host
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['DaemonDescription']]:
445 Describe a daemon (of any kind) that is already configured in
448 :return: list of DaemonDescription objects.
450 raise NotImplementedError()
453 def apply(self
, specs
: Sequence
["GenericSpec"], no_overwrite
: bool = False) -> List
[str]:
457 fns
: Dict
[str, Callable
[..., OrchResult
[str]]] = {
458 'alertmanager': self
.apply_alertmanager
,
459 'crash': self
.apply_crash
,
460 'grafana': self
.apply_grafana
,
461 'iscsi': self
.apply_iscsi
,
462 'mds': self
.apply_mds
,
463 'mgr': self
.apply_mgr
,
464 'mon': self
.apply_mon
,
465 'nfs': self
.apply_nfs
,
466 'node-exporter': self
.apply_node_exporter
,
467 'osd': lambda dg
: self
.apply_drivegroups([dg
]), # type: ignore
468 'prometheus': self
.apply_prometheus
,
469 'rbd-mirror': self
.apply_rbd_mirror
,
470 'rgw': self
.apply_rgw
,
471 'ingress': self
.apply_ingress
,
472 'snmp-gateway': self
.apply_snmp_gateway
,
473 'host': self
.add_host
,
476 def merge(l
: OrchResult
[List
[str]], r
: OrchResult
[str]) -> OrchResult
[List
[str]]: # noqa: E741
477 l_res
= raise_if_exception(l
)
478 r_res
= raise_if_exception(r
)
480 return OrchResult(l_res
)
481 return raise_if_exception(reduce(merge
, [fns
[spec
.service_type
](spec
) for spec
in specs
], OrchResult([])))
483 def plan(self
, spec
: Sequence
["GenericSpec"]) -> OrchResult
[List
]:
485 Plan (Dry-run, Preview) a List of Specs.
487 raise NotImplementedError()
489 def remove_daemons(self
, names
: List
[str]) -> OrchResult
[List
[str]]:
491 Remove specific daemon(s).
495 raise NotImplementedError()
497 def remove_service(self
, service_name
: str, force
: bool = False) -> OrchResult
[str]:
499 Remove a service (a collection of daemons).
503 raise NotImplementedError()
505 def service_action(self
, action
: str, service_name
: str) -> OrchResult
[List
[str]]:
507 Perform an action (start/stop/reload) on a service (i.e., all daemons
508 providing the logical service).
510 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
511 :param service_name: service_type + '.' + service_id
512 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
515 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
516 raise NotImplementedError()
518 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> OrchResult
[str]:
520 Perform an action (start/stop/reload) on a daemon.
522 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
523 :param daemon_name: name of daemon
524 :param image: Container image when redeploying that daemon
527 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
528 raise NotImplementedError()
530 def create_osds(self
, drive_group
: DriveGroupSpec
) -> OrchResult
[str]:
532 Create one or more OSDs within a single Drive Group.
534 The principal argument here is the drive_group member
535 of OsdSpec: other fields are advisory/extensible for any
536 finer-grained OSD feature enablement (choice of backing store,
537 compression/encryption, etc).
539 raise NotImplementedError()
541 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> OrchResult
[List
[str]]:
542 """ Update OSD cluster """
543 raise NotImplementedError()
545 def set_unmanaged_flag(self
,
546 unmanaged_flag
: bool,
547 service_type
: str = 'osd',
548 service_name
: Optional
[str] = None
549 ) -> HandleCommandResult
:
550 raise NotImplementedError()
552 def preview_osdspecs(self
,
553 osdspec_name
: Optional
[str] = 'osd',
554 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
555 ) -> OrchResult
[str]:
556 """ Get a preview for OSD deployments """
557 raise NotImplementedError()
559 def remove_osds(self
, osd_ids
: List
[str],
560 replace
: bool = False,
562 zap
: bool = False) -> OrchResult
[str]:
564 :param osd_ids: list of OSD IDs
565 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
566 :param force: Forces the OSD removal process without waiting for the data to be drained first.
567 :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
570 .. note:: this can only remove OSDs that were successfully
571 created (i.e. got an OSD ID).
573 raise NotImplementedError()
575 def stop_remove_osds(self
, osd_ids
: List
[str]) -> OrchResult
:
579 raise NotImplementedError()
581 def remove_osds_status(self
) -> OrchResult
:
583 Returns a status of the ongoing OSD removal operations.
585 raise NotImplementedError()
587 def blink_device_light(self
, ident_fault
: str, on
: bool, locations
: List
['DeviceLightLoc']) -> OrchResult
[List
[str]]:
589 Instructs the orchestrator to enable or disable either the ident or the fault LED.
591 :param ident_fault: either ``"ident"`` or ``"fault"``
592 :param on: ``True`` = on.
593 :param locations: See :class:`orchestrator.DeviceLightLoc`
595 raise NotImplementedError()
597 def zap_device(self
, host
: str, path
: str) -> OrchResult
[str]:
598 """Zap/Erase a device (DESTROYS DATA)"""
599 raise NotImplementedError()
601 def add_daemon(self
, spec
: ServiceSpec
) -> OrchResult
[List
[str]]:
602 """Create daemons daemon(s) for unmanaged services"""
603 raise NotImplementedError()
605 def apply_mon(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
606 """Update mon cluster"""
607 raise NotImplementedError()
609 def apply_mgr(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
610 """Update mgr cluster"""
611 raise NotImplementedError()
613 def apply_mds(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
614 """Update MDS cluster"""
615 raise NotImplementedError()
617 def apply_rgw(self
, spec
: RGWSpec
) -> OrchResult
[str]:
618 """Update RGW cluster"""
619 raise NotImplementedError()
621 def apply_ingress(self
, spec
: IngressSpec
) -> OrchResult
[str]:
622 """Update ingress daemons"""
623 raise NotImplementedError()
625 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
626 """Update rbd-mirror cluster"""
627 raise NotImplementedError()
629 def apply_nfs(self
, spec
: NFSServiceSpec
) -> OrchResult
[str]:
630 """Update NFS cluster"""
631 raise NotImplementedError()
633 def apply_iscsi(self
, spec
: IscsiServiceSpec
) -> OrchResult
[str]:
634 """Update iscsi cluster"""
635 raise NotImplementedError()
637 def apply_prometheus(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
638 """Update prometheus cluster"""
639 raise NotImplementedError()
641 def apply_node_exporter(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
642 """Update existing a Node-Exporter daemon(s)"""
643 raise NotImplementedError()
645 def apply_crash(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
646 """Update existing a crash daemon(s)"""
647 raise NotImplementedError()
649 def apply_grafana(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
650 """Update existing a grafana service"""
651 raise NotImplementedError()
653 def apply_alertmanager(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
654 """Update an existing AlertManager daemon(s)"""
655 raise NotImplementedError()
657 def apply_snmp_gateway(self
, spec
: SNMPGatewaySpec
) -> OrchResult
[str]:
658 """Update an existing snmp gateway service"""
659 raise NotImplementedError()
661 def upgrade_check(self
, image
: Optional
[str], version
: Optional
[str]) -> OrchResult
[str]:
662 raise NotImplementedError()
664 def upgrade_ls(self
, image
: Optional
[str], tags
: bool) -> OrchResult
[Dict
[Any
, Any
]]:
665 raise NotImplementedError()
667 def upgrade_start(self
, image
: Optional
[str], version
: Optional
[str]) -> OrchResult
[str]:
668 raise NotImplementedError()
670 def upgrade_pause(self
) -> OrchResult
[str]:
671 raise NotImplementedError()
673 def upgrade_resume(self
) -> OrchResult
[str]:
674 raise NotImplementedError()
676 def upgrade_stop(self
) -> OrchResult
[str]:
677 raise NotImplementedError()
679 def upgrade_status(self
) -> OrchResult
['UpgradeStatusSpec']:
681 If an upgrade is currently underway, report on where
682 we are in the process, or if some error has occurred.
684 :return: UpgradeStatusSpec instance
686 raise NotImplementedError()
689 def upgrade_available(self
) -> OrchResult
:
691 Report on what versions are available to upgrade to
693 :return: List of strings
695 raise NotImplementedError()
698 GenericSpec
= Union
[ServiceSpec
, HostSpec
]
701 def json_to_generic_spec(spec
: dict) -> GenericSpec
:
702 if 'service_type' in spec
and spec
['service_type'] == 'host':
703 return HostSpec
.from_json(spec
)
705 return ServiceSpec
.from_json(spec
)
708 def daemon_type_to_service(dtype
: str) -> str:
715 'haproxy': 'ingress',
716 'keepalived': 'ingress',
718 'rbd-mirror': 'rbd-mirror',
719 'cephfs-mirror': 'cephfs-mirror',
721 'grafana': 'grafana',
722 'alertmanager': 'alertmanager',
723 'prometheus': 'prometheus',
724 'node-exporter': 'node-exporter',
726 'crashcollector': 'crash', # Specific Rook Daemon
727 'container': 'container',
729 'snmp-gateway': 'snmp-gateway',
731 return mapping
[dtype
]
734 def service_to_daemon_types(stype
: str) -> List
[str]:
741 'ingress': ['haproxy', 'keepalived'],
743 'rbd-mirror': ['rbd-mirror'],
744 'cephfs-mirror': ['cephfs-mirror'],
746 'grafana': ['grafana'],
747 'alertmanager': ['alertmanager'],
748 'prometheus': ['prometheus'],
749 'node-exporter': ['node-exporter'],
751 'container': ['container'],
753 'snmp-gateway': ['snmp-gateway'],
755 return mapping
[stype
]
758 KNOWN_DAEMON_TYPES
: List
[str] = list(
759 sum((service_to_daemon_types(t
) for t
in ServiceSpec
.KNOWN_SERVICE_TYPES
), []))
762 class UpgradeStatusSpec(object):
763 # Orchestrator's report on what's going on with any ongoing upgrade
764 def __init__(self
) -> None:
765 self
.in_progress
= False # Is an upgrade underway?
766 self
.target_image
: Optional
[str] = None
767 self
.services_complete
: List
[str] = [] # Which daemon types are fully updated?
768 self
.progress
: Optional
[str] = None # How many of the daemons have we upgraded
769 self
.message
= "" # Freeform description
772 def handle_type_error(method
: FuncT
) -> FuncT
:
774 def inner(cls
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
776 return method(cls
, *args
, **kwargs
)
777 except TypeError as e
:
778 error_msg
= '{}: {}'.format(cls
.__name
__, e
)
779 raise OrchestratorValidationError(error_msg
)
780 return cast(FuncT
, inner
)
783 class DaemonDescriptionStatus(enum
.IntEnum
):
788 starting
= 2 #: Daemon is deployed, but not yet running
791 def to_str(status
: Optional
['DaemonDescriptionStatus']) -> str:
793 status
= DaemonDescriptionStatus
.unknown
795 DaemonDescriptionStatus
.unknown
: 'unknown',
796 DaemonDescriptionStatus
.error
: 'error',
797 DaemonDescriptionStatus
.stopped
: 'stopped',
798 DaemonDescriptionStatus
.running
: 'running',
799 DaemonDescriptionStatus
.starting
: 'starting',
800 }.get(status
, '<unknown>')
803 class DaemonDescription(object):
805 For responding to queries about the status of a particular daemon,
806 stateful or stateless.
808 This is not about health or performance monitoring of daemons: it's
809 about letting the orchestrator tell Ceph whether and where a
810 daemon is scheduled in the cluster. When an orchestrator tells
811 Ceph "it's running on host123", that's not a promise that the process
812 is literally up this second, it's a description of where the orchestrator
813 has decided the daemon should run.
817 daemon_type
: Optional
[str] = None,
818 daemon_id
: Optional
[str] = None,
819 hostname
: Optional
[str] = None,
820 container_id
: Optional
[str] = None,
821 container_image_id
: Optional
[str] = None,
822 container_image_name
: Optional
[str] = None,
823 container_image_digests
: Optional
[List
[str]] = None,
824 version
: Optional
[str] = None,
825 status
: Optional
[DaemonDescriptionStatus
] = None,
826 status_desc
: Optional
[str] = None,
827 last_refresh
: Optional
[datetime
.datetime
] = None,
828 created
: Optional
[datetime
.datetime
] = None,
829 started
: Optional
[datetime
.datetime
] = None,
830 last_configured
: Optional
[datetime
.datetime
] = None,
831 osdspec_affinity
: Optional
[str] = None,
832 last_deployed
: Optional
[datetime
.datetime
] = None,
833 events
: Optional
[List
['OrchestratorEvent']] = None,
834 is_active
: bool = False,
835 memory_usage
: Optional
[int] = None,
836 memory_request
: Optional
[int] = None,
837 memory_limit
: Optional
[int] = None,
838 service_name
: Optional
[str] = None,
839 ports
: Optional
[List
[int]] = None,
840 ip
: Optional
[str] = None,
841 deployed_by
: Optional
[List
[str]] = None,
842 rank
: Optional
[int] = None,
843 rank_generation
: Optional
[int] = None,
844 extra_container_args
: Optional
[List
[str]] = None,
847 #: Host is at the same granularity as InventoryHost
848 self
.hostname
: Optional
[str] = hostname
850 # Not everyone runs in containers, but enough people do to
851 # justify having the container_id (runtime id) and container_image
853 self
.container_id
= container_id
# runtime id
854 self
.container_image_id
= container_image_id
# image id locally
855 self
.container_image_name
= container_image_name
# image friendly name
856 self
.container_image_digests
= container_image_digests
# reg hashes
858 #: The type of service (osd, mon, mgr, etc.)
859 self
.daemon_type
= daemon_type
861 #: The orchestrator will have picked some names for daemons,
862 #: typically either based on hostnames or on pod names.
863 #: This is the <foo> in mds.<foo>, the ID that will appear
864 #: in the FSMap/ServiceMap.
865 self
.daemon_id
: Optional
[str] = daemon_id
866 self
.daemon_name
= self
.name()
868 #: Some daemon types have a numeric rank assigned
869 self
.rank
: Optional
[int] = rank
870 self
.rank_generation
: Optional
[int] = rank_generation
872 self
._service
_name
: Optional
[str] = service_name
874 #: Service version that was deployed
875 self
.version
= version
877 # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
878 self
._status
= status
880 #: Service status description when status == error.
881 self
.status_desc
= status_desc
883 #: datetime when this info was last refreshed
884 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
886 self
.created
: Optional
[datetime
.datetime
] = created
887 self
.started
: Optional
[datetime
.datetime
] = started
888 self
.last_configured
: Optional
[datetime
.datetime
] = last_configured
889 self
.last_deployed
: Optional
[datetime
.datetime
] = last_deployed
891 #: Affinity to a certain OSDSpec
892 self
.osdspec_affinity
: Optional
[str] = osdspec_affinity
894 self
.events
: List
[OrchestratorEvent
] = events
or []
896 self
.memory_usage
: Optional
[int] = memory_usage
897 self
.memory_request
: Optional
[int] = memory_request
898 self
.memory_limit
: Optional
[int] = memory_limit
900 self
.ports
: Optional
[List
[int]] = ports
901 self
.ip
: Optional
[str] = ip
903 self
.deployed_by
= deployed_by
905 self
.is_active
= is_active
907 self
.extra_container_args
= extra_container_args
910 def status(self
) -> Optional
[DaemonDescriptionStatus
]:
914 def status(self
, new
: DaemonDescriptionStatus
) -> None:
916 self
.status_desc
= DaemonDescriptionStatus
.to_str(new
)
918 def get_port_summary(self
) -> str:
921 return f
"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
923 def name(self
) -> str:
924 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
926 def matches_service(self
, service_name
: Optional
[str]) -> bool:
927 assert self
.daemon_id
is not None
928 assert self
.daemon_type
is not None
930 return (daemon_type_to_service(self
.daemon_type
) + '.' + self
.daemon_id
).startswith(service_name
+ '.')
933 def service_id(self
) -> str:
934 assert self
.daemon_id
is not None
935 assert self
.daemon_type
is not None
937 if self
._service
_name
:
938 if '.' in self
._service
_name
:
939 return self
._service
_name
.split('.', 1)[1]
943 if self
.daemon_type
== 'osd':
944 if self
.osdspec_affinity
and self
.osdspec_affinity
!= 'None':
945 return self
.osdspec_affinity
949 assert self
.daemon_id
is not None
950 err
= OrchestratorError("DaemonDescription: Cannot calculate service_id: "
951 f
"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
953 if not self
.hostname
:
954 # TODO: can a DaemonDescription exist without a hostname?
957 # use the bare hostname, not the FQDN.
958 host
= self
.hostname
.split('.')[0]
960 if host
== self
.daemon_id
:
961 # daemon_id == "host"
962 return self
.daemon_id
964 elif host
in self
.daemon_id
:
965 # daemon_id == "service_id.host"
966 # daemon_id == "service_id.host.random"
967 pre
, post
= self
.daemon_id
.rsplit(host
, 1)
968 if not pre
.endswith('.'):
969 # '.' sep missing at front of host
971 elif post
and not post
.startswith('.'):
972 # '.' sep missing at end of host
976 # daemon_id == "service_id.random"
977 if self
.daemon_type
== 'rgw':
978 v
= self
.daemon_id
.split('.')
980 return '.'.join(v
[0:2])
982 if self
.daemon_type
== 'iscsi':
983 v
= self
.daemon_id
.split('.')
984 return '.'.join(v
[0:-1])
986 # daemon_id == "service_id"
987 return self
.daemon_id
989 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
992 return self
.daemon_id
994 def service_name(self
) -> str:
995 if self
._service
_name
:
996 return self
._service
_name
997 assert self
.daemon_type
is not None
998 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
999 return f
'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
1000 return daemon_type_to_service(self
.daemon_type
)
1002 def __repr__(self
) -> str:
1003 return "<DaemonDescription>({type}.{id})".format(type=self
.daemon_type
,
1006 def __str__(self
) -> str:
1007 return f
"{self.name()} in status {self.status_desc} on {self.hostname}"
1009 def to_json(self
) -> dict:
1010 out
: Dict
[str, Any
] = OrderedDict()
1011 out
['daemon_type'] = self
.daemon_type
1012 out
['daemon_id'] = self
.daemon_id
1013 out
['service_name'] = self
._service
_name
1014 out
['daemon_name'] = self
.name()
1015 out
['hostname'] = self
.hostname
1016 out
['container_id'] = self
.container_id
1017 out
['container_image_id'] = self
.container_image_id
1018 out
['container_image_name'] = self
.container_image_name
1019 out
['container_image_digests'] = self
.container_image_digests
1020 out
['memory_usage'] = self
.memory_usage
1021 out
['memory_request'] = self
.memory_request
1022 out
['memory_limit'] = self
.memory_limit
1023 out
['version'] = self
.version
1024 out
['status'] = self
.status
.value
if self
.status
is not None else None
1025 out
['status_desc'] = self
.status_desc
1026 if self
.daemon_type
== 'osd':
1027 out
['osdspec_affinity'] = self
.osdspec_affinity
1028 out
['is_active'] = self
.is_active
1029 out
['ports'] = self
.ports
1031 out
['rank'] = self
.rank
1032 out
['rank_generation'] = self
.rank_generation
1034 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1036 if getattr(self
, k
):
1037 out
[k
] = datetime_to_str(getattr(self
, k
))
1040 out
['events'] = [e
.to_json() for e
in self
.events
]
1042 empty
= [k
for k
, v
in out
.items() if v
is None]
1047 def to_dict(self
) -> dict:
1048 out
: Dict
[str, Any
] = OrderedDict()
1049 out
['daemon_type'] = self
.daemon_type
1050 out
['daemon_id'] = self
.daemon_id
1051 out
['daemon_name'] = self
.name()
1052 out
['hostname'] = self
.hostname
1053 out
['container_id'] = self
.container_id
1054 out
['container_image_id'] = self
.container_image_id
1055 out
['container_image_name'] = self
.container_image_name
1056 out
['container_image_digests'] = self
.container_image_digests
1057 out
['memory_usage'] = self
.memory_usage
1058 out
['memory_request'] = self
.memory_request
1059 out
['memory_limit'] = self
.memory_limit
1060 out
['version'] = self
.version
1061 out
['status'] = self
.status
.value
if self
.status
is not None else None
1062 out
['status_desc'] = self
.status_desc
1063 if self
.daemon_type
== 'osd':
1064 out
['osdspec_affinity'] = self
.osdspec_affinity
1065 out
['is_active'] = self
.is_active
1066 out
['ports'] = self
.ports
1069 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1071 if getattr(self
, k
):
1072 out
[k
] = datetime_to_str(getattr(self
, k
))
1075 out
['events'] = [e
.to_dict() for e
in self
.events
]
1077 empty
= [k
for k
, v
in out
.items() if v
is None]
1084 def from_json(cls
, data
: dict) -> 'DaemonDescription':
1086 event_strs
= c
.pop('events', [])
1087 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1090 c
[k
] = str_to_datetime(c
[k
])
1091 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1092 status_int
= c
.pop('status', None)
1093 if 'daemon_name' in c
:
1094 del c
['daemon_name']
1095 if 'service_name' in c
and c
['service_name'].startswith('osd.'):
1096 # if the service_name is a osd.NNN (numeric osd id) then
1097 # ignore it -- it is not a valid service_name and
1098 # (presumably) came from an older version of cephadm.
1100 int(c
['service_name'][4:])
1101 del c
['service_name']
1104 status
= DaemonDescriptionStatus(status_int
) if status_int
is not None else None
1105 return cls(events
=events
, status
=status
, **c
)
1107 def __copy__(self
) -> 'DaemonDescription':
1108 # feel free to change this:
1109 return DaemonDescription
.from_json(self
.to_json())
1112 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'DaemonDescription') -> Any
:
1113 return dumper
.represent_dict(cast(Mapping
, data
.to_json().items()))
1116 yaml
.add_representer(DaemonDescription
, DaemonDescription
.yaml_representer
)
1119 class ServiceDescription(object):
1121 For responding to queries about the status of a particular service,
1122 stateful or stateless.
1124 This is not about health or performance monitoring of services: it's
1125 about letting the orchestrator tell Ceph whether and where a
1126 service is scheduled in the cluster. When an orchestrator tells
1127 Ceph "it's running on host123", that's not a promise that the process
1128 is literally up this second, it's a description of where the orchestrator
1129 has decided the service should run.
1134 container_image_id
: Optional
[str] = None,
1135 container_image_name
: Optional
[str] = None,
1136 service_url
: Optional
[str] = None,
1137 last_refresh
: Optional
[datetime
.datetime
] = None,
1138 created
: Optional
[datetime
.datetime
] = None,
1139 deleted
: Optional
[datetime
.datetime
] = None,
1142 events
: Optional
[List
['OrchestratorEvent']] = None,
1143 virtual_ip
: Optional
[str] = None,
1144 ports
: List
[int] = []) -> None:
1145 # Not everyone runs in containers, but enough people do to
1146 # justify having the container_image_id (image hash) and container_image
1148 self
.container_image_id
= container_image_id
# image hash
1149 self
.container_image_name
= container_image_name
# image friendly name
1151 # If the service exposes REST-like API, this attribute should hold
1153 self
.service_url
= service_url
1158 # Number of daemons up
1159 self
.running
= running
1161 # datetime when this info was last refreshed
1162 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
1163 self
.created
: Optional
[datetime
.datetime
] = created
1164 self
.deleted
: Optional
[datetime
.datetime
] = deleted
1166 self
.spec
: ServiceSpec
= spec
1168 self
.events
: List
[OrchestratorEvent
] = events
or []
1170 self
.virtual_ip
= virtual_ip
1173 def service_type(self
) -> str:
1174 return self
.spec
.service_type
1176 def __repr__(self
) -> str:
1177 return f
"<ServiceDescription of {self.spec.one_line_str()}>"
1179 def get_port_summary(self
) -> str:
1182 return f
"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1184 def to_json(self
) -> OrderedDict
:
1185 out
= self
.spec
.to_json()
1187 'container_image_id': self
.container_image_id
,
1188 'container_image_name': self
.container_image_name
,
1189 'service_url': self
.service_url
,
1191 'running': self
.running
,
1192 'last_refresh': self
.last_refresh
,
1193 'created': self
.created
,
1194 'virtual_ip': self
.virtual_ip
,
1195 'ports': self
.ports
if self
.ports
else None,
1197 for k
in ['last_refresh', 'created']:
1198 if getattr(self
, k
):
1199 status
[k
] = datetime_to_str(getattr(self
, k
))
1200 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1201 out
['status'] = status
1203 out
['events'] = [e
.to_json() for e
in self
.events
]
1206 def to_dict(self
) -> OrderedDict
:
1207 out
= self
.spec
.to_json()
1209 'container_image_id': self
.container_image_id
,
1210 'container_image_name': self
.container_image_name
,
1211 'service_url': self
.service_url
,
1213 'running': self
.running
,
1214 'last_refresh': self
.last_refresh
,
1215 'created': self
.created
,
1216 'virtual_ip': self
.virtual_ip
,
1217 'ports': self
.ports
if self
.ports
else None,
1219 for k
in ['last_refresh', 'created']:
1220 if getattr(self
, k
):
1221 status
[k
] = datetime_to_str(getattr(self
, k
))
1222 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1223 out
['status'] = status
1225 out
['events'] = [e
.to_dict() for e
in self
.events
]
1230 def from_json(cls
, data
: dict) -> 'ServiceDescription':
1232 status
= c
.pop('status', {})
1233 event_strs
= c
.pop('events', [])
1234 spec
= ServiceSpec
.from_json(c
)
1236 c_status
= status
.copy()
1237 for k
in ['last_refresh', 'created']:
1239 c_status
[k
] = str_to_datetime(c_status
[k
])
1240 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1241 return cls(spec
=spec
, events
=events
, **c_status
)
1244 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'ServiceDescription') -> Any
:
1245 return dumper
.represent_dict(cast(Mapping
, data
.to_json().items()))
1248 yaml
.add_representer(ServiceDescription
, ServiceDescription
.yaml_representer
)
1251 class InventoryFilter(object):
1253 When fetching inventory, use this filter to avoid unnecessarily
1254 scanning the whole estate.
1258 filter by host when presentig UI workflow for configuring
1259 a particular server.
1260 filter by label when not all of estate is Ceph servers,
1261 and we want to only learn about the Ceph servers.
1262 filter by label when we are interested particularly
1263 in e.g. OSD servers.
1266 def __init__(self
, labels
: Optional
[List
[str]] = None, hosts
: Optional
[List
[str]] = None) -> None:
1268 #: Optional: get info about hosts matching labels
1269 self
.labels
= labels
1271 #: Optional: get info about certain named hosts only
1275 class InventoryHost(object):
1277 When fetching inventory, all Devices are groups inside of an
1281 def __init__(self
, name
: str, devices
: Optional
[inventory
.Devices
] = None, labels
: Optional
[List
[str]] = None, addr
: Optional
[str] = None) -> None:
1283 devices
= inventory
.Devices([])
1286 assert isinstance(devices
, inventory
.Devices
)
1288 self
.name
= name
# unique within cluster. For example a hostname.
1289 self
.addr
= addr
or name
1290 self
.devices
= devices
1291 self
.labels
= labels
1293 def to_json(self
) -> dict:
1297 'devices': self
.devices
.to_json(),
1298 'labels': self
.labels
,
1302 def from_json(cls
, data
: dict) -> 'InventoryHost':
1304 _data
= copy
.deepcopy(data
)
1305 name
= _data
.pop('name')
1306 addr
= _data
.pop('addr', None) or name
1307 devices
= inventory
.Devices
.from_json(_data
.pop('devices'))
1308 labels
= _data
.pop('labels', list())
1310 error_msg
= 'Unknown key(s) in Inventory: {}'.format(','.join(_data
.keys()))
1311 raise OrchestratorValidationError(error_msg
)
1312 return cls(name
, devices
, labels
, addr
)
1313 except KeyError as e
:
1314 error_msg
= '{} is required for {}'.format(e
, cls
.__name
__)
1315 raise OrchestratorValidationError(error_msg
)
1316 except TypeError as e
:
1317 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e
))
1320 def from_nested_items(cls
, hosts
: List
[dict]) -> List
['InventoryHost']:
1321 devs
= inventory
.Devices
.from_json
1322 return [cls(item
[0], devs(item
[1].data
)) for item
in hosts
]
1324 def __repr__(self
) -> str:
1325 return "<InventoryHost>({name})".format(name
=self
.name
)
1328 def get_host_names(hosts
: List
['InventoryHost']) -> List
[str]:
1329 return [host
.name
for host
in hosts
]
1331 def __eq__(self
, other
: Any
) -> bool:
1332 return self
.name
== other
.name
and self
.devices
== other
.devices
1335 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1337 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1340 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1342 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1343 See ``ceph osd metadata | jq '.[].device_ids'``
1348 class OrchestratorEvent
:
1350 Similar to K8s Events.
1352 Some form of "important" log message attached to something.
1356 regex_v1
= re
.compile(r
'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re
.MULTILINE
)
1358 def __init__(self
, created
: Union
[str, datetime
.datetime
], kind
: str,
1359 subject
: str, level
: str, message
: str) -> None:
1360 if isinstance(created
, str):
1361 created
= str_to_datetime(created
)
1362 self
.created
: datetime
.datetime
= created
1364 assert kind
in "service daemon".split()
1365 self
.kind
: str = kind
1367 # service name, or daemon danem or something
1368 self
.subject
: str = subject
1370 # Events are not meant for debugging. debugs should end in the log.
1371 assert level
in "INFO ERROR".split()
1374 self
.message
: str = message
1376 __slots__
= ('created', 'kind', 'subject', 'level', 'message')
1378 def kind_subject(self
) -> str:
1379 return f
'{self.kind}:{self.subject}'
1381 def to_json(self
) -> str:
1382 # Make a long list of events readable.
1383 created
= datetime_to_str(self
.created
)
1384 return f
'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1386 def to_dict(self
) -> dict:
1387 # Convert events data to dict.
1389 'created': datetime_to_str(self
.created
),
1390 'subject': self
.kind_subject(),
1391 'level': self
.level
,
1392 'message': self
.message
1397 def from_json(cls
, data
: str) -> "OrchestratorEvent":
1399 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1400 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1405 match
= cls
.regex_v1
.match(data
)
1407 return cls(*match
.groups())
1408 raise ValueError(f
'Unable to match: "{data}"')
1410 def __eq__(self
, other
: Any
) -> bool:
1411 if not isinstance(other
, OrchestratorEvent
):
1414 return self
.created
== other
.created
and self
.kind
== other
.kind \
1415 and self
.subject
== other
.subject
and self
.message
== other
.message
1417 def __repr__(self
) -> str:
1418 return f
'OrchestratorEvent.from_json({self.to_json()!r})'
1421 def _mk_orch_methods(cls
: Any
) -> Any
:
1422 # Needs to be defined outside of for.
1423 # Otherwise meth is always bound to last key
1424 def shim(method_name
: str) -> Callable
:
1425 def inner(self
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
1426 completion
= self
._oremote
(method_name
, args
, kwargs
)
1430 for name
, method
in Orchestrator
.__dict
__.items():
1431 if not name
.startswith('_') and name
not in ['is_orchestrator_module']:
1432 remote_call
= update_wrapper(shim(name
), method
)
1433 setattr(cls
, name
, remote_call
)
1438 class OrchestratorClientMixin(Orchestrator
):
1440 A module that inherents from `OrchestratorClientMixin` can directly call
1441 all :class:`Orchestrator` methods without manually calling remote.
1443 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1444 calls :func:`OrchestratorClientMixin._oremote`
1446 >>> class MyModule(OrchestratorClientMixin):
1448 ... completion = self.add_host('somehost') # calls `_oremote()`
1449 ... self.log.debug(completion.result)
1451 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1452 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1453 "real" implementation of the orchestrator.
1456 >>> import mgr_module
1458 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1459 ... def __init__(self, ...):
1460 ... self.orch_client = OrchestratorClientMixin()
1461 ... self.orch_client.set_mgr(self.mgr))
1464 def set_mgr(self
, mgr
: MgrModule
) -> None:
1466 Useable in the Dashbord that uses a global ``mgr``
1469 self
.__mgr
= mgr
# Make sure we're not overwriting any other `mgr` properties
1471 def __get_mgr(self
) -> Any
:
1474 except AttributeError:
1477 def _oremote(self
, meth
: Any
, args
: Any
, kwargs
: Any
) -> Any
:
1479 Helper for invoking `remote` on whichever orchestrator is enabled
1481 :raises RuntimeError: If the remote method failed.
1482 :raises OrchestratorError: orchestrator failed to perform
1483 :raises ImportError: no `orchestrator` module or backend not found.
1485 mgr
= self
.__get
_mgr
()
1488 o
= mgr
._select
_orchestrator
()
1489 except AttributeError:
1490 o
= mgr
.remote('orchestrator', '_select_orchestrator')
1493 raise NoOrchestrator()
1495 mgr
.log
.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr
.module_name
, o
, meth
, args
, kwargs
))
1497 return mgr
.remote(o
, meth
, *args
, **kwargs
)
1498 except Exception as e
:
1499 if meth
== 'get_feature_set':
1500 raise # self.get_feature_set() calls self._oremote()
1501 f_set
= self
.get_feature_set()
1502 if meth
not in f_set
or not f_set
[meth
]['available']:
1503 raise NotImplementedError(f
'{o} does not implement {meth}') from e