]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
b0ccf73570b7772f4d2957efea31d50e69c6886d
3 ceph-mgr orchestrator interface
5 Please see the ceph-mgr module developer's guide for more information.
16 from collections
import namedtuple
, OrderedDict
17 from contextlib
import contextmanager
18 from functools
import wraps
, reduce, update_wrapper
20 from typing
import TypeVar
, Generic
, List
, Optional
, Union
, Tuple
, Iterator
, Callable
, Any
, \
21 Sequence
, Dict
, cast
, Mapping
24 from typing
import Protocol
# Protocol was added in Python 3.8
26 class Protocol
: # type: ignore
32 from ceph
.deployment
import inventory
33 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, \
34 IscsiServiceSpec
, IngressSpec
, SNMPGatewaySpec
, MDSSpec
35 from ceph
.deployment
.drive_group
import DriveGroupSpec
36 from ceph
.deployment
.hostspec
import HostSpec
, SpecValidationError
37 from ceph
.utils
import datetime_to_str
, str_to_datetime
39 from mgr_module
import MgrModule
, CLICommand
, HandleCommandResult
42 logger
= logging
.getLogger(__name__
)
45 FuncT
= TypeVar('FuncT', bound
=Callable
[..., Any
])
48 class OrchestratorError(Exception):
50 General orchestrator specific error.
52 Used for deployment, configuration or user errors.
54 It's not intended for programming errors or orchestrator internal errors.
59 errno
: int = -errno
.EINVAL
,
60 event_kind_subject
: Optional
[Tuple
[str, str]] = None) -> None:
61 super(Exception, self
).__init
__(msg
)
63 # See OrchestratorEvent.subject
64 self
.event_subject
= event_kind_subject
67 class NoOrchestrator(OrchestratorError
):
69 No orchestrator in configured.
72 def __init__(self
, msg
: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator
, self
).__init
__(msg
, errno
=-errno
.ENOENT
)
76 class OrchestratorValidationError(OrchestratorError
):
78 Raised when an orchestrator doesn't support a specific feature.
83 def set_exception_subject(kind
: str, subject
: str, overwrite
: bool = False) -> Iterator
[None]:
86 except OrchestratorError
as e
:
87 if overwrite
or hasattr(e
, 'event_subject'):
88 e
.event_subject
= (kind
, subject
)
92 def handle_exception(prefix
: str, perm
: str, func
: FuncT
) -> FuncT
:
94 def wrapper(*args
: Any
, **kwargs
: Any
) -> Any
:
96 return func(*args
, **kwargs
)
97 except (OrchestratorError
, SpecValidationError
) as e
:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e
.errno
, stderr
=str(e
))
100 except ImportError as e
:
101 return HandleCommandResult(-errno
.ENOENT
, stderr
=str(e
))
102 except NotImplementedError:
103 msg
= 'This Orchestrator does not support `{}`'.format(prefix
)
104 return HandleCommandResult(-errno
.ENOENT
, stderr
=msg
)
106 # misuse lambda to copy `wrapper`
107 wrapper_copy
= lambda *l_args
, **l_kwargs
: wrapper(*l_args
, **l_kwargs
) # noqa: E731
108 wrapper_copy
._prefix
= prefix
# type: ignore
109 wrapper_copy
._cli
_command
= CLICommand(prefix
, perm
) # type: ignore
110 wrapper_copy
._cli
_command
.store_func_metadata(func
) # type: ignore
111 wrapper_copy
._cli
_command
.func
= wrapper_copy
# type: ignore
113 return cast(FuncT
, wrapper_copy
)
116 def handle_orch_error(f
: Callable
[..., T
]) -> Callable
[..., 'OrchResult[T]']:
118 Decorator to make Orchestrator methods return
123 def wrapper(*args
: Any
, **kwargs
: Any
) -> OrchResult
[T
]:
125 return OrchResult(f(*args
, **kwargs
))
126 except Exception as e
:
129 if 'UNITTEST' in os
.environ
:
130 raise # This makes debugging of Tracebacks from unittests a bit easier
131 return OrchResult(None, exception
=e
)
133 return cast(Callable
[..., OrchResult
[T
]], wrapper
)
136 class InnerCliCommandCallable(Protocol
):
137 def __call__(self
, prefix
: str) -> Callable
[[FuncT
], FuncT
]:
141 def _cli_command(perm
: str) -> InnerCliCommandCallable
:
142 def inner_cli_command(prefix
: str) -> Callable
[[FuncT
], FuncT
]:
143 return lambda func
: handle_exception(prefix
, perm
, func
)
144 return inner_cli_command
147 _cli_read_command
= _cli_command('r')
148 _cli_write_command
= _cli_command('rw')
151 class CLICommandMeta(type):
153 This is a workaround for the use of a global variable CLICommand.COMMANDS which
154 prevents modules from importing any other module.
156 We make use of CLICommand, except for the use of the global variable.
158 def __init__(cls
, name
: str, bases
: Any
, dct
: Any
) -> None:
159 super(CLICommandMeta
, cls
).__init
__(name
, bases
, dct
)
160 dispatch
: Dict
[str, CLICommand
] = {}
161 for v
in dct
.values():
163 dispatch
[v
._prefix
] = v
._cli
_command
164 except AttributeError:
167 def handle_command(self
: Any
, inbuf
: Optional
[str], cmd
: dict) -> Any
:
168 if cmd
['prefix'] not in dispatch
:
169 return self
.handle_command(inbuf
, cmd
)
171 return dispatch
[cmd
['prefix']].call(self
, cmd
, inbuf
)
173 cls
.COMMANDS
= [cmd
.dump_cmd() for cmd
in dispatch
.values()]
174 cls
.handle_command
= handle_command
177 class OrchResult(Generic
[T
]):
179 Stores a result and an exception. Mainly to circumvent the
180 MgrModule.remote() method that hides all exceptions and for
181 handling different sub-interpreters.
184 def __init__(self
, result
: Optional
[T
], exception
: Optional
[Exception] = None) -> None:
186 self
.serialized_exception
: Optional
[bytes
] = None
187 self
.exception_str
: str = ''
188 self
.set_exception(exception
)
190 __slots__
= 'result', 'serialized_exception', 'exception_str'
192 def set_exception(self
, e
: Optional
[Exception]) -> None:
194 self
.serialized_exception
= None
195 self
.exception_str
= ''
198 self
.exception_str
= f
'{type(e)}: {str(e)}'
200 self
.serialized_exception
= pickle
.dumps(e
)
201 except pickle
.PicklingError
:
202 logger
.error(f
"failed to pickle {e}")
203 if isinstance(e
, Exception):
204 e
= Exception(*e
.args
)
206 e
= Exception(str(e
))
207 # degenerate to a plain Exception
208 self
.serialized_exception
= pickle
.dumps(e
)
210 def result_str(self
) -> str:
211 """Force a string."""
212 if self
.result
is None:
214 if isinstance(self
.result
, list):
215 return '\n'.join(str(x
) for x
in self
.result
)
216 return str(self
.result
)
219 def raise_if_exception(c
: OrchResult
[T
]) -> T
:
221 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
223 if c
.serialized_exception
is not None:
225 e
= pickle
.loads(c
.serialized_exception
)
226 except (KeyError, AttributeError):
227 raise Exception(c
.exception_str
)
229 assert c
.result
is not None, 'OrchResult should either have an exception or a result'
233 def _hide_in_features(f
: FuncT
) -> FuncT
:
234 f
._hide
_in
_features
= True # type: ignore
238 class Orchestrator(object):
240 Calls in this class may do long running remote operations, with time
241 periods ranging from network latencies to package install latencies and large
242 internet downloads. For that reason, all are asynchronous, and return
243 ``Completion`` objects.
245 Methods should only return the completion and not directly execute
246 anything, like network calls. Otherwise the purpose of
247 those completions is defeated.
249 Implementations are not required to start work on an operation until
250 the caller waits on the relevant Completion objects. Callers making
251 multiple updates should not wait on Completions until they're done
252 sending operations: this enables implementations to batch up a series
253 of updates when wait() is called on a set of Completion objects.
255 Implementations are encouraged to keep reasonably fresh caches of
256 the status of the system: it is better to serve a stale-but-recent
257 result read of e.g. device inventory than it is to keep the caller waiting
258 while you scan hosts every time.
262 def is_orchestrator_module(self
) -> bool:
264 Enable other modules to interrogate this module to discover
265 whether it's usable as an orchestrator module.
267 Subclasses do not need to override this.
272 def available(self
) -> Tuple
[bool, str, Dict
[str, Any
]]:
274 Report whether we can talk to the orchestrator. This is the
275 place to give the user a meaningful message if the orchestrator
276 isn't running or can't be contacted.
278 This method may be called frequently (e.g. every page load
279 to conditionally display a warning banner), so make sure it's
280 not too expensive. It's okay to give a slightly stale status
281 (e.g. based on a periodic background ping of the orchestrator)
282 if that's necessary to make this method fast.
285 `True` doesn't mean that the desired functionality
286 is actually available in the orchestrator. I.e. this
287 won't work as expected::
290 ... if OrchestratorClientMixin().available()[0]: # wrong.
291 ... OrchestratorClientMixin().get_hosts()
293 :return: boolean representing whether the module is available/usable
294 :return: string describing any error
295 :return: dict containing any module specific information
297 raise NotImplementedError()
300 def get_feature_set(self
) -> Dict
[str, dict]:
301 """Describes which methods this orchestrator implements
304 `True` doesn't mean that the desired functionality
305 is actually possible in the orchestrator. I.e. this
306 won't work as expected::
309 ... api = OrchestratorClientMixin()
310 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
313 It's better to ask for forgiveness instead::
317 ... OrchestratorClientMixin().get_hosts()
318 ... except (OrchestratorError, NotImplementedError):
321 :returns: Dict of API method names to ``{'available': True or False}``
323 module
= self
.__class
__
324 features
= {a
: {'available': getattr(Orchestrator
, a
, None) != getattr(module
, a
)}
325 for a
in Orchestrator
.__dict
__
326 if not a
.startswith('_') and not getattr(getattr(Orchestrator
, a
), '_hide_in_features', False)
330 def cancel_completions(self
) -> None:
332 Cancels ongoing completions. Unstuck the mgr.
334 raise NotImplementedError()
336 def pause(self
) -> None:
337 raise NotImplementedError()
339 def resume(self
) -> None:
340 raise NotImplementedError()
342 def add_host(self
, host_spec
: HostSpec
) -> OrchResult
[str]:
344 Add a host to the orchestrator inventory.
346 :param host: hostname
348 raise NotImplementedError()
350 def remove_host(self
, host
: str, force
: bool, offline
: bool) -> OrchResult
[str]:
352 Remove a host from the orchestrator inventory.
354 :param host: hostname
356 raise NotImplementedError()
358 def drain_host(self
, hostname
: str, force
: bool = False) -> OrchResult
[str]:
360 drain all daemons from a host
362 :param hostname: hostname
364 raise NotImplementedError()
366 def update_host_addr(self
, host
: str, addr
: str) -> OrchResult
[str]:
368 Update a host's address
370 :param host: hostname
371 :param addr: address (dns name or IP)
373 raise NotImplementedError()
375 def get_hosts(self
) -> OrchResult
[List
[HostSpec
]]:
377 Report the hosts in the cluster.
379 :return: list of HostSpec
381 raise NotImplementedError()
383 def get_facts(self
, hostname
: Optional
[str] = None) -> OrchResult
[List
[Dict
[str, Any
]]]:
385 Return hosts metadata(gather_facts).
387 raise NotImplementedError()
389 def add_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
393 raise NotImplementedError()
395 def remove_host_label(self
, host
: str, label
: str, force
: bool = False) -> OrchResult
[str]:
399 raise NotImplementedError()
401 def host_ok_to_stop(self
, hostname
: str) -> OrchResult
:
403 Check if the specified host can be safely stopped without reducing availability
405 :param host: hostname
407 raise NotImplementedError()
409 def enter_host_maintenance(self
, hostname
: str, force
: bool = False) -> OrchResult
:
411 Place a host in maintenance, stopping daemons and disabling it's systemd target
413 raise NotImplementedError()
415 def exit_host_maintenance(self
, hostname
: str) -> OrchResult
:
417 Return a host from maintenance, restarting the clusters systemd target
419 raise NotImplementedError()
421 def get_inventory(self
, host_filter
: Optional
['InventoryFilter'] = None, refresh
: bool = False) -> OrchResult
[List
['InventoryHost']]:
423 Returns something that was created by `ceph-volume inventory`.
425 :return: list of InventoryHost
427 raise NotImplementedError()
429 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['ServiceDescription']]:
431 Describe a service (of any kind) that is already configured in
432 the orchestrator. For example, when viewing an OSD in the dashboard
433 we might like to also display information about the orchestrator's
434 view of the service (like the kubernetes pod ID).
436 When viewing a CephFS filesystem in the dashboard, we would use this
437 to display the pods being currently run for MDS daemons.
439 :return: list of ServiceDescription objects.
441 raise NotImplementedError()
443 def list_daemons(self
, service_name
: Optional
[str] = None, daemon_type
: Optional
[str] = None, daemon_id
: Optional
[str] = None, host
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['DaemonDescription']]:
445 Describe a daemon (of any kind) that is already configured in
448 :return: list of DaemonDescription objects.
450 raise NotImplementedError()
453 def apply(self
, specs
: Sequence
["GenericSpec"], no_overwrite
: bool = False) -> List
[str]:
457 fns
: Dict
[str, Callable
[..., OrchResult
[str]]] = {
458 'alertmanager': self
.apply_alertmanager
,
459 'crash': self
.apply_crash
,
460 'grafana': self
.apply_grafana
,
461 'iscsi': self
.apply_iscsi
,
462 'mds': self
.apply_mds
,
463 'mgr': self
.apply_mgr
,
464 'mon': self
.apply_mon
,
465 'nfs': self
.apply_nfs
,
466 'node-exporter': self
.apply_node_exporter
,
467 'osd': lambda dg
: self
.apply_drivegroups([dg
]), # type: ignore
468 'prometheus': self
.apply_prometheus
,
469 'loki': self
.apply_loki
,
470 'promtail': self
.apply_promtail
,
471 'rbd-mirror': self
.apply_rbd_mirror
,
472 'rgw': self
.apply_rgw
,
473 'ingress': self
.apply_ingress
,
474 'snmp-gateway': self
.apply_snmp_gateway
,
475 'host': self
.add_host
,
478 def merge(l
: OrchResult
[List
[str]], r
: OrchResult
[str]) -> OrchResult
[List
[str]]: # noqa: E741
479 l_res
= raise_if_exception(l
)
480 r_res
= raise_if_exception(r
)
482 return OrchResult(l_res
)
483 return raise_if_exception(reduce(merge
, [fns
[spec
.service_type
](spec
) for spec
in specs
], OrchResult([])))
485 def plan(self
, spec
: Sequence
["GenericSpec"]) -> OrchResult
[List
]:
487 Plan (Dry-run, Preview) a List of Specs.
489 raise NotImplementedError()
491 def remove_daemons(self
, names
: List
[str]) -> OrchResult
[List
[str]]:
493 Remove specific daemon(s).
497 raise NotImplementedError()
499 def remove_service(self
, service_name
: str, force
: bool = False) -> OrchResult
[str]:
501 Remove a service (a collection of daemons).
505 raise NotImplementedError()
507 def service_action(self
, action
: str, service_name
: str) -> OrchResult
[List
[str]]:
509 Perform an action (start/stop/reload) on a service (i.e., all daemons
510 providing the logical service).
512 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
513 :param service_name: service_type + '.' + service_id
514 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
517 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
518 raise NotImplementedError()
520 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> OrchResult
[str]:
522 Perform an action (start/stop/reload) on a daemon.
524 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
525 :param daemon_name: name of daemon
526 :param image: Container image when redeploying that daemon
529 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
530 raise NotImplementedError()
532 def create_osds(self
, drive_group
: DriveGroupSpec
) -> OrchResult
[str]:
534 Create one or more OSDs within a single Drive Group.
536 The principal argument here is the drive_group member
537 of OsdSpec: other fields are advisory/extensible for any
538 finer-grained OSD feature enablement (choice of backing store,
539 compression/encryption, etc).
541 raise NotImplementedError()
543 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> OrchResult
[List
[str]]:
544 """ Update OSD cluster """
545 raise NotImplementedError()
547 def set_unmanaged_flag(self
,
548 unmanaged_flag
: bool,
549 service_type
: str = 'osd',
550 service_name
: Optional
[str] = None
551 ) -> HandleCommandResult
:
552 raise NotImplementedError()
554 def preview_osdspecs(self
,
555 osdspec_name
: Optional
[str] = 'osd',
556 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
557 ) -> OrchResult
[str]:
558 """ Get a preview for OSD deployments """
559 raise NotImplementedError()
561 def remove_osds(self
, osd_ids
: List
[str],
562 replace
: bool = False,
564 zap
: bool = False) -> OrchResult
[str]:
566 :param osd_ids: list of OSD IDs
567 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
568 :param force: Forces the OSD removal process without waiting for the data to be drained first.
569 :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
572 .. note:: this can only remove OSDs that were successfully
573 created (i.e. got an OSD ID).
575 raise NotImplementedError()
577 def stop_remove_osds(self
, osd_ids
: List
[str]) -> OrchResult
:
581 raise NotImplementedError()
583 def remove_osds_status(self
) -> OrchResult
:
585 Returns a status of the ongoing OSD removal operations.
587 raise NotImplementedError()
589 def blink_device_light(self
, ident_fault
: str, on
: bool, locations
: List
['DeviceLightLoc']) -> OrchResult
[List
[str]]:
591 Instructs the orchestrator to enable or disable either the ident or the fault LED.
593 :param ident_fault: either ``"ident"`` or ``"fault"``
594 :param on: ``True`` = on.
595 :param locations: See :class:`orchestrator.DeviceLightLoc`
597 raise NotImplementedError()
599 def zap_device(self
, host
: str, path
: str) -> OrchResult
[str]:
600 """Zap/Erase a device (DESTROYS DATA)"""
601 raise NotImplementedError()
603 def add_daemon(self
, spec
: ServiceSpec
) -> OrchResult
[List
[str]]:
604 """Create daemons daemon(s) for unmanaged services"""
605 raise NotImplementedError()
607 def apply_mon(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
608 """Update mon cluster"""
609 raise NotImplementedError()
611 def apply_mgr(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
612 """Update mgr cluster"""
613 raise NotImplementedError()
615 def apply_mds(self
, spec
: MDSSpec
) -> OrchResult
[str]:
616 """Update MDS cluster"""
617 raise NotImplementedError()
619 def apply_rgw(self
, spec
: RGWSpec
) -> OrchResult
[str]:
620 """Update RGW cluster"""
621 raise NotImplementedError()
623 def apply_ingress(self
, spec
: IngressSpec
) -> OrchResult
[str]:
624 """Update ingress daemons"""
625 raise NotImplementedError()
627 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
628 """Update rbd-mirror cluster"""
629 raise NotImplementedError()
631 def apply_nfs(self
, spec
: NFSServiceSpec
) -> OrchResult
[str]:
632 """Update NFS cluster"""
633 raise NotImplementedError()
635 def apply_iscsi(self
, spec
: IscsiServiceSpec
) -> OrchResult
[str]:
636 """Update iscsi cluster"""
637 raise NotImplementedError()
639 def apply_prometheus(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
640 """Update prometheus cluster"""
641 raise NotImplementedError()
643 def apply_node_exporter(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
644 """Update existing a Node-Exporter daemon(s)"""
645 raise NotImplementedError()
647 def apply_loki(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
648 """Update existing a Loki daemon(s)"""
649 raise NotImplementedError()
651 def apply_promtail(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
652 """Update existing a Promtail daemon(s)"""
653 raise NotImplementedError()
655 def apply_crash(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
656 """Update existing a crash daemon(s)"""
657 raise NotImplementedError()
659 def apply_grafana(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
660 """Update existing a grafana service"""
661 raise NotImplementedError()
663 def apply_alertmanager(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
664 """Update an existing AlertManager daemon(s)"""
665 raise NotImplementedError()
667 def apply_snmp_gateway(self
, spec
: SNMPGatewaySpec
) -> OrchResult
[str]:
668 """Update an existing snmp gateway service"""
669 raise NotImplementedError()
671 def upgrade_check(self
, image
: Optional
[str], version
: Optional
[str]) -> OrchResult
[str]:
672 raise NotImplementedError()
674 def upgrade_ls(self
, image
: Optional
[str], tags
: bool, show_all_versions
: Optional
[bool] = False) -> OrchResult
[Dict
[Any
, Any
]]:
675 raise NotImplementedError()
677 def upgrade_start(self
, image
: Optional
[str], version
: Optional
[str], daemon_types
: Optional
[List
[str]],
678 hosts
: Optional
[str], services
: Optional
[List
[str]], limit
: Optional
[int]) -> OrchResult
[str]:
679 raise NotImplementedError()
681 def upgrade_pause(self
) -> OrchResult
[str]:
682 raise NotImplementedError()
684 def upgrade_resume(self
) -> OrchResult
[str]:
685 raise NotImplementedError()
687 def upgrade_stop(self
) -> OrchResult
[str]:
688 raise NotImplementedError()
690 def upgrade_status(self
) -> OrchResult
['UpgradeStatusSpec']:
692 If an upgrade is currently underway, report on where
693 we are in the process, or if some error has occurred.
695 :return: UpgradeStatusSpec instance
697 raise NotImplementedError()
700 def upgrade_available(self
) -> OrchResult
:
702 Report on what versions are available to upgrade to
704 :return: List of strings
706 raise NotImplementedError()
709 GenericSpec
= Union
[ServiceSpec
, HostSpec
]
712 def json_to_generic_spec(spec
: dict) -> GenericSpec
:
713 if 'service_type' in spec
and spec
['service_type'] == 'host':
714 return HostSpec
.from_json(spec
)
716 return ServiceSpec
.from_json(spec
)
719 def daemon_type_to_service(dtype
: str) -> str:
726 'haproxy': 'ingress',
727 'keepalived': 'ingress',
729 'rbd-mirror': 'rbd-mirror',
730 'cephfs-mirror': 'cephfs-mirror',
732 'grafana': 'grafana',
733 'alertmanager': 'alertmanager',
734 'prometheus': 'prometheus',
735 'node-exporter': 'node-exporter',
737 'promtail': 'promtail',
739 'crashcollector': 'crash', # Specific Rook Daemon
740 'container': 'container',
742 'snmp-gateway': 'snmp-gateway',
744 return mapping
[dtype
]
747 def service_to_daemon_types(stype
: str) -> List
[str]:
754 'ingress': ['haproxy', 'keepalived'],
756 'rbd-mirror': ['rbd-mirror'],
757 'cephfs-mirror': ['cephfs-mirror'],
759 'grafana': ['grafana'],
760 'alertmanager': ['alertmanager'],
761 'prometheus': ['prometheus'],
763 'promtail': ['promtail'],
764 'node-exporter': ['node-exporter'],
766 'container': ['container'],
768 'snmp-gateway': ['snmp-gateway'],
770 return mapping
[stype
]
773 KNOWN_DAEMON_TYPES
: List
[str] = list(
774 sum((service_to_daemon_types(t
) for t
in ServiceSpec
.KNOWN_SERVICE_TYPES
), []))
777 class UpgradeStatusSpec(object):
778 # Orchestrator's report on what's going on with any ongoing upgrade
779 def __init__(self
) -> None:
780 self
.in_progress
= False # Is an upgrade underway?
781 self
.target_image
: Optional
[str] = None
782 self
.services_complete
: List
[str] = [] # Which daemon types are fully updated?
783 self
.which
: str = '<unknown>' # for if user specified daemon types, services or hosts
784 self
.progress
: Optional
[str] = None # How many of the daemons have we upgraded
785 self
.message
= "" # Freeform description
788 def handle_type_error(method
: FuncT
) -> FuncT
:
790 def inner(cls
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
792 return method(cls
, *args
, **kwargs
)
793 except TypeError as e
:
794 error_msg
= '{}: {}'.format(cls
.__name
__, e
)
795 raise OrchestratorValidationError(error_msg
)
796 return cast(FuncT
, inner
)
799 class DaemonDescriptionStatus(enum
.IntEnum
):
804 starting
= 2 #: Daemon is deployed, but not yet running
807 def to_str(status
: Optional
['DaemonDescriptionStatus']) -> str:
809 status
= DaemonDescriptionStatus
.unknown
811 DaemonDescriptionStatus
.unknown
: 'unknown',
812 DaemonDescriptionStatus
.error
: 'error',
813 DaemonDescriptionStatus
.stopped
: 'stopped',
814 DaemonDescriptionStatus
.running
: 'running',
815 DaemonDescriptionStatus
.starting
: 'starting',
816 }.get(status
, '<unknown>')
819 class DaemonDescription(object):
821 For responding to queries about the status of a particular daemon,
822 stateful or stateless.
824 This is not about health or performance monitoring of daemons: it's
825 about letting the orchestrator tell Ceph whether and where a
826 daemon is scheduled in the cluster. When an orchestrator tells
827 Ceph "it's running on host123", that's not a promise that the process
828 is literally up this second, it's a description of where the orchestrator
829 has decided the daemon should run.
833 daemon_type
: Optional
[str] = None,
834 daemon_id
: Optional
[str] = None,
835 hostname
: Optional
[str] = None,
836 container_id
: Optional
[str] = None,
837 container_image_id
: Optional
[str] = None,
838 container_image_name
: Optional
[str] = None,
839 container_image_digests
: Optional
[List
[str]] = None,
840 version
: Optional
[str] = None,
841 status
: Optional
[DaemonDescriptionStatus
] = None,
842 status_desc
: Optional
[str] = None,
843 last_refresh
: Optional
[datetime
.datetime
] = None,
844 created
: Optional
[datetime
.datetime
] = None,
845 started
: Optional
[datetime
.datetime
] = None,
846 last_configured
: Optional
[datetime
.datetime
] = None,
847 osdspec_affinity
: Optional
[str] = None,
848 last_deployed
: Optional
[datetime
.datetime
] = None,
849 events
: Optional
[List
['OrchestratorEvent']] = None,
850 is_active
: bool = False,
851 memory_usage
: Optional
[int] = None,
852 memory_request
: Optional
[int] = None,
853 memory_limit
: Optional
[int] = None,
854 cpu_percentage
: Optional
[str] = None,
855 service_name
: Optional
[str] = None,
856 ports
: Optional
[List
[int]] = None,
857 ip
: Optional
[str] = None,
858 deployed_by
: Optional
[List
[str]] = None,
859 rank
: Optional
[int] = None,
860 rank_generation
: Optional
[int] = None,
861 extra_container_args
: Optional
[List
[str]] = None,
864 #: Host is at the same granularity as InventoryHost
865 self
.hostname
: Optional
[str] = hostname
867 # Not everyone runs in containers, but enough people do to
868 # justify having the container_id (runtime id) and container_image
870 self
.container_id
= container_id
# runtime id
871 self
.container_image_id
= container_image_id
# image id locally
872 self
.container_image_name
= container_image_name
# image friendly name
873 self
.container_image_digests
= container_image_digests
# reg hashes
875 #: The type of service (osd, mon, mgr, etc.)
876 self
.daemon_type
= daemon_type
878 #: The orchestrator will have picked some names for daemons,
879 #: typically either based on hostnames or on pod names.
880 #: This is the <foo> in mds.<foo>, the ID that will appear
881 #: in the FSMap/ServiceMap.
882 self
.daemon_id
: Optional
[str] = daemon_id
883 self
.daemon_name
= self
.name()
885 #: Some daemon types have a numeric rank assigned
886 self
.rank
: Optional
[int] = rank
887 self
.rank_generation
: Optional
[int] = rank_generation
889 self
._service
_name
: Optional
[str] = service_name
891 #: Service version that was deployed
892 self
.version
= version
894 # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
895 self
._status
= status
897 #: Service status description when status == error.
898 self
.status_desc
= status_desc
900 #: datetime when this info was last refreshed
901 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
903 self
.created
: Optional
[datetime
.datetime
] = created
904 self
.started
: Optional
[datetime
.datetime
] = started
905 self
.last_configured
: Optional
[datetime
.datetime
] = last_configured
906 self
.last_deployed
: Optional
[datetime
.datetime
] = last_deployed
908 #: Affinity to a certain OSDSpec
909 self
.osdspec_affinity
: Optional
[str] = osdspec_affinity
911 self
.events
: List
[OrchestratorEvent
] = events
or []
913 self
.memory_usage
: Optional
[int] = memory_usage
914 self
.memory_request
: Optional
[int] = memory_request
915 self
.memory_limit
: Optional
[int] = memory_limit
917 self
.cpu_percentage
: Optional
[str] = cpu_percentage
919 self
.ports
: Optional
[List
[int]] = ports
920 self
.ip
: Optional
[str] = ip
922 self
.deployed_by
= deployed_by
924 self
.is_active
= is_active
926 self
.extra_container_args
= extra_container_args
929 def status(self
) -> Optional
[DaemonDescriptionStatus
]:
933 def status(self
, new
: DaemonDescriptionStatus
) -> None:
935 self
.status_desc
= DaemonDescriptionStatus
.to_str(new
)
937 def get_port_summary(self
) -> str:
940 return f
"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
942 def name(self
) -> str:
943 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
945 def matches_service(self
, service_name
: Optional
[str]) -> bool:
946 assert self
.daemon_id
is not None
947 assert self
.daemon_type
is not None
949 return (daemon_type_to_service(self
.daemon_type
) + '.' + self
.daemon_id
).startswith(service_name
+ '.')
952 def service_id(self
) -> str:
953 assert self
.daemon_id
is not None
954 assert self
.daemon_type
is not None
956 if self
._service
_name
:
957 if '.' in self
._service
_name
:
958 return self
._service
_name
.split('.', 1)[1]
962 if self
.daemon_type
== 'osd':
963 if self
.osdspec_affinity
and self
.osdspec_affinity
!= 'None':
964 return self
.osdspec_affinity
968 assert self
.daemon_id
is not None
969 err
= OrchestratorError("DaemonDescription: Cannot calculate service_id: "
970 f
"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
972 if not self
.hostname
:
973 # TODO: can a DaemonDescription exist without a hostname?
976 # use the bare hostname, not the FQDN.
977 host
= self
.hostname
.split('.')[0]
979 if host
== self
.daemon_id
:
980 # daemon_id == "host"
981 return self
.daemon_id
983 elif host
in self
.daemon_id
:
984 # daemon_id == "service_id.host"
985 # daemon_id == "service_id.host.random"
986 pre
, post
= self
.daemon_id
.rsplit(host
, 1)
987 if not pre
.endswith('.'):
988 # '.' sep missing at front of host
990 elif post
and not post
.startswith('.'):
991 # '.' sep missing at end of host
995 # daemon_id == "service_id.random"
996 if self
.daemon_type
== 'rgw':
997 v
= self
.daemon_id
.split('.')
999 return '.'.join(v
[0:2])
1001 if self
.daemon_type
== 'iscsi':
1002 v
= self
.daemon_id
.split('.')
1003 return '.'.join(v
[0:-1])
1005 # daemon_id == "service_id"
1006 return self
.daemon_id
1008 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
1011 return self
.daemon_id
1013 def service_name(self
) -> str:
1014 if self
._service
_name
:
1015 return self
._service
_name
1016 assert self
.daemon_type
is not None
1017 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
1018 return f
'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
1019 return daemon_type_to_service(self
.daemon_type
)
1021 def __repr__(self
) -> str:
1022 return "<DaemonDescription>({type}.{id})".format(type=self
.daemon_type
,
1025 def __str__(self
) -> str:
1026 return f
"{self.name()} in status {self.status_desc} on {self.hostname}"
1028 def to_json(self
) -> dict:
1029 out
: Dict
[str, Any
] = OrderedDict()
1030 out
['daemon_type'] = self
.daemon_type
1031 out
['daemon_id'] = self
.daemon_id
1032 out
['service_name'] = self
._service
_name
1033 out
['daemon_name'] = self
.name()
1034 out
['hostname'] = self
.hostname
1035 out
['container_id'] = self
.container_id
1036 out
['container_image_id'] = self
.container_image_id
1037 out
['container_image_name'] = self
.container_image_name
1038 out
['container_image_digests'] = self
.container_image_digests
1039 out
['memory_usage'] = self
.memory_usage
1040 out
['memory_request'] = self
.memory_request
1041 out
['memory_limit'] = self
.memory_limit
1042 out
['cpu_percentage'] = self
.cpu_percentage
1043 out
['version'] = self
.version
1044 out
['status'] = self
.status
.value
if self
.status
is not None else None
1045 out
['status_desc'] = self
.status_desc
1046 if self
.daemon_type
== 'osd':
1047 out
['osdspec_affinity'] = self
.osdspec_affinity
1048 out
['is_active'] = self
.is_active
1049 out
['ports'] = self
.ports
1051 out
['rank'] = self
.rank
1052 out
['rank_generation'] = self
.rank_generation
1054 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1056 if getattr(self
, k
):
1057 out
[k
] = datetime_to_str(getattr(self
, k
))
1060 out
['events'] = [e
.to_json() for e
in self
.events
]
1062 empty
= [k
for k
, v
in out
.items() if v
is None]
1067 def to_dict(self
) -> dict:
1068 out
: Dict
[str, Any
] = OrderedDict()
1069 out
['daemon_type'] = self
.daemon_type
1070 out
['daemon_id'] = self
.daemon_id
1071 out
['daemon_name'] = self
.name()
1072 out
['hostname'] = self
.hostname
1073 out
['container_id'] = self
.container_id
1074 out
['container_image_id'] = self
.container_image_id
1075 out
['container_image_name'] = self
.container_image_name
1076 out
['container_image_digests'] = self
.container_image_digests
1077 out
['memory_usage'] = self
.memory_usage
1078 out
['memory_request'] = self
.memory_request
1079 out
['memory_limit'] = self
.memory_limit
1080 out
['cpu_percentage'] = self
.cpu_percentage
1081 out
['version'] = self
.version
1082 out
['status'] = self
.status
.value
if self
.status
is not None else None
1083 out
['status_desc'] = self
.status_desc
1084 if self
.daemon_type
== 'osd':
1085 out
['osdspec_affinity'] = self
.osdspec_affinity
1086 out
['is_active'] = self
.is_active
1087 out
['ports'] = self
.ports
1090 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1092 if getattr(self
, k
):
1093 out
[k
] = datetime_to_str(getattr(self
, k
))
1096 out
['events'] = [e
.to_dict() for e
in self
.events
]
1098 empty
= [k
for k
, v
in out
.items() if v
is None]
1105 def from_json(cls
, data
: dict) -> 'DaemonDescription':
1107 event_strs
= c
.pop('events', [])
1108 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1111 c
[k
] = str_to_datetime(c
[k
])
1112 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1113 status_int
= c
.pop('status', None)
1114 if 'daemon_name' in c
:
1115 del c
['daemon_name']
1116 if 'service_name' in c
and c
['service_name'].startswith('osd.'):
1117 # if the service_name is a osd.NNN (numeric osd id) then
1118 # ignore it -- it is not a valid service_name and
1119 # (presumably) came from an older version of cephadm.
1121 int(c
['service_name'][4:])
1122 del c
['service_name']
1125 status
= DaemonDescriptionStatus(status_int
) if status_int
is not None else None
1126 return cls(events
=events
, status
=status
, **c
)
1128 def __copy__(self
) -> 'DaemonDescription':
1129 # feel free to change this:
1130 return DaemonDescription
.from_json(self
.to_json())
1133 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'DaemonDescription') -> Any
:
1134 return dumper
.represent_dict(cast(Mapping
, data
.to_json().items()))
1137 yaml
.add_representer(DaemonDescription
, DaemonDescription
.yaml_representer
)
1140 class ServiceDescription(object):
1142 For responding to queries about the status of a particular service,
1143 stateful or stateless.
1145 This is not about health or performance monitoring of services: it's
1146 about letting the orchestrator tell Ceph whether and where a
1147 service is scheduled in the cluster. When an orchestrator tells
1148 Ceph "it's running on host123", that's not a promise that the process
1149 is literally up this second, it's a description of where the orchestrator
1150 has decided the service should run.
1155 container_image_id
: Optional
[str] = None,
1156 container_image_name
: Optional
[str] = None,
1157 service_url
: Optional
[str] = None,
1158 last_refresh
: Optional
[datetime
.datetime
] = None,
1159 created
: Optional
[datetime
.datetime
] = None,
1160 deleted
: Optional
[datetime
.datetime
] = None,
1163 events
: Optional
[List
['OrchestratorEvent']] = None,
1164 virtual_ip
: Optional
[str] = None,
1165 ports
: List
[int] = []) -> None:
1166 # Not everyone runs in containers, but enough people do to
1167 # justify having the container_image_id (image hash) and container_image
1169 self
.container_image_id
= container_image_id
# image hash
1170 self
.container_image_name
= container_image_name
# image friendly name
1172 # If the service exposes REST-like API, this attribute should hold
1174 self
.service_url
= service_url
1179 # Number of daemons up
1180 self
.running
= running
1182 # datetime when this info was last refreshed
1183 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
1184 self
.created
: Optional
[datetime
.datetime
] = created
1185 self
.deleted
: Optional
[datetime
.datetime
] = deleted
1187 self
.spec
: ServiceSpec
= spec
1189 self
.events
: List
[OrchestratorEvent
] = events
or []
1191 self
.virtual_ip
= virtual_ip
1194 def service_type(self
) -> str:
1195 return self
.spec
.service_type
1197 def __repr__(self
) -> str:
1198 return f
"<ServiceDescription of {self.spec.one_line_str()}>"
1200 def get_port_summary(self
) -> str:
1203 return f
"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1205 def to_json(self
) -> OrderedDict
:
1206 out
= self
.spec
.to_json()
1208 'container_image_id': self
.container_image_id
,
1209 'container_image_name': self
.container_image_name
,
1210 'service_url': self
.service_url
,
1212 'running': self
.running
,
1213 'last_refresh': self
.last_refresh
,
1214 'created': self
.created
,
1215 'virtual_ip': self
.virtual_ip
,
1216 'ports': self
.ports
if self
.ports
else None,
1218 for k
in ['last_refresh', 'created']:
1219 if getattr(self
, k
):
1220 status
[k
] = datetime_to_str(getattr(self
, k
))
1221 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1222 out
['status'] = status
1224 out
['events'] = [e
.to_json() for e
in self
.events
]
1227 def to_dict(self
) -> OrderedDict
:
1228 out
= self
.spec
.to_json()
1230 'container_image_id': self
.container_image_id
,
1231 'container_image_name': self
.container_image_name
,
1232 'service_url': self
.service_url
,
1234 'running': self
.running
,
1235 'last_refresh': self
.last_refresh
,
1236 'created': self
.created
,
1237 'virtual_ip': self
.virtual_ip
,
1238 'ports': self
.ports
if self
.ports
else None,
1240 for k
in ['last_refresh', 'created']:
1241 if getattr(self
, k
):
1242 status
[k
] = datetime_to_str(getattr(self
, k
))
1243 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1244 out
['status'] = status
1246 out
['events'] = [e
.to_dict() for e
in self
.events
]
1251 def from_json(cls
, data
: dict) -> 'ServiceDescription':
1253 status
= c
.pop('status', {})
1254 event_strs
= c
.pop('events', [])
1255 spec
= ServiceSpec
.from_json(c
)
1257 c_status
= status
.copy()
1258 for k
in ['last_refresh', 'created']:
1260 c_status
[k
] = str_to_datetime(c_status
[k
])
1261 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1262 return cls(spec
=spec
, events
=events
, **c_status
)
1265 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'ServiceDescription') -> Any
:
1266 return dumper
.represent_dict(cast(Mapping
, data
.to_json().items()))
1269 yaml
.add_representer(ServiceDescription
, ServiceDescription
.yaml_representer
)
1272 class InventoryFilter(object):
1274 When fetching inventory, use this filter to avoid unnecessarily
1275 scanning the whole estate.
1279 filter by host when presentig UI workflow for configuring
1280 a particular server.
1281 filter by label when not all of estate is Ceph servers,
1282 and we want to only learn about the Ceph servers.
1283 filter by label when we are interested particularly
1284 in e.g. OSD servers.
1287 def __init__(self
, labels
: Optional
[List
[str]] = None, hosts
: Optional
[List
[str]] = None) -> None:
1289 #: Optional: get info about hosts matching labels
1290 self
.labels
= labels
1292 #: Optional: get info about certain named hosts only
1296 class InventoryHost(object):
1298 When fetching inventory, all Devices are groups inside of an
1302 def __init__(self
, name
: str, devices
: Optional
[inventory
.Devices
] = None, labels
: Optional
[List
[str]] = None, addr
: Optional
[str] = None) -> None:
1304 devices
= inventory
.Devices([])
1307 assert isinstance(devices
, inventory
.Devices
)
1309 self
.name
= name
# unique within cluster. For example a hostname.
1310 self
.addr
= addr
or name
1311 self
.devices
= devices
1312 self
.labels
= labels
1314 def to_json(self
) -> dict:
1318 'devices': self
.devices
.to_json(),
1319 'labels': self
.labels
,
1323 def from_json(cls
, data
: dict) -> 'InventoryHost':
1325 _data
= copy
.deepcopy(data
)
1326 name
= _data
.pop('name')
1327 addr
= _data
.pop('addr', None) or name
1328 devices
= inventory
.Devices
.from_json(_data
.pop('devices'))
1329 labels
= _data
.pop('labels', list())
1331 error_msg
= 'Unknown key(s) in Inventory: {}'.format(','.join(_data
.keys()))
1332 raise OrchestratorValidationError(error_msg
)
1333 return cls(name
, devices
, labels
, addr
)
1334 except KeyError as e
:
1335 error_msg
= '{} is required for {}'.format(e
, cls
.__name
__)
1336 raise OrchestratorValidationError(error_msg
)
1337 except TypeError as e
:
1338 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e
))
1341 def from_nested_items(cls
, hosts
: List
[dict]) -> List
['InventoryHost']:
1342 devs
= inventory
.Devices
.from_json
1343 return [cls(item
[0], devs(item
[1].data
)) for item
in hosts
]
1345 def __repr__(self
) -> str:
1346 return "<InventoryHost>({name})".format(name
=self
.name
)
1349 def get_host_names(hosts
: List
['InventoryHost']) -> List
[str]:
1350 return [host
.name
for host
in hosts
]
1352 def __eq__(self
, other
: Any
) -> bool:
1353 return self
.name
== other
.name
and self
.devices
== other
.devices
1356 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1358 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1361 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1363 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1364 See ``ceph osd metadata | jq '.[].device_ids'``
1369 class OrchestratorEvent
:
1371 Similar to K8s Events.
1373 Some form of "important" log message attached to something.
1377 regex_v1
= re
.compile(r
'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re
.MULTILINE
)
1379 def __init__(self
, created
: Union
[str, datetime
.datetime
], kind
: str,
1380 subject
: str, level
: str, message
: str) -> None:
1381 if isinstance(created
, str):
1382 created
= str_to_datetime(created
)
1383 self
.created
: datetime
.datetime
= created
1385 assert kind
in "service daemon".split()
1386 self
.kind
: str = kind
1388 # service name, or daemon danem or something
1389 self
.subject
: str = subject
1391 # Events are not meant for debugging. debugs should end in the log.
1392 assert level
in "INFO ERROR".split()
1395 self
.message
: str = message
1397 __slots__
= ('created', 'kind', 'subject', 'level', 'message')
1399 def kind_subject(self
) -> str:
1400 return f
'{self.kind}:{self.subject}'
1402 def to_json(self
) -> str:
1403 # Make a long list of events readable.
1404 created
= datetime_to_str(self
.created
)
1405 return f
'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1407 def to_dict(self
) -> dict:
1408 # Convert events data to dict.
1410 'created': datetime_to_str(self
.created
),
1411 'subject': self
.kind_subject(),
1412 'level': self
.level
,
1413 'message': self
.message
1418 def from_json(cls
, data
: str) -> "OrchestratorEvent":
1420 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1421 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1426 match
= cls
.regex_v1
.match(data
)
1428 return cls(*match
.groups())
1429 raise ValueError(f
'Unable to match: "{data}"')
1431 def __eq__(self
, other
: Any
) -> bool:
1432 if not isinstance(other
, OrchestratorEvent
):
1435 return self
.created
== other
.created
and self
.kind
== other
.kind \
1436 and self
.subject
== other
.subject
and self
.message
== other
.message
1438 def __repr__(self
) -> str:
1439 return f
'OrchestratorEvent.from_json({self.to_json()!r})'
1442 def _mk_orch_methods(cls
: Any
) -> Any
:
1443 # Needs to be defined outside of for.
1444 # Otherwise meth is always bound to last key
1445 def shim(method_name
: str) -> Callable
:
1446 def inner(self
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
1447 completion
= self
._oremote
(method_name
, args
, kwargs
)
1451 for name
, method
in Orchestrator
.__dict
__.items():
1452 if not name
.startswith('_') and name
not in ['is_orchestrator_module']:
1453 remote_call
= update_wrapper(shim(name
), method
)
1454 setattr(cls
, name
, remote_call
)
1459 class OrchestratorClientMixin(Orchestrator
):
1461 A module that inherents from `OrchestratorClientMixin` can directly call
1462 all :class:`Orchestrator` methods without manually calling remote.
1464 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1465 calls :func:`OrchestratorClientMixin._oremote`
1467 >>> class MyModule(OrchestratorClientMixin):
1469 ... completion = self.add_host('somehost') # calls `_oremote()`
1470 ... self.log.debug(completion.result)
1472 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1473 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1474 "real" implementation of the orchestrator.
1477 >>> import mgr_module
1479 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1480 ... def __init__(self, ...):
1481 ... self.orch_client = OrchestratorClientMixin()
1482 ... self.orch_client.set_mgr(self.mgr))
1485 def set_mgr(self
, mgr
: MgrModule
) -> None:
1487 Useable in the Dashbord that uses a global ``mgr``
1490 self
.__mgr
= mgr
# Make sure we're not overwriting any other `mgr` properties
1492 def __get_mgr(self
) -> Any
:
1495 except AttributeError:
1498 def _oremote(self
, meth
: Any
, args
: Any
, kwargs
: Any
) -> Any
:
1500 Helper for invoking `remote` on whichever orchestrator is enabled
1502 :raises RuntimeError: If the remote method failed.
1503 :raises OrchestratorError: orchestrator failed to perform
1504 :raises ImportError: no `orchestrator` module or backend not found.
1506 mgr
= self
.__get
_mgr
()
1509 o
= mgr
._select
_orchestrator
()
1510 except AttributeError:
1511 o
= mgr
.remote('orchestrator', '_select_orchestrator')
1514 raise NoOrchestrator()
1516 mgr
.log
.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr
.module_name
, o
, meth
, args
, kwargs
))
1518 return mgr
.remote(o
, meth
, *args
, **kwargs
)
1519 except Exception as e
:
1520 if meth
== 'get_feature_set':
1521 raise # self.get_feature_set() calls self._oremote()
1522 f_set
= self
.get_feature_set()
1523 if meth
not in f_set
or not f_set
[meth
]['available']:
1524 raise NotImplementedError(f
'{o} does not implement {meth}') from e