]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
b0ccf73570b7772f4d2957efea31d50e69c6886d
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
1
2 """
3 ceph-mgr orchestrator interface
4
5 Please see the ceph-mgr module developer's guide for more information.
6 """
7
8 import copy
9 import datetime
10 import enum
11 import errno
12 import logging
13 import pickle
14 import re
15
16 from collections import namedtuple, OrderedDict
17 from contextlib import contextmanager
18 from functools import wraps, reduce, update_wrapper
19
20 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
21 Sequence, Dict, cast, Mapping
22
23 try:
24 from typing import Protocol # Protocol was added in Python 3.8
25 except ImportError:
26 class Protocol: # type: ignore
27 pass
28
29
30 import yaml
31
32 from ceph.deployment import inventory
33 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
34 IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec
35 from ceph.deployment.drive_group import DriveGroupSpec
36 from ceph.deployment.hostspec import HostSpec, SpecValidationError
37 from ceph.utils import datetime_to_str, str_to_datetime
38
39 from mgr_module import MgrModule, CLICommand, HandleCommandResult
40
41
42 logger = logging.getLogger(__name__)
43
44 T = TypeVar('T')
45 FuncT = TypeVar('FuncT', bound=Callable[..., Any])
46
47
48 class OrchestratorError(Exception):
49 """
50 General orchestrator specific error.
51
52 Used for deployment, configuration or user errors.
53
54 It's not intended for programming errors or orchestrator internal errors.
55 """
56
57 def __init__(self,
58 msg: str,
59 errno: int = -errno.EINVAL,
60 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
61 super(Exception, self).__init__(msg)
62 self.errno = errno
63 # See OrchestratorEvent.subject
64 self.event_subject = event_kind_subject
65
66
67 class NoOrchestrator(OrchestratorError):
68 """
69 No orchestrator in configured.
70 """
71
72 def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
74
75
76 class OrchestratorValidationError(OrchestratorError):
77 """
78 Raised when an orchestrator doesn't support a specific feature.
79 """
80
81
82 @contextmanager
83 def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
84 try:
85 yield
86 except OrchestratorError as e:
87 if overwrite or hasattr(e, 'event_subject'):
88 e.event_subject = (kind, subject)
89 raise
90
91
92 def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
93 @wraps(func)
94 def wrapper(*args: Any, **kwargs: Any) -> Any:
95 try:
96 return func(*args, **kwargs)
97 except (OrchestratorError, SpecValidationError) as e:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e.errno, stderr=str(e))
100 except ImportError as e:
101 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
102 except NotImplementedError:
103 msg = 'This Orchestrator does not support `{}`'.format(prefix)
104 return HandleCommandResult(-errno.ENOENT, stderr=msg)
105
106 # misuse lambda to copy `wrapper`
107 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731
108 wrapper_copy._prefix = prefix # type: ignore
109 wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore
110 wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
111 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
112
113 return cast(FuncT, wrapper_copy)
114
115
116 def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
117 """
118 Decorator to make Orchestrator methods return
119 an OrchResult.
120 """
121
122 @wraps(f)
123 def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
124 try:
125 return OrchResult(f(*args, **kwargs))
126 except Exception as e:
127 logger.exception(e)
128 import os
129 if 'UNITTEST' in os.environ:
130 raise # This makes debugging of Tracebacks from unittests a bit easier
131 return OrchResult(None, exception=e)
132
133 return cast(Callable[..., OrchResult[T]], wrapper)
134
135
136 class InnerCliCommandCallable(Protocol):
137 def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
138 ...
139
140
141 def _cli_command(perm: str) -> InnerCliCommandCallable:
142 def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
143 return lambda func: handle_exception(prefix, perm, func)
144 return inner_cli_command
145
146
147 _cli_read_command = _cli_command('r')
148 _cli_write_command = _cli_command('rw')
149
150
151 class CLICommandMeta(type):
152 """
153 This is a workaround for the use of a global variable CLICommand.COMMANDS which
154 prevents modules from importing any other module.
155
156 We make use of CLICommand, except for the use of the global variable.
157 """
158 def __init__(cls, name: str, bases: Any, dct: Any) -> None:
159 super(CLICommandMeta, cls).__init__(name, bases, dct)
160 dispatch: Dict[str, CLICommand] = {}
161 for v in dct.values():
162 try:
163 dispatch[v._prefix] = v._cli_command
164 except AttributeError:
165 pass
166
167 def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
168 if cmd['prefix'] not in dispatch:
169 return self.handle_command(inbuf, cmd)
170
171 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
172
173 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
174 cls.handle_command = handle_command
175
176
177 class OrchResult(Generic[T]):
178 """
179 Stores a result and an exception. Mainly to circumvent the
180 MgrModule.remote() method that hides all exceptions and for
181 handling different sub-interpreters.
182 """
183
184 def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
185 self.result = result
186 self.serialized_exception: Optional[bytes] = None
187 self.exception_str: str = ''
188 self.set_exception(exception)
189
190 __slots__ = 'result', 'serialized_exception', 'exception_str'
191
192 def set_exception(self, e: Optional[Exception]) -> None:
193 if e is None:
194 self.serialized_exception = None
195 self.exception_str = ''
196 return
197
198 self.exception_str = f'{type(e)}: {str(e)}'
199 try:
200 self.serialized_exception = pickle.dumps(e)
201 except pickle.PicklingError:
202 logger.error(f"failed to pickle {e}")
203 if isinstance(e, Exception):
204 e = Exception(*e.args)
205 else:
206 e = Exception(str(e))
207 # degenerate to a plain Exception
208 self.serialized_exception = pickle.dumps(e)
209
210 def result_str(self) -> str:
211 """Force a string."""
212 if self.result is None:
213 return ''
214 if isinstance(self.result, list):
215 return '\n'.join(str(x) for x in self.result)
216 return str(self.result)
217
218
219 def raise_if_exception(c: OrchResult[T]) -> T:
220 """
221 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
222 """
223 if c.serialized_exception is not None:
224 try:
225 e = pickle.loads(c.serialized_exception)
226 except (KeyError, AttributeError):
227 raise Exception(c.exception_str)
228 raise e
229 assert c.result is not None, 'OrchResult should either have an exception or a result'
230 return c.result
231
232
233 def _hide_in_features(f: FuncT) -> FuncT:
234 f._hide_in_features = True # type: ignore
235 return f
236
237
238 class Orchestrator(object):
239 """
240 Calls in this class may do long running remote operations, with time
241 periods ranging from network latencies to package install latencies and large
242 internet downloads. For that reason, all are asynchronous, and return
243 ``Completion`` objects.
244
245 Methods should only return the completion and not directly execute
246 anything, like network calls. Otherwise the purpose of
247 those completions is defeated.
248
249 Implementations are not required to start work on an operation until
250 the caller waits on the relevant Completion objects. Callers making
251 multiple updates should not wait on Completions until they're done
252 sending operations: this enables implementations to batch up a series
253 of updates when wait() is called on a set of Completion objects.
254
255 Implementations are encouraged to keep reasonably fresh caches of
256 the status of the system: it is better to serve a stale-but-recent
257 result read of e.g. device inventory than it is to keep the caller waiting
258 while you scan hosts every time.
259 """
260
261 @_hide_in_features
262 def is_orchestrator_module(self) -> bool:
263 """
264 Enable other modules to interrogate this module to discover
265 whether it's usable as an orchestrator module.
266
267 Subclasses do not need to override this.
268 """
269 return True
270
271 @_hide_in_features
272 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
273 """
274 Report whether we can talk to the orchestrator. This is the
275 place to give the user a meaningful message if the orchestrator
276 isn't running or can't be contacted.
277
278 This method may be called frequently (e.g. every page load
279 to conditionally display a warning banner), so make sure it's
280 not too expensive. It's okay to give a slightly stale status
281 (e.g. based on a periodic background ping of the orchestrator)
282 if that's necessary to make this method fast.
283
284 .. note::
285 `True` doesn't mean that the desired functionality
286 is actually available in the orchestrator. I.e. this
287 won't work as expected::
288
289 >>> #doctest: +SKIP
290 ... if OrchestratorClientMixin().available()[0]: # wrong.
291 ... OrchestratorClientMixin().get_hosts()
292
293 :return: boolean representing whether the module is available/usable
294 :return: string describing any error
295 :return: dict containing any module specific information
296 """
297 raise NotImplementedError()
298
299 @_hide_in_features
300 def get_feature_set(self) -> Dict[str, dict]:
301 """Describes which methods this orchestrator implements
302
303 .. note::
304 `True` doesn't mean that the desired functionality
305 is actually possible in the orchestrator. I.e. this
306 won't work as expected::
307
308 >>> #doctest: +SKIP
309 ... api = OrchestratorClientMixin()
310 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
311 ... api.get_hosts()
312
313 It's better to ask for forgiveness instead::
314
315 >>> #doctest: +SKIP
316 ... try:
317 ... OrchestratorClientMixin().get_hosts()
318 ... except (OrchestratorError, NotImplementedError):
319 ... ...
320
321 :returns: Dict of API method names to ``{'available': True or False}``
322 """
323 module = self.__class__
324 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
325 for a in Orchestrator.__dict__
326 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
327 }
328 return features
329
330 def cancel_completions(self) -> None:
331 """
332 Cancels ongoing completions. Unstuck the mgr.
333 """
334 raise NotImplementedError()
335
336 def pause(self) -> None:
337 raise NotImplementedError()
338
339 def resume(self) -> None:
340 raise NotImplementedError()
341
342 def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
343 """
344 Add a host to the orchestrator inventory.
345
346 :param host: hostname
347 """
348 raise NotImplementedError()
349
350 def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
351 """
352 Remove a host from the orchestrator inventory.
353
354 :param host: hostname
355 """
356 raise NotImplementedError()
357
358 def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]:
359 """
360 drain all daemons from a host
361
362 :param hostname: hostname
363 """
364 raise NotImplementedError()
365
366 def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
367 """
368 Update a host's address
369
370 :param host: hostname
371 :param addr: address (dns name or IP)
372 """
373 raise NotImplementedError()
374
375 def get_hosts(self) -> OrchResult[List[HostSpec]]:
376 """
377 Report the hosts in the cluster.
378
379 :return: list of HostSpec
380 """
381 raise NotImplementedError()
382
383 def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
384 """
385 Return hosts metadata(gather_facts).
386 """
387 raise NotImplementedError()
388
389 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
390 """
391 Add a host label
392 """
393 raise NotImplementedError()
394
395 def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
396 """
397 Remove a host label
398 """
399 raise NotImplementedError()
400
401 def host_ok_to_stop(self, hostname: str) -> OrchResult:
402 """
403 Check if the specified host can be safely stopped without reducing availability
404
405 :param host: hostname
406 """
407 raise NotImplementedError()
408
409 def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
410 """
411 Place a host in maintenance, stopping daemons and disabling it's systemd target
412 """
413 raise NotImplementedError()
414
415 def exit_host_maintenance(self, hostname: str) -> OrchResult:
416 """
417 Return a host from maintenance, restarting the clusters systemd target
418 """
419 raise NotImplementedError()
420
421 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
422 """
423 Returns something that was created by `ceph-volume inventory`.
424
425 :return: list of InventoryHost
426 """
427 raise NotImplementedError()
428
429 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
430 """
431 Describe a service (of any kind) that is already configured in
432 the orchestrator. For example, when viewing an OSD in the dashboard
433 we might like to also display information about the orchestrator's
434 view of the service (like the kubernetes pod ID).
435
436 When viewing a CephFS filesystem in the dashboard, we would use this
437 to display the pods being currently run for MDS daemons.
438
439 :return: list of ServiceDescription objects.
440 """
441 raise NotImplementedError()
442
443 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
444 """
445 Describe a daemon (of any kind) that is already configured in
446 the orchestrator.
447
448 :return: list of DaemonDescription objects.
449 """
450 raise NotImplementedError()
451
452 @handle_orch_error
453 def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
454 """
455 Applies any spec
456 """
457 fns: Dict[str, Callable[..., OrchResult[str]]] = {
458 'alertmanager': self.apply_alertmanager,
459 'crash': self.apply_crash,
460 'grafana': self.apply_grafana,
461 'iscsi': self.apply_iscsi,
462 'mds': self.apply_mds,
463 'mgr': self.apply_mgr,
464 'mon': self.apply_mon,
465 'nfs': self.apply_nfs,
466 'node-exporter': self.apply_node_exporter,
467 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
468 'prometheus': self.apply_prometheus,
469 'loki': self.apply_loki,
470 'promtail': self.apply_promtail,
471 'rbd-mirror': self.apply_rbd_mirror,
472 'rgw': self.apply_rgw,
473 'ingress': self.apply_ingress,
474 'snmp-gateway': self.apply_snmp_gateway,
475 'host': self.add_host,
476 }
477
478 def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
479 l_res = raise_if_exception(l)
480 r_res = raise_if_exception(r)
481 l_res.append(r_res)
482 return OrchResult(l_res)
483 return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
484
485 def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
486 """
487 Plan (Dry-run, Preview) a List of Specs.
488 """
489 raise NotImplementedError()
490
491 def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
492 """
493 Remove specific daemon(s).
494
495 :return: None
496 """
497 raise NotImplementedError()
498
499 def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
500 """
501 Remove a service (a collection of daemons).
502
503 :return: None
504 """
505 raise NotImplementedError()
506
507 def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
508 """
509 Perform an action (start/stop/reload) on a service (i.e., all daemons
510 providing the logical service).
511
512 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
513 :param service_name: service_type + '.' + service_id
514 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
515 :rtype: OrchResult
516 """
517 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
518 raise NotImplementedError()
519
520 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
521 """
522 Perform an action (start/stop/reload) on a daemon.
523
524 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
525 :param daemon_name: name of daemon
526 :param image: Container image when redeploying that daemon
527 :rtype: OrchResult
528 """
529 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
530 raise NotImplementedError()
531
532 def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
533 """
534 Create one or more OSDs within a single Drive Group.
535
536 The principal argument here is the drive_group member
537 of OsdSpec: other fields are advisory/extensible for any
538 finer-grained OSD feature enablement (choice of backing store,
539 compression/encryption, etc).
540 """
541 raise NotImplementedError()
542
543 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
544 """ Update OSD cluster """
545 raise NotImplementedError()
546
547 def set_unmanaged_flag(self,
548 unmanaged_flag: bool,
549 service_type: str = 'osd',
550 service_name: Optional[str] = None
551 ) -> HandleCommandResult:
552 raise NotImplementedError()
553
554 def preview_osdspecs(self,
555 osdspec_name: Optional[str] = 'osd',
556 osdspecs: Optional[List[DriveGroupSpec]] = None
557 ) -> OrchResult[str]:
558 """ Get a preview for OSD deployments """
559 raise NotImplementedError()
560
561 def remove_osds(self, osd_ids: List[str],
562 replace: bool = False,
563 force: bool = False,
564 zap: bool = False) -> OrchResult[str]:
565 """
566 :param osd_ids: list of OSD IDs
567 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
568 :param force: Forces the OSD removal process without waiting for the data to be drained first.
569 :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
570
571
572 .. note:: this can only remove OSDs that were successfully
573 created (i.e. got an OSD ID).
574 """
575 raise NotImplementedError()
576
577 def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
578 """
579 TODO
580 """
581 raise NotImplementedError()
582
583 def remove_osds_status(self) -> OrchResult:
584 """
585 Returns a status of the ongoing OSD removal operations.
586 """
587 raise NotImplementedError()
588
589 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
590 """
591 Instructs the orchestrator to enable or disable either the ident or the fault LED.
592
593 :param ident_fault: either ``"ident"`` or ``"fault"``
594 :param on: ``True`` = on.
595 :param locations: See :class:`orchestrator.DeviceLightLoc`
596 """
597 raise NotImplementedError()
598
599 def zap_device(self, host: str, path: str) -> OrchResult[str]:
600 """Zap/Erase a device (DESTROYS DATA)"""
601 raise NotImplementedError()
602
603 def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
604 """Create daemons daemon(s) for unmanaged services"""
605 raise NotImplementedError()
606
607 def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
608 """Update mon cluster"""
609 raise NotImplementedError()
610
611 def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
612 """Update mgr cluster"""
613 raise NotImplementedError()
614
615 def apply_mds(self, spec: MDSSpec) -> OrchResult[str]:
616 """Update MDS cluster"""
617 raise NotImplementedError()
618
619 def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
620 """Update RGW cluster"""
621 raise NotImplementedError()
622
623 def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
624 """Update ingress daemons"""
625 raise NotImplementedError()
626
627 def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
628 """Update rbd-mirror cluster"""
629 raise NotImplementedError()
630
631 def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
632 """Update NFS cluster"""
633 raise NotImplementedError()
634
635 def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
636 """Update iscsi cluster"""
637 raise NotImplementedError()
638
639 def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
640 """Update prometheus cluster"""
641 raise NotImplementedError()
642
643 def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
644 """Update existing a Node-Exporter daemon(s)"""
645 raise NotImplementedError()
646
647 def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]:
648 """Update existing a Loki daemon(s)"""
649 raise NotImplementedError()
650
651 def apply_promtail(self, spec: ServiceSpec) -> OrchResult[str]:
652 """Update existing a Promtail daemon(s)"""
653 raise NotImplementedError()
654
655 def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
656 """Update existing a crash daemon(s)"""
657 raise NotImplementedError()
658
659 def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
660 """Update existing a grafana service"""
661 raise NotImplementedError()
662
663 def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
664 """Update an existing AlertManager daemon(s)"""
665 raise NotImplementedError()
666
667 def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
668 """Update an existing snmp gateway service"""
669 raise NotImplementedError()
670
671 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
672 raise NotImplementedError()
673
674 def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]:
675 raise NotImplementedError()
676
677 def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
678 hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
679 raise NotImplementedError()
680
681 def upgrade_pause(self) -> OrchResult[str]:
682 raise NotImplementedError()
683
684 def upgrade_resume(self) -> OrchResult[str]:
685 raise NotImplementedError()
686
687 def upgrade_stop(self) -> OrchResult[str]:
688 raise NotImplementedError()
689
690 def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
691 """
692 If an upgrade is currently underway, report on where
693 we are in the process, or if some error has occurred.
694
695 :return: UpgradeStatusSpec instance
696 """
697 raise NotImplementedError()
698
699 @_hide_in_features
700 def upgrade_available(self) -> OrchResult:
701 """
702 Report on what versions are available to upgrade to
703
704 :return: List of strings
705 """
706 raise NotImplementedError()
707
708
709 GenericSpec = Union[ServiceSpec, HostSpec]
710
711
712 def json_to_generic_spec(spec: dict) -> GenericSpec:
713 if 'service_type' in spec and spec['service_type'] == 'host':
714 return HostSpec.from_json(spec)
715 else:
716 return ServiceSpec.from_json(spec)
717
718
719 def daemon_type_to_service(dtype: str) -> str:
720 mapping = {
721 'mon': 'mon',
722 'mgr': 'mgr',
723 'mds': 'mds',
724 'rgw': 'rgw',
725 'osd': 'osd',
726 'haproxy': 'ingress',
727 'keepalived': 'ingress',
728 'iscsi': 'iscsi',
729 'rbd-mirror': 'rbd-mirror',
730 'cephfs-mirror': 'cephfs-mirror',
731 'nfs': 'nfs',
732 'grafana': 'grafana',
733 'alertmanager': 'alertmanager',
734 'prometheus': 'prometheus',
735 'node-exporter': 'node-exporter',
736 'loki': 'loki',
737 'promtail': 'promtail',
738 'crash': 'crash',
739 'crashcollector': 'crash', # Specific Rook Daemon
740 'container': 'container',
741 'agent': 'agent',
742 'snmp-gateway': 'snmp-gateway',
743 }
744 return mapping[dtype]
745
746
747 def service_to_daemon_types(stype: str) -> List[str]:
748 mapping = {
749 'mon': ['mon'],
750 'mgr': ['mgr'],
751 'mds': ['mds'],
752 'rgw': ['rgw'],
753 'osd': ['osd'],
754 'ingress': ['haproxy', 'keepalived'],
755 'iscsi': ['iscsi'],
756 'rbd-mirror': ['rbd-mirror'],
757 'cephfs-mirror': ['cephfs-mirror'],
758 'nfs': ['nfs'],
759 'grafana': ['grafana'],
760 'alertmanager': ['alertmanager'],
761 'prometheus': ['prometheus'],
762 'loki': ['loki'],
763 'promtail': ['promtail'],
764 'node-exporter': ['node-exporter'],
765 'crash': ['crash'],
766 'container': ['container'],
767 'agent': ['agent'],
768 'snmp-gateway': ['snmp-gateway'],
769 }
770 return mapping[stype]
771
772
773 KNOWN_DAEMON_TYPES: List[str] = list(
774 sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
775
776
777 class UpgradeStatusSpec(object):
778 # Orchestrator's report on what's going on with any ongoing upgrade
779 def __init__(self) -> None:
780 self.in_progress = False # Is an upgrade underway?
781 self.target_image: Optional[str] = None
782 self.services_complete: List[str] = [] # Which daemon types are fully updated?
783 self.which: str = '<unknown>' # for if user specified daemon types, services or hosts
784 self.progress: Optional[str] = None # How many of the daemons have we upgraded
785 self.message = "" # Freeform description
786
787
788 def handle_type_error(method: FuncT) -> FuncT:
789 @wraps(method)
790 def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
791 try:
792 return method(cls, *args, **kwargs)
793 except TypeError as e:
794 error_msg = '{}: {}'.format(cls.__name__, e)
795 raise OrchestratorValidationError(error_msg)
796 return cast(FuncT, inner)
797
798
799 class DaemonDescriptionStatus(enum.IntEnum):
800 unknown = -2
801 error = -1
802 stopped = 0
803 running = 1
804 starting = 2 #: Daemon is deployed, but not yet running
805
806 @staticmethod
807 def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
808 if status is None:
809 status = DaemonDescriptionStatus.unknown
810 return {
811 DaemonDescriptionStatus.unknown: 'unknown',
812 DaemonDescriptionStatus.error: 'error',
813 DaemonDescriptionStatus.stopped: 'stopped',
814 DaemonDescriptionStatus.running: 'running',
815 DaemonDescriptionStatus.starting: 'starting',
816 }.get(status, '<unknown>')
817
818
819 class DaemonDescription(object):
820 """
821 For responding to queries about the status of a particular daemon,
822 stateful or stateless.
823
824 This is not about health or performance monitoring of daemons: it's
825 about letting the orchestrator tell Ceph whether and where a
826 daemon is scheduled in the cluster. When an orchestrator tells
827 Ceph "it's running on host123", that's not a promise that the process
828 is literally up this second, it's a description of where the orchestrator
829 has decided the daemon should run.
830 """
831
832 def __init__(self,
833 daemon_type: Optional[str] = None,
834 daemon_id: Optional[str] = None,
835 hostname: Optional[str] = None,
836 container_id: Optional[str] = None,
837 container_image_id: Optional[str] = None,
838 container_image_name: Optional[str] = None,
839 container_image_digests: Optional[List[str]] = None,
840 version: Optional[str] = None,
841 status: Optional[DaemonDescriptionStatus] = None,
842 status_desc: Optional[str] = None,
843 last_refresh: Optional[datetime.datetime] = None,
844 created: Optional[datetime.datetime] = None,
845 started: Optional[datetime.datetime] = None,
846 last_configured: Optional[datetime.datetime] = None,
847 osdspec_affinity: Optional[str] = None,
848 last_deployed: Optional[datetime.datetime] = None,
849 events: Optional[List['OrchestratorEvent']] = None,
850 is_active: bool = False,
851 memory_usage: Optional[int] = None,
852 memory_request: Optional[int] = None,
853 memory_limit: Optional[int] = None,
854 cpu_percentage: Optional[str] = None,
855 service_name: Optional[str] = None,
856 ports: Optional[List[int]] = None,
857 ip: Optional[str] = None,
858 deployed_by: Optional[List[str]] = None,
859 rank: Optional[int] = None,
860 rank_generation: Optional[int] = None,
861 extra_container_args: Optional[List[str]] = None,
862 ) -> None:
863
864 #: Host is at the same granularity as InventoryHost
865 self.hostname: Optional[str] = hostname
866
867 # Not everyone runs in containers, but enough people do to
868 # justify having the container_id (runtime id) and container_image
869 # (image name)
870 self.container_id = container_id # runtime id
871 self.container_image_id = container_image_id # image id locally
872 self.container_image_name = container_image_name # image friendly name
873 self.container_image_digests = container_image_digests # reg hashes
874
875 #: The type of service (osd, mon, mgr, etc.)
876 self.daemon_type = daemon_type
877
878 #: The orchestrator will have picked some names for daemons,
879 #: typically either based on hostnames or on pod names.
880 #: This is the <foo> in mds.<foo>, the ID that will appear
881 #: in the FSMap/ServiceMap.
882 self.daemon_id: Optional[str] = daemon_id
883 self.daemon_name = self.name()
884
885 #: Some daemon types have a numeric rank assigned
886 self.rank: Optional[int] = rank
887 self.rank_generation: Optional[int] = rank_generation
888
889 self._service_name: Optional[str] = service_name
890
891 #: Service version that was deployed
892 self.version = version
893
894 # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
895 self._status = status
896
897 #: Service status description when status == error.
898 self.status_desc = status_desc
899
900 #: datetime when this info was last refreshed
901 self.last_refresh: Optional[datetime.datetime] = last_refresh
902
903 self.created: Optional[datetime.datetime] = created
904 self.started: Optional[datetime.datetime] = started
905 self.last_configured: Optional[datetime.datetime] = last_configured
906 self.last_deployed: Optional[datetime.datetime] = last_deployed
907
908 #: Affinity to a certain OSDSpec
909 self.osdspec_affinity: Optional[str] = osdspec_affinity
910
911 self.events: List[OrchestratorEvent] = events or []
912
913 self.memory_usage: Optional[int] = memory_usage
914 self.memory_request: Optional[int] = memory_request
915 self.memory_limit: Optional[int] = memory_limit
916
917 self.cpu_percentage: Optional[str] = cpu_percentage
918
919 self.ports: Optional[List[int]] = ports
920 self.ip: Optional[str] = ip
921
922 self.deployed_by = deployed_by
923
924 self.is_active = is_active
925
926 self.extra_container_args = extra_container_args
927
928 @property
929 def status(self) -> Optional[DaemonDescriptionStatus]:
930 return self._status
931
932 @status.setter
933 def status(self, new: DaemonDescriptionStatus) -> None:
934 self._status = new
935 self.status_desc = DaemonDescriptionStatus.to_str(new)
936
937 def get_port_summary(self) -> str:
938 if not self.ports:
939 return ''
940 return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
941
942 def name(self) -> str:
943 return '%s.%s' % (self.daemon_type, self.daemon_id)
944
945 def matches_service(self, service_name: Optional[str]) -> bool:
946 assert self.daemon_id is not None
947 assert self.daemon_type is not None
948 if service_name:
949 return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
950 return False
951
952 def service_id(self) -> str:
953 assert self.daemon_id is not None
954 assert self.daemon_type is not None
955
956 if self._service_name:
957 if '.' in self._service_name:
958 return self._service_name.split('.', 1)[1]
959 else:
960 return ''
961
962 if self.daemon_type == 'osd':
963 if self.osdspec_affinity and self.osdspec_affinity != 'None':
964 return self.osdspec_affinity
965 return ''
966
967 def _match() -> str:
968 assert self.daemon_id is not None
969 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
970 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
971
972 if not self.hostname:
973 # TODO: can a DaemonDescription exist without a hostname?
974 raise err
975
976 # use the bare hostname, not the FQDN.
977 host = self.hostname.split('.')[0]
978
979 if host == self.daemon_id:
980 # daemon_id == "host"
981 return self.daemon_id
982
983 elif host in self.daemon_id:
984 # daemon_id == "service_id.host"
985 # daemon_id == "service_id.host.random"
986 pre, post = self.daemon_id.rsplit(host, 1)
987 if not pre.endswith('.'):
988 # '.' sep missing at front of host
989 raise err
990 elif post and not post.startswith('.'):
991 # '.' sep missing at end of host
992 raise err
993 return pre[:-1]
994
995 # daemon_id == "service_id.random"
996 if self.daemon_type == 'rgw':
997 v = self.daemon_id.split('.')
998 if len(v) in [3, 4]:
999 return '.'.join(v[0:2])
1000
1001 if self.daemon_type == 'iscsi':
1002 v = self.daemon_id.split('.')
1003 return '.'.join(v[0:-1])
1004
1005 # daemon_id == "service_id"
1006 return self.daemon_id
1007
1008 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
1009 return _match()
1010
1011 return self.daemon_id
1012
1013 def service_name(self) -> str:
1014 if self._service_name:
1015 return self._service_name
1016 assert self.daemon_type is not None
1017 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
1018 return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
1019 return daemon_type_to_service(self.daemon_type)
1020
1021 def __repr__(self) -> str:
1022 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
1023 id=self.daemon_id)
1024
1025 def __str__(self) -> str:
1026 return f"{self.name()} in status {self.status_desc} on {self.hostname}"
1027
1028 def to_json(self) -> dict:
1029 out: Dict[str, Any] = OrderedDict()
1030 out['daemon_type'] = self.daemon_type
1031 out['daemon_id'] = self.daemon_id
1032 out['service_name'] = self._service_name
1033 out['daemon_name'] = self.name()
1034 out['hostname'] = self.hostname
1035 out['container_id'] = self.container_id
1036 out['container_image_id'] = self.container_image_id
1037 out['container_image_name'] = self.container_image_name
1038 out['container_image_digests'] = self.container_image_digests
1039 out['memory_usage'] = self.memory_usage
1040 out['memory_request'] = self.memory_request
1041 out['memory_limit'] = self.memory_limit
1042 out['cpu_percentage'] = self.cpu_percentage
1043 out['version'] = self.version
1044 out['status'] = self.status.value if self.status is not None else None
1045 out['status_desc'] = self.status_desc
1046 if self.daemon_type == 'osd':
1047 out['osdspec_affinity'] = self.osdspec_affinity
1048 out['is_active'] = self.is_active
1049 out['ports'] = self.ports
1050 out['ip'] = self.ip
1051 out['rank'] = self.rank
1052 out['rank_generation'] = self.rank_generation
1053
1054 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1055 'last_configured']:
1056 if getattr(self, k):
1057 out[k] = datetime_to_str(getattr(self, k))
1058
1059 if self.events:
1060 out['events'] = [e.to_json() for e in self.events]
1061
1062 empty = [k for k, v in out.items() if v is None]
1063 for e in empty:
1064 del out[e]
1065 return out
1066
1067 def to_dict(self) -> dict:
1068 out: Dict[str, Any] = OrderedDict()
1069 out['daemon_type'] = self.daemon_type
1070 out['daemon_id'] = self.daemon_id
1071 out['daemon_name'] = self.name()
1072 out['hostname'] = self.hostname
1073 out['container_id'] = self.container_id
1074 out['container_image_id'] = self.container_image_id
1075 out['container_image_name'] = self.container_image_name
1076 out['container_image_digests'] = self.container_image_digests
1077 out['memory_usage'] = self.memory_usage
1078 out['memory_request'] = self.memory_request
1079 out['memory_limit'] = self.memory_limit
1080 out['cpu_percentage'] = self.cpu_percentage
1081 out['version'] = self.version
1082 out['status'] = self.status.value if self.status is not None else None
1083 out['status_desc'] = self.status_desc
1084 if self.daemon_type == 'osd':
1085 out['osdspec_affinity'] = self.osdspec_affinity
1086 out['is_active'] = self.is_active
1087 out['ports'] = self.ports
1088 out['ip'] = self.ip
1089
1090 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1091 'last_configured']:
1092 if getattr(self, k):
1093 out[k] = datetime_to_str(getattr(self, k))
1094
1095 if self.events:
1096 out['events'] = [e.to_dict() for e in self.events]
1097
1098 empty = [k for k, v in out.items() if v is None]
1099 for e in empty:
1100 del out[e]
1101 return out
1102
1103 @classmethod
1104 @handle_type_error
1105 def from_json(cls, data: dict) -> 'DaemonDescription':
1106 c = data.copy()
1107 event_strs = c.pop('events', [])
1108 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1109 'last_configured']:
1110 if k in c:
1111 c[k] = str_to_datetime(c[k])
1112 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1113 status_int = c.pop('status', None)
1114 if 'daemon_name' in c:
1115 del c['daemon_name']
1116 if 'service_name' in c and c['service_name'].startswith('osd.'):
1117 # if the service_name is a osd.NNN (numeric osd id) then
1118 # ignore it -- it is not a valid service_name and
1119 # (presumably) came from an older version of cephadm.
1120 try:
1121 int(c['service_name'][4:])
1122 del c['service_name']
1123 except ValueError:
1124 pass
1125 status = DaemonDescriptionStatus(status_int) if status_int is not None else None
1126 return cls(events=events, status=status, **c)
1127
1128 def __copy__(self) -> 'DaemonDescription':
1129 # feel free to change this:
1130 return DaemonDescription.from_json(self.to_json())
1131
1132 @staticmethod
1133 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
1134 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
1135
1136
1137 yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1138
1139
1140 class ServiceDescription(object):
1141 """
1142 For responding to queries about the status of a particular service,
1143 stateful or stateless.
1144
1145 This is not about health or performance monitoring of services: it's
1146 about letting the orchestrator tell Ceph whether and where a
1147 service is scheduled in the cluster. When an orchestrator tells
1148 Ceph "it's running on host123", that's not a promise that the process
1149 is literally up this second, it's a description of where the orchestrator
1150 has decided the service should run.
1151 """
1152
1153 def __init__(self,
1154 spec: ServiceSpec,
1155 container_image_id: Optional[str] = None,
1156 container_image_name: Optional[str] = None,
1157 service_url: Optional[str] = None,
1158 last_refresh: Optional[datetime.datetime] = None,
1159 created: Optional[datetime.datetime] = None,
1160 deleted: Optional[datetime.datetime] = None,
1161 size: int = 0,
1162 running: int = 0,
1163 events: Optional[List['OrchestratorEvent']] = None,
1164 virtual_ip: Optional[str] = None,
1165 ports: List[int] = []) -> None:
1166 # Not everyone runs in containers, but enough people do to
1167 # justify having the container_image_id (image hash) and container_image
1168 # (image name)
1169 self.container_image_id = container_image_id # image hash
1170 self.container_image_name = container_image_name # image friendly name
1171
1172 # If the service exposes REST-like API, this attribute should hold
1173 # the URL.
1174 self.service_url = service_url
1175
1176 # Number of daemons
1177 self.size = size
1178
1179 # Number of daemons up
1180 self.running = running
1181
1182 # datetime when this info was last refreshed
1183 self.last_refresh: Optional[datetime.datetime] = last_refresh
1184 self.created: Optional[datetime.datetime] = created
1185 self.deleted: Optional[datetime.datetime] = deleted
1186
1187 self.spec: ServiceSpec = spec
1188
1189 self.events: List[OrchestratorEvent] = events or []
1190
1191 self.virtual_ip = virtual_ip
1192 self.ports = ports
1193
1194 def service_type(self) -> str:
1195 return self.spec.service_type
1196
1197 def __repr__(self) -> str:
1198 return f"<ServiceDescription of {self.spec.one_line_str()}>"
1199
1200 def get_port_summary(self) -> str:
1201 if not self.ports:
1202 return ''
1203 return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1204
1205 def to_json(self) -> OrderedDict:
1206 out = self.spec.to_json()
1207 status = {
1208 'container_image_id': self.container_image_id,
1209 'container_image_name': self.container_image_name,
1210 'service_url': self.service_url,
1211 'size': self.size,
1212 'running': self.running,
1213 'last_refresh': self.last_refresh,
1214 'created': self.created,
1215 'virtual_ip': self.virtual_ip,
1216 'ports': self.ports if self.ports else None,
1217 }
1218 for k in ['last_refresh', 'created']:
1219 if getattr(self, k):
1220 status[k] = datetime_to_str(getattr(self, k))
1221 status = {k: v for (k, v) in status.items() if v is not None}
1222 out['status'] = status
1223 if self.events:
1224 out['events'] = [e.to_json() for e in self.events]
1225 return out
1226
1227 def to_dict(self) -> OrderedDict:
1228 out = self.spec.to_json()
1229 status = {
1230 'container_image_id': self.container_image_id,
1231 'container_image_name': self.container_image_name,
1232 'service_url': self.service_url,
1233 'size': self.size,
1234 'running': self.running,
1235 'last_refresh': self.last_refresh,
1236 'created': self.created,
1237 'virtual_ip': self.virtual_ip,
1238 'ports': self.ports if self.ports else None,
1239 }
1240 for k in ['last_refresh', 'created']:
1241 if getattr(self, k):
1242 status[k] = datetime_to_str(getattr(self, k))
1243 status = {k: v for (k, v) in status.items() if v is not None}
1244 out['status'] = status
1245 if self.events:
1246 out['events'] = [e.to_dict() for e in self.events]
1247 return out
1248
1249 @classmethod
1250 @handle_type_error
1251 def from_json(cls, data: dict) -> 'ServiceDescription':
1252 c = data.copy()
1253 status = c.pop('status', {})
1254 event_strs = c.pop('events', [])
1255 spec = ServiceSpec.from_json(c)
1256
1257 c_status = status.copy()
1258 for k in ['last_refresh', 'created']:
1259 if k in c_status:
1260 c_status[k] = str_to_datetime(c_status[k])
1261 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1262 return cls(spec=spec, events=events, **c_status)
1263
1264 @staticmethod
1265 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
1266 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
1267
1268
1269 yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
1270
1271
1272 class InventoryFilter(object):
1273 """
1274 When fetching inventory, use this filter to avoid unnecessarily
1275 scanning the whole estate.
1276
1277 Typical use:
1278
1279 filter by host when presentig UI workflow for configuring
1280 a particular server.
1281 filter by label when not all of estate is Ceph servers,
1282 and we want to only learn about the Ceph servers.
1283 filter by label when we are interested particularly
1284 in e.g. OSD servers.
1285 """
1286
1287 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
1288
1289 #: Optional: get info about hosts matching labels
1290 self.labels = labels
1291
1292 #: Optional: get info about certain named hosts only
1293 self.hosts = hosts
1294
1295
1296 class InventoryHost(object):
1297 """
1298 When fetching inventory, all Devices are groups inside of an
1299 InventoryHost.
1300 """
1301
1302 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
1303 if devices is None:
1304 devices = inventory.Devices([])
1305 if labels is None:
1306 labels = []
1307 assert isinstance(devices, inventory.Devices)
1308
1309 self.name = name # unique within cluster. For example a hostname.
1310 self.addr = addr or name
1311 self.devices = devices
1312 self.labels = labels
1313
1314 def to_json(self) -> dict:
1315 return {
1316 'name': self.name,
1317 'addr': self.addr,
1318 'devices': self.devices.to_json(),
1319 'labels': self.labels,
1320 }
1321
1322 @classmethod
1323 def from_json(cls, data: dict) -> 'InventoryHost':
1324 try:
1325 _data = copy.deepcopy(data)
1326 name = _data.pop('name')
1327 addr = _data.pop('addr', None) or name
1328 devices = inventory.Devices.from_json(_data.pop('devices'))
1329 labels = _data.pop('labels', list())
1330 if _data:
1331 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1332 raise OrchestratorValidationError(error_msg)
1333 return cls(name, devices, labels, addr)
1334 except KeyError as e:
1335 error_msg = '{} is required for {}'.format(e, cls.__name__)
1336 raise OrchestratorValidationError(error_msg)
1337 except TypeError as e:
1338 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1339
1340 @classmethod
1341 def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
1342 devs = inventory.Devices.from_json
1343 return [cls(item[0], devs(item[1].data)) for item in hosts]
1344
1345 def __repr__(self) -> str:
1346 return "<InventoryHost>({name})".format(name=self.name)
1347
1348 @staticmethod
1349 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
1350 return [host.name for host in hosts]
1351
1352 def __eq__(self, other: Any) -> bool:
1353 return self.name == other.name and self.devices == other.devices
1354
1355
1356 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1357 """
1358 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1359 on devices.
1360
1361 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1362
1363 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1364 See ``ceph osd metadata | jq '.[].device_ids'``
1365 """
1366 __slots__ = ()
1367
1368
1369 class OrchestratorEvent:
1370 """
1371 Similar to K8s Events.
1372
1373 Some form of "important" log message attached to something.
1374 """
1375 INFO = 'INFO'
1376 ERROR = 'ERROR'
1377 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1378
1379 def __init__(self, created: Union[str, datetime.datetime], kind: str,
1380 subject: str, level: str, message: str) -> None:
1381 if isinstance(created, str):
1382 created = str_to_datetime(created)
1383 self.created: datetime.datetime = created
1384
1385 assert kind in "service daemon".split()
1386 self.kind: str = kind
1387
1388 # service name, or daemon danem or something
1389 self.subject: str = subject
1390
1391 # Events are not meant for debugging. debugs should end in the log.
1392 assert level in "INFO ERROR".split()
1393 self.level = level
1394
1395 self.message: str = message
1396
1397 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1398
1399 def kind_subject(self) -> str:
1400 return f'{self.kind}:{self.subject}'
1401
1402 def to_json(self) -> str:
1403 # Make a long list of events readable.
1404 created = datetime_to_str(self.created)
1405 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1406
1407 def to_dict(self) -> dict:
1408 # Convert events data to dict.
1409 return {
1410 'created': datetime_to_str(self.created),
1411 'subject': self.kind_subject(),
1412 'level': self.level,
1413 'message': self.message
1414 }
1415
1416 @classmethod
1417 @handle_type_error
1418 def from_json(cls, data: str) -> "OrchestratorEvent":
1419 """
1420 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1421 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1422
1423 :param data:
1424 :return:
1425 """
1426 match = cls.regex_v1.match(data)
1427 if match:
1428 return cls(*match.groups())
1429 raise ValueError(f'Unable to match: "{data}"')
1430
1431 def __eq__(self, other: Any) -> bool:
1432 if not isinstance(other, OrchestratorEvent):
1433 return False
1434
1435 return self.created == other.created and self.kind == other.kind \
1436 and self.subject == other.subject and self.message == other.message
1437
1438 def __repr__(self) -> str:
1439 return f'OrchestratorEvent.from_json({self.to_json()!r})'
1440
1441
1442 def _mk_orch_methods(cls: Any) -> Any:
1443 # Needs to be defined outside of for.
1444 # Otherwise meth is always bound to last key
1445 def shim(method_name: str) -> Callable:
1446 def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
1447 completion = self._oremote(method_name, args, kwargs)
1448 return completion
1449 return inner
1450
1451 for name, method in Orchestrator.__dict__.items():
1452 if not name.startswith('_') and name not in ['is_orchestrator_module']:
1453 remote_call = update_wrapper(shim(name), method)
1454 setattr(cls, name, remote_call)
1455 return cls
1456
1457
1458 @_mk_orch_methods
1459 class OrchestratorClientMixin(Orchestrator):
1460 """
1461 A module that inherents from `OrchestratorClientMixin` can directly call
1462 all :class:`Orchestrator` methods without manually calling remote.
1463
1464 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1465 calls :func:`OrchestratorClientMixin._oremote`
1466
1467 >>> class MyModule(OrchestratorClientMixin):
1468 ... def func(self):
1469 ... completion = self.add_host('somehost') # calls `_oremote()`
1470 ... self.log.debug(completion.result)
1471
1472 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1473 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1474 "real" implementation of the orchestrator.
1475
1476
1477 >>> import mgr_module
1478 >>> #doctest: +SKIP
1479 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1480 ... def __init__(self, ...):
1481 ... self.orch_client = OrchestratorClientMixin()
1482 ... self.orch_client.set_mgr(self.mgr))
1483 """
1484
1485 def set_mgr(self, mgr: MgrModule) -> None:
1486 """
1487 Useable in the Dashbord that uses a global ``mgr``
1488 """
1489
1490 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1491
1492 def __get_mgr(self) -> Any:
1493 try:
1494 return self.__mgr
1495 except AttributeError:
1496 return self
1497
1498 def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
1499 """
1500 Helper for invoking `remote` on whichever orchestrator is enabled
1501
1502 :raises RuntimeError: If the remote method failed.
1503 :raises OrchestratorError: orchestrator failed to perform
1504 :raises ImportError: no `orchestrator` module or backend not found.
1505 """
1506 mgr = self.__get_mgr()
1507
1508 try:
1509 o = mgr._select_orchestrator()
1510 except AttributeError:
1511 o = mgr.remote('orchestrator', '_select_orchestrator')
1512
1513 if o is None:
1514 raise NoOrchestrator()
1515
1516 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1517 try:
1518 return mgr.remote(o, meth, *args, **kwargs)
1519 except Exception as e:
1520 if meth == 'get_feature_set':
1521 raise # self.get_feature_set() calls self._oremote()
1522 f_set = self.get_feature_set()
1523 if meth not in f_set or not f_set[meth]['available']:
1524 raise NotImplementedError(f'{o} does not implement {meth}') from e
1525 raise