]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
6f0374ffede94dc1c0571981ee5af61ac10986aa
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
1
2 """
3 ceph-mgr orchestrator interface
4
5 Please see the ceph-mgr module developer's guide for more information.
6 """
7
8 import copy
9 import datetime
10 import enum
11 import errno
12 import logging
13 import pickle
14 import re
15
16 from collections import namedtuple, OrderedDict
17 from contextlib import contextmanager
18 from functools import wraps, reduce, update_wrapper
19
20 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
21 Sequence, Dict, cast, Mapping
22
23 try:
24 from typing import Protocol # Protocol was added in Python 3.8
25 except ImportError:
26 class Protocol: # type: ignore
27 pass
28
29
30 import yaml
31
32 from ceph.deployment import inventory
33 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
34 IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec, TunedProfileSpec
35 from ceph.deployment.drive_group import DriveGroupSpec
36 from ceph.deployment.hostspec import HostSpec, SpecValidationError
37 from ceph.utils import datetime_to_str, str_to_datetime
38
39 from mgr_module import MgrModule, CLICommand, HandleCommandResult
40
41
42 logger = logging.getLogger(__name__)
43
44 T = TypeVar('T')
45 FuncT = TypeVar('FuncT', bound=Callable[..., Any])
46
47
48 class OrchestratorError(Exception):
49 """
50 General orchestrator specific error.
51
52 Used for deployment, configuration or user errors.
53
54 It's not intended for programming errors or orchestrator internal errors.
55 """
56
57 def __init__(self,
58 msg: str,
59 errno: int = -errno.EINVAL,
60 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
61 super(Exception, self).__init__(msg)
62 self.errno = errno
63 # See OrchestratorEvent.subject
64 self.event_subject = event_kind_subject
65
66
67 class NoOrchestrator(OrchestratorError):
68 """
69 No orchestrator in configured.
70 """
71
72 def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
74
75
76 class OrchestratorValidationError(OrchestratorError):
77 """
78 Raised when an orchestrator doesn't support a specific feature.
79 """
80
81
82 @contextmanager
83 def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
84 try:
85 yield
86 except OrchestratorError as e:
87 if overwrite or hasattr(e, 'event_subject'):
88 e.event_subject = (kind, subject)
89 raise
90
91
92 def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
93 @wraps(func)
94 def wrapper(*args: Any, **kwargs: Any) -> Any:
95 try:
96 return func(*args, **kwargs)
97 except (OrchestratorError, SpecValidationError) as e:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e.errno, stderr=str(e))
100 except ImportError as e:
101 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
102 except NotImplementedError:
103 msg = 'This Orchestrator does not support `{}`'.format(prefix)
104 return HandleCommandResult(-errno.ENOENT, stderr=msg)
105
106 # misuse lambda to copy `wrapper`
107 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731
108 wrapper_copy._prefix = prefix # type: ignore
109 wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore
110 wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
111 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
112
113 return cast(FuncT, wrapper_copy)
114
115
116 def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
117 """
118 Decorator to make Orchestrator methods return
119 an OrchResult.
120 """
121
122 @wraps(f)
123 def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
124 try:
125 return OrchResult(f(*args, **kwargs))
126 except Exception as e:
127 logger.exception(e)
128 import os
129 if 'UNITTEST' in os.environ:
130 raise # This makes debugging of Tracebacks from unittests a bit easier
131 return OrchResult(None, exception=e)
132
133 return cast(Callable[..., OrchResult[T]], wrapper)
134
135
136 class InnerCliCommandCallable(Protocol):
137 def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
138 ...
139
140
141 def _cli_command(perm: str) -> InnerCliCommandCallable:
142 def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
143 return lambda func: handle_exception(prefix, perm, func)
144 return inner_cli_command
145
146
147 _cli_read_command = _cli_command('r')
148 _cli_write_command = _cli_command('rw')
149
150
151 class CLICommandMeta(type):
152 """
153 This is a workaround for the use of a global variable CLICommand.COMMANDS which
154 prevents modules from importing any other module.
155
156 We make use of CLICommand, except for the use of the global variable.
157 """
158 def __init__(cls, name: str, bases: Any, dct: Any) -> None:
159 super(CLICommandMeta, cls).__init__(name, bases, dct)
160 dispatch: Dict[str, CLICommand] = {}
161 for v in dct.values():
162 try:
163 dispatch[v._prefix] = v._cli_command
164 except AttributeError:
165 pass
166
167 def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
168 if cmd['prefix'] not in dispatch:
169 return self.handle_command(inbuf, cmd)
170
171 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
172
173 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
174 cls.handle_command = handle_command
175
176
177 class OrchResult(Generic[T]):
178 """
179 Stores a result and an exception. Mainly to circumvent the
180 MgrModule.remote() method that hides all exceptions and for
181 handling different sub-interpreters.
182 """
183
184 def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
185 self.result = result
186 self.serialized_exception: Optional[bytes] = None
187 self.exception_str: str = ''
188 self.set_exception(exception)
189
190 __slots__ = 'result', 'serialized_exception', 'exception_str'
191
192 def set_exception(self, e: Optional[Exception]) -> None:
193 if e is None:
194 self.serialized_exception = None
195 self.exception_str = ''
196 return
197
198 self.exception_str = f'{type(e)}: {str(e)}'
199 try:
200 self.serialized_exception = pickle.dumps(e)
201 except pickle.PicklingError:
202 logger.error(f"failed to pickle {e}")
203 if isinstance(e, Exception):
204 e = Exception(*e.args)
205 else:
206 e = Exception(str(e))
207 # degenerate to a plain Exception
208 self.serialized_exception = pickle.dumps(e)
209
210 def result_str(self) -> str:
211 """Force a string."""
212 if self.result is None:
213 return ''
214 if isinstance(self.result, list):
215 return '\n'.join(str(x) for x in self.result)
216 return str(self.result)
217
218
219 def raise_if_exception(c: OrchResult[T]) -> T:
220 """
221 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
222 """
223 if c.serialized_exception is not None:
224 try:
225 e = pickle.loads(c.serialized_exception)
226 except (KeyError, AttributeError):
227 raise Exception(c.exception_str)
228 raise e
229 assert c.result is not None, 'OrchResult should either have an exception or a result'
230 return c.result
231
232
233 def _hide_in_features(f: FuncT) -> FuncT:
234 f._hide_in_features = True # type: ignore
235 return f
236
237
238 class Orchestrator(object):
239 """
240 Calls in this class may do long running remote operations, with time
241 periods ranging from network latencies to package install latencies and large
242 internet downloads. For that reason, all are asynchronous, and return
243 ``Completion`` objects.
244
245 Methods should only return the completion and not directly execute
246 anything, like network calls. Otherwise the purpose of
247 those completions is defeated.
248
249 Implementations are not required to start work on an operation until
250 the caller waits on the relevant Completion objects. Callers making
251 multiple updates should not wait on Completions until they're done
252 sending operations: this enables implementations to batch up a series
253 of updates when wait() is called on a set of Completion objects.
254
255 Implementations are encouraged to keep reasonably fresh caches of
256 the status of the system: it is better to serve a stale-but-recent
257 result read of e.g. device inventory than it is to keep the caller waiting
258 while you scan hosts every time.
259 """
260
261 @_hide_in_features
262 def is_orchestrator_module(self) -> bool:
263 """
264 Enable other modules to interrogate this module to discover
265 whether it's usable as an orchestrator module.
266
267 Subclasses do not need to override this.
268 """
269 return True
270
271 @_hide_in_features
272 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
273 """
274 Report whether we can talk to the orchestrator. This is the
275 place to give the user a meaningful message if the orchestrator
276 isn't running or can't be contacted.
277
278 This method may be called frequently (e.g. every page load
279 to conditionally display a warning banner), so make sure it's
280 not too expensive. It's okay to give a slightly stale status
281 (e.g. based on a periodic background ping of the orchestrator)
282 if that's necessary to make this method fast.
283
284 .. note::
285 `True` doesn't mean that the desired functionality
286 is actually available in the orchestrator. I.e. this
287 won't work as expected::
288
289 >>> #doctest: +SKIP
290 ... if OrchestratorClientMixin().available()[0]: # wrong.
291 ... OrchestratorClientMixin().get_hosts()
292
293 :return: boolean representing whether the module is available/usable
294 :return: string describing any error
295 :return: dict containing any module specific information
296 """
297 raise NotImplementedError()
298
299 @_hide_in_features
300 def get_feature_set(self) -> Dict[str, dict]:
301 """Describes which methods this orchestrator implements
302
303 .. note::
304 `True` doesn't mean that the desired functionality
305 is actually possible in the orchestrator. I.e. this
306 won't work as expected::
307
308 >>> #doctest: +SKIP
309 ... api = OrchestratorClientMixin()
310 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
311 ... api.get_hosts()
312
313 It's better to ask for forgiveness instead::
314
315 >>> #doctest: +SKIP
316 ... try:
317 ... OrchestratorClientMixin().get_hosts()
318 ... except (OrchestratorError, NotImplementedError):
319 ... ...
320
321 :returns: Dict of API method names to ``{'available': True or False}``
322 """
323 module = self.__class__
324 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
325 for a in Orchestrator.__dict__
326 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
327 }
328 return features
329
330 def cancel_completions(self) -> None:
331 """
332 Cancels ongoing completions. Unstuck the mgr.
333 """
334 raise NotImplementedError()
335
336 def pause(self) -> None:
337 raise NotImplementedError()
338
339 def resume(self) -> None:
340 raise NotImplementedError()
341
342 def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
343 """
344 Add a host to the orchestrator inventory.
345
346 :param host: hostname
347 """
348 raise NotImplementedError()
349
350 def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
351 """
352 Remove a host from the orchestrator inventory.
353
354 :param host: hostname
355 """
356 raise NotImplementedError()
357
358 def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]:
359 """
360 drain all daemons from a host
361
362 :param hostname: hostname
363 """
364 raise NotImplementedError()
365
366 def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
367 """
368 Update a host's address
369
370 :param host: hostname
371 :param addr: address (dns name or IP)
372 """
373 raise NotImplementedError()
374
375 def get_hosts(self) -> OrchResult[List[HostSpec]]:
376 """
377 Report the hosts in the cluster.
378
379 :return: list of HostSpec
380 """
381 raise NotImplementedError()
382
383 def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
384 """
385 Return hosts metadata(gather_facts).
386 """
387 raise NotImplementedError()
388
389 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
390 """
391 Add a host label
392 """
393 raise NotImplementedError()
394
395 def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
396 """
397 Remove a host label
398 """
399 raise NotImplementedError()
400
401 def host_ok_to_stop(self, hostname: str) -> OrchResult:
402 """
403 Check if the specified host can be safely stopped without reducing availability
404
405 :param host: hostname
406 """
407 raise NotImplementedError()
408
409 def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
410 """
411 Place a host in maintenance, stopping daemons and disabling it's systemd target
412 """
413 raise NotImplementedError()
414
415 def exit_host_maintenance(self, hostname: str) -> OrchResult:
416 """
417 Return a host from maintenance, restarting the clusters systemd target
418 """
419 raise NotImplementedError()
420
421 def rescan_host(self, hostname: str) -> OrchResult:
422 """Use cephadm to issue a disk rescan on each HBA
423
424 Some HBAs and external enclosures don't automatically register
425 device insertion with the kernel, so for these scenarios we need
426 to manually rescan
427
428 :param hostname: (str) host name
429 """
430 raise NotImplementedError()
431
432 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
433 """
434 Returns something that was created by `ceph-volume inventory`.
435
436 :return: list of InventoryHost
437 """
438 raise NotImplementedError()
439
440 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
441 """
442 Describe a service (of any kind) that is already configured in
443 the orchestrator. For example, when viewing an OSD in the dashboard
444 we might like to also display information about the orchestrator's
445 view of the service (like the kubernetes pod ID).
446
447 When viewing a CephFS filesystem in the dashboard, we would use this
448 to display the pods being currently run for MDS daemons.
449
450 :return: list of ServiceDescription objects.
451 """
452 raise NotImplementedError()
453
454 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
455 """
456 Describe a daemon (of any kind) that is already configured in
457 the orchestrator.
458
459 :return: list of DaemonDescription objects.
460 """
461 raise NotImplementedError()
462
463 @handle_orch_error
464 def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
465 """
466 Applies any spec
467 """
468 fns: Dict[str, Callable[..., OrchResult[str]]] = {
469 'alertmanager': self.apply_alertmanager,
470 'crash': self.apply_crash,
471 'grafana': self.apply_grafana,
472 'iscsi': self.apply_iscsi,
473 'mds': self.apply_mds,
474 'mgr': self.apply_mgr,
475 'mon': self.apply_mon,
476 'nfs': self.apply_nfs,
477 'node-exporter': self.apply_node_exporter,
478 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
479 'prometheus': self.apply_prometheus,
480 'loki': self.apply_loki,
481 'promtail': self.apply_promtail,
482 'rbd-mirror': self.apply_rbd_mirror,
483 'rgw': self.apply_rgw,
484 'ingress': self.apply_ingress,
485 'snmp-gateway': self.apply_snmp_gateway,
486 'host': self.add_host,
487 }
488
489 def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
490 l_res = raise_if_exception(l)
491 r_res = raise_if_exception(r)
492 l_res.append(r_res)
493 return OrchResult(l_res)
494 return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
495
496 def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
497 """
498 Plan (Dry-run, Preview) a List of Specs.
499 """
500 raise NotImplementedError()
501
502 def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
503 """
504 Remove specific daemon(s).
505
506 :return: None
507 """
508 raise NotImplementedError()
509
510 def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
511 """
512 Remove a service (a collection of daemons).
513
514 :return: None
515 """
516 raise NotImplementedError()
517
518 def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
519 """
520 Perform an action (start/stop/reload) on a service (i.e., all daemons
521 providing the logical service).
522
523 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
524 :param service_name: service_type + '.' + service_id
525 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
526 :rtype: OrchResult
527 """
528 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
529 raise NotImplementedError()
530
531 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
532 """
533 Perform an action (start/stop/reload) on a daemon.
534
535 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
536 :param daemon_name: name of daemon
537 :param image: Container image when redeploying that daemon
538 :rtype: OrchResult
539 """
540 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
541 raise NotImplementedError()
542
543 def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
544 """
545 Create one or more OSDs within a single Drive Group.
546
547 The principal argument here is the drive_group member
548 of OsdSpec: other fields are advisory/extensible for any
549 finer-grained OSD feature enablement (choice of backing store,
550 compression/encryption, etc).
551 """
552 raise NotImplementedError()
553
554 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
555 """ Update OSD cluster """
556 raise NotImplementedError()
557
558 def set_unmanaged_flag(self,
559 unmanaged_flag: bool,
560 service_type: str = 'osd',
561 service_name: Optional[str] = None
562 ) -> HandleCommandResult:
563 raise NotImplementedError()
564
565 def preview_osdspecs(self,
566 osdspec_name: Optional[str] = 'osd',
567 osdspecs: Optional[List[DriveGroupSpec]] = None
568 ) -> OrchResult[str]:
569 """ Get a preview for OSD deployments """
570 raise NotImplementedError()
571
572 def remove_osds(self, osd_ids: List[str],
573 replace: bool = False,
574 force: bool = False,
575 zap: bool = False) -> OrchResult[str]:
576 """
577 :param osd_ids: list of OSD IDs
578 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
579 :param force: Forces the OSD removal process without waiting for the data to be drained first.
580 :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
581
582
583 .. note:: this can only remove OSDs that were successfully
584 created (i.e. got an OSD ID).
585 """
586 raise NotImplementedError()
587
588 def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
589 """
590 TODO
591 """
592 raise NotImplementedError()
593
594 def remove_osds_status(self) -> OrchResult:
595 """
596 Returns a status of the ongoing OSD removal operations.
597 """
598 raise NotImplementedError()
599
600 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
601 """
602 Instructs the orchestrator to enable or disable either the ident or the fault LED.
603
604 :param ident_fault: either ``"ident"`` or ``"fault"``
605 :param on: ``True`` = on.
606 :param locations: See :class:`orchestrator.DeviceLightLoc`
607 """
608 raise NotImplementedError()
609
610 def zap_device(self, host: str, path: str) -> OrchResult[str]:
611 """Zap/Erase a device (DESTROYS DATA)"""
612 raise NotImplementedError()
613
614 def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
615 """Create daemons daemon(s) for unmanaged services"""
616 raise NotImplementedError()
617
618 def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
619 """Update mon cluster"""
620 raise NotImplementedError()
621
622 def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
623 """Update mgr cluster"""
624 raise NotImplementedError()
625
626 def apply_mds(self, spec: MDSSpec) -> OrchResult[str]:
627 """Update MDS cluster"""
628 raise NotImplementedError()
629
630 def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
631 """Update RGW cluster"""
632 raise NotImplementedError()
633
634 def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
635 """Update ingress daemons"""
636 raise NotImplementedError()
637
638 def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
639 """Update rbd-mirror cluster"""
640 raise NotImplementedError()
641
642 def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
643 """Update NFS cluster"""
644 raise NotImplementedError()
645
646 def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
647 """Update iscsi cluster"""
648 raise NotImplementedError()
649
650 def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
651 """Update prometheus cluster"""
652 raise NotImplementedError()
653
654 def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
655 """Update existing a Node-Exporter daemon(s)"""
656 raise NotImplementedError()
657
658 def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]:
659 """Update existing a Loki daemon(s)"""
660 raise NotImplementedError()
661
662 def apply_promtail(self, spec: ServiceSpec) -> OrchResult[str]:
663 """Update existing a Promtail daemon(s)"""
664 raise NotImplementedError()
665
666 def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
667 """Update existing a crash daemon(s)"""
668 raise NotImplementedError()
669
670 def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
671 """Update existing a grafana service"""
672 raise NotImplementedError()
673
674 def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
675 """Update an existing AlertManager daemon(s)"""
676 raise NotImplementedError()
677
678 def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
679 """Update an existing snmp gateway service"""
680 raise NotImplementedError()
681
682 def apply_tuned_profiles(self, specs: List[TunedProfileSpec], no_overwrite: bool) -> OrchResult[str]:
683 """Add or update an existing tuned profile"""
684 raise NotImplementedError()
685
686 def rm_tuned_profile(self, profile_name: str) -> OrchResult[str]:
687 """Remove a tuned profile"""
688 raise NotImplementedError()
689
690 def tuned_profile_ls(self) -> OrchResult[List[TunedProfileSpec]]:
691 """See current tuned profiles"""
692 raise NotImplementedError()
693
694 def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> OrchResult[str]:
695 """Change/Add a specific setting for a tuned profile"""
696 raise NotImplementedError()
697
698 def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> OrchResult[str]:
699 """Remove a specific setting for a tuned profile"""
700 raise NotImplementedError()
701
702 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
703 raise NotImplementedError()
704
705 def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]:
706 raise NotImplementedError()
707
708 def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
709 hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
710 raise NotImplementedError()
711
712 def upgrade_pause(self) -> OrchResult[str]:
713 raise NotImplementedError()
714
715 def upgrade_resume(self) -> OrchResult[str]:
716 raise NotImplementedError()
717
718 def upgrade_stop(self) -> OrchResult[str]:
719 raise NotImplementedError()
720
721 def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
722 """
723 If an upgrade is currently underway, report on where
724 we are in the process, or if some error has occurred.
725
726 :return: UpgradeStatusSpec instance
727 """
728 raise NotImplementedError()
729
730 @_hide_in_features
731 def upgrade_available(self) -> OrchResult:
732 """
733 Report on what versions are available to upgrade to
734
735 :return: List of strings
736 """
737 raise NotImplementedError()
738
739
740 GenericSpec = Union[ServiceSpec, HostSpec]
741
742
743 def json_to_generic_spec(spec: dict) -> GenericSpec:
744 if 'service_type' in spec and spec['service_type'] == 'host':
745 return HostSpec.from_json(spec)
746 else:
747 return ServiceSpec.from_json(spec)
748
749
750 def daemon_type_to_service(dtype: str) -> str:
751 mapping = {
752 'mon': 'mon',
753 'mgr': 'mgr',
754 'mds': 'mds',
755 'rgw': 'rgw',
756 'osd': 'osd',
757 'haproxy': 'ingress',
758 'keepalived': 'ingress',
759 'iscsi': 'iscsi',
760 'rbd-mirror': 'rbd-mirror',
761 'cephfs-mirror': 'cephfs-mirror',
762 'nfs': 'nfs',
763 'grafana': 'grafana',
764 'alertmanager': 'alertmanager',
765 'prometheus': 'prometheus',
766 'node-exporter': 'node-exporter',
767 'loki': 'loki',
768 'promtail': 'promtail',
769 'crash': 'crash',
770 'crashcollector': 'crash', # Specific Rook Daemon
771 'container': 'container',
772 'agent': 'agent',
773 'snmp-gateway': 'snmp-gateway',
774 }
775 return mapping[dtype]
776
777
778 def service_to_daemon_types(stype: str) -> List[str]:
779 mapping = {
780 'mon': ['mon'],
781 'mgr': ['mgr'],
782 'mds': ['mds'],
783 'rgw': ['rgw'],
784 'osd': ['osd'],
785 'ingress': ['haproxy', 'keepalived'],
786 'iscsi': ['iscsi'],
787 'rbd-mirror': ['rbd-mirror'],
788 'cephfs-mirror': ['cephfs-mirror'],
789 'nfs': ['nfs'],
790 'grafana': ['grafana'],
791 'alertmanager': ['alertmanager'],
792 'prometheus': ['prometheus'],
793 'loki': ['loki'],
794 'promtail': ['promtail'],
795 'node-exporter': ['node-exporter'],
796 'crash': ['crash'],
797 'container': ['container'],
798 'agent': ['agent'],
799 'snmp-gateway': ['snmp-gateway'],
800 }
801 return mapping[stype]
802
803
804 KNOWN_DAEMON_TYPES: List[str] = list(
805 sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
806
807
808 class UpgradeStatusSpec(object):
809 # Orchestrator's report on what's going on with any ongoing upgrade
810 def __init__(self) -> None:
811 self.in_progress = False # Is an upgrade underway?
812 self.target_image: Optional[str] = None
813 self.services_complete: List[str] = [] # Which daemon types are fully updated?
814 self.which: str = '<unknown>' # for if user specified daemon types, services or hosts
815 self.progress: Optional[str] = None # How many of the daemons have we upgraded
816 self.message = "" # Freeform description
817 self.is_paused: bool = False # Is the upgrade paused?
818
819
820 def handle_type_error(method: FuncT) -> FuncT:
821 @wraps(method)
822 def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
823 try:
824 return method(cls, *args, **kwargs)
825 except TypeError as e:
826 error_msg = '{}: {}'.format(cls.__name__, e)
827 raise OrchestratorValidationError(error_msg)
828 return cast(FuncT, inner)
829
830
831 class DaemonDescriptionStatus(enum.IntEnum):
832 unknown = -2
833 error = -1
834 stopped = 0
835 running = 1
836 starting = 2 #: Daemon is deployed, but not yet running
837
838 @staticmethod
839 def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
840 if status is None:
841 status = DaemonDescriptionStatus.unknown
842 return {
843 DaemonDescriptionStatus.unknown: 'unknown',
844 DaemonDescriptionStatus.error: 'error',
845 DaemonDescriptionStatus.stopped: 'stopped',
846 DaemonDescriptionStatus.running: 'running',
847 DaemonDescriptionStatus.starting: 'starting',
848 }.get(status, '<unknown>')
849
850
851 class DaemonDescription(object):
852 """
853 For responding to queries about the status of a particular daemon,
854 stateful or stateless.
855
856 This is not about health or performance monitoring of daemons: it's
857 about letting the orchestrator tell Ceph whether and where a
858 daemon is scheduled in the cluster. When an orchestrator tells
859 Ceph "it's running on host123", that's not a promise that the process
860 is literally up this second, it's a description of where the orchestrator
861 has decided the daemon should run.
862 """
863
864 def __init__(self,
865 daemon_type: Optional[str] = None,
866 daemon_id: Optional[str] = None,
867 hostname: Optional[str] = None,
868 container_id: Optional[str] = None,
869 container_image_id: Optional[str] = None,
870 container_image_name: Optional[str] = None,
871 container_image_digests: Optional[List[str]] = None,
872 version: Optional[str] = None,
873 status: Optional[DaemonDescriptionStatus] = None,
874 status_desc: Optional[str] = None,
875 last_refresh: Optional[datetime.datetime] = None,
876 created: Optional[datetime.datetime] = None,
877 started: Optional[datetime.datetime] = None,
878 last_configured: Optional[datetime.datetime] = None,
879 osdspec_affinity: Optional[str] = None,
880 last_deployed: Optional[datetime.datetime] = None,
881 events: Optional[List['OrchestratorEvent']] = None,
882 is_active: bool = False,
883 memory_usage: Optional[int] = None,
884 memory_request: Optional[int] = None,
885 memory_limit: Optional[int] = None,
886 cpu_percentage: Optional[str] = None,
887 service_name: Optional[str] = None,
888 ports: Optional[List[int]] = None,
889 ip: Optional[str] = None,
890 deployed_by: Optional[List[str]] = None,
891 rank: Optional[int] = None,
892 rank_generation: Optional[int] = None,
893 extra_container_args: Optional[List[str]] = None,
894 ) -> None:
895
896 #: Host is at the same granularity as InventoryHost
897 self.hostname: Optional[str] = hostname
898
899 # Not everyone runs in containers, but enough people do to
900 # justify having the container_id (runtime id) and container_image
901 # (image name)
902 self.container_id = container_id # runtime id
903 self.container_image_id = container_image_id # image id locally
904 self.container_image_name = container_image_name # image friendly name
905 self.container_image_digests = container_image_digests # reg hashes
906
907 #: The type of service (osd, mon, mgr, etc.)
908 self.daemon_type = daemon_type
909
910 #: The orchestrator will have picked some names for daemons,
911 #: typically either based on hostnames or on pod names.
912 #: This is the <foo> in mds.<foo>, the ID that will appear
913 #: in the FSMap/ServiceMap.
914 self.daemon_id: Optional[str] = daemon_id
915 self.daemon_name = self.name()
916
917 #: Some daemon types have a numeric rank assigned
918 self.rank: Optional[int] = rank
919 self.rank_generation: Optional[int] = rank_generation
920
921 self._service_name: Optional[str] = service_name
922
923 #: Service version that was deployed
924 self.version = version
925
926 # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
927 self._status = status
928
929 #: Service status description when status == error.
930 self.status_desc = status_desc
931
932 #: datetime when this info was last refreshed
933 self.last_refresh: Optional[datetime.datetime] = last_refresh
934
935 self.created: Optional[datetime.datetime] = created
936 self.started: Optional[datetime.datetime] = started
937 self.last_configured: Optional[datetime.datetime] = last_configured
938 self.last_deployed: Optional[datetime.datetime] = last_deployed
939
940 #: Affinity to a certain OSDSpec
941 self.osdspec_affinity: Optional[str] = osdspec_affinity
942
943 self.events: List[OrchestratorEvent] = events or []
944
945 self.memory_usage: Optional[int] = memory_usage
946 self.memory_request: Optional[int] = memory_request
947 self.memory_limit: Optional[int] = memory_limit
948
949 self.cpu_percentage: Optional[str] = cpu_percentage
950
951 self.ports: Optional[List[int]] = ports
952 self.ip: Optional[str] = ip
953
954 self.deployed_by = deployed_by
955
956 self.is_active = is_active
957
958 self.extra_container_args = extra_container_args
959
960 @property
961 def status(self) -> Optional[DaemonDescriptionStatus]:
962 return self._status
963
964 @status.setter
965 def status(self, new: DaemonDescriptionStatus) -> None:
966 self._status = new
967 self.status_desc = DaemonDescriptionStatus.to_str(new)
968
969 def get_port_summary(self) -> str:
970 if not self.ports:
971 return ''
972 return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
973
974 def name(self) -> str:
975 return '%s.%s' % (self.daemon_type, self.daemon_id)
976
977 def matches_service(self, service_name: Optional[str]) -> bool:
978 assert self.daemon_id is not None
979 assert self.daemon_type is not None
980 if service_name:
981 return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
982 return False
983
984 def service_id(self) -> str:
985 assert self.daemon_id is not None
986 assert self.daemon_type is not None
987
988 if self._service_name:
989 if '.' in self._service_name:
990 return self._service_name.split('.', 1)[1]
991 else:
992 return ''
993
994 if self.daemon_type == 'osd':
995 if self.osdspec_affinity and self.osdspec_affinity != 'None':
996 return self.osdspec_affinity
997 return ''
998
999 def _match() -> str:
1000 assert self.daemon_id is not None
1001 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
1002 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
1003
1004 if not self.hostname:
1005 # TODO: can a DaemonDescription exist without a hostname?
1006 raise err
1007
1008 # use the bare hostname, not the FQDN.
1009 host = self.hostname.split('.')[0]
1010
1011 if host == self.daemon_id:
1012 # daemon_id == "host"
1013 return self.daemon_id
1014
1015 elif host in self.daemon_id:
1016 # daemon_id == "service_id.host"
1017 # daemon_id == "service_id.host.random"
1018 pre, post = self.daemon_id.rsplit(host, 1)
1019 if not pre.endswith('.'):
1020 # '.' sep missing at front of host
1021 raise err
1022 elif post and not post.startswith('.'):
1023 # '.' sep missing at end of host
1024 raise err
1025 return pre[:-1]
1026
1027 # daemon_id == "service_id.random"
1028 if self.daemon_type == 'rgw':
1029 v = self.daemon_id.split('.')
1030 if len(v) in [3, 4]:
1031 return '.'.join(v[0:2])
1032
1033 if self.daemon_type == 'iscsi':
1034 v = self.daemon_id.split('.')
1035 return '.'.join(v[0:-1])
1036
1037 # daemon_id == "service_id"
1038 return self.daemon_id
1039
1040 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
1041 return _match()
1042
1043 return self.daemon_id
1044
1045 def service_name(self) -> str:
1046 if self._service_name:
1047 return self._service_name
1048 assert self.daemon_type is not None
1049 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
1050 return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
1051 return daemon_type_to_service(self.daemon_type)
1052
1053 def __repr__(self) -> str:
1054 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
1055 id=self.daemon_id)
1056
1057 def __str__(self) -> str:
1058 return f"{self.name()} in status {self.status_desc} on {self.hostname}"
1059
1060 def to_json(self) -> dict:
1061 out: Dict[str, Any] = OrderedDict()
1062 out['daemon_type'] = self.daemon_type
1063 out['daemon_id'] = self.daemon_id
1064 out['service_name'] = self._service_name
1065 out['daemon_name'] = self.name()
1066 out['hostname'] = self.hostname
1067 out['container_id'] = self.container_id
1068 out['container_image_id'] = self.container_image_id
1069 out['container_image_name'] = self.container_image_name
1070 out['container_image_digests'] = self.container_image_digests
1071 out['memory_usage'] = self.memory_usage
1072 out['memory_request'] = self.memory_request
1073 out['memory_limit'] = self.memory_limit
1074 out['cpu_percentage'] = self.cpu_percentage
1075 out['version'] = self.version
1076 out['status'] = self.status.value if self.status is not None else None
1077 out['status_desc'] = self.status_desc
1078 if self.daemon_type == 'osd':
1079 out['osdspec_affinity'] = self.osdspec_affinity
1080 out['is_active'] = self.is_active
1081 out['ports'] = self.ports
1082 out['ip'] = self.ip
1083 out['rank'] = self.rank
1084 out['rank_generation'] = self.rank_generation
1085
1086 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1087 'last_configured']:
1088 if getattr(self, k):
1089 out[k] = datetime_to_str(getattr(self, k))
1090
1091 if self.events:
1092 out['events'] = [e.to_json() for e in self.events]
1093
1094 empty = [k for k, v in out.items() if v is None]
1095 for e in empty:
1096 del out[e]
1097 return out
1098
1099 def to_dict(self) -> dict:
1100 out: Dict[str, Any] = OrderedDict()
1101 out['daemon_type'] = self.daemon_type
1102 out['daemon_id'] = self.daemon_id
1103 out['daemon_name'] = self.name()
1104 out['hostname'] = self.hostname
1105 out['container_id'] = self.container_id
1106 out['container_image_id'] = self.container_image_id
1107 out['container_image_name'] = self.container_image_name
1108 out['container_image_digests'] = self.container_image_digests
1109 out['memory_usage'] = self.memory_usage
1110 out['memory_request'] = self.memory_request
1111 out['memory_limit'] = self.memory_limit
1112 out['cpu_percentage'] = self.cpu_percentage
1113 out['version'] = self.version
1114 out['status'] = self.status.value if self.status is not None else None
1115 out['status_desc'] = self.status_desc
1116 if self.daemon_type == 'osd':
1117 out['osdspec_affinity'] = self.osdspec_affinity
1118 out['is_active'] = self.is_active
1119 out['ports'] = self.ports
1120 out['ip'] = self.ip
1121
1122 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1123 'last_configured']:
1124 if getattr(self, k):
1125 out[k] = datetime_to_str(getattr(self, k))
1126
1127 if self.events:
1128 out['events'] = [e.to_dict() for e in self.events]
1129
1130 empty = [k for k, v in out.items() if v is None]
1131 for e in empty:
1132 del out[e]
1133 return out
1134
1135 @classmethod
1136 @handle_type_error
1137 def from_json(cls, data: dict) -> 'DaemonDescription':
1138 c = data.copy()
1139 event_strs = c.pop('events', [])
1140 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1141 'last_configured']:
1142 if k in c:
1143 c[k] = str_to_datetime(c[k])
1144 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1145 status_int = c.pop('status', None)
1146 if 'daemon_name' in c:
1147 del c['daemon_name']
1148 if 'service_name' in c and c['service_name'].startswith('osd.'):
1149 # if the service_name is a osd.NNN (numeric osd id) then
1150 # ignore it -- it is not a valid service_name and
1151 # (presumably) came from an older version of cephadm.
1152 try:
1153 int(c['service_name'][4:])
1154 del c['service_name']
1155 except ValueError:
1156 pass
1157 status = DaemonDescriptionStatus(status_int) if status_int is not None else None
1158 return cls(events=events, status=status, **c)
1159
1160 def __copy__(self) -> 'DaemonDescription':
1161 # feel free to change this:
1162 return DaemonDescription.from_json(self.to_json())
1163
1164 @staticmethod
1165 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
1166 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
1167
1168
1169 yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1170
1171
1172 class ServiceDescription(object):
1173 """
1174 For responding to queries about the status of a particular service,
1175 stateful or stateless.
1176
1177 This is not about health or performance monitoring of services: it's
1178 about letting the orchestrator tell Ceph whether and where a
1179 service is scheduled in the cluster. When an orchestrator tells
1180 Ceph "it's running on host123", that's not a promise that the process
1181 is literally up this second, it's a description of where the orchestrator
1182 has decided the service should run.
1183 """
1184
1185 def __init__(self,
1186 spec: ServiceSpec,
1187 container_image_id: Optional[str] = None,
1188 container_image_name: Optional[str] = None,
1189 service_url: Optional[str] = None,
1190 last_refresh: Optional[datetime.datetime] = None,
1191 created: Optional[datetime.datetime] = None,
1192 deleted: Optional[datetime.datetime] = None,
1193 size: int = 0,
1194 running: int = 0,
1195 events: Optional[List['OrchestratorEvent']] = None,
1196 virtual_ip: Optional[str] = None,
1197 ports: List[int] = []) -> None:
1198 # Not everyone runs in containers, but enough people do to
1199 # justify having the container_image_id (image hash) and container_image
1200 # (image name)
1201 self.container_image_id = container_image_id # image hash
1202 self.container_image_name = container_image_name # image friendly name
1203
1204 # If the service exposes REST-like API, this attribute should hold
1205 # the URL.
1206 self.service_url = service_url
1207
1208 # Number of daemons
1209 self.size = size
1210
1211 # Number of daemons up
1212 self.running = running
1213
1214 # datetime when this info was last refreshed
1215 self.last_refresh: Optional[datetime.datetime] = last_refresh
1216 self.created: Optional[datetime.datetime] = created
1217 self.deleted: Optional[datetime.datetime] = deleted
1218
1219 self.spec: ServiceSpec = spec
1220
1221 self.events: List[OrchestratorEvent] = events or []
1222
1223 self.virtual_ip = virtual_ip
1224 self.ports = ports
1225
1226 def service_type(self) -> str:
1227 return self.spec.service_type
1228
1229 def __repr__(self) -> str:
1230 return f"<ServiceDescription of {self.spec.one_line_str()}>"
1231
1232 def get_port_summary(self) -> str:
1233 if not self.ports:
1234 return ''
1235 return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1236
1237 def to_json(self) -> OrderedDict:
1238 out = self.spec.to_json()
1239 status = {
1240 'container_image_id': self.container_image_id,
1241 'container_image_name': self.container_image_name,
1242 'service_url': self.service_url,
1243 'size': self.size,
1244 'running': self.running,
1245 'last_refresh': self.last_refresh,
1246 'created': self.created,
1247 'virtual_ip': self.virtual_ip,
1248 'ports': self.ports if self.ports else None,
1249 }
1250 for k in ['last_refresh', 'created']:
1251 if getattr(self, k):
1252 status[k] = datetime_to_str(getattr(self, k))
1253 status = {k: v for (k, v) in status.items() if v is not None}
1254 out['status'] = status
1255 if self.events:
1256 out['events'] = [e.to_json() for e in self.events]
1257 return out
1258
1259 def to_dict(self) -> OrderedDict:
1260 out = self.spec.to_json()
1261 status = {
1262 'container_image_id': self.container_image_id,
1263 'container_image_name': self.container_image_name,
1264 'service_url': self.service_url,
1265 'size': self.size,
1266 'running': self.running,
1267 'last_refresh': self.last_refresh,
1268 'created': self.created,
1269 'virtual_ip': self.virtual_ip,
1270 'ports': self.ports if self.ports else None,
1271 }
1272 for k in ['last_refresh', 'created']:
1273 if getattr(self, k):
1274 status[k] = datetime_to_str(getattr(self, k))
1275 status = {k: v for (k, v) in status.items() if v is not None}
1276 out['status'] = status
1277 if self.events:
1278 out['events'] = [e.to_dict() for e in self.events]
1279 return out
1280
1281 @classmethod
1282 @handle_type_error
1283 def from_json(cls, data: dict) -> 'ServiceDescription':
1284 c = data.copy()
1285 status = c.pop('status', {})
1286 event_strs = c.pop('events', [])
1287 spec = ServiceSpec.from_json(c)
1288
1289 c_status = status.copy()
1290 for k in ['last_refresh', 'created']:
1291 if k in c_status:
1292 c_status[k] = str_to_datetime(c_status[k])
1293 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1294 return cls(spec=spec, events=events, **c_status)
1295
1296 @staticmethod
1297 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
1298 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
1299
1300
1301 yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
1302
1303
1304 class InventoryFilter(object):
1305 """
1306 When fetching inventory, use this filter to avoid unnecessarily
1307 scanning the whole estate.
1308
1309 Typical use:
1310
1311 filter by host when presentig UI workflow for configuring
1312 a particular server.
1313 filter by label when not all of estate is Ceph servers,
1314 and we want to only learn about the Ceph servers.
1315 filter by label when we are interested particularly
1316 in e.g. OSD servers.
1317 """
1318
1319 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
1320
1321 #: Optional: get info about hosts matching labels
1322 self.labels = labels
1323
1324 #: Optional: get info about certain named hosts only
1325 self.hosts = hosts
1326
1327
1328 class InventoryHost(object):
1329 """
1330 When fetching inventory, all Devices are groups inside of an
1331 InventoryHost.
1332 """
1333
1334 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
1335 if devices is None:
1336 devices = inventory.Devices([])
1337 if labels is None:
1338 labels = []
1339 assert isinstance(devices, inventory.Devices)
1340
1341 self.name = name # unique within cluster. For example a hostname.
1342 self.addr = addr or name
1343 self.devices = devices
1344 self.labels = labels
1345
1346 def to_json(self) -> dict:
1347 return {
1348 'name': self.name,
1349 'addr': self.addr,
1350 'devices': self.devices.to_json(),
1351 'labels': self.labels,
1352 }
1353
1354 @classmethod
1355 def from_json(cls, data: dict) -> 'InventoryHost':
1356 try:
1357 _data = copy.deepcopy(data)
1358 name = _data.pop('name')
1359 addr = _data.pop('addr', None) or name
1360 devices = inventory.Devices.from_json(_data.pop('devices'))
1361 labels = _data.pop('labels', list())
1362 if _data:
1363 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1364 raise OrchestratorValidationError(error_msg)
1365 return cls(name, devices, labels, addr)
1366 except KeyError as e:
1367 error_msg = '{} is required for {}'.format(e, cls.__name__)
1368 raise OrchestratorValidationError(error_msg)
1369 except TypeError as e:
1370 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1371
1372 @classmethod
1373 def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
1374 devs = inventory.Devices.from_json
1375 return [cls(item[0], devs(item[1].data)) for item in hosts]
1376
1377 def __repr__(self) -> str:
1378 return "<InventoryHost>({name})".format(name=self.name)
1379
1380 @staticmethod
1381 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
1382 return [host.name for host in hosts]
1383
1384 def __eq__(self, other: Any) -> bool:
1385 return self.name == other.name and self.devices == other.devices
1386
1387
1388 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1389 """
1390 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1391 on devices.
1392
1393 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1394
1395 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1396 See ``ceph osd metadata | jq '.[].device_ids'``
1397 """
1398 __slots__ = ()
1399
1400
1401 class OrchestratorEvent:
1402 """
1403 Similar to K8s Events.
1404
1405 Some form of "important" log message attached to something.
1406 """
1407 INFO = 'INFO'
1408 ERROR = 'ERROR'
1409 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1410
1411 def __init__(self, created: Union[str, datetime.datetime], kind: str,
1412 subject: str, level: str, message: str) -> None:
1413 if isinstance(created, str):
1414 created = str_to_datetime(created)
1415 self.created: datetime.datetime = created
1416
1417 assert kind in "service daemon".split()
1418 self.kind: str = kind
1419
1420 # service name, or daemon danem or something
1421 self.subject: str = subject
1422
1423 # Events are not meant for debugging. debugs should end in the log.
1424 assert level in "INFO ERROR".split()
1425 self.level = level
1426
1427 self.message: str = message
1428
1429 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1430
1431 def kind_subject(self) -> str:
1432 return f'{self.kind}:{self.subject}'
1433
1434 def to_json(self) -> str:
1435 # Make a long list of events readable.
1436 created = datetime_to_str(self.created)
1437 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1438
1439 def to_dict(self) -> dict:
1440 # Convert events data to dict.
1441 return {
1442 'created': datetime_to_str(self.created),
1443 'subject': self.kind_subject(),
1444 'level': self.level,
1445 'message': self.message
1446 }
1447
1448 @classmethod
1449 @handle_type_error
1450 def from_json(cls, data: str) -> "OrchestratorEvent":
1451 """
1452 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1453 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1454
1455 :param data:
1456 :return:
1457 """
1458 match = cls.regex_v1.match(data)
1459 if match:
1460 return cls(*match.groups())
1461 raise ValueError(f'Unable to match: "{data}"')
1462
1463 def __eq__(self, other: Any) -> bool:
1464 if not isinstance(other, OrchestratorEvent):
1465 return False
1466
1467 return self.created == other.created and self.kind == other.kind \
1468 and self.subject == other.subject and self.message == other.message
1469
1470 def __repr__(self) -> str:
1471 return f'OrchestratorEvent.from_json({self.to_json()!r})'
1472
1473
1474 def _mk_orch_methods(cls: Any) -> Any:
1475 # Needs to be defined outside of for.
1476 # Otherwise meth is always bound to last key
1477 def shim(method_name: str) -> Callable:
1478 def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
1479 completion = self._oremote(method_name, args, kwargs)
1480 return completion
1481 return inner
1482
1483 for name, method in Orchestrator.__dict__.items():
1484 if not name.startswith('_') and name not in ['is_orchestrator_module']:
1485 remote_call = update_wrapper(shim(name), method)
1486 setattr(cls, name, remote_call)
1487 return cls
1488
1489
1490 @_mk_orch_methods
1491 class OrchestratorClientMixin(Orchestrator):
1492 """
1493 A module that inherents from `OrchestratorClientMixin` can directly call
1494 all :class:`Orchestrator` methods without manually calling remote.
1495
1496 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1497 calls :func:`OrchestratorClientMixin._oremote`
1498
1499 >>> class MyModule(OrchestratorClientMixin):
1500 ... def func(self):
1501 ... completion = self.add_host('somehost') # calls `_oremote()`
1502 ... self.log.debug(completion.result)
1503
1504 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1505 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1506 "real" implementation of the orchestrator.
1507
1508
1509 >>> import mgr_module
1510 >>> #doctest: +SKIP
1511 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1512 ... def __init__(self, ...):
1513 ... self.orch_client = OrchestratorClientMixin()
1514 ... self.orch_client.set_mgr(self.mgr))
1515 """
1516
1517 def set_mgr(self, mgr: MgrModule) -> None:
1518 """
1519 Useable in the Dashbord that uses a global ``mgr``
1520 """
1521
1522 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1523
1524 def __get_mgr(self) -> Any:
1525 try:
1526 return self.__mgr
1527 except AttributeError:
1528 return self
1529
1530 def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
1531 """
1532 Helper for invoking `remote` on whichever orchestrator is enabled
1533
1534 :raises RuntimeError: If the remote method failed.
1535 :raises OrchestratorError: orchestrator failed to perform
1536 :raises ImportError: no `orchestrator` module or backend not found.
1537 """
1538 mgr = self.__get_mgr()
1539
1540 try:
1541 o = mgr._select_orchestrator()
1542 except AttributeError:
1543 o = mgr.remote('orchestrator', '_select_orchestrator')
1544
1545 if o is None:
1546 raise NoOrchestrator()
1547
1548 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1549 try:
1550 return mgr.remote(o, meth, *args, **kwargs)
1551 except Exception as e:
1552 if meth == 'get_feature_set':
1553 raise # self.get_feature_set() calls self._oremote()
1554 f_set = self.get_feature_set()
1555 if meth not in f_set or not f_set[meth]['available']:
1556 raise NotImplementedError(f'{o} does not implement {meth}') from e
1557 raise