]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
import ceph 16.2.7
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
1
2 """
3 ceph-mgr orchestrator interface
4
5 Please see the ceph-mgr module developer's guide for more information.
6 """
7
8 import copy
9 import datetime
10 import enum
11 import errno
12 import logging
13 import pickle
14 import re
15
16 from collections import namedtuple, OrderedDict
17 from contextlib import contextmanager
18 from functools import wraps, reduce
19
20 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
21 Sequence, Dict, cast, Mapping
22
23 try:
24 from typing import Protocol # Protocol was added in Python 3.8
25 except ImportError:
26 class Protocol: # type: ignore
27 pass
28
29
30 import yaml
31
32 from ceph.deployment import inventory
33 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
34 IscsiServiceSpec, IngressSpec
35 from ceph.deployment.drive_group import DriveGroupSpec
36 from ceph.deployment.hostspec import HostSpec, SpecValidationError
37 from ceph.utils import datetime_to_str, str_to_datetime
38
39 from mgr_module import MgrModule, CLICommand, HandleCommandResult
40
41
42 logger = logging.getLogger(__name__)
43
44 T = TypeVar('T')
45 FuncT = TypeVar('FuncT', bound=Callable[..., Any])
46
47
48 class OrchestratorError(Exception):
49 """
50 General orchestrator specific error.
51
52 Used for deployment, configuration or user errors.
53
54 It's not intended for programming errors or orchestrator internal errors.
55 """
56
57 def __init__(self,
58 msg: str,
59 errno: int = -errno.EINVAL,
60 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
61 super(Exception, self).__init__(msg)
62 self.errno = errno
63 # See OrchestratorEvent.subject
64 self.event_subject = event_kind_subject
65
66
67 class NoOrchestrator(OrchestratorError):
68 """
69 No orchestrator in configured.
70 """
71
72 def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
74
75
76 class OrchestratorValidationError(OrchestratorError):
77 """
78 Raised when an orchestrator doesn't support a specific feature.
79 """
80
81
82 @contextmanager
83 def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
84 try:
85 yield
86 except OrchestratorError as e:
87 if overwrite or hasattr(e, 'event_subject'):
88 e.event_subject = (kind, subject)
89 raise
90
91
92 def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
93 @wraps(func)
94 def wrapper(*args: Any, **kwargs: Any) -> Any:
95 try:
96 return func(*args, **kwargs)
97 except (OrchestratorError, SpecValidationError) as e:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e.errno, stderr=str(e))
100 except ImportError as e:
101 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
102 except NotImplementedError:
103 msg = 'This Orchestrator does not support `{}`'.format(prefix)
104 return HandleCommandResult(-errno.ENOENT, stderr=msg)
105
106 # misuse lambda to copy `wrapper`
107 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731
108 wrapper_copy._prefix = prefix # type: ignore
109 wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore
110 wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
111 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
112
113 return cast(FuncT, wrapper_copy)
114
115
116 def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
117 """
118 Decorator to make Orchestrator methods return
119 an OrchResult.
120 """
121
122 @wraps(f)
123 def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
124 try:
125 return OrchResult(f(*args, **kwargs))
126 except Exception as e:
127 logger.exception(e)
128 return OrchResult(None, exception=e)
129
130 return cast(Callable[..., OrchResult[T]], wrapper)
131
132
133 class InnerCliCommandCallable(Protocol):
134 def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
135 ...
136
137
138 def _cli_command(perm: str) -> InnerCliCommandCallable:
139 def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
140 return lambda func: handle_exception(prefix, perm, func)
141 return inner_cli_command
142
143
144 _cli_read_command = _cli_command('r')
145 _cli_write_command = _cli_command('rw')
146
147
148 class CLICommandMeta(type):
149 """
150 This is a workaround for the use of a global variable CLICommand.COMMANDS which
151 prevents modules from importing any other module.
152
153 We make use of CLICommand, except for the use of the global variable.
154 """
155 def __init__(cls, name: str, bases: Any, dct: Any) -> None:
156 super(CLICommandMeta, cls).__init__(name, bases, dct)
157 dispatch: Dict[str, CLICommand] = {}
158 for v in dct.values():
159 try:
160 dispatch[v._prefix] = v._cli_command
161 except AttributeError:
162 pass
163
164 def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
165 if cmd['prefix'] not in dispatch:
166 return self.handle_command(inbuf, cmd)
167
168 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
169
170 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
171 cls.handle_command = handle_command
172
173
174 class OrchResult(Generic[T]):
175 """
176 Stores a result and an exception. Mainly to circumvent the
177 MgrModule.remote() method that hides all exceptions and for
178 handling different sub-interpreters.
179 """
180
181 def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
182 self.result = result
183 self.serialized_exception: Optional[bytes] = None
184 self.exception_str: str = ''
185 self.set_exception(exception)
186
187 __slots__ = 'result', 'serialized_exception', 'exception_str'
188
189 def set_exception(self, e: Optional[Exception]) -> None:
190 if e is None:
191 self.serialized_exception = None
192 self.exception_str = ''
193 return
194
195 self.exception_str = f'{type(e)}: {str(e)}'
196 try:
197 self.serialized_exception = pickle.dumps(e)
198 except pickle.PicklingError:
199 logger.error(f"failed to pickle {e}")
200 if isinstance(e, Exception):
201 e = Exception(*e.args)
202 else:
203 e = Exception(str(e))
204 # degenerate to a plain Exception
205 self.serialized_exception = pickle.dumps(e)
206
207 def result_str(self) -> str:
208 """Force a string."""
209 if self.result is None:
210 return ''
211 if isinstance(self.result, list):
212 return '\n'.join(str(x) for x in self.result)
213 return str(self.result)
214
215
216 def raise_if_exception(c: OrchResult[T]) -> T:
217 """
218 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
219 """
220 if c.serialized_exception is not None:
221 try:
222 e = pickle.loads(c.serialized_exception)
223 except (KeyError, AttributeError):
224 raise Exception(c.exception_str)
225 raise e
226 assert c.result is not None, 'OrchResult should either have an exception or a result'
227 return c.result
228
229
230 def _hide_in_features(f: FuncT) -> FuncT:
231 f._hide_in_features = True # type: ignore
232 return f
233
234
235 class Orchestrator(object):
236 """
237 Calls in this class may do long running remote operations, with time
238 periods ranging from network latencies to package install latencies and large
239 internet downloads. For that reason, all are asynchronous, and return
240 ``Completion`` objects.
241
242 Methods should only return the completion and not directly execute
243 anything, like network calls. Otherwise the purpose of
244 those completions is defeated.
245
246 Implementations are not required to start work on an operation until
247 the caller waits on the relevant Completion objects. Callers making
248 multiple updates should not wait on Completions until they're done
249 sending operations: this enables implementations to batch up a series
250 of updates when wait() is called on a set of Completion objects.
251
252 Implementations are encouraged to keep reasonably fresh caches of
253 the status of the system: it is better to serve a stale-but-recent
254 result read of e.g. device inventory than it is to keep the caller waiting
255 while you scan hosts every time.
256 """
257
258 @_hide_in_features
259 def is_orchestrator_module(self) -> bool:
260 """
261 Enable other modules to interrogate this module to discover
262 whether it's usable as an orchestrator module.
263
264 Subclasses do not need to override this.
265 """
266 return True
267
268 @_hide_in_features
269 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
270 """
271 Report whether we can talk to the orchestrator. This is the
272 place to give the user a meaningful message if the orchestrator
273 isn't running or can't be contacted.
274
275 This method may be called frequently (e.g. every page load
276 to conditionally display a warning banner), so make sure it's
277 not too expensive. It's okay to give a slightly stale status
278 (e.g. based on a periodic background ping of the orchestrator)
279 if that's necessary to make this method fast.
280
281 .. note::
282 `True` doesn't mean that the desired functionality
283 is actually available in the orchestrator. I.e. this
284 won't work as expected::
285
286 >>> #doctest: +SKIP
287 ... if OrchestratorClientMixin().available()[0]: # wrong.
288 ... OrchestratorClientMixin().get_hosts()
289
290 :return: boolean representing whether the module is available/usable
291 :return: string describing any error
292 :return: dict containing any module specific information
293 """
294 raise NotImplementedError()
295
296 @_hide_in_features
297 def get_feature_set(self) -> Dict[str, dict]:
298 """Describes which methods this orchestrator implements
299
300 .. note::
301 `True` doesn't mean that the desired functionality
302 is actually possible in the orchestrator. I.e. this
303 won't work as expected::
304
305 >>> #doctest: +SKIP
306 ... api = OrchestratorClientMixin()
307 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
308 ... api.get_hosts()
309
310 It's better to ask for forgiveness instead::
311
312 >>> #doctest: +SKIP
313 ... try:
314 ... OrchestratorClientMixin().get_hosts()
315 ... except (OrchestratorError, NotImplementedError):
316 ... ...
317
318 :returns: Dict of API method names to ``{'available': True or False}``
319 """
320 module = self.__class__
321 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
322 for a in Orchestrator.__dict__
323 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
324 }
325 return features
326
327 def cancel_completions(self) -> None:
328 """
329 Cancels ongoing completions. Unstuck the mgr.
330 """
331 raise NotImplementedError()
332
333 def pause(self) -> None:
334 raise NotImplementedError()
335
336 def resume(self) -> None:
337 raise NotImplementedError()
338
339 def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
340 """
341 Add a host to the orchestrator inventory.
342
343 :param host: hostname
344 """
345 raise NotImplementedError()
346
347 def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
348 """
349 Remove a host from the orchestrator inventory.
350
351 :param host: hostname
352 """
353 raise NotImplementedError()
354
355 def drain_host(self, hostname: str) -> OrchResult[str]:
356 """
357 drain all daemons from a host
358
359 :param hostname: hostname
360 """
361 raise NotImplementedError()
362
363 def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
364 """
365 Update a host's address
366
367 :param host: hostname
368 :param addr: address (dns name or IP)
369 """
370 raise NotImplementedError()
371
372 def get_hosts(self) -> OrchResult[List[HostSpec]]:
373 """
374 Report the hosts in the cluster.
375
376 :return: list of HostSpec
377 """
378 raise NotImplementedError()
379
380 def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
381 """
382 Return hosts metadata(gather_facts).
383 """
384 raise NotImplementedError()
385
386 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
387 """
388 Add a host label
389 """
390 raise NotImplementedError()
391
392 def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
393 """
394 Remove a host label
395 """
396 raise NotImplementedError()
397
398 def host_ok_to_stop(self, hostname: str) -> OrchResult:
399 """
400 Check if the specified host can be safely stopped without reducing availability
401
402 :param host: hostname
403 """
404 raise NotImplementedError()
405
406 def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
407 """
408 Place a host in maintenance, stopping daemons and disabling it's systemd target
409 """
410 raise NotImplementedError()
411
412 def exit_host_maintenance(self, hostname: str) -> OrchResult:
413 """
414 Return a host from maintenance, restarting the clusters systemd target
415 """
416 raise NotImplementedError()
417
418 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
419 """
420 Returns something that was created by `ceph-volume inventory`.
421
422 :return: list of InventoryHost
423 """
424 raise NotImplementedError()
425
426 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
427 """
428 Describe a service (of any kind) that is already configured in
429 the orchestrator. For example, when viewing an OSD in the dashboard
430 we might like to also display information about the orchestrator's
431 view of the service (like the kubernetes pod ID).
432
433 When viewing a CephFS filesystem in the dashboard, we would use this
434 to display the pods being currently run for MDS daemons.
435
436 :return: list of ServiceDescription objects.
437 """
438 raise NotImplementedError()
439
440 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
441 """
442 Describe a daemon (of any kind) that is already configured in
443 the orchestrator.
444
445 :return: list of DaemonDescription objects.
446 """
447 raise NotImplementedError()
448
449 @handle_orch_error
450 def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
451 """
452 Applies any spec
453 """
454 fns: Dict[str, Callable[..., OrchResult[str]]] = {
455 'alertmanager': self.apply_alertmanager,
456 'crash': self.apply_crash,
457 'grafana': self.apply_grafana,
458 'iscsi': self.apply_iscsi,
459 'mds': self.apply_mds,
460 'mgr': self.apply_mgr,
461 'mon': self.apply_mon,
462 'nfs': self.apply_nfs,
463 'node-exporter': self.apply_node_exporter,
464 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
465 'prometheus': self.apply_prometheus,
466 'rbd-mirror': self.apply_rbd_mirror,
467 'rgw': self.apply_rgw,
468 'ingress': self.apply_ingress,
469 'host': self.add_host,
470 'cephadm-exporter': self.apply_cephadm_exporter,
471 }
472
473 def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
474 l_res = raise_if_exception(l)
475 r_res = raise_if_exception(r)
476 l_res.append(r_res)
477 return OrchResult(l_res)
478 return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
479
480 def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
481 """
482 Plan (Dry-run, Preview) a List of Specs.
483 """
484 raise NotImplementedError()
485
486 def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
487 """
488 Remove specific daemon(s).
489
490 :return: None
491 """
492 raise NotImplementedError()
493
494 def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
495 """
496 Remove a service (a collection of daemons).
497
498 :return: None
499 """
500 raise NotImplementedError()
501
502 def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
503 """
504 Perform an action (start/stop/reload) on a service (i.e., all daemons
505 providing the logical service).
506
507 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
508 :param service_name: service_type + '.' + service_id
509 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
510 :rtype: OrchResult
511 """
512 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
513 raise NotImplementedError()
514
515 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
516 """
517 Perform an action (start/stop/reload) on a daemon.
518
519 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
520 :param daemon_name: name of daemon
521 :param image: Container image when redeploying that daemon
522 :rtype: OrchResult
523 """
524 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
525 raise NotImplementedError()
526
527 def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
528 """
529 Create one or more OSDs within a single Drive Group.
530
531 The principal argument here is the drive_group member
532 of OsdSpec: other fields are advisory/extensible for any
533 finer-grained OSD feature enablement (choice of backing store,
534 compression/encryption, etc).
535 """
536 raise NotImplementedError()
537
538 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
539 """ Update OSD cluster """
540 raise NotImplementedError()
541
542 def set_unmanaged_flag(self,
543 unmanaged_flag: bool,
544 service_type: str = 'osd',
545 service_name: Optional[str] = None
546 ) -> HandleCommandResult:
547 raise NotImplementedError()
548
549 def preview_osdspecs(self,
550 osdspec_name: Optional[str] = 'osd',
551 osdspecs: Optional[List[DriveGroupSpec]] = None
552 ) -> OrchResult[str]:
553 """ Get a preview for OSD deployments """
554 raise NotImplementedError()
555
556 def remove_osds(self, osd_ids: List[str],
557 replace: bool = False,
558 force: bool = False,
559 zap: bool = False) -> OrchResult[str]:
560 """
561 :param osd_ids: list of OSD IDs
562 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
563 :param force: Forces the OSD removal process without waiting for the data to be drained first.
564 :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
565 Note that this can only remove OSDs that were successfully
566 created (i.e. got an OSD ID).
567 """
568 raise NotImplementedError()
569
570 def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
571 """
572 TODO
573 """
574 raise NotImplementedError()
575
576 def remove_osds_status(self) -> OrchResult:
577 """
578 Returns a status of the ongoing OSD removal operations.
579 """
580 raise NotImplementedError()
581
582 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
583 """
584 Instructs the orchestrator to enable or disable either the ident or the fault LED.
585
586 :param ident_fault: either ``"ident"`` or ``"fault"``
587 :param on: ``True`` = on.
588 :param locations: See :class:`orchestrator.DeviceLightLoc`
589 """
590 raise NotImplementedError()
591
592 def zap_device(self, host: str, path: str) -> OrchResult[str]:
593 """Zap/Erase a device (DESTROYS DATA)"""
594 raise NotImplementedError()
595
596 def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
597 """Create daemons daemon(s) for unmanaged services"""
598 raise NotImplementedError()
599
600 def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
601 """Update mon cluster"""
602 raise NotImplementedError()
603
604 def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
605 """Update mgr cluster"""
606 raise NotImplementedError()
607
608 def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]:
609 """Update MDS cluster"""
610 raise NotImplementedError()
611
612 def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
613 """Update RGW cluster"""
614 raise NotImplementedError()
615
616 def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
617 """Update ingress daemons"""
618 raise NotImplementedError()
619
620 def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
621 """Update rbd-mirror cluster"""
622 raise NotImplementedError()
623
624 def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
625 """Update NFS cluster"""
626 raise NotImplementedError()
627
628 def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
629 """Update iscsi cluster"""
630 raise NotImplementedError()
631
632 def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
633 """Update prometheus cluster"""
634 raise NotImplementedError()
635
636 def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
637 """Update existing a Node-Exporter daemon(s)"""
638 raise NotImplementedError()
639
640 def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
641 """Update existing a crash daemon(s)"""
642 raise NotImplementedError()
643
644 def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
645 """Update existing a grafana service"""
646 raise NotImplementedError()
647
648 def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
649 """Update an existing AlertManager daemon(s)"""
650 raise NotImplementedError()
651
652 def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
653 """Update an existing cephadm exporter daemon"""
654 raise NotImplementedError()
655
656 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
657 raise NotImplementedError()
658
659 def upgrade_ls(self, image: Optional[str], tags: bool) -> OrchResult[Dict[Any, Any]]:
660 raise NotImplementedError()
661
662 def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
663 raise NotImplementedError()
664
665 def upgrade_pause(self) -> OrchResult[str]:
666 raise NotImplementedError()
667
668 def upgrade_resume(self) -> OrchResult[str]:
669 raise NotImplementedError()
670
671 def upgrade_stop(self) -> OrchResult[str]:
672 raise NotImplementedError()
673
674 def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
675 """
676 If an upgrade is currently underway, report on where
677 we are in the process, or if some error has occurred.
678
679 :return: UpgradeStatusSpec instance
680 """
681 raise NotImplementedError()
682
683 @_hide_in_features
684 def upgrade_available(self) -> OrchResult:
685 """
686 Report on what versions are available to upgrade to
687
688 :return: List of strings
689 """
690 raise NotImplementedError()
691
692
693 GenericSpec = Union[ServiceSpec, HostSpec]
694
695
696 def json_to_generic_spec(spec: dict) -> GenericSpec:
697 if 'service_type' in spec and spec['service_type'] == 'host':
698 return HostSpec.from_json(spec)
699 else:
700 return ServiceSpec.from_json(spec)
701
702
703 def daemon_type_to_service(dtype: str) -> str:
704 mapping = {
705 'mon': 'mon',
706 'mgr': 'mgr',
707 'mds': 'mds',
708 'rgw': 'rgw',
709 'osd': 'osd',
710 'haproxy': 'ingress',
711 'keepalived': 'ingress',
712 'iscsi': 'iscsi',
713 'rbd-mirror': 'rbd-mirror',
714 'cephfs-mirror': 'cephfs-mirror',
715 'nfs': 'nfs',
716 'grafana': 'grafana',
717 'alertmanager': 'alertmanager',
718 'prometheus': 'prometheus',
719 'node-exporter': 'node-exporter',
720 'crash': 'crash',
721 'crashcollector': 'crash', # Specific Rook Daemon
722 'container': 'container',
723 'cephadm-exporter': 'cephadm-exporter',
724 }
725 return mapping[dtype]
726
727
728 def service_to_daemon_types(stype: str) -> List[str]:
729 mapping = {
730 'mon': ['mon'],
731 'mgr': ['mgr'],
732 'mds': ['mds'],
733 'rgw': ['rgw'],
734 'osd': ['osd'],
735 'ingress': ['haproxy', 'keepalived'],
736 'iscsi': ['iscsi'],
737 'rbd-mirror': ['rbd-mirror'],
738 'cephfs-mirror': ['cephfs-mirror'],
739 'nfs': ['nfs'],
740 'grafana': ['grafana'],
741 'alertmanager': ['alertmanager'],
742 'prometheus': ['prometheus'],
743 'node-exporter': ['node-exporter'],
744 'crash': ['crash'],
745 'container': ['container'],
746 'cephadm-exporter': ['cephadm-exporter'],
747 }
748 return mapping[stype]
749
750
751 KNOWN_DAEMON_TYPES: List[str] = list(
752 sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
753
754
755 class UpgradeStatusSpec(object):
756 # Orchestrator's report on what's going on with any ongoing upgrade
757 def __init__(self) -> None:
758 self.in_progress = False # Is an upgrade underway?
759 self.target_image: Optional[str] = None
760 self.services_complete: List[str] = [] # Which daemon types are fully updated?
761 self.progress: Optional[str] = None # How many of the daemons have we upgraded
762 self.message = "" # Freeform description
763
764
765 def handle_type_error(method: FuncT) -> FuncT:
766 @wraps(method)
767 def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
768 try:
769 return method(cls, *args, **kwargs)
770 except TypeError as e:
771 error_msg = '{}: {}'.format(cls.__name__, e)
772 raise OrchestratorValidationError(error_msg)
773 return cast(FuncT, inner)
774
775
776 class DaemonDescriptionStatus(enum.IntEnum):
777 error = -1
778 stopped = 0
779 running = 1
780
781
782 class DaemonDescription(object):
783 """
784 For responding to queries about the status of a particular daemon,
785 stateful or stateless.
786
787 This is not about health or performance monitoring of daemons: it's
788 about letting the orchestrator tell Ceph whether and where a
789 daemon is scheduled in the cluster. When an orchestrator tells
790 Ceph "it's running on host123", that's not a promise that the process
791 is literally up this second, it's a description of where the orchestrator
792 has decided the daemon should run.
793 """
794
795 def __init__(self,
796 daemon_type: Optional[str] = None,
797 daemon_id: Optional[str] = None,
798 hostname: Optional[str] = None,
799 container_id: Optional[str] = None,
800 container_image_id: Optional[str] = None,
801 container_image_name: Optional[str] = None,
802 container_image_digests: Optional[List[str]] = None,
803 version: Optional[str] = None,
804 status: Optional[DaemonDescriptionStatus] = None,
805 status_desc: Optional[str] = None,
806 last_refresh: Optional[datetime.datetime] = None,
807 created: Optional[datetime.datetime] = None,
808 started: Optional[datetime.datetime] = None,
809 last_configured: Optional[datetime.datetime] = None,
810 osdspec_affinity: Optional[str] = None,
811 last_deployed: Optional[datetime.datetime] = None,
812 events: Optional[List['OrchestratorEvent']] = None,
813 is_active: bool = False,
814 memory_usage: Optional[int] = None,
815 memory_request: Optional[int] = None,
816 memory_limit: Optional[int] = None,
817 service_name: Optional[str] = None,
818 ports: Optional[List[int]] = None,
819 ip: Optional[str] = None,
820 deployed_by: Optional[List[str]] = None,
821 rank: Optional[int] = None,
822 rank_generation: Optional[int] = None,
823 ) -> None:
824
825 # Host is at the same granularity as InventoryHost
826 self.hostname: Optional[str] = hostname
827
828 # Not everyone runs in containers, but enough people do to
829 # justify having the container_id (runtime id) and container_image
830 # (image name)
831 self.container_id = container_id # runtime id
832 self.container_image_id = container_image_id # image id locally
833 self.container_image_name = container_image_name # image friendly name
834 self.container_image_digests = container_image_digests # reg hashes
835
836 # The type of service (osd, mon, mgr, etc.)
837 self.daemon_type = daemon_type
838
839 # The orchestrator will have picked some names for daemons,
840 # typically either based on hostnames or on pod names.
841 # This is the <foo> in mds.<foo>, the ID that will appear
842 # in the FSMap/ServiceMap.
843 self.daemon_id: Optional[str] = daemon_id
844 self.daemon_name = self.name()
845
846 # Some daemon types have a numeric rank assigned
847 self.rank: Optional[int] = rank
848 self.rank_generation: Optional[int] = rank_generation
849
850 self._service_name: Optional[str] = service_name
851
852 # Service version that was deployed
853 self.version = version
854
855 # Service status: -1 error, 0 stopped, 1 running
856 self.status = status
857
858 # Service status description when status == error.
859 self.status_desc = status_desc
860
861 # datetime when this info was last refreshed
862 self.last_refresh: Optional[datetime.datetime] = last_refresh
863
864 self.created: Optional[datetime.datetime] = created
865 self.started: Optional[datetime.datetime] = started
866 self.last_configured: Optional[datetime.datetime] = last_configured
867 self.last_deployed: Optional[datetime.datetime] = last_deployed
868
869 # Affinity to a certain OSDSpec
870 self.osdspec_affinity: Optional[str] = osdspec_affinity
871
872 self.events: List[OrchestratorEvent] = events or []
873
874 self.memory_usage: Optional[int] = memory_usage
875 self.memory_request: Optional[int] = memory_request
876 self.memory_limit: Optional[int] = memory_limit
877
878 self.ports: Optional[List[int]] = ports
879 self.ip: Optional[str] = ip
880
881 self.deployed_by = deployed_by
882
883 self.is_active = is_active
884
885 def get_port_summary(self) -> str:
886 if not self.ports:
887 return ''
888 return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
889
890 def name(self) -> str:
891 return '%s.%s' % (self.daemon_type, self.daemon_id)
892
893 def matches_service(self, service_name: Optional[str]) -> bool:
894 assert self.daemon_id is not None
895 assert self.daemon_type is not None
896 if service_name:
897 return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
898 return False
899
900 def service_id(self) -> str:
901 assert self.daemon_id is not None
902 assert self.daemon_type is not None
903
904 if self._service_name:
905 if '.' in self._service_name:
906 return self._service_name.split('.', 1)[1]
907 else:
908 return ''
909
910 if self.daemon_type == 'osd':
911 if self.osdspec_affinity and self.osdspec_affinity != 'None':
912 return self.osdspec_affinity
913 return 'unmanaged'
914
915 def _match() -> str:
916 assert self.daemon_id is not None
917 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
918 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
919
920 if not self.hostname:
921 # TODO: can a DaemonDescription exist without a hostname?
922 raise err
923
924 # use the bare hostname, not the FQDN.
925 host = self.hostname.split('.')[0]
926
927 if host == self.daemon_id:
928 # daemon_id == "host"
929 return self.daemon_id
930
931 elif host in self.daemon_id:
932 # daemon_id == "service_id.host"
933 # daemon_id == "service_id.host.random"
934 pre, post = self.daemon_id.rsplit(host, 1)
935 if not pre.endswith('.'):
936 # '.' sep missing at front of host
937 raise err
938 elif post and not post.startswith('.'):
939 # '.' sep missing at end of host
940 raise err
941 return pre[:-1]
942
943 # daemon_id == "service_id.random"
944 if self.daemon_type == 'rgw':
945 v = self.daemon_id.split('.')
946 if len(v) in [3, 4]:
947 return '.'.join(v[0:2])
948
949 if self.daemon_type == 'iscsi':
950 v = self.daemon_id.split('.')
951 return '.'.join(v[0:-1])
952
953 # daemon_id == "service_id"
954 return self.daemon_id
955
956 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
957 return _match()
958
959 return self.daemon_id
960
961 def service_name(self) -> str:
962 if self._service_name:
963 return self._service_name
964 assert self.daemon_type is not None
965 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
966 return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
967 return daemon_type_to_service(self.daemon_type)
968
969 def __repr__(self) -> str:
970 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
971 id=self.daemon_id)
972
973 def to_json(self) -> dict:
974 out: Dict[str, Any] = OrderedDict()
975 out['daemon_type'] = self.daemon_type
976 out['daemon_id'] = self.daemon_id
977 out['service_name'] = self._service_name
978 out['daemon_name'] = self.name()
979 out['hostname'] = self.hostname
980 out['container_id'] = self.container_id
981 out['container_image_id'] = self.container_image_id
982 out['container_image_name'] = self.container_image_name
983 out['container_image_digests'] = self.container_image_digests
984 out['memory_usage'] = self.memory_usage
985 out['memory_request'] = self.memory_request
986 out['memory_limit'] = self.memory_limit
987 out['version'] = self.version
988 out['status'] = self.status.value if self.status is not None else None
989 out['status_desc'] = self.status_desc
990 if self.daemon_type == 'osd':
991 out['osdspec_affinity'] = self.osdspec_affinity
992 out['is_active'] = self.is_active
993 out['ports'] = self.ports
994 out['ip'] = self.ip
995 out['rank'] = self.rank
996 out['rank_generation'] = self.rank_generation
997
998 for k in ['last_refresh', 'created', 'started', 'last_deployed',
999 'last_configured']:
1000 if getattr(self, k):
1001 out[k] = datetime_to_str(getattr(self, k))
1002
1003 if self.events:
1004 out['events'] = [e.to_json() for e in self.events]
1005
1006 empty = [k for k, v in out.items() if v is None]
1007 for e in empty:
1008 del out[e]
1009 return out
1010
1011 def to_dict(self) -> dict:
1012 out: Dict[str, Any] = OrderedDict()
1013 out['daemon_type'] = self.daemon_type
1014 out['daemon_id'] = self.daemon_id
1015 out['daemon_name'] = self.name()
1016 out['hostname'] = self.hostname
1017 out['container_id'] = self.container_id
1018 out['container_image_id'] = self.container_image_id
1019 out['container_image_name'] = self.container_image_name
1020 out['container_image_digests'] = self.container_image_digests
1021 out['memory_usage'] = self.memory_usage
1022 out['memory_request'] = self.memory_request
1023 out['memory_limit'] = self.memory_limit
1024 out['version'] = self.version
1025 out['status'] = self.status.value if self.status is not None else None
1026 out['status_desc'] = self.status_desc
1027 if self.daemon_type == 'osd':
1028 out['osdspec_affinity'] = self.osdspec_affinity
1029 out['is_active'] = self.is_active
1030 out['ports'] = self.ports
1031 out['ip'] = self.ip
1032
1033 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1034 'last_configured']:
1035 if getattr(self, k):
1036 out[k] = datetime_to_str(getattr(self, k))
1037
1038 if self.events:
1039 out['events'] = [e.to_dict() for e in self.events]
1040
1041 empty = [k for k, v in out.items() if v is None]
1042 for e in empty:
1043 del out[e]
1044 return out
1045
1046 @classmethod
1047 @handle_type_error
1048 def from_json(cls, data: dict) -> 'DaemonDescription':
1049 c = data.copy()
1050 event_strs = c.pop('events', [])
1051 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1052 'last_configured']:
1053 if k in c:
1054 c[k] = str_to_datetime(c[k])
1055 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1056 status_int = c.pop('status', None)
1057 if 'daemon_name' in c:
1058 del c['daemon_name']
1059 status = DaemonDescriptionStatus(status_int) if status_int is not None else None
1060 return cls(events=events, status=status, **c)
1061
1062 def __copy__(self) -> 'DaemonDescription':
1063 # feel free to change this:
1064 return DaemonDescription.from_json(self.to_json())
1065
1066 @staticmethod
1067 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
1068 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
1069
1070
1071 yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1072
1073
1074 class ServiceDescription(object):
1075 """
1076 For responding to queries about the status of a particular service,
1077 stateful or stateless.
1078
1079 This is not about health or performance monitoring of services: it's
1080 about letting the orchestrator tell Ceph whether and where a
1081 service is scheduled in the cluster. When an orchestrator tells
1082 Ceph "it's running on host123", that's not a promise that the process
1083 is literally up this second, it's a description of where the orchestrator
1084 has decided the service should run.
1085 """
1086
1087 def __init__(self,
1088 spec: ServiceSpec,
1089 container_image_id: Optional[str] = None,
1090 container_image_name: Optional[str] = None,
1091 service_url: Optional[str] = None,
1092 last_refresh: Optional[datetime.datetime] = None,
1093 created: Optional[datetime.datetime] = None,
1094 deleted: Optional[datetime.datetime] = None,
1095 size: int = 0,
1096 running: int = 0,
1097 events: Optional[List['OrchestratorEvent']] = None,
1098 virtual_ip: Optional[str] = None,
1099 ports: List[int] = []) -> None:
1100 # Not everyone runs in containers, but enough people do to
1101 # justify having the container_image_id (image hash) and container_image
1102 # (image name)
1103 self.container_image_id = container_image_id # image hash
1104 self.container_image_name = container_image_name # image friendly name
1105
1106 # If the service exposes REST-like API, this attribute should hold
1107 # the URL.
1108 self.service_url = service_url
1109
1110 # Number of daemons
1111 self.size = size
1112
1113 # Number of daemons up
1114 self.running = running
1115
1116 # datetime when this info was last refreshed
1117 self.last_refresh: Optional[datetime.datetime] = last_refresh
1118 self.created: Optional[datetime.datetime] = created
1119 self.deleted: Optional[datetime.datetime] = deleted
1120
1121 self.spec: ServiceSpec = spec
1122
1123 self.events: List[OrchestratorEvent] = events or []
1124
1125 self.virtual_ip = virtual_ip
1126 self.ports = ports
1127
1128 def service_type(self) -> str:
1129 return self.spec.service_type
1130
1131 def __repr__(self) -> str:
1132 return f"<ServiceDescription of {self.spec.one_line_str()}>"
1133
1134 def get_port_summary(self) -> str:
1135 if not self.ports:
1136 return ''
1137 return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1138
1139 def to_json(self) -> OrderedDict:
1140 out = self.spec.to_json()
1141 status = {
1142 'container_image_id': self.container_image_id,
1143 'container_image_name': self.container_image_name,
1144 'service_url': self.service_url,
1145 'size': self.size,
1146 'running': self.running,
1147 'last_refresh': self.last_refresh,
1148 'created': self.created,
1149 'virtual_ip': self.virtual_ip,
1150 'ports': self.ports if self.ports else None,
1151 }
1152 for k in ['last_refresh', 'created']:
1153 if getattr(self, k):
1154 status[k] = datetime_to_str(getattr(self, k))
1155 status = {k: v for (k, v) in status.items() if v is not None}
1156 out['status'] = status
1157 if self.events:
1158 out['events'] = [e.to_json() for e in self.events]
1159 return out
1160
1161 def to_dict(self) -> OrderedDict:
1162 out = self.spec.to_json()
1163 status = {
1164 'container_image_id': self.container_image_id,
1165 'container_image_name': self.container_image_name,
1166 'service_url': self.service_url,
1167 'size': self.size,
1168 'running': self.running,
1169 'last_refresh': self.last_refresh,
1170 'created': self.created,
1171 'virtual_ip': self.virtual_ip,
1172 'ports': self.ports if self.ports else None,
1173 }
1174 for k in ['last_refresh', 'created']:
1175 if getattr(self, k):
1176 status[k] = datetime_to_str(getattr(self, k))
1177 status = {k: v for (k, v) in status.items() if v is not None}
1178 out['status'] = status
1179 if self.events:
1180 out['events'] = [e.to_dict() for e in self.events]
1181 return out
1182
1183 @classmethod
1184 @handle_type_error
1185 def from_json(cls, data: dict) -> 'ServiceDescription':
1186 c = data.copy()
1187 status = c.pop('status', {})
1188 event_strs = c.pop('events', [])
1189 spec = ServiceSpec.from_json(c)
1190
1191 c_status = status.copy()
1192 for k in ['last_refresh', 'created']:
1193 if k in c_status:
1194 c_status[k] = str_to_datetime(c_status[k])
1195 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1196 return cls(spec=spec, events=events, **c_status)
1197
1198 @staticmethod
1199 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
1200 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
1201
1202
1203 yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
1204
1205
1206 class InventoryFilter(object):
1207 """
1208 When fetching inventory, use this filter to avoid unnecessarily
1209 scanning the whole estate.
1210
1211 Typical use: filter by host when presenting UI workflow for configuring
1212 a particular server.
1213 filter by label when not all of estate is Ceph servers,
1214 and we want to only learn about the Ceph servers.
1215 filter by label when we are interested particularly
1216 in e.g. OSD servers.
1217
1218 """
1219
1220 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
1221
1222 #: Optional: get info about hosts matching labels
1223 self.labels = labels
1224
1225 #: Optional: get info about certain named hosts only
1226 self.hosts = hosts
1227
1228
1229 class InventoryHost(object):
1230 """
1231 When fetching inventory, all Devices are groups inside of an
1232 InventoryHost.
1233 """
1234
1235 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
1236 if devices is None:
1237 devices = inventory.Devices([])
1238 if labels is None:
1239 labels = []
1240 assert isinstance(devices, inventory.Devices)
1241
1242 self.name = name # unique within cluster. For example a hostname.
1243 self.addr = addr or name
1244 self.devices = devices
1245 self.labels = labels
1246
1247 def to_json(self) -> dict:
1248 return {
1249 'name': self.name,
1250 'addr': self.addr,
1251 'devices': self.devices.to_json(),
1252 'labels': self.labels,
1253 }
1254
1255 @classmethod
1256 def from_json(cls, data: dict) -> 'InventoryHost':
1257 try:
1258 _data = copy.deepcopy(data)
1259 name = _data.pop('name')
1260 addr = _data.pop('addr', None) or name
1261 devices = inventory.Devices.from_json(_data.pop('devices'))
1262 labels = _data.pop('labels', list())
1263 if _data:
1264 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1265 raise OrchestratorValidationError(error_msg)
1266 return cls(name, devices, labels, addr)
1267 except KeyError as e:
1268 error_msg = '{} is required for {}'.format(e, cls.__name__)
1269 raise OrchestratorValidationError(error_msg)
1270 except TypeError as e:
1271 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1272
1273 @classmethod
1274 def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
1275 devs = inventory.Devices.from_json
1276 return [cls(item[0], devs(item[1].data)) for item in hosts]
1277
1278 def __repr__(self) -> str:
1279 return "<InventoryHost>({name})".format(name=self.name)
1280
1281 @staticmethod
1282 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
1283 return [host.name for host in hosts]
1284
1285 def __eq__(self, other: Any) -> bool:
1286 return self.name == other.name and self.devices == other.devices
1287
1288
1289 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1290 """
1291 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1292 on devices.
1293
1294 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1295
1296 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1297 See ``ceph osd metadata | jq '.[].device_ids'``
1298 """
1299 __slots__ = ()
1300
1301
1302 class OrchestratorEvent:
1303 """
1304 Similar to K8s Events.
1305
1306 Some form of "important" log message attached to something.
1307 """
1308 INFO = 'INFO'
1309 ERROR = 'ERROR'
1310 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1311
1312 def __init__(self, created: Union[str, datetime.datetime], kind: str,
1313 subject: str, level: str, message: str) -> None:
1314 if isinstance(created, str):
1315 created = str_to_datetime(created)
1316 self.created: datetime.datetime = created
1317
1318 assert kind in "service daemon".split()
1319 self.kind: str = kind
1320
1321 # service name, or daemon danem or something
1322 self.subject: str = subject
1323
1324 # Events are not meant for debugging. debugs should end in the log.
1325 assert level in "INFO ERROR".split()
1326 self.level = level
1327
1328 self.message: str = message
1329
1330 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1331
1332 def kind_subject(self) -> str:
1333 return f'{self.kind}:{self.subject}'
1334
1335 def to_json(self) -> str:
1336 # Make a long list of events readable.
1337 created = datetime_to_str(self.created)
1338 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1339
1340 def to_dict(self) -> dict:
1341 # Convert events data to dict.
1342 return {
1343 'created': datetime_to_str(self.created),
1344 'subject': self.kind_subject(),
1345 'level': self.level,
1346 'message': self.message
1347 }
1348
1349 @classmethod
1350 @handle_type_error
1351 def from_json(cls, data: str) -> "OrchestratorEvent":
1352 """
1353 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1354 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1355
1356 :param data:
1357 :return:
1358 """
1359 match = cls.regex_v1.match(data)
1360 if match:
1361 return cls(*match.groups())
1362 raise ValueError(f'Unable to match: "{data}"')
1363
1364 def __eq__(self, other: Any) -> bool:
1365 if not isinstance(other, OrchestratorEvent):
1366 return False
1367
1368 return self.created == other.created and self.kind == other.kind \
1369 and self.subject == other.subject and self.message == other.message
1370
1371
1372 def _mk_orch_methods(cls: Any) -> Any:
1373 # Needs to be defined outside of for.
1374 # Otherwise meth is always bound to last key
1375 def shim(method_name: str) -> Callable:
1376 def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
1377 completion = self._oremote(method_name, args, kwargs)
1378 return completion
1379 return inner
1380
1381 for meth in Orchestrator.__dict__:
1382 if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
1383 setattr(cls, meth, shim(meth))
1384 return cls
1385
1386
1387 @_mk_orch_methods
1388 class OrchestratorClientMixin(Orchestrator):
1389 """
1390 A module that inherents from `OrchestratorClientMixin` can directly call
1391 all :class:`Orchestrator` methods without manually calling remote.
1392
1393 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1394 calls :func:`OrchestratorClientMixin._oremote`
1395
1396 >>> class MyModule(OrchestratorClientMixin):
1397 ... def func(self):
1398 ... completion = self.add_host('somehost') # calls `_oremote()`
1399 ... self.log.debug(completion.result)
1400
1401 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1402 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1403 "real" implementation of the orchestrator.
1404
1405
1406 >>> import mgr_module
1407 >>> #doctest: +SKIP
1408 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1409 ... def __init__(self, ...):
1410 ... self.orch_client = OrchestratorClientMixin()
1411 ... self.orch_client.set_mgr(self.mgr))
1412 """
1413
1414 def set_mgr(self, mgr: MgrModule) -> None:
1415 """
1416 Useable in the Dashbord that uses a global ``mgr``
1417 """
1418
1419 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1420
1421 def __get_mgr(self) -> Any:
1422 try:
1423 return self.__mgr
1424 except AttributeError:
1425 return self
1426
1427 def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
1428 """
1429 Helper for invoking `remote` on whichever orchestrator is enabled
1430
1431 :raises RuntimeError: If the remote method failed.
1432 :raises OrchestratorError: orchestrator failed to perform
1433 :raises ImportError: no `orchestrator` module or backend not found.
1434 """
1435 mgr = self.__get_mgr()
1436
1437 try:
1438 o = mgr._select_orchestrator()
1439 except AttributeError:
1440 o = mgr.remote('orchestrator', '_select_orchestrator')
1441
1442 if o is None:
1443 raise NoOrchestrator()
1444
1445 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1446 try:
1447 return mgr.remote(o, meth, *args, **kwargs)
1448 except Exception as e:
1449 if meth == 'get_feature_set':
1450 raise # self.get_feature_set() calls self._oremote()
1451 f_set = self.get_feature_set()
1452 if meth not in f_set or not f_set[meth]['available']:
1453 raise NotImplementedError(f'{o} does not implement {meth}') from e
1454 raise