]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
6f0374ffede94dc1c0571981ee5af61ac10986aa
3 ceph-mgr orchestrator interface
5 Please see the ceph-mgr module developer's guide for more information.
16 from collections
import namedtuple
, OrderedDict
17 from contextlib
import contextmanager
18 from functools
import wraps
, reduce, update_wrapper
20 from typing
import TypeVar
, Generic
, List
, Optional
, Union
, Tuple
, Iterator
, Callable
, Any
, \
21 Sequence
, Dict
, cast
, Mapping
24 from typing
import Protocol
# Protocol was added in Python 3.8
26 class Protocol
: # type: ignore
32 from ceph
.deployment
import inventory
33 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, \
34 IscsiServiceSpec
, IngressSpec
, SNMPGatewaySpec
, MDSSpec
, TunedProfileSpec
35 from ceph
.deployment
.drive_group
import DriveGroupSpec
36 from ceph
.deployment
.hostspec
import HostSpec
, SpecValidationError
37 from ceph
.utils
import datetime_to_str
, str_to_datetime
39 from mgr_module
import MgrModule
, CLICommand
, HandleCommandResult
42 logger
= logging
.getLogger(__name__
)
45 FuncT
= TypeVar('FuncT', bound
=Callable
[..., Any
])
48 class OrchestratorError(Exception):
50 General orchestrator specific error.
52 Used for deployment, configuration or user errors.
54 It's not intended for programming errors or orchestrator internal errors.
59 errno
: int = -errno
.EINVAL
,
60 event_kind_subject
: Optional
[Tuple
[str, str]] = None) -> None:
61 super(Exception, self
).__init
__(msg
)
63 # See OrchestratorEvent.subject
64 self
.event_subject
= event_kind_subject
67 class NoOrchestrator(OrchestratorError
):
69 No orchestrator in configured.
72 def __init__(self
, msg
: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator
, self
).__init
__(msg
, errno
=-errno
.ENOENT
)
76 class OrchestratorValidationError(OrchestratorError
):
78 Raised when an orchestrator doesn't support a specific feature.
83 def set_exception_subject(kind
: str, subject
: str, overwrite
: bool = False) -> Iterator
[None]:
86 except OrchestratorError
as e
:
87 if overwrite
or hasattr(e
, 'event_subject'):
88 e
.event_subject
= (kind
, subject
)
92 def handle_exception(prefix
: str, perm
: str, func
: FuncT
) -> FuncT
:
94 def wrapper(*args
: Any
, **kwargs
: Any
) -> Any
:
96 return func(*args
, **kwargs
)
97 except (OrchestratorError
, SpecValidationError
) as e
:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e
.errno
, stderr
=str(e
))
100 except ImportError as e
:
101 return HandleCommandResult(-errno
.ENOENT
, stderr
=str(e
))
102 except NotImplementedError:
103 msg
= 'This Orchestrator does not support `{}`'.format(prefix
)
104 return HandleCommandResult(-errno
.ENOENT
, stderr
=msg
)
106 # misuse lambda to copy `wrapper`
107 wrapper_copy
= lambda *l_args
, **l_kwargs
: wrapper(*l_args
, **l_kwargs
) # noqa: E731
108 wrapper_copy
._prefix
= prefix
# type: ignore
109 wrapper_copy
._cli
_command
= CLICommand(prefix
, perm
) # type: ignore
110 wrapper_copy
._cli
_command
.store_func_metadata(func
) # type: ignore
111 wrapper_copy
._cli
_command
.func
= wrapper_copy
# type: ignore
113 return cast(FuncT
, wrapper_copy
)
116 def handle_orch_error(f
: Callable
[..., T
]) -> Callable
[..., 'OrchResult[T]']:
118 Decorator to make Orchestrator methods return
123 def wrapper(*args
: Any
, **kwargs
: Any
) -> OrchResult
[T
]:
125 return OrchResult(f(*args
, **kwargs
))
126 except Exception as e
:
129 if 'UNITTEST' in os
.environ
:
130 raise # This makes debugging of Tracebacks from unittests a bit easier
131 return OrchResult(None, exception
=e
)
133 return cast(Callable
[..., OrchResult
[T
]], wrapper
)
136 class InnerCliCommandCallable(Protocol
):
137 def __call__(self
, prefix
: str) -> Callable
[[FuncT
], FuncT
]:
141 def _cli_command(perm
: str) -> InnerCliCommandCallable
:
142 def inner_cli_command(prefix
: str) -> Callable
[[FuncT
], FuncT
]:
143 return lambda func
: handle_exception(prefix
, perm
, func
)
144 return inner_cli_command
147 _cli_read_command
= _cli_command('r')
148 _cli_write_command
= _cli_command('rw')
151 class CLICommandMeta(type):
153 This is a workaround for the use of a global variable CLICommand.COMMANDS which
154 prevents modules from importing any other module.
156 We make use of CLICommand, except for the use of the global variable.
158 def __init__(cls
, name
: str, bases
: Any
, dct
: Any
) -> None:
159 super(CLICommandMeta
, cls
).__init
__(name
, bases
, dct
)
160 dispatch
: Dict
[str, CLICommand
] = {}
161 for v
in dct
.values():
163 dispatch
[v
._prefix
] = v
._cli
_command
164 except AttributeError:
167 def handle_command(self
: Any
, inbuf
: Optional
[str], cmd
: dict) -> Any
:
168 if cmd
['prefix'] not in dispatch
:
169 return self
.handle_command(inbuf
, cmd
)
171 return dispatch
[cmd
['prefix']].call(self
, cmd
, inbuf
)
173 cls
.COMMANDS
= [cmd
.dump_cmd() for cmd
in dispatch
.values()]
174 cls
.handle_command
= handle_command
177 class OrchResult(Generic
[T
]):
179 Stores a result and an exception. Mainly to circumvent the
180 MgrModule.remote() method that hides all exceptions and for
181 handling different sub-interpreters.
184 def __init__(self
, result
: Optional
[T
], exception
: Optional
[Exception] = None) -> None:
186 self
.serialized_exception
: Optional
[bytes
] = None
187 self
.exception_str
: str = ''
188 self
.set_exception(exception
)
190 __slots__
= 'result', 'serialized_exception', 'exception_str'
192 def set_exception(self
, e
: Optional
[Exception]) -> None:
194 self
.serialized_exception
= None
195 self
.exception_str
= ''
198 self
.exception_str
= f
'{type(e)}: {str(e)}'
200 self
.serialized_exception
= pickle
.dumps(e
)
201 except pickle
.PicklingError
:
202 logger
.error(f
"failed to pickle {e}")
203 if isinstance(e
, Exception):
204 e
= Exception(*e
.args
)
206 e
= Exception(str(e
))
207 # degenerate to a plain Exception
208 self
.serialized_exception
= pickle
.dumps(e
)
210 def result_str(self
) -> str:
211 """Force a string."""
212 if self
.result
is None:
214 if isinstance(self
.result
, list):
215 return '\n'.join(str(x
) for x
in self
.result
)
216 return str(self
.result
)
219 def raise_if_exception(c
: OrchResult
[T
]) -> T
:
221 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
223 if c
.serialized_exception
is not None:
225 e
= pickle
.loads(c
.serialized_exception
)
226 except (KeyError, AttributeError):
227 raise Exception(c
.exception_str
)
229 assert c
.result
is not None, 'OrchResult should either have an exception or a result'
233 def _hide_in_features(f
: FuncT
) -> FuncT
:
234 f
._hide
_in
_features
= True # type: ignore
238 class Orchestrator(object):
240 Calls in this class may do long running remote operations, with time
241 periods ranging from network latencies to package install latencies and large
242 internet downloads. For that reason, all are asynchronous, and return
243 ``Completion`` objects.
245 Methods should only return the completion and not directly execute
246 anything, like network calls. Otherwise the purpose of
247 those completions is defeated.
249 Implementations are not required to start work on an operation until
250 the caller waits on the relevant Completion objects. Callers making
251 multiple updates should not wait on Completions until they're done
252 sending operations: this enables implementations to batch up a series
253 of updates when wait() is called on a set of Completion objects.
255 Implementations are encouraged to keep reasonably fresh caches of
256 the status of the system: it is better to serve a stale-but-recent
257 result read of e.g. device inventory than it is to keep the caller waiting
258 while you scan hosts every time.
262 def is_orchestrator_module(self
) -> bool:
264 Enable other modules to interrogate this module to discover
265 whether it's usable as an orchestrator module.
267 Subclasses do not need to override this.
272 def available(self
) -> Tuple
[bool, str, Dict
[str, Any
]]:
274 Report whether we can talk to the orchestrator. This is the
275 place to give the user a meaningful message if the orchestrator
276 isn't running or can't be contacted.
278 This method may be called frequently (e.g. every page load
279 to conditionally display a warning banner), so make sure it's
280 not too expensive. It's okay to give a slightly stale status
281 (e.g. based on a periodic background ping of the orchestrator)
282 if that's necessary to make this method fast.
285 `True` doesn't mean that the desired functionality
286 is actually available in the orchestrator. I.e. this
287 won't work as expected::
290 ... if OrchestratorClientMixin().available()[0]: # wrong.
291 ... OrchestratorClientMixin().get_hosts()
293 :return: boolean representing whether the module is available/usable
294 :return: string describing any error
295 :return: dict containing any module specific information
297 raise NotImplementedError()
300 def get_feature_set(self
) -> Dict
[str, dict]:
301 """Describes which methods this orchestrator implements
304 `True` doesn't mean that the desired functionality
305 is actually possible in the orchestrator. I.e. this
306 won't work as expected::
309 ... api = OrchestratorClientMixin()
310 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
313 It's better to ask for forgiveness instead::
317 ... OrchestratorClientMixin().get_hosts()
318 ... except (OrchestratorError, NotImplementedError):
321 :returns: Dict of API method names to ``{'available': True or False}``
323 module
= self
.__class
__
324 features
= {a
: {'available': getattr(Orchestrator
, a
, None) != getattr(module
, a
)}
325 for a
in Orchestrator
.__dict
__
326 if not a
.startswith('_') and not getattr(getattr(Orchestrator
, a
), '_hide_in_features', False)
330 def cancel_completions(self
) -> None:
332 Cancels ongoing completions. Unstuck the mgr.
334 raise NotImplementedError()
336 def pause(self
) -> None:
337 raise NotImplementedError()
339 def resume(self
) -> None:
340 raise NotImplementedError()
342 def add_host(self
, host_spec
: HostSpec
) -> OrchResult
[str]:
344 Add a host to the orchestrator inventory.
346 :param host: hostname
348 raise NotImplementedError()
350 def remove_host(self
, host
: str, force
: bool, offline
: bool) -> OrchResult
[str]:
352 Remove a host from the orchestrator inventory.
354 :param host: hostname
356 raise NotImplementedError()
358 def drain_host(self
, hostname
: str, force
: bool = False) -> OrchResult
[str]:
360 drain all daemons from a host
362 :param hostname: hostname
364 raise NotImplementedError()
366 def update_host_addr(self
, host
: str, addr
: str) -> OrchResult
[str]:
368 Update a host's address
370 :param host: hostname
371 :param addr: address (dns name or IP)
373 raise NotImplementedError()
375 def get_hosts(self
) -> OrchResult
[List
[HostSpec
]]:
377 Report the hosts in the cluster.
379 :return: list of HostSpec
381 raise NotImplementedError()
383 def get_facts(self
, hostname
: Optional
[str] = None) -> OrchResult
[List
[Dict
[str, Any
]]]:
385 Return hosts metadata(gather_facts).
387 raise NotImplementedError()
389 def add_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
393 raise NotImplementedError()
395 def remove_host_label(self
, host
: str, label
: str, force
: bool = False) -> OrchResult
[str]:
399 raise NotImplementedError()
401 def host_ok_to_stop(self
, hostname
: str) -> OrchResult
:
403 Check if the specified host can be safely stopped without reducing availability
405 :param host: hostname
407 raise NotImplementedError()
409 def enter_host_maintenance(self
, hostname
: str, force
: bool = False) -> OrchResult
:
411 Place a host in maintenance, stopping daemons and disabling it's systemd target
413 raise NotImplementedError()
415 def exit_host_maintenance(self
, hostname
: str) -> OrchResult
:
417 Return a host from maintenance, restarting the clusters systemd target
419 raise NotImplementedError()
421 def rescan_host(self
, hostname
: str) -> OrchResult
:
422 """Use cephadm to issue a disk rescan on each HBA
424 Some HBAs and external enclosures don't automatically register
425 device insertion with the kernel, so for these scenarios we need
428 :param hostname: (str) host name
430 raise NotImplementedError()
432 def get_inventory(self
, host_filter
: Optional
['InventoryFilter'] = None, refresh
: bool = False) -> OrchResult
[List
['InventoryHost']]:
434 Returns something that was created by `ceph-volume inventory`.
436 :return: list of InventoryHost
438 raise NotImplementedError()
440 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['ServiceDescription']]:
442 Describe a service (of any kind) that is already configured in
443 the orchestrator. For example, when viewing an OSD in the dashboard
444 we might like to also display information about the orchestrator's
445 view of the service (like the kubernetes pod ID).
447 When viewing a CephFS filesystem in the dashboard, we would use this
448 to display the pods being currently run for MDS daemons.
450 :return: list of ServiceDescription objects.
452 raise NotImplementedError()
454 def list_daemons(self
, service_name
: Optional
[str] = None, daemon_type
: Optional
[str] = None, daemon_id
: Optional
[str] = None, host
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['DaemonDescription']]:
456 Describe a daemon (of any kind) that is already configured in
459 :return: list of DaemonDescription objects.
461 raise NotImplementedError()
464 def apply(self
, specs
: Sequence
["GenericSpec"], no_overwrite
: bool = False) -> List
[str]:
468 fns
: Dict
[str, Callable
[..., OrchResult
[str]]] = {
469 'alertmanager': self
.apply_alertmanager
,
470 'crash': self
.apply_crash
,
471 'grafana': self
.apply_grafana
,
472 'iscsi': self
.apply_iscsi
,
473 'mds': self
.apply_mds
,
474 'mgr': self
.apply_mgr
,
475 'mon': self
.apply_mon
,
476 'nfs': self
.apply_nfs
,
477 'node-exporter': self
.apply_node_exporter
,
478 'osd': lambda dg
: self
.apply_drivegroups([dg
]), # type: ignore
479 'prometheus': self
.apply_prometheus
,
480 'loki': self
.apply_loki
,
481 'promtail': self
.apply_promtail
,
482 'rbd-mirror': self
.apply_rbd_mirror
,
483 'rgw': self
.apply_rgw
,
484 'ingress': self
.apply_ingress
,
485 'snmp-gateway': self
.apply_snmp_gateway
,
486 'host': self
.add_host
,
489 def merge(l
: OrchResult
[List
[str]], r
: OrchResult
[str]) -> OrchResult
[List
[str]]: # noqa: E741
490 l_res
= raise_if_exception(l
)
491 r_res
= raise_if_exception(r
)
493 return OrchResult(l_res
)
494 return raise_if_exception(reduce(merge
, [fns
[spec
.service_type
](spec
) for spec
in specs
], OrchResult([])))
496 def plan(self
, spec
: Sequence
["GenericSpec"]) -> OrchResult
[List
]:
498 Plan (Dry-run, Preview) a List of Specs.
500 raise NotImplementedError()
502 def remove_daemons(self
, names
: List
[str]) -> OrchResult
[List
[str]]:
504 Remove specific daemon(s).
508 raise NotImplementedError()
510 def remove_service(self
, service_name
: str, force
: bool = False) -> OrchResult
[str]:
512 Remove a service (a collection of daemons).
516 raise NotImplementedError()
518 def service_action(self
, action
: str, service_name
: str) -> OrchResult
[List
[str]]:
520 Perform an action (start/stop/reload) on a service (i.e., all daemons
521 providing the logical service).
523 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
524 :param service_name: service_type + '.' + service_id
525 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
528 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
529 raise NotImplementedError()
531 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> OrchResult
[str]:
533 Perform an action (start/stop/reload) on a daemon.
535 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
536 :param daemon_name: name of daemon
537 :param image: Container image when redeploying that daemon
540 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
541 raise NotImplementedError()
543 def create_osds(self
, drive_group
: DriveGroupSpec
) -> OrchResult
[str]:
545 Create one or more OSDs within a single Drive Group.
547 The principal argument here is the drive_group member
548 of OsdSpec: other fields are advisory/extensible for any
549 finer-grained OSD feature enablement (choice of backing store,
550 compression/encryption, etc).
552 raise NotImplementedError()
554 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> OrchResult
[List
[str]]:
555 """ Update OSD cluster """
556 raise NotImplementedError()
558 def set_unmanaged_flag(self
,
559 unmanaged_flag
: bool,
560 service_type
: str = 'osd',
561 service_name
: Optional
[str] = None
562 ) -> HandleCommandResult
:
563 raise NotImplementedError()
565 def preview_osdspecs(self
,
566 osdspec_name
: Optional
[str] = 'osd',
567 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
568 ) -> OrchResult
[str]:
569 """ Get a preview for OSD deployments """
570 raise NotImplementedError()
572 def remove_osds(self
, osd_ids
: List
[str],
573 replace
: bool = False,
575 zap
: bool = False) -> OrchResult
[str]:
577 :param osd_ids: list of OSD IDs
578 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
579 :param force: Forces the OSD removal process without waiting for the data to be drained first.
580 :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
583 .. note:: this can only remove OSDs that were successfully
584 created (i.e. got an OSD ID).
586 raise NotImplementedError()
588 def stop_remove_osds(self
, osd_ids
: List
[str]) -> OrchResult
:
592 raise NotImplementedError()
594 def remove_osds_status(self
) -> OrchResult
:
596 Returns a status of the ongoing OSD removal operations.
598 raise NotImplementedError()
600 def blink_device_light(self
, ident_fault
: str, on
: bool, locations
: List
['DeviceLightLoc']) -> OrchResult
[List
[str]]:
602 Instructs the orchestrator to enable or disable either the ident or the fault LED.
604 :param ident_fault: either ``"ident"`` or ``"fault"``
605 :param on: ``True`` = on.
606 :param locations: See :class:`orchestrator.DeviceLightLoc`
608 raise NotImplementedError()
610 def zap_device(self
, host
: str, path
: str) -> OrchResult
[str]:
611 """Zap/Erase a device (DESTROYS DATA)"""
612 raise NotImplementedError()
614 def add_daemon(self
, spec
: ServiceSpec
) -> OrchResult
[List
[str]]:
615 """Create daemons daemon(s) for unmanaged services"""
616 raise NotImplementedError()
618 def apply_mon(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
619 """Update mon cluster"""
620 raise NotImplementedError()
622 def apply_mgr(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
623 """Update mgr cluster"""
624 raise NotImplementedError()
626 def apply_mds(self
, spec
: MDSSpec
) -> OrchResult
[str]:
627 """Update MDS cluster"""
628 raise NotImplementedError()
630 def apply_rgw(self
, spec
: RGWSpec
) -> OrchResult
[str]:
631 """Update RGW cluster"""
632 raise NotImplementedError()
634 def apply_ingress(self
, spec
: IngressSpec
) -> OrchResult
[str]:
635 """Update ingress daemons"""
636 raise NotImplementedError()
638 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
639 """Update rbd-mirror cluster"""
640 raise NotImplementedError()
642 def apply_nfs(self
, spec
: NFSServiceSpec
) -> OrchResult
[str]:
643 """Update NFS cluster"""
644 raise NotImplementedError()
646 def apply_iscsi(self
, spec
: IscsiServiceSpec
) -> OrchResult
[str]:
647 """Update iscsi cluster"""
648 raise NotImplementedError()
650 def apply_prometheus(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
651 """Update prometheus cluster"""
652 raise NotImplementedError()
654 def apply_node_exporter(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
655 """Update existing a Node-Exporter daemon(s)"""
656 raise NotImplementedError()
658 def apply_loki(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
659 """Update existing a Loki daemon(s)"""
660 raise NotImplementedError()
662 def apply_promtail(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
663 """Update existing a Promtail daemon(s)"""
664 raise NotImplementedError()
666 def apply_crash(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
667 """Update existing a crash daemon(s)"""
668 raise NotImplementedError()
670 def apply_grafana(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
671 """Update existing a grafana service"""
672 raise NotImplementedError()
674 def apply_alertmanager(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
675 """Update an existing AlertManager daemon(s)"""
676 raise NotImplementedError()
678 def apply_snmp_gateway(self
, spec
: SNMPGatewaySpec
) -> OrchResult
[str]:
679 """Update an existing snmp gateway service"""
680 raise NotImplementedError()
682 def apply_tuned_profiles(self
, specs
: List
[TunedProfileSpec
], no_overwrite
: bool) -> OrchResult
[str]:
683 """Add or update an existing tuned profile"""
684 raise NotImplementedError()
686 def rm_tuned_profile(self
, profile_name
: str) -> OrchResult
[str]:
687 """Remove a tuned profile"""
688 raise NotImplementedError()
690 def tuned_profile_ls(self
) -> OrchResult
[List
[TunedProfileSpec
]]:
691 """See current tuned profiles"""
692 raise NotImplementedError()
694 def tuned_profile_add_setting(self
, profile_name
: str, setting
: str, value
: str) -> OrchResult
[str]:
695 """Change/Add a specific setting for a tuned profile"""
696 raise NotImplementedError()
698 def tuned_profile_rm_setting(self
, profile_name
: str, setting
: str) -> OrchResult
[str]:
699 """Remove a specific setting for a tuned profile"""
700 raise NotImplementedError()
702 def upgrade_check(self
, image
: Optional
[str], version
: Optional
[str]) -> OrchResult
[str]:
703 raise NotImplementedError()
705 def upgrade_ls(self
, image
: Optional
[str], tags
: bool, show_all_versions
: Optional
[bool] = False) -> OrchResult
[Dict
[Any
, Any
]]:
706 raise NotImplementedError()
708 def upgrade_start(self
, image
: Optional
[str], version
: Optional
[str], daemon_types
: Optional
[List
[str]],
709 hosts
: Optional
[str], services
: Optional
[List
[str]], limit
: Optional
[int]) -> OrchResult
[str]:
710 raise NotImplementedError()
712 def upgrade_pause(self
) -> OrchResult
[str]:
713 raise NotImplementedError()
715 def upgrade_resume(self
) -> OrchResult
[str]:
716 raise NotImplementedError()
718 def upgrade_stop(self
) -> OrchResult
[str]:
719 raise NotImplementedError()
721 def upgrade_status(self
) -> OrchResult
['UpgradeStatusSpec']:
723 If an upgrade is currently underway, report on where
724 we are in the process, or if some error has occurred.
726 :return: UpgradeStatusSpec instance
728 raise NotImplementedError()
731 def upgrade_available(self
) -> OrchResult
:
733 Report on what versions are available to upgrade to
735 :return: List of strings
737 raise NotImplementedError()
740 GenericSpec
= Union
[ServiceSpec
, HostSpec
]
743 def json_to_generic_spec(spec
: dict) -> GenericSpec
:
744 if 'service_type' in spec
and spec
['service_type'] == 'host':
745 return HostSpec
.from_json(spec
)
747 return ServiceSpec
.from_json(spec
)
750 def daemon_type_to_service(dtype
: str) -> str:
757 'haproxy': 'ingress',
758 'keepalived': 'ingress',
760 'rbd-mirror': 'rbd-mirror',
761 'cephfs-mirror': 'cephfs-mirror',
763 'grafana': 'grafana',
764 'alertmanager': 'alertmanager',
765 'prometheus': 'prometheus',
766 'node-exporter': 'node-exporter',
768 'promtail': 'promtail',
770 'crashcollector': 'crash', # Specific Rook Daemon
771 'container': 'container',
773 'snmp-gateway': 'snmp-gateway',
775 return mapping
[dtype
]
778 def service_to_daemon_types(stype
: str) -> List
[str]:
785 'ingress': ['haproxy', 'keepalived'],
787 'rbd-mirror': ['rbd-mirror'],
788 'cephfs-mirror': ['cephfs-mirror'],
790 'grafana': ['grafana'],
791 'alertmanager': ['alertmanager'],
792 'prometheus': ['prometheus'],
794 'promtail': ['promtail'],
795 'node-exporter': ['node-exporter'],
797 'container': ['container'],
799 'snmp-gateway': ['snmp-gateway'],
801 return mapping
[stype
]
804 KNOWN_DAEMON_TYPES
: List
[str] = list(
805 sum((service_to_daemon_types(t
) for t
in ServiceSpec
.KNOWN_SERVICE_TYPES
), []))
808 class UpgradeStatusSpec(object):
809 # Orchestrator's report on what's going on with any ongoing upgrade
810 def __init__(self
) -> None:
811 self
.in_progress
= False # Is an upgrade underway?
812 self
.target_image
: Optional
[str] = None
813 self
.services_complete
: List
[str] = [] # Which daemon types are fully updated?
814 self
.which
: str = '<unknown>' # for if user specified daemon types, services or hosts
815 self
.progress
: Optional
[str] = None # How many of the daemons have we upgraded
816 self
.message
= "" # Freeform description
817 self
.is_paused
: bool = False # Is the upgrade paused?
820 def handle_type_error(method
: FuncT
) -> FuncT
:
822 def inner(cls
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
824 return method(cls
, *args
, **kwargs
)
825 except TypeError as e
:
826 error_msg
= '{}: {}'.format(cls
.__name
__, e
)
827 raise OrchestratorValidationError(error_msg
)
828 return cast(FuncT
, inner
)
831 class DaemonDescriptionStatus(enum
.IntEnum
):
836 starting
= 2 #: Daemon is deployed, but not yet running
839 def to_str(status
: Optional
['DaemonDescriptionStatus']) -> str:
841 status
= DaemonDescriptionStatus
.unknown
843 DaemonDescriptionStatus
.unknown
: 'unknown',
844 DaemonDescriptionStatus
.error
: 'error',
845 DaemonDescriptionStatus
.stopped
: 'stopped',
846 DaemonDescriptionStatus
.running
: 'running',
847 DaemonDescriptionStatus
.starting
: 'starting',
848 }.get(status
, '<unknown>')
851 class DaemonDescription(object):
853 For responding to queries about the status of a particular daemon,
854 stateful or stateless.
856 This is not about health or performance monitoring of daemons: it's
857 about letting the orchestrator tell Ceph whether and where a
858 daemon is scheduled in the cluster. When an orchestrator tells
859 Ceph "it's running on host123", that's not a promise that the process
860 is literally up this second, it's a description of where the orchestrator
861 has decided the daemon should run.
865 daemon_type
: Optional
[str] = None,
866 daemon_id
: Optional
[str] = None,
867 hostname
: Optional
[str] = None,
868 container_id
: Optional
[str] = None,
869 container_image_id
: Optional
[str] = None,
870 container_image_name
: Optional
[str] = None,
871 container_image_digests
: Optional
[List
[str]] = None,
872 version
: Optional
[str] = None,
873 status
: Optional
[DaemonDescriptionStatus
] = None,
874 status_desc
: Optional
[str] = None,
875 last_refresh
: Optional
[datetime
.datetime
] = None,
876 created
: Optional
[datetime
.datetime
] = None,
877 started
: Optional
[datetime
.datetime
] = None,
878 last_configured
: Optional
[datetime
.datetime
] = None,
879 osdspec_affinity
: Optional
[str] = None,
880 last_deployed
: Optional
[datetime
.datetime
] = None,
881 events
: Optional
[List
['OrchestratorEvent']] = None,
882 is_active
: bool = False,
883 memory_usage
: Optional
[int] = None,
884 memory_request
: Optional
[int] = None,
885 memory_limit
: Optional
[int] = None,
886 cpu_percentage
: Optional
[str] = None,
887 service_name
: Optional
[str] = None,
888 ports
: Optional
[List
[int]] = None,
889 ip
: Optional
[str] = None,
890 deployed_by
: Optional
[List
[str]] = None,
891 rank
: Optional
[int] = None,
892 rank_generation
: Optional
[int] = None,
893 extra_container_args
: Optional
[List
[str]] = None,
896 #: Host is at the same granularity as InventoryHost
897 self
.hostname
: Optional
[str] = hostname
899 # Not everyone runs in containers, but enough people do to
900 # justify having the container_id (runtime id) and container_image
902 self
.container_id
= container_id
# runtime id
903 self
.container_image_id
= container_image_id
# image id locally
904 self
.container_image_name
= container_image_name
# image friendly name
905 self
.container_image_digests
= container_image_digests
# reg hashes
907 #: The type of service (osd, mon, mgr, etc.)
908 self
.daemon_type
= daemon_type
910 #: The orchestrator will have picked some names for daemons,
911 #: typically either based on hostnames or on pod names.
912 #: This is the <foo> in mds.<foo>, the ID that will appear
913 #: in the FSMap/ServiceMap.
914 self
.daemon_id
: Optional
[str] = daemon_id
915 self
.daemon_name
= self
.name()
917 #: Some daemon types have a numeric rank assigned
918 self
.rank
: Optional
[int] = rank
919 self
.rank_generation
: Optional
[int] = rank_generation
921 self
._service
_name
: Optional
[str] = service_name
923 #: Service version that was deployed
924 self
.version
= version
926 # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
927 self
._status
= status
929 #: Service status description when status == error.
930 self
.status_desc
= status_desc
932 #: datetime when this info was last refreshed
933 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
935 self
.created
: Optional
[datetime
.datetime
] = created
936 self
.started
: Optional
[datetime
.datetime
] = started
937 self
.last_configured
: Optional
[datetime
.datetime
] = last_configured
938 self
.last_deployed
: Optional
[datetime
.datetime
] = last_deployed
940 #: Affinity to a certain OSDSpec
941 self
.osdspec_affinity
: Optional
[str] = osdspec_affinity
943 self
.events
: List
[OrchestratorEvent
] = events
or []
945 self
.memory_usage
: Optional
[int] = memory_usage
946 self
.memory_request
: Optional
[int] = memory_request
947 self
.memory_limit
: Optional
[int] = memory_limit
949 self
.cpu_percentage
: Optional
[str] = cpu_percentage
951 self
.ports
: Optional
[List
[int]] = ports
952 self
.ip
: Optional
[str] = ip
954 self
.deployed_by
= deployed_by
956 self
.is_active
= is_active
958 self
.extra_container_args
= extra_container_args
961 def status(self
) -> Optional
[DaemonDescriptionStatus
]:
965 def status(self
, new
: DaemonDescriptionStatus
) -> None:
967 self
.status_desc
= DaemonDescriptionStatus
.to_str(new
)
969 def get_port_summary(self
) -> str:
972 return f
"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
974 def name(self
) -> str:
975 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
977 def matches_service(self
, service_name
: Optional
[str]) -> bool:
978 assert self
.daemon_id
is not None
979 assert self
.daemon_type
is not None
981 return (daemon_type_to_service(self
.daemon_type
) + '.' + self
.daemon_id
).startswith(service_name
+ '.')
984 def service_id(self
) -> str:
985 assert self
.daemon_id
is not None
986 assert self
.daemon_type
is not None
988 if self
._service
_name
:
989 if '.' in self
._service
_name
:
990 return self
._service
_name
.split('.', 1)[1]
994 if self
.daemon_type
== 'osd':
995 if self
.osdspec_affinity
and self
.osdspec_affinity
!= 'None':
996 return self
.osdspec_affinity
1000 assert self
.daemon_id
is not None
1001 err
= OrchestratorError("DaemonDescription: Cannot calculate service_id: "
1002 f
"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
1004 if not self
.hostname
:
1005 # TODO: can a DaemonDescription exist without a hostname?
1008 # use the bare hostname, not the FQDN.
1009 host
= self
.hostname
.split('.')[0]
1011 if host
== self
.daemon_id
:
1012 # daemon_id == "host"
1013 return self
.daemon_id
1015 elif host
in self
.daemon_id
:
1016 # daemon_id == "service_id.host"
1017 # daemon_id == "service_id.host.random"
1018 pre
, post
= self
.daemon_id
.rsplit(host
, 1)
1019 if not pre
.endswith('.'):
1020 # '.' sep missing at front of host
1022 elif post
and not post
.startswith('.'):
1023 # '.' sep missing at end of host
1027 # daemon_id == "service_id.random"
1028 if self
.daemon_type
== 'rgw':
1029 v
= self
.daemon_id
.split('.')
1030 if len(v
) in [3, 4]:
1031 return '.'.join(v
[0:2])
1033 if self
.daemon_type
== 'iscsi':
1034 v
= self
.daemon_id
.split('.')
1035 return '.'.join(v
[0:-1])
1037 # daemon_id == "service_id"
1038 return self
.daemon_id
1040 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
1043 return self
.daemon_id
1045 def service_name(self
) -> str:
1046 if self
._service
_name
:
1047 return self
._service
_name
1048 assert self
.daemon_type
is not None
1049 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
1050 return f
'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
1051 return daemon_type_to_service(self
.daemon_type
)
1053 def __repr__(self
) -> str:
1054 return "<DaemonDescription>({type}.{id})".format(type=self
.daemon_type
,
1057 def __str__(self
) -> str:
1058 return f
"{self.name()} in status {self.status_desc} on {self.hostname}"
1060 def to_json(self
) -> dict:
1061 out
: Dict
[str, Any
] = OrderedDict()
1062 out
['daemon_type'] = self
.daemon_type
1063 out
['daemon_id'] = self
.daemon_id
1064 out
['service_name'] = self
._service
_name
1065 out
['daemon_name'] = self
.name()
1066 out
['hostname'] = self
.hostname
1067 out
['container_id'] = self
.container_id
1068 out
['container_image_id'] = self
.container_image_id
1069 out
['container_image_name'] = self
.container_image_name
1070 out
['container_image_digests'] = self
.container_image_digests
1071 out
['memory_usage'] = self
.memory_usage
1072 out
['memory_request'] = self
.memory_request
1073 out
['memory_limit'] = self
.memory_limit
1074 out
['cpu_percentage'] = self
.cpu_percentage
1075 out
['version'] = self
.version
1076 out
['status'] = self
.status
.value
if self
.status
is not None else None
1077 out
['status_desc'] = self
.status_desc
1078 if self
.daemon_type
== 'osd':
1079 out
['osdspec_affinity'] = self
.osdspec_affinity
1080 out
['is_active'] = self
.is_active
1081 out
['ports'] = self
.ports
1083 out
['rank'] = self
.rank
1084 out
['rank_generation'] = self
.rank_generation
1086 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1088 if getattr(self
, k
):
1089 out
[k
] = datetime_to_str(getattr(self
, k
))
1092 out
['events'] = [e
.to_json() for e
in self
.events
]
1094 empty
= [k
for k
, v
in out
.items() if v
is None]
1099 def to_dict(self
) -> dict:
1100 out
: Dict
[str, Any
] = OrderedDict()
1101 out
['daemon_type'] = self
.daemon_type
1102 out
['daemon_id'] = self
.daemon_id
1103 out
['daemon_name'] = self
.name()
1104 out
['hostname'] = self
.hostname
1105 out
['container_id'] = self
.container_id
1106 out
['container_image_id'] = self
.container_image_id
1107 out
['container_image_name'] = self
.container_image_name
1108 out
['container_image_digests'] = self
.container_image_digests
1109 out
['memory_usage'] = self
.memory_usage
1110 out
['memory_request'] = self
.memory_request
1111 out
['memory_limit'] = self
.memory_limit
1112 out
['cpu_percentage'] = self
.cpu_percentage
1113 out
['version'] = self
.version
1114 out
['status'] = self
.status
.value
if self
.status
is not None else None
1115 out
['status_desc'] = self
.status_desc
1116 if self
.daemon_type
== 'osd':
1117 out
['osdspec_affinity'] = self
.osdspec_affinity
1118 out
['is_active'] = self
.is_active
1119 out
['ports'] = self
.ports
1122 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1124 if getattr(self
, k
):
1125 out
[k
] = datetime_to_str(getattr(self
, k
))
1128 out
['events'] = [e
.to_dict() for e
in self
.events
]
1130 empty
= [k
for k
, v
in out
.items() if v
is None]
1137 def from_json(cls
, data
: dict) -> 'DaemonDescription':
1139 event_strs
= c
.pop('events', [])
1140 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1143 c
[k
] = str_to_datetime(c
[k
])
1144 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1145 status_int
= c
.pop('status', None)
1146 if 'daemon_name' in c
:
1147 del c
['daemon_name']
1148 if 'service_name' in c
and c
['service_name'].startswith('osd.'):
1149 # if the service_name is a osd.NNN (numeric osd id) then
1150 # ignore it -- it is not a valid service_name and
1151 # (presumably) came from an older version of cephadm.
1153 int(c
['service_name'][4:])
1154 del c
['service_name']
1157 status
= DaemonDescriptionStatus(status_int
) if status_int
is not None else None
1158 return cls(events
=events
, status
=status
, **c
)
1160 def __copy__(self
) -> 'DaemonDescription':
1161 # feel free to change this:
1162 return DaemonDescription
.from_json(self
.to_json())
1165 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'DaemonDescription') -> Any
:
1166 return dumper
.represent_dict(cast(Mapping
, data
.to_json().items()))
1169 yaml
.add_representer(DaemonDescription
, DaemonDescription
.yaml_representer
)
1172 class ServiceDescription(object):
1174 For responding to queries about the status of a particular service,
1175 stateful or stateless.
1177 This is not about health or performance monitoring of services: it's
1178 about letting the orchestrator tell Ceph whether and where a
1179 service is scheduled in the cluster. When an orchestrator tells
1180 Ceph "it's running on host123", that's not a promise that the process
1181 is literally up this second, it's a description of where the orchestrator
1182 has decided the service should run.
1187 container_image_id
: Optional
[str] = None,
1188 container_image_name
: Optional
[str] = None,
1189 service_url
: Optional
[str] = None,
1190 last_refresh
: Optional
[datetime
.datetime
] = None,
1191 created
: Optional
[datetime
.datetime
] = None,
1192 deleted
: Optional
[datetime
.datetime
] = None,
1195 events
: Optional
[List
['OrchestratorEvent']] = None,
1196 virtual_ip
: Optional
[str] = None,
1197 ports
: List
[int] = []) -> None:
1198 # Not everyone runs in containers, but enough people do to
1199 # justify having the container_image_id (image hash) and container_image
1201 self
.container_image_id
= container_image_id
# image hash
1202 self
.container_image_name
= container_image_name
# image friendly name
1204 # If the service exposes REST-like API, this attribute should hold
1206 self
.service_url
= service_url
1211 # Number of daemons up
1212 self
.running
= running
1214 # datetime when this info was last refreshed
1215 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
1216 self
.created
: Optional
[datetime
.datetime
] = created
1217 self
.deleted
: Optional
[datetime
.datetime
] = deleted
1219 self
.spec
: ServiceSpec
= spec
1221 self
.events
: List
[OrchestratorEvent
] = events
or []
1223 self
.virtual_ip
= virtual_ip
1226 def service_type(self
) -> str:
1227 return self
.spec
.service_type
1229 def __repr__(self
) -> str:
1230 return f
"<ServiceDescription of {self.spec.one_line_str()}>"
1232 def get_port_summary(self
) -> str:
1235 return f
"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1237 def to_json(self
) -> OrderedDict
:
1238 out
= self
.spec
.to_json()
1240 'container_image_id': self
.container_image_id
,
1241 'container_image_name': self
.container_image_name
,
1242 'service_url': self
.service_url
,
1244 'running': self
.running
,
1245 'last_refresh': self
.last_refresh
,
1246 'created': self
.created
,
1247 'virtual_ip': self
.virtual_ip
,
1248 'ports': self
.ports
if self
.ports
else None,
1250 for k
in ['last_refresh', 'created']:
1251 if getattr(self
, k
):
1252 status
[k
] = datetime_to_str(getattr(self
, k
))
1253 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1254 out
['status'] = status
1256 out
['events'] = [e
.to_json() for e
in self
.events
]
1259 def to_dict(self
) -> OrderedDict
:
1260 out
= self
.spec
.to_json()
1262 'container_image_id': self
.container_image_id
,
1263 'container_image_name': self
.container_image_name
,
1264 'service_url': self
.service_url
,
1266 'running': self
.running
,
1267 'last_refresh': self
.last_refresh
,
1268 'created': self
.created
,
1269 'virtual_ip': self
.virtual_ip
,
1270 'ports': self
.ports
if self
.ports
else None,
1272 for k
in ['last_refresh', 'created']:
1273 if getattr(self
, k
):
1274 status
[k
] = datetime_to_str(getattr(self
, k
))
1275 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1276 out
['status'] = status
1278 out
['events'] = [e
.to_dict() for e
in self
.events
]
1283 def from_json(cls
, data
: dict) -> 'ServiceDescription':
1285 status
= c
.pop('status', {})
1286 event_strs
= c
.pop('events', [])
1287 spec
= ServiceSpec
.from_json(c
)
1289 c_status
= status
.copy()
1290 for k
in ['last_refresh', 'created']:
1292 c_status
[k
] = str_to_datetime(c_status
[k
])
1293 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1294 return cls(spec
=spec
, events
=events
, **c_status
)
1297 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'ServiceDescription') -> Any
:
1298 return dumper
.represent_dict(cast(Mapping
, data
.to_json().items()))
1301 yaml
.add_representer(ServiceDescription
, ServiceDescription
.yaml_representer
)
1304 class InventoryFilter(object):
1306 When fetching inventory, use this filter to avoid unnecessarily
1307 scanning the whole estate.
1311 filter by host when presentig UI workflow for configuring
1312 a particular server.
1313 filter by label when not all of estate is Ceph servers,
1314 and we want to only learn about the Ceph servers.
1315 filter by label when we are interested particularly
1316 in e.g. OSD servers.
1319 def __init__(self
, labels
: Optional
[List
[str]] = None, hosts
: Optional
[List
[str]] = None) -> None:
1321 #: Optional: get info about hosts matching labels
1322 self
.labels
= labels
1324 #: Optional: get info about certain named hosts only
1328 class InventoryHost(object):
1330 When fetching inventory, all Devices are groups inside of an
1334 def __init__(self
, name
: str, devices
: Optional
[inventory
.Devices
] = None, labels
: Optional
[List
[str]] = None, addr
: Optional
[str] = None) -> None:
1336 devices
= inventory
.Devices([])
1339 assert isinstance(devices
, inventory
.Devices
)
1341 self
.name
= name
# unique within cluster. For example a hostname.
1342 self
.addr
= addr
or name
1343 self
.devices
= devices
1344 self
.labels
= labels
1346 def to_json(self
) -> dict:
1350 'devices': self
.devices
.to_json(),
1351 'labels': self
.labels
,
1355 def from_json(cls
, data
: dict) -> 'InventoryHost':
1357 _data
= copy
.deepcopy(data
)
1358 name
= _data
.pop('name')
1359 addr
= _data
.pop('addr', None) or name
1360 devices
= inventory
.Devices
.from_json(_data
.pop('devices'))
1361 labels
= _data
.pop('labels', list())
1363 error_msg
= 'Unknown key(s) in Inventory: {}'.format(','.join(_data
.keys()))
1364 raise OrchestratorValidationError(error_msg
)
1365 return cls(name
, devices
, labels
, addr
)
1366 except KeyError as e
:
1367 error_msg
= '{} is required for {}'.format(e
, cls
.__name
__)
1368 raise OrchestratorValidationError(error_msg
)
1369 except TypeError as e
:
1370 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e
))
1373 def from_nested_items(cls
, hosts
: List
[dict]) -> List
['InventoryHost']:
1374 devs
= inventory
.Devices
.from_json
1375 return [cls(item
[0], devs(item
[1].data
)) for item
in hosts
]
1377 def __repr__(self
) -> str:
1378 return "<InventoryHost>({name})".format(name
=self
.name
)
1381 def get_host_names(hosts
: List
['InventoryHost']) -> List
[str]:
1382 return [host
.name
for host
in hosts
]
1384 def __eq__(self
, other
: Any
) -> bool:
1385 return self
.name
== other
.name
and self
.devices
== other
.devices
1388 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1390 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1393 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1395 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1396 See ``ceph osd metadata | jq '.[].device_ids'``
1401 class OrchestratorEvent
:
1403 Similar to K8s Events.
1405 Some form of "important" log message attached to something.
1409 regex_v1
= re
.compile(r
'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re
.MULTILINE
)
1411 def __init__(self
, created
: Union
[str, datetime
.datetime
], kind
: str,
1412 subject
: str, level
: str, message
: str) -> None:
1413 if isinstance(created
, str):
1414 created
= str_to_datetime(created
)
1415 self
.created
: datetime
.datetime
= created
1417 assert kind
in "service daemon".split()
1418 self
.kind
: str = kind
1420 # service name, or daemon danem or something
1421 self
.subject
: str = subject
1423 # Events are not meant for debugging. debugs should end in the log.
1424 assert level
in "INFO ERROR".split()
1427 self
.message
: str = message
1429 __slots__
= ('created', 'kind', 'subject', 'level', 'message')
1431 def kind_subject(self
) -> str:
1432 return f
'{self.kind}:{self.subject}'
1434 def to_json(self
) -> str:
1435 # Make a long list of events readable.
1436 created
= datetime_to_str(self
.created
)
1437 return f
'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1439 def to_dict(self
) -> dict:
1440 # Convert events data to dict.
1442 'created': datetime_to_str(self
.created
),
1443 'subject': self
.kind_subject(),
1444 'level': self
.level
,
1445 'message': self
.message
1450 def from_json(cls
, data
: str) -> "OrchestratorEvent":
1452 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1453 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1458 match
= cls
.regex_v1
.match(data
)
1460 return cls(*match
.groups())
1461 raise ValueError(f
'Unable to match: "{data}"')
1463 def __eq__(self
, other
: Any
) -> bool:
1464 if not isinstance(other
, OrchestratorEvent
):
1467 return self
.created
== other
.created
and self
.kind
== other
.kind \
1468 and self
.subject
== other
.subject
and self
.message
== other
.message
1470 def __repr__(self
) -> str:
1471 return f
'OrchestratorEvent.from_json({self.to_json()!r})'
1474 def _mk_orch_methods(cls
: Any
) -> Any
:
1475 # Needs to be defined outside of for.
1476 # Otherwise meth is always bound to last key
1477 def shim(method_name
: str) -> Callable
:
1478 def inner(self
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
1479 completion
= self
._oremote
(method_name
, args
, kwargs
)
1483 for name
, method
in Orchestrator
.__dict
__.items():
1484 if not name
.startswith('_') and name
not in ['is_orchestrator_module']:
1485 remote_call
= update_wrapper(shim(name
), method
)
1486 setattr(cls
, name
, remote_call
)
1491 class OrchestratorClientMixin(Orchestrator
):
1493 A module that inherents from `OrchestratorClientMixin` can directly call
1494 all :class:`Orchestrator` methods without manually calling remote.
1496 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1497 calls :func:`OrchestratorClientMixin._oremote`
1499 >>> class MyModule(OrchestratorClientMixin):
1501 ... completion = self.add_host('somehost') # calls `_oremote()`
1502 ... self.log.debug(completion.result)
1504 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1505 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1506 "real" implementation of the orchestrator.
1509 >>> import mgr_module
1511 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1512 ... def __init__(self, ...):
1513 ... self.orch_client = OrchestratorClientMixin()
1514 ... self.orch_client.set_mgr(self.mgr))
1517 def set_mgr(self
, mgr
: MgrModule
) -> None:
1519 Useable in the Dashbord that uses a global ``mgr``
1522 self
.__mgr
= mgr
# Make sure we're not overwriting any other `mgr` properties
1524 def __get_mgr(self
) -> Any
:
1527 except AttributeError:
1530 def _oremote(self
, meth
: Any
, args
: Any
, kwargs
: Any
) -> Any
:
1532 Helper for invoking `remote` on whichever orchestrator is enabled
1534 :raises RuntimeError: If the remote method failed.
1535 :raises OrchestratorError: orchestrator failed to perform
1536 :raises ImportError: no `orchestrator` module or backend not found.
1538 mgr
= self
.__get
_mgr
()
1541 o
= mgr
._select
_orchestrator
()
1542 except AttributeError:
1543 o
= mgr
.remote('orchestrator', '_select_orchestrator')
1546 raise NoOrchestrator()
1548 mgr
.log
.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr
.module_name
, o
, meth
, args
, kwargs
))
1550 return mgr
.remote(o
, meth
, *args
, **kwargs
)
1551 except Exception as e
:
1552 if meth
== 'get_feature_set':
1553 raise # self.get_feature_set() calls self._oremote()
1554 f_set
= self
.get_feature_set()
1555 if meth
not in f_set
or not f_set
[meth
]['available']:
1556 raise NotImplementedError(f
'{o} does not implement {meth}') from e