]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
3 ceph-mgr orchestrator interface
5 Please see the ceph-mgr module developer's guide for more information.
16 from collections
import namedtuple
, OrderedDict
17 from contextlib
import contextmanager
18 from functools
import wraps
, reduce, update_wrapper
20 from typing
import TypeVar
, Generic
, List
, Optional
, Union
, Tuple
, Iterator
, Callable
, Any
, \
21 Sequence
, Dict
, cast
, Mapping
24 from typing
import Protocol
# Protocol was added in Python 3.8
26 class Protocol
: # type: ignore
32 from ceph
.deployment
import inventory
33 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, \
34 IscsiServiceSpec
, IngressSpec
, SNMPGatewaySpec
, MDSSpec
, TunedProfileSpec
35 from ceph
.deployment
.drive_group
import DriveGroupSpec
36 from ceph
.deployment
.hostspec
import HostSpec
, SpecValidationError
37 from ceph
.utils
import datetime_to_str
, str_to_datetime
39 from mgr_module
import MgrModule
, CLICommand
, HandleCommandResult
42 logger
= logging
.getLogger(__name__
)
45 FuncT
= TypeVar('FuncT', bound
=Callable
[..., Any
])
48 class OrchestratorError(Exception):
50 General orchestrator specific error.
52 Used for deployment, configuration or user errors.
54 It's not intended for programming errors or orchestrator internal errors.
59 errno
: int = -errno
.EINVAL
,
60 event_kind_subject
: Optional
[Tuple
[str, str]] = None) -> None:
61 super(Exception, self
).__init
__(msg
)
63 # See OrchestratorEvent.subject
64 self
.event_subject
= event_kind_subject
67 class NoOrchestrator(OrchestratorError
):
69 No orchestrator in configured.
72 def __init__(self
, msg
: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator
, self
).__init
__(msg
, errno
=-errno
.ENOENT
)
76 class OrchestratorValidationError(OrchestratorError
):
78 Raised when an orchestrator doesn't support a specific feature.
83 def set_exception_subject(kind
: str, subject
: str, overwrite
: bool = False) -> Iterator
[None]:
86 except OrchestratorError
as e
:
87 if overwrite
or hasattr(e
, 'event_subject'):
88 e
.event_subject
= (kind
, subject
)
92 def handle_exception(prefix
: str, perm
: str, func
: FuncT
) -> FuncT
:
94 def wrapper(*args
: Any
, **kwargs
: Any
) -> Any
:
96 return func(*args
, **kwargs
)
97 except (OrchestratorError
, SpecValidationError
) as e
:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e
.errno
, stderr
=str(e
))
100 except ImportError as e
:
101 return HandleCommandResult(-errno
.ENOENT
, stderr
=str(e
))
102 except NotImplementedError:
103 msg
= 'This Orchestrator does not support `{}`'.format(prefix
)
104 return HandleCommandResult(-errno
.ENOENT
, stderr
=msg
)
106 # misuse lambda to copy `wrapper`
107 wrapper_copy
= lambda *l_args
, **l_kwargs
: wrapper(*l_args
, **l_kwargs
) # noqa: E731
108 wrapper_copy
._prefix
= prefix
# type: ignore
109 wrapper_copy
._cli
_command
= CLICommand(prefix
, perm
) # type: ignore
110 wrapper_copy
._cli
_command
.store_func_metadata(func
) # type: ignore
111 wrapper_copy
._cli
_command
.func
= wrapper_copy
# type: ignore
113 return cast(FuncT
, wrapper_copy
)
116 def handle_orch_error(f
: Callable
[..., T
]) -> Callable
[..., 'OrchResult[T]']:
118 Decorator to make Orchestrator methods return
123 def wrapper(*args
: Any
, **kwargs
: Any
) -> OrchResult
[T
]:
125 return OrchResult(f(*args
, **kwargs
))
126 except Exception as e
:
129 if 'UNITTEST' in os
.environ
:
130 raise # This makes debugging of Tracebacks from unittests a bit easier
131 return OrchResult(None, exception
=e
)
133 return cast(Callable
[..., OrchResult
[T
]], wrapper
)
136 class InnerCliCommandCallable(Protocol
):
137 def __call__(self
, prefix
: str) -> Callable
[[FuncT
], FuncT
]:
141 def _cli_command(perm
: str) -> InnerCliCommandCallable
:
142 def inner_cli_command(prefix
: str) -> Callable
[[FuncT
], FuncT
]:
143 return lambda func
: handle_exception(prefix
, perm
, func
)
144 return inner_cli_command
147 _cli_read_command
= _cli_command('r')
148 _cli_write_command
= _cli_command('rw')
151 class CLICommandMeta(type):
153 This is a workaround for the use of a global variable CLICommand.COMMANDS which
154 prevents modules from importing any other module.
156 We make use of CLICommand, except for the use of the global variable.
158 def __init__(cls
, name
: str, bases
: Any
, dct
: Any
) -> None:
159 super(CLICommandMeta
, cls
).__init
__(name
, bases
, dct
)
160 dispatch
: Dict
[str, CLICommand
] = {}
161 for v
in dct
.values():
163 dispatch
[v
._prefix
] = v
._cli
_command
164 except AttributeError:
167 def handle_command(self
: Any
, inbuf
: Optional
[str], cmd
: dict) -> Any
:
168 if cmd
['prefix'] not in dispatch
:
169 return self
.handle_command(inbuf
, cmd
)
171 return dispatch
[cmd
['prefix']].call(self
, cmd
, inbuf
)
173 cls
.COMMANDS
= [cmd
.dump_cmd() for cmd
in dispatch
.values()]
174 cls
.handle_command
= handle_command
177 class OrchResult(Generic
[T
]):
179 Stores a result and an exception. Mainly to circumvent the
180 MgrModule.remote() method that hides all exceptions and for
181 handling different sub-interpreters.
184 def __init__(self
, result
: Optional
[T
], exception
: Optional
[Exception] = None) -> None:
186 self
.serialized_exception
: Optional
[bytes
] = None
187 self
.exception_str
: str = ''
188 self
.set_exception(exception
)
190 __slots__
= 'result', 'serialized_exception', 'exception_str'
192 def set_exception(self
, e
: Optional
[Exception]) -> None:
194 self
.serialized_exception
= None
195 self
.exception_str
= ''
198 self
.exception_str
= f
'{type(e)}: {str(e)}'
200 self
.serialized_exception
= pickle
.dumps(e
)
201 except pickle
.PicklingError
:
202 logger
.error(f
"failed to pickle {e}")
203 if isinstance(e
, Exception):
204 e
= Exception(*e
.args
)
206 e
= Exception(str(e
))
207 # degenerate to a plain Exception
208 self
.serialized_exception
= pickle
.dumps(e
)
210 def result_str(self
) -> str:
211 """Force a string."""
212 if self
.result
is None:
214 if isinstance(self
.result
, list):
215 return '\n'.join(str(x
) for x
in self
.result
)
216 return str(self
.result
)
219 def raise_if_exception(c
: OrchResult
[T
]) -> T
:
221 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
223 if c
.serialized_exception
is not None:
225 e
= pickle
.loads(c
.serialized_exception
)
226 except (KeyError, AttributeError):
227 raise Exception(c
.exception_str
)
229 assert c
.result
is not None, 'OrchResult should either have an exception or a result'
233 def _hide_in_features(f
: FuncT
) -> FuncT
:
234 f
._hide
_in
_features
= True # type: ignore
238 class Orchestrator(object):
240 Calls in this class may do long running remote operations, with time
241 periods ranging from network latencies to package install latencies and large
242 internet downloads. For that reason, all are asynchronous, and return
243 ``Completion`` objects.
245 Methods should only return the completion and not directly execute
246 anything, like network calls. Otherwise the purpose of
247 those completions is defeated.
249 Implementations are not required to start work on an operation until
250 the caller waits on the relevant Completion objects. Callers making
251 multiple updates should not wait on Completions until they're done
252 sending operations: this enables implementations to batch up a series
253 of updates when wait() is called on a set of Completion objects.
255 Implementations are encouraged to keep reasonably fresh caches of
256 the status of the system: it is better to serve a stale-but-recent
257 result read of e.g. device inventory than it is to keep the caller waiting
258 while you scan hosts every time.
262 def is_orchestrator_module(self
) -> bool:
264 Enable other modules to interrogate this module to discover
265 whether it's usable as an orchestrator module.
267 Subclasses do not need to override this.
272 def available(self
) -> Tuple
[bool, str, Dict
[str, Any
]]:
274 Report whether we can talk to the orchestrator. This is the
275 place to give the user a meaningful message if the orchestrator
276 isn't running or can't be contacted.
278 This method may be called frequently (e.g. every page load
279 to conditionally display a warning banner), so make sure it's
280 not too expensive. It's okay to give a slightly stale status
281 (e.g. based on a periodic background ping of the orchestrator)
282 if that's necessary to make this method fast.
285 `True` doesn't mean that the desired functionality
286 is actually available in the orchestrator. I.e. this
287 won't work as expected::
290 ... if OrchestratorClientMixin().available()[0]: # wrong.
291 ... OrchestratorClientMixin().get_hosts()
293 :return: boolean representing whether the module is available/usable
294 :return: string describing any error
295 :return: dict containing any module specific information
297 raise NotImplementedError()
300 def get_feature_set(self
) -> Dict
[str, dict]:
301 """Describes which methods this orchestrator implements
304 `True` doesn't mean that the desired functionality
305 is actually possible in the orchestrator. I.e. this
306 won't work as expected::
309 ... api = OrchestratorClientMixin()
310 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
313 It's better to ask for forgiveness instead::
317 ... OrchestratorClientMixin().get_hosts()
318 ... except (OrchestratorError, NotImplementedError):
321 :returns: Dict of API method names to ``{'available': True or False}``
323 module
= self
.__class
__
324 features
= {a
: {'available': getattr(Orchestrator
, a
, None) != getattr(module
, a
)}
325 for a
in Orchestrator
.__dict
__
326 if not a
.startswith('_') and not getattr(getattr(Orchestrator
, a
), '_hide_in_features', False)
330 def cancel_completions(self
) -> None:
332 Cancels ongoing completions. Unstuck the mgr.
334 raise NotImplementedError()
336 def pause(self
) -> None:
337 raise NotImplementedError()
339 def resume(self
) -> None:
340 raise NotImplementedError()
342 def add_host(self
, host_spec
: HostSpec
) -> OrchResult
[str]:
344 Add a host to the orchestrator inventory.
346 :param host: hostname
348 raise NotImplementedError()
350 def remove_host(self
, host
: str, force
: bool, offline
: bool) -> OrchResult
[str]:
352 Remove a host from the orchestrator inventory.
354 :param host: hostname
356 raise NotImplementedError()
358 def drain_host(self
, hostname
: str, force
: bool = False) -> OrchResult
[str]:
360 drain all daemons from a host
362 :param hostname: hostname
364 raise NotImplementedError()
366 def update_host_addr(self
, host
: str, addr
: str) -> OrchResult
[str]:
368 Update a host's address
370 :param host: hostname
371 :param addr: address (dns name or IP)
373 raise NotImplementedError()
375 def get_hosts(self
) -> OrchResult
[List
[HostSpec
]]:
377 Report the hosts in the cluster.
379 :return: list of HostSpec
381 raise NotImplementedError()
383 def get_facts(self
, hostname
: Optional
[str] = None) -> OrchResult
[List
[Dict
[str, Any
]]]:
385 Return hosts metadata(gather_facts).
387 raise NotImplementedError()
389 def add_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
393 raise NotImplementedError()
395 def remove_host_label(self
, host
: str, label
: str, force
: bool = False) -> OrchResult
[str]:
399 raise NotImplementedError()
401 def host_ok_to_stop(self
, hostname
: str) -> OrchResult
:
403 Check if the specified host can be safely stopped without reducing availability
405 :param host: hostname
407 raise NotImplementedError()
409 def enter_host_maintenance(self
, hostname
: str, force
: bool = False) -> OrchResult
:
411 Place a host in maintenance, stopping daemons and disabling it's systemd target
413 raise NotImplementedError()
415 def exit_host_maintenance(self
, hostname
: str) -> OrchResult
:
417 Return a host from maintenance, restarting the clusters systemd target
419 raise NotImplementedError()
421 def rescan_host(self
, hostname
: str) -> OrchResult
:
422 """Use cephadm to issue a disk rescan on each HBA
424 Some HBAs and external enclosures don't automatically register
425 device insertion with the kernel, so for these scenarios we need
428 :param hostname: (str) host name
430 raise NotImplementedError()
432 def get_inventory(self
, host_filter
: Optional
['InventoryFilter'] = None, refresh
: bool = False) -> OrchResult
[List
['InventoryHost']]:
434 Returns something that was created by `ceph-volume inventory`.
436 :return: list of InventoryHost
438 raise NotImplementedError()
440 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['ServiceDescription']]:
442 Describe a service (of any kind) that is already configured in
443 the orchestrator. For example, when viewing an OSD in the dashboard
444 we might like to also display information about the orchestrator's
445 view of the service (like the kubernetes pod ID).
447 When viewing a CephFS filesystem in the dashboard, we would use this
448 to display the pods being currently run for MDS daemons.
450 :return: list of ServiceDescription objects.
452 raise NotImplementedError()
454 def list_daemons(self
, service_name
: Optional
[str] = None, daemon_type
: Optional
[str] = None, daemon_id
: Optional
[str] = None, host
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['DaemonDescription']]:
456 Describe a daemon (of any kind) that is already configured in
459 :return: list of DaemonDescription objects.
461 raise NotImplementedError()
464 def apply(self
, specs
: Sequence
["GenericSpec"], no_overwrite
: bool = False) -> List
[str]:
468 fns
: Dict
[str, Callable
[..., OrchResult
[str]]] = {
469 'alertmanager': self
.apply_alertmanager
,
470 'crash': self
.apply_crash
,
471 'grafana': self
.apply_grafana
,
472 'iscsi': self
.apply_iscsi
,
473 'mds': self
.apply_mds
,
474 'mgr': self
.apply_mgr
,
475 'mon': self
.apply_mon
,
476 'nfs': self
.apply_nfs
,
477 'node-exporter': self
.apply_node_exporter
,
478 'ceph-exporter': self
.apply_ceph_exporter
,
479 'osd': lambda dg
: self
.apply_drivegroups([dg
]), # type: ignore
480 'prometheus': self
.apply_prometheus
,
481 'loki': self
.apply_loki
,
482 'promtail': self
.apply_promtail
,
483 'rbd-mirror': self
.apply_rbd_mirror
,
484 'rgw': self
.apply_rgw
,
485 'ingress': self
.apply_ingress
,
486 'snmp-gateway': self
.apply_snmp_gateway
,
487 'host': self
.add_host
,
490 def merge(l
: OrchResult
[List
[str]], r
: OrchResult
[str]) -> OrchResult
[List
[str]]: # noqa: E741
491 l_res
= raise_if_exception(l
)
492 r_res
= raise_if_exception(r
)
494 return OrchResult(l_res
)
495 return raise_if_exception(reduce(merge
, [fns
[spec
.service_type
](spec
) for spec
in specs
], OrchResult([])))
497 def plan(self
, spec
: Sequence
["GenericSpec"]) -> OrchResult
[List
]:
499 Plan (Dry-run, Preview) a List of Specs.
501 raise NotImplementedError()
503 def remove_daemons(self
, names
: List
[str]) -> OrchResult
[List
[str]]:
505 Remove specific daemon(s).
509 raise NotImplementedError()
511 def remove_service(self
, service_name
: str, force
: bool = False) -> OrchResult
[str]:
513 Remove a service (a collection of daemons).
517 raise NotImplementedError()
519 def service_action(self
, action
: str, service_name
: str) -> OrchResult
[List
[str]]:
521 Perform an action (start/stop/reload) on a service (i.e., all daemons
522 providing the logical service).
524 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
525 :param service_name: service_type + '.' + service_id
526 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
529 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
530 raise NotImplementedError()
532 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> OrchResult
[str]:
534 Perform an action (start/stop/reload) on a daemon.
536 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
537 :param daemon_name: name of daemon
538 :param image: Container image when redeploying that daemon
541 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
542 raise NotImplementedError()
544 def create_osds(self
, drive_group
: DriveGroupSpec
) -> OrchResult
[str]:
546 Create one or more OSDs within a single Drive Group.
548 The principal argument here is the drive_group member
549 of OsdSpec: other fields are advisory/extensible for any
550 finer-grained OSD feature enablement (choice of backing store,
551 compression/encryption, etc).
553 raise NotImplementedError()
555 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> OrchResult
[List
[str]]:
556 """ Update OSD cluster """
557 raise NotImplementedError()
559 def set_unmanaged_flag(self
,
560 unmanaged_flag
: bool,
561 service_type
: str = 'osd',
562 service_name
: Optional
[str] = None
563 ) -> HandleCommandResult
:
564 raise NotImplementedError()
566 def preview_osdspecs(self
,
567 osdspec_name
: Optional
[str] = 'osd',
568 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
569 ) -> OrchResult
[str]:
570 """ Get a preview for OSD deployments """
571 raise NotImplementedError()
573 def remove_osds(self
, osd_ids
: List
[str],
574 replace
: bool = False,
576 zap
: bool = False) -> OrchResult
[str]:
578 :param osd_ids: list of OSD IDs
579 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
580 :param force: Forces the OSD removal process without waiting for the data to be drained first.
581 :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
584 .. note:: this can only remove OSDs that were successfully
585 created (i.e. got an OSD ID).
587 raise NotImplementedError()
589 def stop_remove_osds(self
, osd_ids
: List
[str]) -> OrchResult
:
593 raise NotImplementedError()
595 def remove_osds_status(self
) -> OrchResult
:
597 Returns a status of the ongoing OSD removal operations.
599 raise NotImplementedError()
601 def blink_device_light(self
, ident_fault
: str, on
: bool, locations
: List
['DeviceLightLoc']) -> OrchResult
[List
[str]]:
603 Instructs the orchestrator to enable or disable either the ident or the fault LED.
605 :param ident_fault: either ``"ident"`` or ``"fault"``
606 :param on: ``True`` = on.
607 :param locations: See :class:`orchestrator.DeviceLightLoc`
609 raise NotImplementedError()
611 def zap_device(self
, host
: str, path
: str) -> OrchResult
[str]:
612 """Zap/Erase a device (DESTROYS DATA)"""
613 raise NotImplementedError()
615 def add_daemon(self
, spec
: ServiceSpec
) -> OrchResult
[List
[str]]:
616 """Create daemons daemon(s) for unmanaged services"""
617 raise NotImplementedError()
619 def apply_mon(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
620 """Update mon cluster"""
621 raise NotImplementedError()
623 def apply_mgr(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
624 """Update mgr cluster"""
625 raise NotImplementedError()
627 def apply_mds(self
, spec
: MDSSpec
) -> OrchResult
[str]:
628 """Update MDS cluster"""
629 raise NotImplementedError()
631 def apply_rgw(self
, spec
: RGWSpec
) -> OrchResult
[str]:
632 """Update RGW cluster"""
633 raise NotImplementedError()
635 def apply_ingress(self
, spec
: IngressSpec
) -> OrchResult
[str]:
636 """Update ingress daemons"""
637 raise NotImplementedError()
639 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
640 """Update rbd-mirror cluster"""
641 raise NotImplementedError()
643 def apply_nfs(self
, spec
: NFSServiceSpec
) -> OrchResult
[str]:
644 """Update NFS cluster"""
645 raise NotImplementedError()
647 def apply_iscsi(self
, spec
: IscsiServiceSpec
) -> OrchResult
[str]:
648 """Update iscsi cluster"""
649 raise NotImplementedError()
651 def apply_prometheus(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
652 """Update prometheus cluster"""
653 raise NotImplementedError()
655 def apply_node_exporter(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
656 """Update existing a Node-Exporter daemon(s)"""
657 raise NotImplementedError()
659 def apply_ceph_exporter(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
660 """Update existing a ceph exporter daemon(s)"""
661 raise NotImplementedError()
663 def apply_loki(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
664 """Update existing a Loki daemon(s)"""
665 raise NotImplementedError()
667 def apply_promtail(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
668 """Update existing a Promtail daemon(s)"""
669 raise NotImplementedError()
671 def apply_crash(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
672 """Update existing a crash daemon(s)"""
673 raise NotImplementedError()
675 def apply_grafana(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
676 """Update existing a grafana service"""
677 raise NotImplementedError()
679 def apply_alertmanager(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
680 """Update an existing AlertManager daemon(s)"""
681 raise NotImplementedError()
683 def apply_snmp_gateway(self
, spec
: SNMPGatewaySpec
) -> OrchResult
[str]:
684 """Update an existing snmp gateway service"""
685 raise NotImplementedError()
687 def apply_tuned_profiles(self
, specs
: List
[TunedProfileSpec
], no_overwrite
: bool) -> OrchResult
[str]:
688 """Add or update an existing tuned profile"""
689 raise NotImplementedError()
691 def rm_tuned_profile(self
, profile_name
: str) -> OrchResult
[str]:
692 """Remove a tuned profile"""
693 raise NotImplementedError()
695 def tuned_profile_ls(self
) -> OrchResult
[List
[TunedProfileSpec
]]:
696 """See current tuned profiles"""
697 raise NotImplementedError()
699 def tuned_profile_add_setting(self
, profile_name
: str, setting
: str, value
: str) -> OrchResult
[str]:
700 """Change/Add a specific setting for a tuned profile"""
701 raise NotImplementedError()
703 def tuned_profile_rm_setting(self
, profile_name
: str, setting
: str) -> OrchResult
[str]:
704 """Remove a specific setting for a tuned profile"""
705 raise NotImplementedError()
707 def upgrade_check(self
, image
: Optional
[str], version
: Optional
[str]) -> OrchResult
[str]:
708 raise NotImplementedError()
710 def upgrade_ls(self
, image
: Optional
[str], tags
: bool, show_all_versions
: Optional
[bool] = False) -> OrchResult
[Dict
[Any
, Any
]]:
711 raise NotImplementedError()
713 def upgrade_start(self
, image
: Optional
[str], version
: Optional
[str], daemon_types
: Optional
[List
[str]],
714 hosts
: Optional
[str], services
: Optional
[List
[str]], limit
: Optional
[int]) -> OrchResult
[str]:
715 raise NotImplementedError()
717 def upgrade_pause(self
) -> OrchResult
[str]:
718 raise NotImplementedError()
720 def upgrade_resume(self
) -> OrchResult
[str]:
721 raise NotImplementedError()
723 def upgrade_stop(self
) -> OrchResult
[str]:
724 raise NotImplementedError()
726 def upgrade_status(self
) -> OrchResult
['UpgradeStatusSpec']:
728 If an upgrade is currently underway, report on where
729 we are in the process, or if some error has occurred.
731 :return: UpgradeStatusSpec instance
733 raise NotImplementedError()
736 def upgrade_available(self
) -> OrchResult
:
738 Report on what versions are available to upgrade to
740 :return: List of strings
742 raise NotImplementedError()
745 GenericSpec
= Union
[ServiceSpec
, HostSpec
]
748 def json_to_generic_spec(spec
: dict) -> GenericSpec
:
749 if 'service_type' in spec
and spec
['service_type'] == 'host':
750 return HostSpec
.from_json(spec
)
752 return ServiceSpec
.from_json(spec
)
755 def daemon_type_to_service(dtype
: str) -> str:
762 'haproxy': 'ingress',
763 'keepalived': 'ingress',
765 'rbd-mirror': 'rbd-mirror',
766 'cephfs-mirror': 'cephfs-mirror',
768 'grafana': 'grafana',
769 'alertmanager': 'alertmanager',
770 'prometheus': 'prometheus',
771 'node-exporter': 'node-exporter',
772 'ceph-exporter': 'ceph-exporter',
774 'promtail': 'promtail',
776 'crashcollector': 'crash', # Specific Rook Daemon
777 'container': 'container',
779 'snmp-gateway': 'snmp-gateway',
781 return mapping
[dtype
]
784 def service_to_daemon_types(stype
: str) -> List
[str]:
791 'ingress': ['haproxy', 'keepalived'],
793 'rbd-mirror': ['rbd-mirror'],
794 'cephfs-mirror': ['cephfs-mirror'],
796 'grafana': ['grafana'],
797 'alertmanager': ['alertmanager'],
798 'prometheus': ['prometheus'],
800 'promtail': ['promtail'],
801 'node-exporter': ['node-exporter'],
802 'ceph-exporter': ['ceph-exporter'],
804 'container': ['container'],
806 'snmp-gateway': ['snmp-gateway'],
808 return mapping
[stype
]
811 KNOWN_DAEMON_TYPES
: List
[str] = list(
812 sum((service_to_daemon_types(t
) for t
in ServiceSpec
.KNOWN_SERVICE_TYPES
), []))
815 class UpgradeStatusSpec(object):
816 # Orchestrator's report on what's going on with any ongoing upgrade
817 def __init__(self
) -> None:
818 self
.in_progress
= False # Is an upgrade underway?
819 self
.target_image
: Optional
[str] = None
820 self
.services_complete
: List
[str] = [] # Which daemon types are fully updated?
821 self
.which
: str = '<unknown>' # for if user specified daemon types, services or hosts
822 self
.progress
: Optional
[str] = None # How many of the daemons have we upgraded
823 self
.message
= "" # Freeform description
824 self
.is_paused
: bool = False # Is the upgrade paused?
827 def handle_type_error(method
: FuncT
) -> FuncT
:
829 def inner(cls
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
831 return method(cls
, *args
, **kwargs
)
832 except TypeError as e
:
833 error_msg
= '{}: {}'.format(cls
.__name
__, e
)
834 raise OrchestratorValidationError(error_msg
)
835 return cast(FuncT
, inner
)
838 class DaemonDescriptionStatus(enum
.IntEnum
):
843 starting
= 2 #: Daemon is deployed, but not yet running
846 def to_str(status
: Optional
['DaemonDescriptionStatus']) -> str:
848 status
= DaemonDescriptionStatus
.unknown
850 DaemonDescriptionStatus
.unknown
: 'unknown',
851 DaemonDescriptionStatus
.error
: 'error',
852 DaemonDescriptionStatus
.stopped
: 'stopped',
853 DaemonDescriptionStatus
.running
: 'running',
854 DaemonDescriptionStatus
.starting
: 'starting',
855 }.get(status
, '<unknown>')
858 class DaemonDescription(object):
860 For responding to queries about the status of a particular daemon,
861 stateful or stateless.
863 This is not about health or performance monitoring of daemons: it's
864 about letting the orchestrator tell Ceph whether and where a
865 daemon is scheduled in the cluster. When an orchestrator tells
866 Ceph "it's running on host123", that's not a promise that the process
867 is literally up this second, it's a description of where the orchestrator
868 has decided the daemon should run.
872 daemon_type
: Optional
[str] = None,
873 daemon_id
: Optional
[str] = None,
874 hostname
: Optional
[str] = None,
875 container_id
: Optional
[str] = None,
876 container_image_id
: Optional
[str] = None,
877 container_image_name
: Optional
[str] = None,
878 container_image_digests
: Optional
[List
[str]] = None,
879 version
: Optional
[str] = None,
880 status
: Optional
[DaemonDescriptionStatus
] = None,
881 status_desc
: Optional
[str] = None,
882 last_refresh
: Optional
[datetime
.datetime
] = None,
883 created
: Optional
[datetime
.datetime
] = None,
884 started
: Optional
[datetime
.datetime
] = None,
885 last_configured
: Optional
[datetime
.datetime
] = None,
886 osdspec_affinity
: Optional
[str] = None,
887 last_deployed
: Optional
[datetime
.datetime
] = None,
888 events
: Optional
[List
['OrchestratorEvent']] = None,
889 is_active
: bool = False,
890 memory_usage
: Optional
[int] = None,
891 memory_request
: Optional
[int] = None,
892 memory_limit
: Optional
[int] = None,
893 cpu_percentage
: Optional
[str] = None,
894 service_name
: Optional
[str] = None,
895 ports
: Optional
[List
[int]] = None,
896 ip
: Optional
[str] = None,
897 deployed_by
: Optional
[List
[str]] = None,
898 rank
: Optional
[int] = None,
899 rank_generation
: Optional
[int] = None,
900 extra_container_args
: Optional
[List
[str]] = None,
901 extra_entrypoint_args
: Optional
[List
[str]] = None,
904 #: Host is at the same granularity as InventoryHost
905 self
.hostname
: Optional
[str] = hostname
907 # Not everyone runs in containers, but enough people do to
908 # justify having the container_id (runtime id) and container_image
910 self
.container_id
= container_id
# runtime id
911 self
.container_image_id
= container_image_id
# image id locally
912 self
.container_image_name
= container_image_name
# image friendly name
913 self
.container_image_digests
= container_image_digests
# reg hashes
915 #: The type of service (osd, mon, mgr, etc.)
916 self
.daemon_type
= daemon_type
918 #: The orchestrator will have picked some names for daemons,
919 #: typically either based on hostnames or on pod names.
920 #: This is the <foo> in mds.<foo>, the ID that will appear
921 #: in the FSMap/ServiceMap.
922 self
.daemon_id
: Optional
[str] = daemon_id
923 self
.daemon_name
= self
.name()
925 #: Some daemon types have a numeric rank assigned
926 self
.rank
: Optional
[int] = rank
927 self
.rank_generation
: Optional
[int] = rank_generation
929 self
._service
_name
: Optional
[str] = service_name
931 #: Service version that was deployed
932 self
.version
= version
934 # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
935 self
._status
= status
937 #: Service status description when status == error.
938 self
.status_desc
= status_desc
940 #: datetime when this info was last refreshed
941 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
943 self
.created
: Optional
[datetime
.datetime
] = created
944 self
.started
: Optional
[datetime
.datetime
] = started
945 self
.last_configured
: Optional
[datetime
.datetime
] = last_configured
946 self
.last_deployed
: Optional
[datetime
.datetime
] = last_deployed
948 #: Affinity to a certain OSDSpec
949 self
.osdspec_affinity
: Optional
[str] = osdspec_affinity
951 self
.events
: List
[OrchestratorEvent
] = events
or []
953 self
.memory_usage
: Optional
[int] = memory_usage
954 self
.memory_request
: Optional
[int] = memory_request
955 self
.memory_limit
: Optional
[int] = memory_limit
957 self
.cpu_percentage
: Optional
[str] = cpu_percentage
959 self
.ports
: Optional
[List
[int]] = ports
960 self
.ip
: Optional
[str] = ip
962 self
.deployed_by
= deployed_by
964 self
.is_active
= is_active
966 self
.extra_container_args
= extra_container_args
967 self
.extra_entrypoint_args
= extra_entrypoint_args
970 def status(self
) -> Optional
[DaemonDescriptionStatus
]:
974 def status(self
, new
: DaemonDescriptionStatus
) -> None:
976 self
.status_desc
= DaemonDescriptionStatus
.to_str(new
)
978 def get_port_summary(self
) -> str:
981 return f
"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
983 def name(self
) -> str:
984 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
986 def matches_service(self
, service_name
: Optional
[str]) -> bool:
987 assert self
.daemon_id
is not None
988 assert self
.daemon_type
is not None
990 return (daemon_type_to_service(self
.daemon_type
) + '.' + self
.daemon_id
).startswith(service_name
+ '.')
993 def service_id(self
) -> str:
994 assert self
.daemon_id
is not None
995 assert self
.daemon_type
is not None
997 if self
._service
_name
:
998 if '.' in self
._service
_name
:
999 return self
._service
_name
.split('.', 1)[1]
1003 if self
.daemon_type
== 'osd':
1004 if self
.osdspec_affinity
and self
.osdspec_affinity
!= 'None':
1005 return self
.osdspec_affinity
1008 def _match() -> str:
1009 assert self
.daemon_id
is not None
1010 err
= OrchestratorError("DaemonDescription: Cannot calculate service_id: "
1011 f
"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
1013 if not self
.hostname
:
1014 # TODO: can a DaemonDescription exist without a hostname?
1017 # use the bare hostname, not the FQDN.
1018 host
= self
.hostname
.split('.')[0]
1020 if host
== self
.daemon_id
:
1021 # daemon_id == "host"
1022 return self
.daemon_id
1024 elif host
in self
.daemon_id
:
1025 # daemon_id == "service_id.host"
1026 # daemon_id == "service_id.host.random"
1027 pre
, post
= self
.daemon_id
.rsplit(host
, 1)
1028 if not pre
.endswith('.'):
1029 # '.' sep missing at front of host
1031 elif post
and not post
.startswith('.'):
1032 # '.' sep missing at end of host
1036 # daemon_id == "service_id.random"
1037 if self
.daemon_type
== 'rgw':
1038 v
= self
.daemon_id
.split('.')
1039 if len(v
) in [3, 4]:
1040 return '.'.join(v
[0:2])
1042 if self
.daemon_type
== 'iscsi':
1043 v
= self
.daemon_id
.split('.')
1044 return '.'.join(v
[0:-1])
1046 # daemon_id == "service_id"
1047 return self
.daemon_id
1049 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
1052 return self
.daemon_id
1054 def service_name(self
) -> str:
1055 if self
._service
_name
:
1056 return self
._service
_name
1057 assert self
.daemon_type
is not None
1058 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
1059 return f
'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
1060 return daemon_type_to_service(self
.daemon_type
)
1062 def __repr__(self
) -> str:
1063 return "<DaemonDescription>({type}.{id})".format(type=self
.daemon_type
,
1066 def __str__(self
) -> str:
1067 return f
"{self.name()} in status {self.status_desc} on {self.hostname}"
1069 def to_json(self
) -> dict:
1070 out
: Dict
[str, Any
] = OrderedDict()
1071 out
['daemon_type'] = self
.daemon_type
1072 out
['daemon_id'] = self
.daemon_id
1073 out
['service_name'] = self
._service
_name
1074 out
['daemon_name'] = self
.name()
1075 out
['hostname'] = self
.hostname
1076 out
['container_id'] = self
.container_id
1077 out
['container_image_id'] = self
.container_image_id
1078 out
['container_image_name'] = self
.container_image_name
1079 out
['container_image_digests'] = self
.container_image_digests
1080 out
['memory_usage'] = self
.memory_usage
1081 out
['memory_request'] = self
.memory_request
1082 out
['memory_limit'] = self
.memory_limit
1083 out
['cpu_percentage'] = self
.cpu_percentage
1084 out
['version'] = self
.version
1085 out
['status'] = self
.status
.value
if self
.status
is not None else None
1086 out
['status_desc'] = self
.status_desc
1087 if self
.daemon_type
== 'osd':
1088 out
['osdspec_affinity'] = self
.osdspec_affinity
1089 out
['is_active'] = self
.is_active
1090 out
['ports'] = self
.ports
1092 out
['rank'] = self
.rank
1093 out
['rank_generation'] = self
.rank_generation
1095 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1097 if getattr(self
, k
):
1098 out
[k
] = datetime_to_str(getattr(self
, k
))
1101 out
['events'] = [e
.to_json() for e
in self
.events
]
1103 empty
= [k
for k
, v
in out
.items() if v
is None]
1108 def to_dict(self
) -> dict:
1109 out
: Dict
[str, Any
] = OrderedDict()
1110 out
['daemon_type'] = self
.daemon_type
1111 out
['daemon_id'] = self
.daemon_id
1112 out
['daemon_name'] = self
.name()
1113 out
['hostname'] = self
.hostname
1114 out
['container_id'] = self
.container_id
1115 out
['container_image_id'] = self
.container_image_id
1116 out
['container_image_name'] = self
.container_image_name
1117 out
['container_image_digests'] = self
.container_image_digests
1118 out
['memory_usage'] = self
.memory_usage
1119 out
['memory_request'] = self
.memory_request
1120 out
['memory_limit'] = self
.memory_limit
1121 out
['cpu_percentage'] = self
.cpu_percentage
1122 out
['version'] = self
.version
1123 out
['status'] = self
.status
.value
if self
.status
is not None else None
1124 out
['status_desc'] = self
.status_desc
1125 if self
.daemon_type
== 'osd':
1126 out
['osdspec_affinity'] = self
.osdspec_affinity
1127 out
['is_active'] = self
.is_active
1128 out
['ports'] = self
.ports
1131 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1133 if getattr(self
, k
):
1134 out
[k
] = datetime_to_str(getattr(self
, k
))
1137 out
['events'] = [e
.to_dict() for e
in self
.events
]
1139 empty
= [k
for k
, v
in out
.items() if v
is None]
1146 def from_json(cls
, data
: dict) -> 'DaemonDescription':
1148 event_strs
= c
.pop('events', [])
1149 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1152 c
[k
] = str_to_datetime(c
[k
])
1153 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1154 status_int
= c
.pop('status', None)
1155 if 'daemon_name' in c
:
1156 del c
['daemon_name']
1157 if 'service_name' in c
and c
['service_name'].startswith('osd.'):
1158 # if the service_name is a osd.NNN (numeric osd id) then
1159 # ignore it -- it is not a valid service_name and
1160 # (presumably) came from an older version of cephadm.
1162 int(c
['service_name'][4:])
1163 del c
['service_name']
1166 status
= DaemonDescriptionStatus(status_int
) if status_int
is not None else None
1167 return cls(events
=events
, status
=status
, **c
)
1169 def __copy__(self
) -> 'DaemonDescription':
1170 # feel free to change this:
1171 return DaemonDescription
.from_json(self
.to_json())
1174 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'DaemonDescription') -> Any
:
1175 return dumper
.represent_dict(cast(Mapping
, data
.to_json().items()))
1178 yaml
.add_representer(DaemonDescription
, DaemonDescription
.yaml_representer
)
1181 class ServiceDescription(object):
1183 For responding to queries about the status of a particular service,
1184 stateful or stateless.
1186 This is not about health or performance monitoring of services: it's
1187 about letting the orchestrator tell Ceph whether and where a
1188 service is scheduled in the cluster. When an orchestrator tells
1189 Ceph "it's running on host123", that's not a promise that the process
1190 is literally up this second, it's a description of where the orchestrator
1191 has decided the service should run.
1196 container_image_id
: Optional
[str] = None,
1197 container_image_name
: Optional
[str] = None,
1198 service_url
: Optional
[str] = None,
1199 last_refresh
: Optional
[datetime
.datetime
] = None,
1200 created
: Optional
[datetime
.datetime
] = None,
1201 deleted
: Optional
[datetime
.datetime
] = None,
1204 events
: Optional
[List
['OrchestratorEvent']] = None,
1205 virtual_ip
: Optional
[str] = None,
1206 ports
: List
[int] = []) -> None:
1207 # Not everyone runs in containers, but enough people do to
1208 # justify having the container_image_id (image hash) and container_image
1210 self
.container_image_id
= container_image_id
# image hash
1211 self
.container_image_name
= container_image_name
# image friendly name
1213 # If the service exposes REST-like API, this attribute should hold
1215 self
.service_url
= service_url
1220 # Number of daemons up
1221 self
.running
= running
1223 # datetime when this info was last refreshed
1224 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
1225 self
.created
: Optional
[datetime
.datetime
] = created
1226 self
.deleted
: Optional
[datetime
.datetime
] = deleted
1228 self
.spec
: ServiceSpec
= spec
1230 self
.events
: List
[OrchestratorEvent
] = events
or []
1232 self
.virtual_ip
= virtual_ip
1235 def service_type(self
) -> str:
1236 return self
.spec
.service_type
1238 def __repr__(self
) -> str:
1239 return f
"<ServiceDescription of {self.spec.one_line_str()}>"
1241 def get_port_summary(self
) -> str:
1244 return f
"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1246 def to_json(self
) -> OrderedDict
:
1247 out
= self
.spec
.to_json()
1249 'container_image_id': self
.container_image_id
,
1250 'container_image_name': self
.container_image_name
,
1251 'service_url': self
.service_url
,
1253 'running': self
.running
,
1254 'last_refresh': self
.last_refresh
,
1255 'created': self
.created
,
1256 'virtual_ip': self
.virtual_ip
,
1257 'ports': self
.ports
if self
.ports
else None,
1259 for k
in ['last_refresh', 'created']:
1260 if getattr(self
, k
):
1261 status
[k
] = datetime_to_str(getattr(self
, k
))
1262 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1263 out
['status'] = status
1265 out
['events'] = [e
.to_json() for e
in self
.events
]
1268 def to_dict(self
) -> OrderedDict
:
1269 out
= self
.spec
.to_json()
1271 'container_image_id': self
.container_image_id
,
1272 'container_image_name': self
.container_image_name
,
1273 'service_url': self
.service_url
,
1275 'running': self
.running
,
1276 'last_refresh': self
.last_refresh
,
1277 'created': self
.created
,
1278 'virtual_ip': self
.virtual_ip
,
1279 'ports': self
.ports
if self
.ports
else None,
1281 for k
in ['last_refresh', 'created']:
1282 if getattr(self
, k
):
1283 status
[k
] = datetime_to_str(getattr(self
, k
))
1284 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1285 out
['status'] = status
1287 out
['events'] = [e
.to_dict() for e
in self
.events
]
1292 def from_json(cls
, data
: dict) -> 'ServiceDescription':
1294 status
= c
.pop('status', {})
1295 event_strs
= c
.pop('events', [])
1296 spec
= ServiceSpec
.from_json(c
)
1298 c_status
= status
.copy()
1299 for k
in ['last_refresh', 'created']:
1301 c_status
[k
] = str_to_datetime(c_status
[k
])
1302 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1303 return cls(spec
=spec
, events
=events
, **c_status
)
1306 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'ServiceDescription') -> Any
:
1307 return dumper
.represent_dict(cast(Mapping
, data
.to_json().items()))
1310 yaml
.add_representer(ServiceDescription
, ServiceDescription
.yaml_representer
)
1313 class InventoryFilter(object):
1315 When fetching inventory, use this filter to avoid unnecessarily
1316 scanning the whole estate.
1320 filter by host when presentig UI workflow for configuring
1321 a particular server.
1322 filter by label when not all of estate is Ceph servers,
1323 and we want to only learn about the Ceph servers.
1324 filter by label when we are interested particularly
1325 in e.g. OSD servers.
1328 def __init__(self
, labels
: Optional
[List
[str]] = None, hosts
: Optional
[List
[str]] = None) -> None:
1330 #: Optional: get info about hosts matching labels
1331 self
.labels
= labels
1333 #: Optional: get info about certain named hosts only
1337 class InventoryHost(object):
1339 When fetching inventory, all Devices are groups inside of an
1343 def __init__(self
, name
: str, devices
: Optional
[inventory
.Devices
] = None, labels
: Optional
[List
[str]] = None, addr
: Optional
[str] = None) -> None:
1345 devices
= inventory
.Devices([])
1348 assert isinstance(devices
, inventory
.Devices
)
1350 self
.name
= name
# unique within cluster. For example a hostname.
1351 self
.addr
= addr
or name
1352 self
.devices
= devices
1353 self
.labels
= labels
1355 def to_json(self
) -> dict:
1359 'devices': self
.devices
.to_json(),
1360 'labels': self
.labels
,
1364 def from_json(cls
, data
: dict) -> 'InventoryHost':
1366 _data
= copy
.deepcopy(data
)
1367 name
= _data
.pop('name')
1368 addr
= _data
.pop('addr', None) or name
1369 devices
= inventory
.Devices
.from_json(_data
.pop('devices'))
1370 labels
= _data
.pop('labels', list())
1372 error_msg
= 'Unknown key(s) in Inventory: {}'.format(','.join(_data
.keys()))
1373 raise OrchestratorValidationError(error_msg
)
1374 return cls(name
, devices
, labels
, addr
)
1375 except KeyError as e
:
1376 error_msg
= '{} is required for {}'.format(e
, cls
.__name
__)
1377 raise OrchestratorValidationError(error_msg
)
1378 except TypeError as e
:
1379 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e
))
1382 def from_nested_items(cls
, hosts
: List
[dict]) -> List
['InventoryHost']:
1383 devs
= inventory
.Devices
.from_json
1384 return [cls(item
[0], devs(item
[1].data
)) for item
in hosts
]
1386 def __repr__(self
) -> str:
1387 return "<InventoryHost>({name})".format(name
=self
.name
)
1390 def get_host_names(hosts
: List
['InventoryHost']) -> List
[str]:
1391 return [host
.name
for host
in hosts
]
1393 def __eq__(self
, other
: Any
) -> bool:
1394 return self
.name
== other
.name
and self
.devices
== other
.devices
1397 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1399 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1402 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1404 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1405 See ``ceph osd metadata | jq '.[].device_ids'``
1410 class OrchestratorEvent
:
1412 Similar to K8s Events.
1414 Some form of "important" log message attached to something.
1418 regex_v1
= re
.compile(r
'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re
.MULTILINE
)
1420 def __init__(self
, created
: Union
[str, datetime
.datetime
], kind
: str,
1421 subject
: str, level
: str, message
: str) -> None:
1422 if isinstance(created
, str):
1423 created
= str_to_datetime(created
)
1424 self
.created
: datetime
.datetime
= created
1426 assert kind
in "service daemon".split()
1427 self
.kind
: str = kind
1429 # service name, or daemon danem or something
1430 self
.subject
: str = subject
1432 # Events are not meant for debugging. debugs should end in the log.
1433 assert level
in "INFO ERROR".split()
1436 self
.message
: str = message
1438 __slots__
= ('created', 'kind', 'subject', 'level', 'message')
1440 def kind_subject(self
) -> str:
1441 return f
'{self.kind}:{self.subject}'
1443 def to_json(self
) -> str:
1444 # Make a long list of events readable.
1445 created
= datetime_to_str(self
.created
)
1446 return f
'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1448 def to_dict(self
) -> dict:
1449 # Convert events data to dict.
1451 'created': datetime_to_str(self
.created
),
1452 'subject': self
.kind_subject(),
1453 'level': self
.level
,
1454 'message': self
.message
1459 def from_json(cls
, data
: str) -> "OrchestratorEvent":
1461 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1462 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1467 match
= cls
.regex_v1
.match(data
)
1469 return cls(*match
.groups())
1470 raise ValueError(f
'Unable to match: "{data}"')
1472 def __eq__(self
, other
: Any
) -> bool:
1473 if not isinstance(other
, OrchestratorEvent
):
1476 return self
.created
== other
.created
and self
.kind
== other
.kind \
1477 and self
.subject
== other
.subject
and self
.message
== other
.message
1479 def __repr__(self
) -> str:
1480 return f
'OrchestratorEvent.from_json({self.to_json()!r})'
1483 def _mk_orch_methods(cls
: Any
) -> Any
:
1484 # Needs to be defined outside of for.
1485 # Otherwise meth is always bound to last key
1486 def shim(method_name
: str) -> Callable
:
1487 def inner(self
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
1488 completion
= self
._oremote
(method_name
, args
, kwargs
)
1492 for name
, method
in Orchestrator
.__dict
__.items():
1493 if not name
.startswith('_') and name
not in ['is_orchestrator_module']:
1494 remote_call
= update_wrapper(shim(name
), method
)
1495 setattr(cls
, name
, remote_call
)
1500 class OrchestratorClientMixin(Orchestrator
):
1502 A module that inherents from `OrchestratorClientMixin` can directly call
1503 all :class:`Orchestrator` methods without manually calling remote.
1505 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1506 calls :func:`OrchestratorClientMixin._oremote`
1508 >>> class MyModule(OrchestratorClientMixin):
1510 ... completion = self.add_host('somehost') # calls `_oremote()`
1511 ... self.log.debug(completion.result)
1513 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1514 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1515 "real" implementation of the orchestrator.
1518 >>> import mgr_module
1520 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1521 ... def __init__(self, ...):
1522 ... self.orch_client = OrchestratorClientMixin()
1523 ... self.orch_client.set_mgr(self.mgr))
1526 def set_mgr(self
, mgr
: MgrModule
) -> None:
1528 Useable in the Dashbord that uses a global ``mgr``
1531 self
.__mgr
= mgr
# Make sure we're not overwriting any other `mgr` properties
1533 def __get_mgr(self
) -> Any
:
1536 except AttributeError:
1539 def _oremote(self
, meth
: Any
, args
: Any
, kwargs
: Any
) -> Any
:
1541 Helper for invoking `remote` on whichever orchestrator is enabled
1543 :raises RuntimeError: If the remote method failed.
1544 :raises OrchestratorError: orchestrator failed to perform
1545 :raises ImportError: no `orchestrator` module or backend not found.
1547 mgr
= self
.__get
_mgr
()
1550 o
= mgr
._select
_orchestrator
()
1551 except AttributeError:
1552 o
= mgr
.remote('orchestrator', '_select_orchestrator')
1555 raise NoOrchestrator()
1557 mgr
.log
.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr
.module_name
, o
, meth
, args
, kwargs
))
1559 return mgr
.remote(o
, meth
, *args
, **kwargs
)
1560 except Exception as e
:
1561 if meth
== 'get_feature_set':
1562 raise # self.get_feature_set() calls self._oremote()
1563 f_set
= self
.get_feature_set()
1564 if meth
not in f_set
or not f_set
[meth
]['available']:
1565 raise NotImplementedError(f
'{o} does not implement {meth}') from e