]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
import quincy 17.2.0
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
1
2 """
3 ceph-mgr orchestrator interface
4
5 Please see the ceph-mgr module developer's guide for more information.
6 """
7
8 import copy
9 import datetime
10 import enum
11 import errno
12 import logging
13 import pickle
14 import re
15
16 from collections import namedtuple, OrderedDict
17 from contextlib import contextmanager
18 from functools import wraps, reduce, update_wrapper
19
20 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
21 Sequence, Dict, cast, Mapping
22
23 try:
24 from typing import Protocol # Protocol was added in Python 3.8
25 except ImportError:
26 class Protocol: # type: ignore
27 pass
28
29
30 import yaml
31
32 from ceph.deployment import inventory
33 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
34 IscsiServiceSpec, IngressSpec, SNMPGatewaySpec
35 from ceph.deployment.drive_group import DriveGroupSpec
36 from ceph.deployment.hostspec import HostSpec, SpecValidationError
37 from ceph.utils import datetime_to_str, str_to_datetime
38
39 from mgr_module import MgrModule, CLICommand, HandleCommandResult
40
41
42 logger = logging.getLogger(__name__)
43
44 T = TypeVar('T')
45 FuncT = TypeVar('FuncT', bound=Callable[..., Any])
46
47
48 class OrchestratorError(Exception):
49 """
50 General orchestrator specific error.
51
52 Used for deployment, configuration or user errors.
53
54 It's not intended for programming errors or orchestrator internal errors.
55 """
56
57 def __init__(self,
58 msg: str,
59 errno: int = -errno.EINVAL,
60 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
61 super(Exception, self).__init__(msg)
62 self.errno = errno
63 # See OrchestratorEvent.subject
64 self.event_subject = event_kind_subject
65
66
67 class NoOrchestrator(OrchestratorError):
68 """
69 No orchestrator in configured.
70 """
71
72 def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
74
75
76 class OrchestratorValidationError(OrchestratorError):
77 """
78 Raised when an orchestrator doesn't support a specific feature.
79 """
80
81
82 @contextmanager
83 def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
84 try:
85 yield
86 except OrchestratorError as e:
87 if overwrite or hasattr(e, 'event_subject'):
88 e.event_subject = (kind, subject)
89 raise
90
91
92 def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
93 @wraps(func)
94 def wrapper(*args: Any, **kwargs: Any) -> Any:
95 try:
96 return func(*args, **kwargs)
97 except (OrchestratorError, SpecValidationError) as e:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e.errno, stderr=str(e))
100 except ImportError as e:
101 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
102 except NotImplementedError:
103 msg = 'This Orchestrator does not support `{}`'.format(prefix)
104 return HandleCommandResult(-errno.ENOENT, stderr=msg)
105
106 # misuse lambda to copy `wrapper`
107 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731
108 wrapper_copy._prefix = prefix # type: ignore
109 wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore
110 wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
111 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
112
113 return cast(FuncT, wrapper_copy)
114
115
116 def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
117 """
118 Decorator to make Orchestrator methods return
119 an OrchResult.
120 """
121
122 @wraps(f)
123 def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
124 try:
125 return OrchResult(f(*args, **kwargs))
126 except Exception as e:
127 logger.exception(e)
128 import os
129 if 'UNITTEST' in os.environ:
130 raise # This makes debugging of Tracebacks from unittests a bit easier
131 return OrchResult(None, exception=e)
132
133 return cast(Callable[..., OrchResult[T]], wrapper)
134
135
136 class InnerCliCommandCallable(Protocol):
137 def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
138 ...
139
140
141 def _cli_command(perm: str) -> InnerCliCommandCallable:
142 def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
143 return lambda func: handle_exception(prefix, perm, func)
144 return inner_cli_command
145
146
147 _cli_read_command = _cli_command('r')
148 _cli_write_command = _cli_command('rw')
149
150
151 class CLICommandMeta(type):
152 """
153 This is a workaround for the use of a global variable CLICommand.COMMANDS which
154 prevents modules from importing any other module.
155
156 We make use of CLICommand, except for the use of the global variable.
157 """
158 def __init__(cls, name: str, bases: Any, dct: Any) -> None:
159 super(CLICommandMeta, cls).__init__(name, bases, dct)
160 dispatch: Dict[str, CLICommand] = {}
161 for v in dct.values():
162 try:
163 dispatch[v._prefix] = v._cli_command
164 except AttributeError:
165 pass
166
167 def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
168 if cmd['prefix'] not in dispatch:
169 return self.handle_command(inbuf, cmd)
170
171 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
172
173 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
174 cls.handle_command = handle_command
175
176
177 class OrchResult(Generic[T]):
178 """
179 Stores a result and an exception. Mainly to circumvent the
180 MgrModule.remote() method that hides all exceptions and for
181 handling different sub-interpreters.
182 """
183
184 def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
185 self.result = result
186 self.serialized_exception: Optional[bytes] = None
187 self.exception_str: str = ''
188 self.set_exception(exception)
189
190 __slots__ = 'result', 'serialized_exception', 'exception_str'
191
192 def set_exception(self, e: Optional[Exception]) -> None:
193 if e is None:
194 self.serialized_exception = None
195 self.exception_str = ''
196 return
197
198 self.exception_str = f'{type(e)}: {str(e)}'
199 try:
200 self.serialized_exception = pickle.dumps(e)
201 except pickle.PicklingError:
202 logger.error(f"failed to pickle {e}")
203 if isinstance(e, Exception):
204 e = Exception(*e.args)
205 else:
206 e = Exception(str(e))
207 # degenerate to a plain Exception
208 self.serialized_exception = pickle.dumps(e)
209
210 def result_str(self) -> str:
211 """Force a string."""
212 if self.result is None:
213 return ''
214 if isinstance(self.result, list):
215 return '\n'.join(str(x) for x in self.result)
216 return str(self.result)
217
218
219 def raise_if_exception(c: OrchResult[T]) -> T:
220 """
221 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
222 """
223 if c.serialized_exception is not None:
224 try:
225 e = pickle.loads(c.serialized_exception)
226 except (KeyError, AttributeError):
227 raise Exception(c.exception_str)
228 raise e
229 assert c.result is not None, 'OrchResult should either have an exception or a result'
230 return c.result
231
232
233 def _hide_in_features(f: FuncT) -> FuncT:
234 f._hide_in_features = True # type: ignore
235 return f
236
237
238 class Orchestrator(object):
239 """
240 Calls in this class may do long running remote operations, with time
241 periods ranging from network latencies to package install latencies and large
242 internet downloads. For that reason, all are asynchronous, and return
243 ``Completion`` objects.
244
245 Methods should only return the completion and not directly execute
246 anything, like network calls. Otherwise the purpose of
247 those completions is defeated.
248
249 Implementations are not required to start work on an operation until
250 the caller waits on the relevant Completion objects. Callers making
251 multiple updates should not wait on Completions until they're done
252 sending operations: this enables implementations to batch up a series
253 of updates when wait() is called on a set of Completion objects.
254
255 Implementations are encouraged to keep reasonably fresh caches of
256 the status of the system: it is better to serve a stale-but-recent
257 result read of e.g. device inventory than it is to keep the caller waiting
258 while you scan hosts every time.
259 """
260
261 @_hide_in_features
262 def is_orchestrator_module(self) -> bool:
263 """
264 Enable other modules to interrogate this module to discover
265 whether it's usable as an orchestrator module.
266
267 Subclasses do not need to override this.
268 """
269 return True
270
271 @_hide_in_features
272 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
273 """
274 Report whether we can talk to the orchestrator. This is the
275 place to give the user a meaningful message if the orchestrator
276 isn't running or can't be contacted.
277
278 This method may be called frequently (e.g. every page load
279 to conditionally display a warning banner), so make sure it's
280 not too expensive. It's okay to give a slightly stale status
281 (e.g. based on a periodic background ping of the orchestrator)
282 if that's necessary to make this method fast.
283
284 .. note::
285 `True` doesn't mean that the desired functionality
286 is actually available in the orchestrator. I.e. this
287 won't work as expected::
288
289 >>> #doctest: +SKIP
290 ... if OrchestratorClientMixin().available()[0]: # wrong.
291 ... OrchestratorClientMixin().get_hosts()
292
293 :return: boolean representing whether the module is available/usable
294 :return: string describing any error
295 :return: dict containing any module specific information
296 """
297 raise NotImplementedError()
298
299 @_hide_in_features
300 def get_feature_set(self) -> Dict[str, dict]:
301 """Describes which methods this orchestrator implements
302
303 .. note::
304 `True` doesn't mean that the desired functionality
305 is actually possible in the orchestrator. I.e. this
306 won't work as expected::
307
308 >>> #doctest: +SKIP
309 ... api = OrchestratorClientMixin()
310 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
311 ... api.get_hosts()
312
313 It's better to ask for forgiveness instead::
314
315 >>> #doctest: +SKIP
316 ... try:
317 ... OrchestratorClientMixin().get_hosts()
318 ... except (OrchestratorError, NotImplementedError):
319 ... ...
320
321 :returns: Dict of API method names to ``{'available': True or False}``
322 """
323 module = self.__class__
324 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
325 for a in Orchestrator.__dict__
326 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
327 }
328 return features
329
330 def cancel_completions(self) -> None:
331 """
332 Cancels ongoing completions. Unstuck the mgr.
333 """
334 raise NotImplementedError()
335
336 def pause(self) -> None:
337 raise NotImplementedError()
338
339 def resume(self) -> None:
340 raise NotImplementedError()
341
342 def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
343 """
344 Add a host to the orchestrator inventory.
345
346 :param host: hostname
347 """
348 raise NotImplementedError()
349
350 def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
351 """
352 Remove a host from the orchestrator inventory.
353
354 :param host: hostname
355 """
356 raise NotImplementedError()
357
358 def drain_host(self, hostname: str) -> OrchResult[str]:
359 """
360 drain all daemons from a host
361
362 :param hostname: hostname
363 """
364 raise NotImplementedError()
365
366 def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
367 """
368 Update a host's address
369
370 :param host: hostname
371 :param addr: address (dns name or IP)
372 """
373 raise NotImplementedError()
374
375 def get_hosts(self) -> OrchResult[List[HostSpec]]:
376 """
377 Report the hosts in the cluster.
378
379 :return: list of HostSpec
380 """
381 raise NotImplementedError()
382
383 def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
384 """
385 Return hosts metadata(gather_facts).
386 """
387 raise NotImplementedError()
388
389 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
390 """
391 Add a host label
392 """
393 raise NotImplementedError()
394
395 def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
396 """
397 Remove a host label
398 """
399 raise NotImplementedError()
400
401 def host_ok_to_stop(self, hostname: str) -> OrchResult:
402 """
403 Check if the specified host can be safely stopped without reducing availability
404
405 :param host: hostname
406 """
407 raise NotImplementedError()
408
409 def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
410 """
411 Place a host in maintenance, stopping daemons and disabling it's systemd target
412 """
413 raise NotImplementedError()
414
415 def exit_host_maintenance(self, hostname: str) -> OrchResult:
416 """
417 Return a host from maintenance, restarting the clusters systemd target
418 """
419 raise NotImplementedError()
420
421 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
422 """
423 Returns something that was created by `ceph-volume inventory`.
424
425 :return: list of InventoryHost
426 """
427 raise NotImplementedError()
428
429 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
430 """
431 Describe a service (of any kind) that is already configured in
432 the orchestrator. For example, when viewing an OSD in the dashboard
433 we might like to also display information about the orchestrator's
434 view of the service (like the kubernetes pod ID).
435
436 When viewing a CephFS filesystem in the dashboard, we would use this
437 to display the pods being currently run for MDS daemons.
438
439 :return: list of ServiceDescription objects.
440 """
441 raise NotImplementedError()
442
443 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
444 """
445 Describe a daemon (of any kind) that is already configured in
446 the orchestrator.
447
448 :return: list of DaemonDescription objects.
449 """
450 raise NotImplementedError()
451
452 @handle_orch_error
453 def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
454 """
455 Applies any spec
456 """
457 fns: Dict[str, Callable[..., OrchResult[str]]] = {
458 'alertmanager': self.apply_alertmanager,
459 'crash': self.apply_crash,
460 'grafana': self.apply_grafana,
461 'iscsi': self.apply_iscsi,
462 'mds': self.apply_mds,
463 'mgr': self.apply_mgr,
464 'mon': self.apply_mon,
465 'nfs': self.apply_nfs,
466 'node-exporter': self.apply_node_exporter,
467 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
468 'prometheus': self.apply_prometheus,
469 'rbd-mirror': self.apply_rbd_mirror,
470 'rgw': self.apply_rgw,
471 'ingress': self.apply_ingress,
472 'snmp-gateway': self.apply_snmp_gateway,
473 'host': self.add_host,
474 }
475
476 def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
477 l_res = raise_if_exception(l)
478 r_res = raise_if_exception(r)
479 l_res.append(r_res)
480 return OrchResult(l_res)
481 return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
482
483 def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
484 """
485 Plan (Dry-run, Preview) a List of Specs.
486 """
487 raise NotImplementedError()
488
489 def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
490 """
491 Remove specific daemon(s).
492
493 :return: None
494 """
495 raise NotImplementedError()
496
497 def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
498 """
499 Remove a service (a collection of daemons).
500
501 :return: None
502 """
503 raise NotImplementedError()
504
505 def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
506 """
507 Perform an action (start/stop/reload) on a service (i.e., all daemons
508 providing the logical service).
509
510 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
511 :param service_name: service_type + '.' + service_id
512 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
513 :rtype: OrchResult
514 """
515 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
516 raise NotImplementedError()
517
518 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
519 """
520 Perform an action (start/stop/reload) on a daemon.
521
522 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
523 :param daemon_name: name of daemon
524 :param image: Container image when redeploying that daemon
525 :rtype: OrchResult
526 """
527 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
528 raise NotImplementedError()
529
530 def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
531 """
532 Create one or more OSDs within a single Drive Group.
533
534 The principal argument here is the drive_group member
535 of OsdSpec: other fields are advisory/extensible for any
536 finer-grained OSD feature enablement (choice of backing store,
537 compression/encryption, etc).
538 """
539 raise NotImplementedError()
540
541 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
542 """ Update OSD cluster """
543 raise NotImplementedError()
544
545 def set_unmanaged_flag(self,
546 unmanaged_flag: bool,
547 service_type: str = 'osd',
548 service_name: Optional[str] = None
549 ) -> HandleCommandResult:
550 raise NotImplementedError()
551
552 def preview_osdspecs(self,
553 osdspec_name: Optional[str] = 'osd',
554 osdspecs: Optional[List[DriveGroupSpec]] = None
555 ) -> OrchResult[str]:
556 """ Get a preview for OSD deployments """
557 raise NotImplementedError()
558
559 def remove_osds(self, osd_ids: List[str],
560 replace: bool = False,
561 force: bool = False,
562 zap: bool = False) -> OrchResult[str]:
563 """
564 :param osd_ids: list of OSD IDs
565 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
566 :param force: Forces the OSD removal process without waiting for the data to be drained first.
567 :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
568
569
570 .. note:: this can only remove OSDs that were successfully
571 created (i.e. got an OSD ID).
572 """
573 raise NotImplementedError()
574
575 def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
576 """
577 TODO
578 """
579 raise NotImplementedError()
580
581 def remove_osds_status(self) -> OrchResult:
582 """
583 Returns a status of the ongoing OSD removal operations.
584 """
585 raise NotImplementedError()
586
587 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
588 """
589 Instructs the orchestrator to enable or disable either the ident or the fault LED.
590
591 :param ident_fault: either ``"ident"`` or ``"fault"``
592 :param on: ``True`` = on.
593 :param locations: See :class:`orchestrator.DeviceLightLoc`
594 """
595 raise NotImplementedError()
596
597 def zap_device(self, host: str, path: str) -> OrchResult[str]:
598 """Zap/Erase a device (DESTROYS DATA)"""
599 raise NotImplementedError()
600
601 def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
602 """Create daemons daemon(s) for unmanaged services"""
603 raise NotImplementedError()
604
605 def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
606 """Update mon cluster"""
607 raise NotImplementedError()
608
609 def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
610 """Update mgr cluster"""
611 raise NotImplementedError()
612
613 def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]:
614 """Update MDS cluster"""
615 raise NotImplementedError()
616
617 def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
618 """Update RGW cluster"""
619 raise NotImplementedError()
620
621 def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
622 """Update ingress daemons"""
623 raise NotImplementedError()
624
625 def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
626 """Update rbd-mirror cluster"""
627 raise NotImplementedError()
628
629 def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
630 """Update NFS cluster"""
631 raise NotImplementedError()
632
633 def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
634 """Update iscsi cluster"""
635 raise NotImplementedError()
636
637 def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
638 """Update prometheus cluster"""
639 raise NotImplementedError()
640
641 def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
642 """Update existing a Node-Exporter daemon(s)"""
643 raise NotImplementedError()
644
645 def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
646 """Update existing a crash daemon(s)"""
647 raise NotImplementedError()
648
649 def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
650 """Update existing a grafana service"""
651 raise NotImplementedError()
652
653 def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
654 """Update an existing AlertManager daemon(s)"""
655 raise NotImplementedError()
656
657 def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
658 """Update an existing snmp gateway service"""
659 raise NotImplementedError()
660
661 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
662 raise NotImplementedError()
663
664 def upgrade_ls(self, image: Optional[str], tags: bool) -> OrchResult[Dict[Any, Any]]:
665 raise NotImplementedError()
666
667 def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
668 raise NotImplementedError()
669
670 def upgrade_pause(self) -> OrchResult[str]:
671 raise NotImplementedError()
672
673 def upgrade_resume(self) -> OrchResult[str]:
674 raise NotImplementedError()
675
676 def upgrade_stop(self) -> OrchResult[str]:
677 raise NotImplementedError()
678
679 def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
680 """
681 If an upgrade is currently underway, report on where
682 we are in the process, or if some error has occurred.
683
684 :return: UpgradeStatusSpec instance
685 """
686 raise NotImplementedError()
687
688 @_hide_in_features
689 def upgrade_available(self) -> OrchResult:
690 """
691 Report on what versions are available to upgrade to
692
693 :return: List of strings
694 """
695 raise NotImplementedError()
696
697
698 GenericSpec = Union[ServiceSpec, HostSpec]
699
700
701 def json_to_generic_spec(spec: dict) -> GenericSpec:
702 if 'service_type' in spec and spec['service_type'] == 'host':
703 return HostSpec.from_json(spec)
704 else:
705 return ServiceSpec.from_json(spec)
706
707
708 def daemon_type_to_service(dtype: str) -> str:
709 mapping = {
710 'mon': 'mon',
711 'mgr': 'mgr',
712 'mds': 'mds',
713 'rgw': 'rgw',
714 'osd': 'osd',
715 'haproxy': 'ingress',
716 'keepalived': 'ingress',
717 'iscsi': 'iscsi',
718 'rbd-mirror': 'rbd-mirror',
719 'cephfs-mirror': 'cephfs-mirror',
720 'nfs': 'nfs',
721 'grafana': 'grafana',
722 'alertmanager': 'alertmanager',
723 'prometheus': 'prometheus',
724 'node-exporter': 'node-exporter',
725 'crash': 'crash',
726 'crashcollector': 'crash', # Specific Rook Daemon
727 'container': 'container',
728 'agent': 'agent',
729 'snmp-gateway': 'snmp-gateway',
730 }
731 return mapping[dtype]
732
733
734 def service_to_daemon_types(stype: str) -> List[str]:
735 mapping = {
736 'mon': ['mon'],
737 'mgr': ['mgr'],
738 'mds': ['mds'],
739 'rgw': ['rgw'],
740 'osd': ['osd'],
741 'ingress': ['haproxy', 'keepalived'],
742 'iscsi': ['iscsi'],
743 'rbd-mirror': ['rbd-mirror'],
744 'cephfs-mirror': ['cephfs-mirror'],
745 'nfs': ['nfs'],
746 'grafana': ['grafana'],
747 'alertmanager': ['alertmanager'],
748 'prometheus': ['prometheus'],
749 'node-exporter': ['node-exporter'],
750 'crash': ['crash'],
751 'container': ['container'],
752 'agent': ['agent'],
753 'snmp-gateway': ['snmp-gateway'],
754 }
755 return mapping[stype]
756
757
758 KNOWN_DAEMON_TYPES: List[str] = list(
759 sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
760
761
762 class UpgradeStatusSpec(object):
763 # Orchestrator's report on what's going on with any ongoing upgrade
764 def __init__(self) -> None:
765 self.in_progress = False # Is an upgrade underway?
766 self.target_image: Optional[str] = None
767 self.services_complete: List[str] = [] # Which daemon types are fully updated?
768 self.progress: Optional[str] = None # How many of the daemons have we upgraded
769 self.message = "" # Freeform description
770
771
772 def handle_type_error(method: FuncT) -> FuncT:
773 @wraps(method)
774 def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
775 try:
776 return method(cls, *args, **kwargs)
777 except TypeError as e:
778 error_msg = '{}: {}'.format(cls.__name__, e)
779 raise OrchestratorValidationError(error_msg)
780 return cast(FuncT, inner)
781
782
783 class DaemonDescriptionStatus(enum.IntEnum):
784 unknown = -2
785 error = -1
786 stopped = 0
787 running = 1
788 starting = 2 #: Daemon is deployed, but not yet running
789
790 @staticmethod
791 def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
792 if status is None:
793 status = DaemonDescriptionStatus.unknown
794 return {
795 DaemonDescriptionStatus.unknown: 'unknown',
796 DaemonDescriptionStatus.error: 'error',
797 DaemonDescriptionStatus.stopped: 'stopped',
798 DaemonDescriptionStatus.running: 'running',
799 DaemonDescriptionStatus.starting: 'starting',
800 }.get(status, '<unknown>')
801
802
803 class DaemonDescription(object):
804 """
805 For responding to queries about the status of a particular daemon,
806 stateful or stateless.
807
808 This is not about health or performance monitoring of daemons: it's
809 about letting the orchestrator tell Ceph whether and where a
810 daemon is scheduled in the cluster. When an orchestrator tells
811 Ceph "it's running on host123", that's not a promise that the process
812 is literally up this second, it's a description of where the orchestrator
813 has decided the daemon should run.
814 """
815
816 def __init__(self,
817 daemon_type: Optional[str] = None,
818 daemon_id: Optional[str] = None,
819 hostname: Optional[str] = None,
820 container_id: Optional[str] = None,
821 container_image_id: Optional[str] = None,
822 container_image_name: Optional[str] = None,
823 container_image_digests: Optional[List[str]] = None,
824 version: Optional[str] = None,
825 status: Optional[DaemonDescriptionStatus] = None,
826 status_desc: Optional[str] = None,
827 last_refresh: Optional[datetime.datetime] = None,
828 created: Optional[datetime.datetime] = None,
829 started: Optional[datetime.datetime] = None,
830 last_configured: Optional[datetime.datetime] = None,
831 osdspec_affinity: Optional[str] = None,
832 last_deployed: Optional[datetime.datetime] = None,
833 events: Optional[List['OrchestratorEvent']] = None,
834 is_active: bool = False,
835 memory_usage: Optional[int] = None,
836 memory_request: Optional[int] = None,
837 memory_limit: Optional[int] = None,
838 service_name: Optional[str] = None,
839 ports: Optional[List[int]] = None,
840 ip: Optional[str] = None,
841 deployed_by: Optional[List[str]] = None,
842 rank: Optional[int] = None,
843 rank_generation: Optional[int] = None,
844 extra_container_args: Optional[List[str]] = None,
845 ) -> None:
846
847 #: Host is at the same granularity as InventoryHost
848 self.hostname: Optional[str] = hostname
849
850 # Not everyone runs in containers, but enough people do to
851 # justify having the container_id (runtime id) and container_image
852 # (image name)
853 self.container_id = container_id # runtime id
854 self.container_image_id = container_image_id # image id locally
855 self.container_image_name = container_image_name # image friendly name
856 self.container_image_digests = container_image_digests # reg hashes
857
858 #: The type of service (osd, mon, mgr, etc.)
859 self.daemon_type = daemon_type
860
861 #: The orchestrator will have picked some names for daemons,
862 #: typically either based on hostnames or on pod names.
863 #: This is the <foo> in mds.<foo>, the ID that will appear
864 #: in the FSMap/ServiceMap.
865 self.daemon_id: Optional[str] = daemon_id
866 self.daemon_name = self.name()
867
868 #: Some daemon types have a numeric rank assigned
869 self.rank: Optional[int] = rank
870 self.rank_generation: Optional[int] = rank_generation
871
872 self._service_name: Optional[str] = service_name
873
874 #: Service version that was deployed
875 self.version = version
876
877 # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
878 self._status = status
879
880 #: Service status description when status == error.
881 self.status_desc = status_desc
882
883 #: datetime when this info was last refreshed
884 self.last_refresh: Optional[datetime.datetime] = last_refresh
885
886 self.created: Optional[datetime.datetime] = created
887 self.started: Optional[datetime.datetime] = started
888 self.last_configured: Optional[datetime.datetime] = last_configured
889 self.last_deployed: Optional[datetime.datetime] = last_deployed
890
891 #: Affinity to a certain OSDSpec
892 self.osdspec_affinity: Optional[str] = osdspec_affinity
893
894 self.events: List[OrchestratorEvent] = events or []
895
896 self.memory_usage: Optional[int] = memory_usage
897 self.memory_request: Optional[int] = memory_request
898 self.memory_limit: Optional[int] = memory_limit
899
900 self.ports: Optional[List[int]] = ports
901 self.ip: Optional[str] = ip
902
903 self.deployed_by = deployed_by
904
905 self.is_active = is_active
906
907 self.extra_container_args = extra_container_args
908
909 @property
910 def status(self) -> Optional[DaemonDescriptionStatus]:
911 return self._status
912
913 @status.setter
914 def status(self, new: DaemonDescriptionStatus) -> None:
915 self._status = new
916 self.status_desc = DaemonDescriptionStatus.to_str(new)
917
918 def get_port_summary(self) -> str:
919 if not self.ports:
920 return ''
921 return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
922
923 def name(self) -> str:
924 return '%s.%s' % (self.daemon_type, self.daemon_id)
925
926 def matches_service(self, service_name: Optional[str]) -> bool:
927 assert self.daemon_id is not None
928 assert self.daemon_type is not None
929 if service_name:
930 return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
931 return False
932
933 def service_id(self) -> str:
934 assert self.daemon_id is not None
935 assert self.daemon_type is not None
936
937 if self._service_name:
938 if '.' in self._service_name:
939 return self._service_name.split('.', 1)[1]
940 else:
941 return ''
942
943 if self.daemon_type == 'osd':
944 if self.osdspec_affinity and self.osdspec_affinity != 'None':
945 return self.osdspec_affinity
946 return ''
947
948 def _match() -> str:
949 assert self.daemon_id is not None
950 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
951 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
952
953 if not self.hostname:
954 # TODO: can a DaemonDescription exist without a hostname?
955 raise err
956
957 # use the bare hostname, not the FQDN.
958 host = self.hostname.split('.')[0]
959
960 if host == self.daemon_id:
961 # daemon_id == "host"
962 return self.daemon_id
963
964 elif host in self.daemon_id:
965 # daemon_id == "service_id.host"
966 # daemon_id == "service_id.host.random"
967 pre, post = self.daemon_id.rsplit(host, 1)
968 if not pre.endswith('.'):
969 # '.' sep missing at front of host
970 raise err
971 elif post and not post.startswith('.'):
972 # '.' sep missing at end of host
973 raise err
974 return pre[:-1]
975
976 # daemon_id == "service_id.random"
977 if self.daemon_type == 'rgw':
978 v = self.daemon_id.split('.')
979 if len(v) in [3, 4]:
980 return '.'.join(v[0:2])
981
982 if self.daemon_type == 'iscsi':
983 v = self.daemon_id.split('.')
984 return '.'.join(v[0:-1])
985
986 # daemon_id == "service_id"
987 return self.daemon_id
988
989 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
990 return _match()
991
992 return self.daemon_id
993
994 def service_name(self) -> str:
995 if self._service_name:
996 return self._service_name
997 assert self.daemon_type is not None
998 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
999 return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
1000 return daemon_type_to_service(self.daemon_type)
1001
1002 def __repr__(self) -> str:
1003 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
1004 id=self.daemon_id)
1005
1006 def __str__(self) -> str:
1007 return f"{self.name()} in status {self.status_desc} on {self.hostname}"
1008
1009 def to_json(self) -> dict:
1010 out: Dict[str, Any] = OrderedDict()
1011 out['daemon_type'] = self.daemon_type
1012 out['daemon_id'] = self.daemon_id
1013 out['service_name'] = self._service_name
1014 out['daemon_name'] = self.name()
1015 out['hostname'] = self.hostname
1016 out['container_id'] = self.container_id
1017 out['container_image_id'] = self.container_image_id
1018 out['container_image_name'] = self.container_image_name
1019 out['container_image_digests'] = self.container_image_digests
1020 out['memory_usage'] = self.memory_usage
1021 out['memory_request'] = self.memory_request
1022 out['memory_limit'] = self.memory_limit
1023 out['version'] = self.version
1024 out['status'] = self.status.value if self.status is not None else None
1025 out['status_desc'] = self.status_desc
1026 if self.daemon_type == 'osd':
1027 out['osdspec_affinity'] = self.osdspec_affinity
1028 out['is_active'] = self.is_active
1029 out['ports'] = self.ports
1030 out['ip'] = self.ip
1031 out['rank'] = self.rank
1032 out['rank_generation'] = self.rank_generation
1033
1034 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1035 'last_configured']:
1036 if getattr(self, k):
1037 out[k] = datetime_to_str(getattr(self, k))
1038
1039 if self.events:
1040 out['events'] = [e.to_json() for e in self.events]
1041
1042 empty = [k for k, v in out.items() if v is None]
1043 for e in empty:
1044 del out[e]
1045 return out
1046
1047 def to_dict(self) -> dict:
1048 out: Dict[str, Any] = OrderedDict()
1049 out['daemon_type'] = self.daemon_type
1050 out['daemon_id'] = self.daemon_id
1051 out['daemon_name'] = self.name()
1052 out['hostname'] = self.hostname
1053 out['container_id'] = self.container_id
1054 out['container_image_id'] = self.container_image_id
1055 out['container_image_name'] = self.container_image_name
1056 out['container_image_digests'] = self.container_image_digests
1057 out['memory_usage'] = self.memory_usage
1058 out['memory_request'] = self.memory_request
1059 out['memory_limit'] = self.memory_limit
1060 out['version'] = self.version
1061 out['status'] = self.status.value if self.status is not None else None
1062 out['status_desc'] = self.status_desc
1063 if self.daemon_type == 'osd':
1064 out['osdspec_affinity'] = self.osdspec_affinity
1065 out['is_active'] = self.is_active
1066 out['ports'] = self.ports
1067 out['ip'] = self.ip
1068
1069 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1070 'last_configured']:
1071 if getattr(self, k):
1072 out[k] = datetime_to_str(getattr(self, k))
1073
1074 if self.events:
1075 out['events'] = [e.to_dict() for e in self.events]
1076
1077 empty = [k for k, v in out.items() if v is None]
1078 for e in empty:
1079 del out[e]
1080 return out
1081
1082 @classmethod
1083 @handle_type_error
1084 def from_json(cls, data: dict) -> 'DaemonDescription':
1085 c = data.copy()
1086 event_strs = c.pop('events', [])
1087 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1088 'last_configured']:
1089 if k in c:
1090 c[k] = str_to_datetime(c[k])
1091 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1092 status_int = c.pop('status', None)
1093 if 'daemon_name' in c:
1094 del c['daemon_name']
1095 if 'service_name' in c and c['service_name'].startswith('osd.'):
1096 # if the service_name is a osd.NNN (numeric osd id) then
1097 # ignore it -- it is not a valid service_name and
1098 # (presumably) came from an older version of cephadm.
1099 try:
1100 int(c['service_name'][4:])
1101 del c['service_name']
1102 except ValueError:
1103 pass
1104 status = DaemonDescriptionStatus(status_int) if status_int is not None else None
1105 return cls(events=events, status=status, **c)
1106
1107 def __copy__(self) -> 'DaemonDescription':
1108 # feel free to change this:
1109 return DaemonDescription.from_json(self.to_json())
1110
1111 @staticmethod
1112 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
1113 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
1114
1115
1116 yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1117
1118
1119 class ServiceDescription(object):
1120 """
1121 For responding to queries about the status of a particular service,
1122 stateful or stateless.
1123
1124 This is not about health or performance monitoring of services: it's
1125 about letting the orchestrator tell Ceph whether and where a
1126 service is scheduled in the cluster. When an orchestrator tells
1127 Ceph "it's running on host123", that's not a promise that the process
1128 is literally up this second, it's a description of where the orchestrator
1129 has decided the service should run.
1130 """
1131
1132 def __init__(self,
1133 spec: ServiceSpec,
1134 container_image_id: Optional[str] = None,
1135 container_image_name: Optional[str] = None,
1136 service_url: Optional[str] = None,
1137 last_refresh: Optional[datetime.datetime] = None,
1138 created: Optional[datetime.datetime] = None,
1139 deleted: Optional[datetime.datetime] = None,
1140 size: int = 0,
1141 running: int = 0,
1142 events: Optional[List['OrchestratorEvent']] = None,
1143 virtual_ip: Optional[str] = None,
1144 ports: List[int] = []) -> None:
1145 # Not everyone runs in containers, but enough people do to
1146 # justify having the container_image_id (image hash) and container_image
1147 # (image name)
1148 self.container_image_id = container_image_id # image hash
1149 self.container_image_name = container_image_name # image friendly name
1150
1151 # If the service exposes REST-like API, this attribute should hold
1152 # the URL.
1153 self.service_url = service_url
1154
1155 # Number of daemons
1156 self.size = size
1157
1158 # Number of daemons up
1159 self.running = running
1160
1161 # datetime when this info was last refreshed
1162 self.last_refresh: Optional[datetime.datetime] = last_refresh
1163 self.created: Optional[datetime.datetime] = created
1164 self.deleted: Optional[datetime.datetime] = deleted
1165
1166 self.spec: ServiceSpec = spec
1167
1168 self.events: List[OrchestratorEvent] = events or []
1169
1170 self.virtual_ip = virtual_ip
1171 self.ports = ports
1172
1173 def service_type(self) -> str:
1174 return self.spec.service_type
1175
1176 def __repr__(self) -> str:
1177 return f"<ServiceDescription of {self.spec.one_line_str()}>"
1178
1179 def get_port_summary(self) -> str:
1180 if not self.ports:
1181 return ''
1182 return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1183
1184 def to_json(self) -> OrderedDict:
1185 out = self.spec.to_json()
1186 status = {
1187 'container_image_id': self.container_image_id,
1188 'container_image_name': self.container_image_name,
1189 'service_url': self.service_url,
1190 'size': self.size,
1191 'running': self.running,
1192 'last_refresh': self.last_refresh,
1193 'created': self.created,
1194 'virtual_ip': self.virtual_ip,
1195 'ports': self.ports if self.ports else None,
1196 }
1197 for k in ['last_refresh', 'created']:
1198 if getattr(self, k):
1199 status[k] = datetime_to_str(getattr(self, k))
1200 status = {k: v for (k, v) in status.items() if v is not None}
1201 out['status'] = status
1202 if self.events:
1203 out['events'] = [e.to_json() for e in self.events]
1204 return out
1205
1206 def to_dict(self) -> OrderedDict:
1207 out = self.spec.to_json()
1208 status = {
1209 'container_image_id': self.container_image_id,
1210 'container_image_name': self.container_image_name,
1211 'service_url': self.service_url,
1212 'size': self.size,
1213 'running': self.running,
1214 'last_refresh': self.last_refresh,
1215 'created': self.created,
1216 'virtual_ip': self.virtual_ip,
1217 'ports': self.ports if self.ports else None,
1218 }
1219 for k in ['last_refresh', 'created']:
1220 if getattr(self, k):
1221 status[k] = datetime_to_str(getattr(self, k))
1222 status = {k: v for (k, v) in status.items() if v is not None}
1223 out['status'] = status
1224 if self.events:
1225 out['events'] = [e.to_dict() for e in self.events]
1226 return out
1227
1228 @classmethod
1229 @handle_type_error
1230 def from_json(cls, data: dict) -> 'ServiceDescription':
1231 c = data.copy()
1232 status = c.pop('status', {})
1233 event_strs = c.pop('events', [])
1234 spec = ServiceSpec.from_json(c)
1235
1236 c_status = status.copy()
1237 for k in ['last_refresh', 'created']:
1238 if k in c_status:
1239 c_status[k] = str_to_datetime(c_status[k])
1240 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1241 return cls(spec=spec, events=events, **c_status)
1242
1243 @staticmethod
1244 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
1245 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
1246
1247
1248 yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
1249
1250
1251 class InventoryFilter(object):
1252 """
1253 When fetching inventory, use this filter to avoid unnecessarily
1254 scanning the whole estate.
1255
1256 Typical use:
1257
1258 filter by host when presentig UI workflow for configuring
1259 a particular server.
1260 filter by label when not all of estate is Ceph servers,
1261 and we want to only learn about the Ceph servers.
1262 filter by label when we are interested particularly
1263 in e.g. OSD servers.
1264 """
1265
1266 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
1267
1268 #: Optional: get info about hosts matching labels
1269 self.labels = labels
1270
1271 #: Optional: get info about certain named hosts only
1272 self.hosts = hosts
1273
1274
1275 class InventoryHost(object):
1276 """
1277 When fetching inventory, all Devices are groups inside of an
1278 InventoryHost.
1279 """
1280
1281 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
1282 if devices is None:
1283 devices = inventory.Devices([])
1284 if labels is None:
1285 labels = []
1286 assert isinstance(devices, inventory.Devices)
1287
1288 self.name = name # unique within cluster. For example a hostname.
1289 self.addr = addr or name
1290 self.devices = devices
1291 self.labels = labels
1292
1293 def to_json(self) -> dict:
1294 return {
1295 'name': self.name,
1296 'addr': self.addr,
1297 'devices': self.devices.to_json(),
1298 'labels': self.labels,
1299 }
1300
1301 @classmethod
1302 def from_json(cls, data: dict) -> 'InventoryHost':
1303 try:
1304 _data = copy.deepcopy(data)
1305 name = _data.pop('name')
1306 addr = _data.pop('addr', None) or name
1307 devices = inventory.Devices.from_json(_data.pop('devices'))
1308 labels = _data.pop('labels', list())
1309 if _data:
1310 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1311 raise OrchestratorValidationError(error_msg)
1312 return cls(name, devices, labels, addr)
1313 except KeyError as e:
1314 error_msg = '{} is required for {}'.format(e, cls.__name__)
1315 raise OrchestratorValidationError(error_msg)
1316 except TypeError as e:
1317 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1318
1319 @classmethod
1320 def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
1321 devs = inventory.Devices.from_json
1322 return [cls(item[0], devs(item[1].data)) for item in hosts]
1323
1324 def __repr__(self) -> str:
1325 return "<InventoryHost>({name})".format(name=self.name)
1326
1327 @staticmethod
1328 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
1329 return [host.name for host in hosts]
1330
1331 def __eq__(self, other: Any) -> bool:
1332 return self.name == other.name and self.devices == other.devices
1333
1334
1335 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1336 """
1337 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1338 on devices.
1339
1340 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1341
1342 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1343 See ``ceph osd metadata | jq '.[].device_ids'``
1344 """
1345 __slots__ = ()
1346
1347
1348 class OrchestratorEvent:
1349 """
1350 Similar to K8s Events.
1351
1352 Some form of "important" log message attached to something.
1353 """
1354 INFO = 'INFO'
1355 ERROR = 'ERROR'
1356 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1357
1358 def __init__(self, created: Union[str, datetime.datetime], kind: str,
1359 subject: str, level: str, message: str) -> None:
1360 if isinstance(created, str):
1361 created = str_to_datetime(created)
1362 self.created: datetime.datetime = created
1363
1364 assert kind in "service daemon".split()
1365 self.kind: str = kind
1366
1367 # service name, or daemon danem or something
1368 self.subject: str = subject
1369
1370 # Events are not meant for debugging. debugs should end in the log.
1371 assert level in "INFO ERROR".split()
1372 self.level = level
1373
1374 self.message: str = message
1375
1376 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1377
1378 def kind_subject(self) -> str:
1379 return f'{self.kind}:{self.subject}'
1380
1381 def to_json(self) -> str:
1382 # Make a long list of events readable.
1383 created = datetime_to_str(self.created)
1384 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1385
1386 def to_dict(self) -> dict:
1387 # Convert events data to dict.
1388 return {
1389 'created': datetime_to_str(self.created),
1390 'subject': self.kind_subject(),
1391 'level': self.level,
1392 'message': self.message
1393 }
1394
1395 @classmethod
1396 @handle_type_error
1397 def from_json(cls, data: str) -> "OrchestratorEvent":
1398 """
1399 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1400 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1401
1402 :param data:
1403 :return:
1404 """
1405 match = cls.regex_v1.match(data)
1406 if match:
1407 return cls(*match.groups())
1408 raise ValueError(f'Unable to match: "{data}"')
1409
1410 def __eq__(self, other: Any) -> bool:
1411 if not isinstance(other, OrchestratorEvent):
1412 return False
1413
1414 return self.created == other.created and self.kind == other.kind \
1415 and self.subject == other.subject and self.message == other.message
1416
1417 def __repr__(self) -> str:
1418 return f'OrchestratorEvent.from_json({self.to_json()!r})'
1419
1420
1421 def _mk_orch_methods(cls: Any) -> Any:
1422 # Needs to be defined outside of for.
1423 # Otherwise meth is always bound to last key
1424 def shim(method_name: str) -> Callable:
1425 def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
1426 completion = self._oremote(method_name, args, kwargs)
1427 return completion
1428 return inner
1429
1430 for name, method in Orchestrator.__dict__.items():
1431 if not name.startswith('_') and name not in ['is_orchestrator_module']:
1432 remote_call = update_wrapper(shim(name), method)
1433 setattr(cls, name, remote_call)
1434 return cls
1435
1436
1437 @_mk_orch_methods
1438 class OrchestratorClientMixin(Orchestrator):
1439 """
1440 A module that inherents from `OrchestratorClientMixin` can directly call
1441 all :class:`Orchestrator` methods without manually calling remote.
1442
1443 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1444 calls :func:`OrchestratorClientMixin._oremote`
1445
1446 >>> class MyModule(OrchestratorClientMixin):
1447 ... def func(self):
1448 ... completion = self.add_host('somehost') # calls `_oremote()`
1449 ... self.log.debug(completion.result)
1450
1451 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1452 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1453 "real" implementation of the orchestrator.
1454
1455
1456 >>> import mgr_module
1457 >>> #doctest: +SKIP
1458 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1459 ... def __init__(self, ...):
1460 ... self.orch_client = OrchestratorClientMixin()
1461 ... self.orch_client.set_mgr(self.mgr))
1462 """
1463
1464 def set_mgr(self, mgr: MgrModule) -> None:
1465 """
1466 Useable in the Dashbord that uses a global ``mgr``
1467 """
1468
1469 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1470
1471 def __get_mgr(self) -> Any:
1472 try:
1473 return self.__mgr
1474 except AttributeError:
1475 return self
1476
1477 def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
1478 """
1479 Helper for invoking `remote` on whichever orchestrator is enabled
1480
1481 :raises RuntimeError: If the remote method failed.
1482 :raises OrchestratorError: orchestrator failed to perform
1483 :raises ImportError: no `orchestrator` module or backend not found.
1484 """
1485 mgr = self.__get_mgr()
1486
1487 try:
1488 o = mgr._select_orchestrator()
1489 except AttributeError:
1490 o = mgr.remote('orchestrator', '_select_orchestrator')
1491
1492 if o is None:
1493 raise NoOrchestrator()
1494
1495 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1496 try:
1497 return mgr.remote(o, meth, *args, **kwargs)
1498 except Exception as e:
1499 if meth == 'get_feature_set':
1500 raise # self.get_feature_set() calls self._oremote()
1501 f_set = self.get_feature_set()
1502 if meth not in f_set or not f_set[meth]['available']:
1503 raise NotImplementedError(f'{o} does not implement {meth}') from e
1504 raise