]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
import ceph quincy 17.2.6
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
1
2 """
3 ceph-mgr orchestrator interface
4
5 Please see the ceph-mgr module developer's guide for more information.
6 """
7
8 import copy
9 import datetime
10 import enum
11 import errno
12 import logging
13 import pickle
14 import re
15
16 from collections import namedtuple, OrderedDict
17 from contextlib import contextmanager
18 from functools import wraps, reduce, update_wrapper
19
20 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
21 Sequence, Dict, cast, Mapping
22
23 try:
24 from typing import Protocol # Protocol was added in Python 3.8
25 except ImportError:
26 class Protocol: # type: ignore
27 pass
28
29
30 import yaml
31
32 from ceph.deployment import inventory
33 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
34 IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec, TunedProfileSpec
35 from ceph.deployment.drive_group import DriveGroupSpec
36 from ceph.deployment.hostspec import HostSpec, SpecValidationError
37 from ceph.utils import datetime_to_str, str_to_datetime
38
39 from mgr_module import MgrModule, CLICommand, HandleCommandResult
40
41
42 logger = logging.getLogger(__name__)
43
44 T = TypeVar('T')
45 FuncT = TypeVar('FuncT', bound=Callable[..., Any])
46
47
48 class OrchestratorError(Exception):
49 """
50 General orchestrator specific error.
51
52 Used for deployment, configuration or user errors.
53
54 It's not intended for programming errors or orchestrator internal errors.
55 """
56
57 def __init__(self,
58 msg: str,
59 errno: int = -errno.EINVAL,
60 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
61 super(Exception, self).__init__(msg)
62 self.errno = errno
63 # See OrchestratorEvent.subject
64 self.event_subject = event_kind_subject
65
66
67 class NoOrchestrator(OrchestratorError):
68 """
69 No orchestrator in configured.
70 """
71
72 def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
74
75
76 class OrchestratorValidationError(OrchestratorError):
77 """
78 Raised when an orchestrator doesn't support a specific feature.
79 """
80
81
82 @contextmanager
83 def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
84 try:
85 yield
86 except OrchestratorError as e:
87 if overwrite or hasattr(e, 'event_subject'):
88 e.event_subject = (kind, subject)
89 raise
90
91
92 def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
93 @wraps(func)
94 def wrapper(*args: Any, **kwargs: Any) -> Any:
95 try:
96 return func(*args, **kwargs)
97 except (OrchestratorError, SpecValidationError) as e:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e.errno, stderr=str(e))
100 except ImportError as e:
101 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
102 except NotImplementedError:
103 msg = 'This Orchestrator does not support `{}`'.format(prefix)
104 return HandleCommandResult(-errno.ENOENT, stderr=msg)
105
106 # misuse lambda to copy `wrapper`
107 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731
108 wrapper_copy._prefix = prefix # type: ignore
109 wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore
110 wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
111 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
112
113 return cast(FuncT, wrapper_copy)
114
115
116 def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
117 """
118 Decorator to make Orchestrator methods return
119 an OrchResult.
120 """
121
122 @wraps(f)
123 def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
124 try:
125 return OrchResult(f(*args, **kwargs))
126 except Exception as e:
127 logger.exception(e)
128 import os
129 if 'UNITTEST' in os.environ:
130 raise # This makes debugging of Tracebacks from unittests a bit easier
131 return OrchResult(None, exception=e)
132
133 return cast(Callable[..., OrchResult[T]], wrapper)
134
135
136 class InnerCliCommandCallable(Protocol):
137 def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
138 ...
139
140
141 def _cli_command(perm: str) -> InnerCliCommandCallable:
142 def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
143 return lambda func: handle_exception(prefix, perm, func)
144 return inner_cli_command
145
146
147 _cli_read_command = _cli_command('r')
148 _cli_write_command = _cli_command('rw')
149
150
151 class CLICommandMeta(type):
152 """
153 This is a workaround for the use of a global variable CLICommand.COMMANDS which
154 prevents modules from importing any other module.
155
156 We make use of CLICommand, except for the use of the global variable.
157 """
158 def __init__(cls, name: str, bases: Any, dct: Any) -> None:
159 super(CLICommandMeta, cls).__init__(name, bases, dct)
160 dispatch: Dict[str, CLICommand] = {}
161 for v in dct.values():
162 try:
163 dispatch[v._prefix] = v._cli_command
164 except AttributeError:
165 pass
166
167 def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
168 if cmd['prefix'] not in dispatch:
169 return self.handle_command(inbuf, cmd)
170
171 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
172
173 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
174 cls.handle_command = handle_command
175
176
177 class OrchResult(Generic[T]):
178 """
179 Stores a result and an exception. Mainly to circumvent the
180 MgrModule.remote() method that hides all exceptions and for
181 handling different sub-interpreters.
182 """
183
184 def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
185 self.result = result
186 self.serialized_exception: Optional[bytes] = None
187 self.exception_str: str = ''
188 self.set_exception(exception)
189
190 __slots__ = 'result', 'serialized_exception', 'exception_str'
191
192 def set_exception(self, e: Optional[Exception]) -> None:
193 if e is None:
194 self.serialized_exception = None
195 self.exception_str = ''
196 return
197
198 self.exception_str = f'{type(e)}: {str(e)}'
199 try:
200 self.serialized_exception = pickle.dumps(e)
201 except pickle.PicklingError:
202 logger.error(f"failed to pickle {e}")
203 if isinstance(e, Exception):
204 e = Exception(*e.args)
205 else:
206 e = Exception(str(e))
207 # degenerate to a plain Exception
208 self.serialized_exception = pickle.dumps(e)
209
210 def result_str(self) -> str:
211 """Force a string."""
212 if self.result is None:
213 return ''
214 if isinstance(self.result, list):
215 return '\n'.join(str(x) for x in self.result)
216 return str(self.result)
217
218
219 def raise_if_exception(c: OrchResult[T]) -> T:
220 """
221 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
222 """
223 if c.serialized_exception is not None:
224 try:
225 e = pickle.loads(c.serialized_exception)
226 except (KeyError, AttributeError):
227 raise Exception(c.exception_str)
228 raise e
229 assert c.result is not None, 'OrchResult should either have an exception or a result'
230 return c.result
231
232
233 def _hide_in_features(f: FuncT) -> FuncT:
234 f._hide_in_features = True # type: ignore
235 return f
236
237
238 class Orchestrator(object):
239 """
240 Calls in this class may do long running remote operations, with time
241 periods ranging from network latencies to package install latencies and large
242 internet downloads. For that reason, all are asynchronous, and return
243 ``Completion`` objects.
244
245 Methods should only return the completion and not directly execute
246 anything, like network calls. Otherwise the purpose of
247 those completions is defeated.
248
249 Implementations are not required to start work on an operation until
250 the caller waits on the relevant Completion objects. Callers making
251 multiple updates should not wait on Completions until they're done
252 sending operations: this enables implementations to batch up a series
253 of updates when wait() is called on a set of Completion objects.
254
255 Implementations are encouraged to keep reasonably fresh caches of
256 the status of the system: it is better to serve a stale-but-recent
257 result read of e.g. device inventory than it is to keep the caller waiting
258 while you scan hosts every time.
259 """
260
261 @_hide_in_features
262 def is_orchestrator_module(self) -> bool:
263 """
264 Enable other modules to interrogate this module to discover
265 whether it's usable as an orchestrator module.
266
267 Subclasses do not need to override this.
268 """
269 return True
270
271 @_hide_in_features
272 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
273 """
274 Report whether we can talk to the orchestrator. This is the
275 place to give the user a meaningful message if the orchestrator
276 isn't running or can't be contacted.
277
278 This method may be called frequently (e.g. every page load
279 to conditionally display a warning banner), so make sure it's
280 not too expensive. It's okay to give a slightly stale status
281 (e.g. based on a periodic background ping of the orchestrator)
282 if that's necessary to make this method fast.
283
284 .. note::
285 `True` doesn't mean that the desired functionality
286 is actually available in the orchestrator. I.e. this
287 won't work as expected::
288
289 >>> #doctest: +SKIP
290 ... if OrchestratorClientMixin().available()[0]: # wrong.
291 ... OrchestratorClientMixin().get_hosts()
292
293 :return: boolean representing whether the module is available/usable
294 :return: string describing any error
295 :return: dict containing any module specific information
296 """
297 raise NotImplementedError()
298
299 @_hide_in_features
300 def get_feature_set(self) -> Dict[str, dict]:
301 """Describes which methods this orchestrator implements
302
303 .. note::
304 `True` doesn't mean that the desired functionality
305 is actually possible in the orchestrator. I.e. this
306 won't work as expected::
307
308 >>> #doctest: +SKIP
309 ... api = OrchestratorClientMixin()
310 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
311 ... api.get_hosts()
312
313 It's better to ask for forgiveness instead::
314
315 >>> #doctest: +SKIP
316 ... try:
317 ... OrchestratorClientMixin().get_hosts()
318 ... except (OrchestratorError, NotImplementedError):
319 ... ...
320
321 :returns: Dict of API method names to ``{'available': True or False}``
322 """
323 module = self.__class__
324 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
325 for a in Orchestrator.__dict__
326 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
327 }
328 return features
329
330 def cancel_completions(self) -> None:
331 """
332 Cancels ongoing completions. Unstuck the mgr.
333 """
334 raise NotImplementedError()
335
336 def pause(self) -> None:
337 raise NotImplementedError()
338
339 def resume(self) -> None:
340 raise NotImplementedError()
341
342 def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
343 """
344 Add a host to the orchestrator inventory.
345
346 :param host: hostname
347 """
348 raise NotImplementedError()
349
350 def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
351 """
352 Remove a host from the orchestrator inventory.
353
354 :param host: hostname
355 """
356 raise NotImplementedError()
357
358 def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]:
359 """
360 drain all daemons from a host
361
362 :param hostname: hostname
363 """
364 raise NotImplementedError()
365
366 def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
367 """
368 Update a host's address
369
370 :param host: hostname
371 :param addr: address (dns name or IP)
372 """
373 raise NotImplementedError()
374
375 def get_hosts(self) -> OrchResult[List[HostSpec]]:
376 """
377 Report the hosts in the cluster.
378
379 :return: list of HostSpec
380 """
381 raise NotImplementedError()
382
383 def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
384 """
385 Return hosts metadata(gather_facts).
386 """
387 raise NotImplementedError()
388
389 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
390 """
391 Add a host label
392 """
393 raise NotImplementedError()
394
395 def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
396 """
397 Remove a host label
398 """
399 raise NotImplementedError()
400
401 def host_ok_to_stop(self, hostname: str) -> OrchResult:
402 """
403 Check if the specified host can be safely stopped without reducing availability
404
405 :param host: hostname
406 """
407 raise NotImplementedError()
408
409 def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
410 """
411 Place a host in maintenance, stopping daemons and disabling it's systemd target
412 """
413 raise NotImplementedError()
414
415 def exit_host_maintenance(self, hostname: str) -> OrchResult:
416 """
417 Return a host from maintenance, restarting the clusters systemd target
418 """
419 raise NotImplementedError()
420
421 def rescan_host(self, hostname: str) -> OrchResult:
422 """Use cephadm to issue a disk rescan on each HBA
423
424 Some HBAs and external enclosures don't automatically register
425 device insertion with the kernel, so for these scenarios we need
426 to manually rescan
427
428 :param hostname: (str) host name
429 """
430 raise NotImplementedError()
431
432 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
433 """
434 Returns something that was created by `ceph-volume inventory`.
435
436 :return: list of InventoryHost
437 """
438 raise NotImplementedError()
439
440 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
441 """
442 Describe a service (of any kind) that is already configured in
443 the orchestrator. For example, when viewing an OSD in the dashboard
444 we might like to also display information about the orchestrator's
445 view of the service (like the kubernetes pod ID).
446
447 When viewing a CephFS filesystem in the dashboard, we would use this
448 to display the pods being currently run for MDS daemons.
449
450 :return: list of ServiceDescription objects.
451 """
452 raise NotImplementedError()
453
454 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
455 """
456 Describe a daemon (of any kind) that is already configured in
457 the orchestrator.
458
459 :return: list of DaemonDescription objects.
460 """
461 raise NotImplementedError()
462
463 @handle_orch_error
464 def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
465 """
466 Applies any spec
467 """
468 fns: Dict[str, Callable[..., OrchResult[str]]] = {
469 'alertmanager': self.apply_alertmanager,
470 'crash': self.apply_crash,
471 'grafana': self.apply_grafana,
472 'iscsi': self.apply_iscsi,
473 'mds': self.apply_mds,
474 'mgr': self.apply_mgr,
475 'mon': self.apply_mon,
476 'nfs': self.apply_nfs,
477 'node-exporter': self.apply_node_exporter,
478 'ceph-exporter': self.apply_ceph_exporter,
479 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
480 'prometheus': self.apply_prometheus,
481 'loki': self.apply_loki,
482 'promtail': self.apply_promtail,
483 'rbd-mirror': self.apply_rbd_mirror,
484 'rgw': self.apply_rgw,
485 'ingress': self.apply_ingress,
486 'snmp-gateway': self.apply_snmp_gateway,
487 'host': self.add_host,
488 }
489
490 def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
491 l_res = raise_if_exception(l)
492 r_res = raise_if_exception(r)
493 l_res.append(r_res)
494 return OrchResult(l_res)
495 return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
496
497 def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
498 """
499 Plan (Dry-run, Preview) a List of Specs.
500 """
501 raise NotImplementedError()
502
503 def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
504 """
505 Remove specific daemon(s).
506
507 :return: None
508 """
509 raise NotImplementedError()
510
511 def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
512 """
513 Remove a service (a collection of daemons).
514
515 :return: None
516 """
517 raise NotImplementedError()
518
519 def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
520 """
521 Perform an action (start/stop/reload) on a service (i.e., all daemons
522 providing the logical service).
523
524 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
525 :param service_name: service_type + '.' + service_id
526 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
527 :rtype: OrchResult
528 """
529 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
530 raise NotImplementedError()
531
532 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
533 """
534 Perform an action (start/stop/reload) on a daemon.
535
536 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
537 :param daemon_name: name of daemon
538 :param image: Container image when redeploying that daemon
539 :rtype: OrchResult
540 """
541 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
542 raise NotImplementedError()
543
544 def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
545 """
546 Create one or more OSDs within a single Drive Group.
547
548 The principal argument here is the drive_group member
549 of OsdSpec: other fields are advisory/extensible for any
550 finer-grained OSD feature enablement (choice of backing store,
551 compression/encryption, etc).
552 """
553 raise NotImplementedError()
554
555 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
556 """ Update OSD cluster """
557 raise NotImplementedError()
558
559 def set_unmanaged_flag(self,
560 unmanaged_flag: bool,
561 service_type: str = 'osd',
562 service_name: Optional[str] = None
563 ) -> HandleCommandResult:
564 raise NotImplementedError()
565
566 def preview_osdspecs(self,
567 osdspec_name: Optional[str] = 'osd',
568 osdspecs: Optional[List[DriveGroupSpec]] = None
569 ) -> OrchResult[str]:
570 """ Get a preview for OSD deployments """
571 raise NotImplementedError()
572
573 def remove_osds(self, osd_ids: List[str],
574 replace: bool = False,
575 force: bool = False,
576 zap: bool = False) -> OrchResult[str]:
577 """
578 :param osd_ids: list of OSD IDs
579 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
580 :param force: Forces the OSD removal process without waiting for the data to be drained first.
581 :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
582
583
584 .. note:: this can only remove OSDs that were successfully
585 created (i.e. got an OSD ID).
586 """
587 raise NotImplementedError()
588
589 def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
590 """
591 TODO
592 """
593 raise NotImplementedError()
594
595 def remove_osds_status(self) -> OrchResult:
596 """
597 Returns a status of the ongoing OSD removal operations.
598 """
599 raise NotImplementedError()
600
601 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
602 """
603 Instructs the orchestrator to enable or disable either the ident or the fault LED.
604
605 :param ident_fault: either ``"ident"`` or ``"fault"``
606 :param on: ``True`` = on.
607 :param locations: See :class:`orchestrator.DeviceLightLoc`
608 """
609 raise NotImplementedError()
610
611 def zap_device(self, host: str, path: str) -> OrchResult[str]:
612 """Zap/Erase a device (DESTROYS DATA)"""
613 raise NotImplementedError()
614
615 def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
616 """Create daemons daemon(s) for unmanaged services"""
617 raise NotImplementedError()
618
619 def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
620 """Update mon cluster"""
621 raise NotImplementedError()
622
623 def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
624 """Update mgr cluster"""
625 raise NotImplementedError()
626
627 def apply_mds(self, spec: MDSSpec) -> OrchResult[str]:
628 """Update MDS cluster"""
629 raise NotImplementedError()
630
631 def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
632 """Update RGW cluster"""
633 raise NotImplementedError()
634
635 def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
636 """Update ingress daemons"""
637 raise NotImplementedError()
638
639 def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
640 """Update rbd-mirror cluster"""
641 raise NotImplementedError()
642
643 def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
644 """Update NFS cluster"""
645 raise NotImplementedError()
646
647 def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
648 """Update iscsi cluster"""
649 raise NotImplementedError()
650
651 def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
652 """Update prometheus cluster"""
653 raise NotImplementedError()
654
655 def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
656 """Update existing a Node-Exporter daemon(s)"""
657 raise NotImplementedError()
658
659 def apply_ceph_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
660 """Update existing a ceph exporter daemon(s)"""
661 raise NotImplementedError()
662
663 def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]:
664 """Update existing a Loki daemon(s)"""
665 raise NotImplementedError()
666
667 def apply_promtail(self, spec: ServiceSpec) -> OrchResult[str]:
668 """Update existing a Promtail daemon(s)"""
669 raise NotImplementedError()
670
671 def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
672 """Update existing a crash daemon(s)"""
673 raise NotImplementedError()
674
675 def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
676 """Update existing a grafana service"""
677 raise NotImplementedError()
678
679 def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
680 """Update an existing AlertManager daemon(s)"""
681 raise NotImplementedError()
682
683 def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
684 """Update an existing snmp gateway service"""
685 raise NotImplementedError()
686
687 def apply_tuned_profiles(self, specs: List[TunedProfileSpec], no_overwrite: bool) -> OrchResult[str]:
688 """Add or update an existing tuned profile"""
689 raise NotImplementedError()
690
691 def rm_tuned_profile(self, profile_name: str) -> OrchResult[str]:
692 """Remove a tuned profile"""
693 raise NotImplementedError()
694
695 def tuned_profile_ls(self) -> OrchResult[List[TunedProfileSpec]]:
696 """See current tuned profiles"""
697 raise NotImplementedError()
698
699 def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> OrchResult[str]:
700 """Change/Add a specific setting for a tuned profile"""
701 raise NotImplementedError()
702
703 def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> OrchResult[str]:
704 """Remove a specific setting for a tuned profile"""
705 raise NotImplementedError()
706
707 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
708 raise NotImplementedError()
709
710 def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]:
711 raise NotImplementedError()
712
713 def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
714 hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
715 raise NotImplementedError()
716
717 def upgrade_pause(self) -> OrchResult[str]:
718 raise NotImplementedError()
719
720 def upgrade_resume(self) -> OrchResult[str]:
721 raise NotImplementedError()
722
723 def upgrade_stop(self) -> OrchResult[str]:
724 raise NotImplementedError()
725
726 def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
727 """
728 If an upgrade is currently underway, report on where
729 we are in the process, or if some error has occurred.
730
731 :return: UpgradeStatusSpec instance
732 """
733 raise NotImplementedError()
734
735 @_hide_in_features
736 def upgrade_available(self) -> OrchResult:
737 """
738 Report on what versions are available to upgrade to
739
740 :return: List of strings
741 """
742 raise NotImplementedError()
743
744
745 GenericSpec = Union[ServiceSpec, HostSpec]
746
747
748 def json_to_generic_spec(spec: dict) -> GenericSpec:
749 if 'service_type' in spec and spec['service_type'] == 'host':
750 return HostSpec.from_json(spec)
751 else:
752 return ServiceSpec.from_json(spec)
753
754
755 def daemon_type_to_service(dtype: str) -> str:
756 mapping = {
757 'mon': 'mon',
758 'mgr': 'mgr',
759 'mds': 'mds',
760 'rgw': 'rgw',
761 'osd': 'osd',
762 'haproxy': 'ingress',
763 'keepalived': 'ingress',
764 'iscsi': 'iscsi',
765 'rbd-mirror': 'rbd-mirror',
766 'cephfs-mirror': 'cephfs-mirror',
767 'nfs': 'nfs',
768 'grafana': 'grafana',
769 'alertmanager': 'alertmanager',
770 'prometheus': 'prometheus',
771 'node-exporter': 'node-exporter',
772 'ceph-exporter': 'ceph-exporter',
773 'loki': 'loki',
774 'promtail': 'promtail',
775 'crash': 'crash',
776 'crashcollector': 'crash', # Specific Rook Daemon
777 'container': 'container',
778 'agent': 'agent',
779 'snmp-gateway': 'snmp-gateway',
780 }
781 return mapping[dtype]
782
783
784 def service_to_daemon_types(stype: str) -> List[str]:
785 mapping = {
786 'mon': ['mon'],
787 'mgr': ['mgr'],
788 'mds': ['mds'],
789 'rgw': ['rgw'],
790 'osd': ['osd'],
791 'ingress': ['haproxy', 'keepalived'],
792 'iscsi': ['iscsi'],
793 'rbd-mirror': ['rbd-mirror'],
794 'cephfs-mirror': ['cephfs-mirror'],
795 'nfs': ['nfs'],
796 'grafana': ['grafana'],
797 'alertmanager': ['alertmanager'],
798 'prometheus': ['prometheus'],
799 'loki': ['loki'],
800 'promtail': ['promtail'],
801 'node-exporter': ['node-exporter'],
802 'ceph-exporter': ['ceph-exporter'],
803 'crash': ['crash'],
804 'container': ['container'],
805 'agent': ['agent'],
806 'snmp-gateway': ['snmp-gateway'],
807 }
808 return mapping[stype]
809
810
811 KNOWN_DAEMON_TYPES: List[str] = list(
812 sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
813
814
815 class UpgradeStatusSpec(object):
816 # Orchestrator's report on what's going on with any ongoing upgrade
817 def __init__(self) -> None:
818 self.in_progress = False # Is an upgrade underway?
819 self.target_image: Optional[str] = None
820 self.services_complete: List[str] = [] # Which daemon types are fully updated?
821 self.which: str = '<unknown>' # for if user specified daemon types, services or hosts
822 self.progress: Optional[str] = None # How many of the daemons have we upgraded
823 self.message = "" # Freeform description
824 self.is_paused: bool = False # Is the upgrade paused?
825
826
827 def handle_type_error(method: FuncT) -> FuncT:
828 @wraps(method)
829 def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
830 try:
831 return method(cls, *args, **kwargs)
832 except TypeError as e:
833 error_msg = '{}: {}'.format(cls.__name__, e)
834 raise OrchestratorValidationError(error_msg)
835 return cast(FuncT, inner)
836
837
838 class DaemonDescriptionStatus(enum.IntEnum):
839 unknown = -2
840 error = -1
841 stopped = 0
842 running = 1
843 starting = 2 #: Daemon is deployed, but not yet running
844
845 @staticmethod
846 def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
847 if status is None:
848 status = DaemonDescriptionStatus.unknown
849 return {
850 DaemonDescriptionStatus.unknown: 'unknown',
851 DaemonDescriptionStatus.error: 'error',
852 DaemonDescriptionStatus.stopped: 'stopped',
853 DaemonDescriptionStatus.running: 'running',
854 DaemonDescriptionStatus.starting: 'starting',
855 }.get(status, '<unknown>')
856
857
858 class DaemonDescription(object):
859 """
860 For responding to queries about the status of a particular daemon,
861 stateful or stateless.
862
863 This is not about health or performance monitoring of daemons: it's
864 about letting the orchestrator tell Ceph whether and where a
865 daemon is scheduled in the cluster. When an orchestrator tells
866 Ceph "it's running on host123", that's not a promise that the process
867 is literally up this second, it's a description of where the orchestrator
868 has decided the daemon should run.
869 """
870
871 def __init__(self,
872 daemon_type: Optional[str] = None,
873 daemon_id: Optional[str] = None,
874 hostname: Optional[str] = None,
875 container_id: Optional[str] = None,
876 container_image_id: Optional[str] = None,
877 container_image_name: Optional[str] = None,
878 container_image_digests: Optional[List[str]] = None,
879 version: Optional[str] = None,
880 status: Optional[DaemonDescriptionStatus] = None,
881 status_desc: Optional[str] = None,
882 last_refresh: Optional[datetime.datetime] = None,
883 created: Optional[datetime.datetime] = None,
884 started: Optional[datetime.datetime] = None,
885 last_configured: Optional[datetime.datetime] = None,
886 osdspec_affinity: Optional[str] = None,
887 last_deployed: Optional[datetime.datetime] = None,
888 events: Optional[List['OrchestratorEvent']] = None,
889 is_active: bool = False,
890 memory_usage: Optional[int] = None,
891 memory_request: Optional[int] = None,
892 memory_limit: Optional[int] = None,
893 cpu_percentage: Optional[str] = None,
894 service_name: Optional[str] = None,
895 ports: Optional[List[int]] = None,
896 ip: Optional[str] = None,
897 deployed_by: Optional[List[str]] = None,
898 rank: Optional[int] = None,
899 rank_generation: Optional[int] = None,
900 extra_container_args: Optional[List[str]] = None,
901 extra_entrypoint_args: Optional[List[str]] = None,
902 ) -> None:
903
904 #: Host is at the same granularity as InventoryHost
905 self.hostname: Optional[str] = hostname
906
907 # Not everyone runs in containers, but enough people do to
908 # justify having the container_id (runtime id) and container_image
909 # (image name)
910 self.container_id = container_id # runtime id
911 self.container_image_id = container_image_id # image id locally
912 self.container_image_name = container_image_name # image friendly name
913 self.container_image_digests = container_image_digests # reg hashes
914
915 #: The type of service (osd, mon, mgr, etc.)
916 self.daemon_type = daemon_type
917
918 #: The orchestrator will have picked some names for daemons,
919 #: typically either based on hostnames or on pod names.
920 #: This is the <foo> in mds.<foo>, the ID that will appear
921 #: in the FSMap/ServiceMap.
922 self.daemon_id: Optional[str] = daemon_id
923 self.daemon_name = self.name()
924
925 #: Some daemon types have a numeric rank assigned
926 self.rank: Optional[int] = rank
927 self.rank_generation: Optional[int] = rank_generation
928
929 self._service_name: Optional[str] = service_name
930
931 #: Service version that was deployed
932 self.version = version
933
934 # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
935 self._status = status
936
937 #: Service status description when status == error.
938 self.status_desc = status_desc
939
940 #: datetime when this info was last refreshed
941 self.last_refresh: Optional[datetime.datetime] = last_refresh
942
943 self.created: Optional[datetime.datetime] = created
944 self.started: Optional[datetime.datetime] = started
945 self.last_configured: Optional[datetime.datetime] = last_configured
946 self.last_deployed: Optional[datetime.datetime] = last_deployed
947
948 #: Affinity to a certain OSDSpec
949 self.osdspec_affinity: Optional[str] = osdspec_affinity
950
951 self.events: List[OrchestratorEvent] = events or []
952
953 self.memory_usage: Optional[int] = memory_usage
954 self.memory_request: Optional[int] = memory_request
955 self.memory_limit: Optional[int] = memory_limit
956
957 self.cpu_percentage: Optional[str] = cpu_percentage
958
959 self.ports: Optional[List[int]] = ports
960 self.ip: Optional[str] = ip
961
962 self.deployed_by = deployed_by
963
964 self.is_active = is_active
965
966 self.extra_container_args = extra_container_args
967 self.extra_entrypoint_args = extra_entrypoint_args
968
969 @property
970 def status(self) -> Optional[DaemonDescriptionStatus]:
971 return self._status
972
973 @status.setter
974 def status(self, new: DaemonDescriptionStatus) -> None:
975 self._status = new
976 self.status_desc = DaemonDescriptionStatus.to_str(new)
977
978 def get_port_summary(self) -> str:
979 if not self.ports:
980 return ''
981 return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
982
983 def name(self) -> str:
984 return '%s.%s' % (self.daemon_type, self.daemon_id)
985
986 def matches_service(self, service_name: Optional[str]) -> bool:
987 assert self.daemon_id is not None
988 assert self.daemon_type is not None
989 if service_name:
990 return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
991 return False
992
993 def service_id(self) -> str:
994 assert self.daemon_id is not None
995 assert self.daemon_type is not None
996
997 if self._service_name:
998 if '.' in self._service_name:
999 return self._service_name.split('.', 1)[1]
1000 else:
1001 return ''
1002
1003 if self.daemon_type == 'osd':
1004 if self.osdspec_affinity and self.osdspec_affinity != 'None':
1005 return self.osdspec_affinity
1006 return ''
1007
1008 def _match() -> str:
1009 assert self.daemon_id is not None
1010 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
1011 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
1012
1013 if not self.hostname:
1014 # TODO: can a DaemonDescription exist without a hostname?
1015 raise err
1016
1017 # use the bare hostname, not the FQDN.
1018 host = self.hostname.split('.')[0]
1019
1020 if host == self.daemon_id:
1021 # daemon_id == "host"
1022 return self.daemon_id
1023
1024 elif host in self.daemon_id:
1025 # daemon_id == "service_id.host"
1026 # daemon_id == "service_id.host.random"
1027 pre, post = self.daemon_id.rsplit(host, 1)
1028 if not pre.endswith('.'):
1029 # '.' sep missing at front of host
1030 raise err
1031 elif post and not post.startswith('.'):
1032 # '.' sep missing at end of host
1033 raise err
1034 return pre[:-1]
1035
1036 # daemon_id == "service_id.random"
1037 if self.daemon_type == 'rgw':
1038 v = self.daemon_id.split('.')
1039 if len(v) in [3, 4]:
1040 return '.'.join(v[0:2])
1041
1042 if self.daemon_type == 'iscsi':
1043 v = self.daemon_id.split('.')
1044 return '.'.join(v[0:-1])
1045
1046 # daemon_id == "service_id"
1047 return self.daemon_id
1048
1049 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
1050 return _match()
1051
1052 return self.daemon_id
1053
1054 def service_name(self) -> str:
1055 if self._service_name:
1056 return self._service_name
1057 assert self.daemon_type is not None
1058 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
1059 return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
1060 return daemon_type_to_service(self.daemon_type)
1061
1062 def __repr__(self) -> str:
1063 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
1064 id=self.daemon_id)
1065
1066 def __str__(self) -> str:
1067 return f"{self.name()} in status {self.status_desc} on {self.hostname}"
1068
1069 def to_json(self) -> dict:
1070 out: Dict[str, Any] = OrderedDict()
1071 out['daemon_type'] = self.daemon_type
1072 out['daemon_id'] = self.daemon_id
1073 out['service_name'] = self._service_name
1074 out['daemon_name'] = self.name()
1075 out['hostname'] = self.hostname
1076 out['container_id'] = self.container_id
1077 out['container_image_id'] = self.container_image_id
1078 out['container_image_name'] = self.container_image_name
1079 out['container_image_digests'] = self.container_image_digests
1080 out['memory_usage'] = self.memory_usage
1081 out['memory_request'] = self.memory_request
1082 out['memory_limit'] = self.memory_limit
1083 out['cpu_percentage'] = self.cpu_percentage
1084 out['version'] = self.version
1085 out['status'] = self.status.value if self.status is not None else None
1086 out['status_desc'] = self.status_desc
1087 if self.daemon_type == 'osd':
1088 out['osdspec_affinity'] = self.osdspec_affinity
1089 out['is_active'] = self.is_active
1090 out['ports'] = self.ports
1091 out['ip'] = self.ip
1092 out['rank'] = self.rank
1093 out['rank_generation'] = self.rank_generation
1094
1095 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1096 'last_configured']:
1097 if getattr(self, k):
1098 out[k] = datetime_to_str(getattr(self, k))
1099
1100 if self.events:
1101 out['events'] = [e.to_json() for e in self.events]
1102
1103 empty = [k for k, v in out.items() if v is None]
1104 for e in empty:
1105 del out[e]
1106 return out
1107
1108 def to_dict(self) -> dict:
1109 out: Dict[str, Any] = OrderedDict()
1110 out['daemon_type'] = self.daemon_type
1111 out['daemon_id'] = self.daemon_id
1112 out['daemon_name'] = self.name()
1113 out['hostname'] = self.hostname
1114 out['container_id'] = self.container_id
1115 out['container_image_id'] = self.container_image_id
1116 out['container_image_name'] = self.container_image_name
1117 out['container_image_digests'] = self.container_image_digests
1118 out['memory_usage'] = self.memory_usage
1119 out['memory_request'] = self.memory_request
1120 out['memory_limit'] = self.memory_limit
1121 out['cpu_percentage'] = self.cpu_percentage
1122 out['version'] = self.version
1123 out['status'] = self.status.value if self.status is not None else None
1124 out['status_desc'] = self.status_desc
1125 if self.daemon_type == 'osd':
1126 out['osdspec_affinity'] = self.osdspec_affinity
1127 out['is_active'] = self.is_active
1128 out['ports'] = self.ports
1129 out['ip'] = self.ip
1130
1131 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1132 'last_configured']:
1133 if getattr(self, k):
1134 out[k] = datetime_to_str(getattr(self, k))
1135
1136 if self.events:
1137 out['events'] = [e.to_dict() for e in self.events]
1138
1139 empty = [k for k, v in out.items() if v is None]
1140 for e in empty:
1141 del out[e]
1142 return out
1143
1144 @classmethod
1145 @handle_type_error
1146 def from_json(cls, data: dict) -> 'DaemonDescription':
1147 c = data.copy()
1148 event_strs = c.pop('events', [])
1149 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1150 'last_configured']:
1151 if k in c:
1152 c[k] = str_to_datetime(c[k])
1153 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1154 status_int = c.pop('status', None)
1155 if 'daemon_name' in c:
1156 del c['daemon_name']
1157 if 'service_name' in c and c['service_name'].startswith('osd.'):
1158 # if the service_name is a osd.NNN (numeric osd id) then
1159 # ignore it -- it is not a valid service_name and
1160 # (presumably) came from an older version of cephadm.
1161 try:
1162 int(c['service_name'][4:])
1163 del c['service_name']
1164 except ValueError:
1165 pass
1166 status = DaemonDescriptionStatus(status_int) if status_int is not None else None
1167 return cls(events=events, status=status, **c)
1168
1169 def __copy__(self) -> 'DaemonDescription':
1170 # feel free to change this:
1171 return DaemonDescription.from_json(self.to_json())
1172
1173 @staticmethod
1174 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
1175 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
1176
1177
1178 yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1179
1180
1181 class ServiceDescription(object):
1182 """
1183 For responding to queries about the status of a particular service,
1184 stateful or stateless.
1185
1186 This is not about health or performance monitoring of services: it's
1187 about letting the orchestrator tell Ceph whether and where a
1188 service is scheduled in the cluster. When an orchestrator tells
1189 Ceph "it's running on host123", that's not a promise that the process
1190 is literally up this second, it's a description of where the orchestrator
1191 has decided the service should run.
1192 """
1193
1194 def __init__(self,
1195 spec: ServiceSpec,
1196 container_image_id: Optional[str] = None,
1197 container_image_name: Optional[str] = None,
1198 service_url: Optional[str] = None,
1199 last_refresh: Optional[datetime.datetime] = None,
1200 created: Optional[datetime.datetime] = None,
1201 deleted: Optional[datetime.datetime] = None,
1202 size: int = 0,
1203 running: int = 0,
1204 events: Optional[List['OrchestratorEvent']] = None,
1205 virtual_ip: Optional[str] = None,
1206 ports: List[int] = []) -> None:
1207 # Not everyone runs in containers, but enough people do to
1208 # justify having the container_image_id (image hash) and container_image
1209 # (image name)
1210 self.container_image_id = container_image_id # image hash
1211 self.container_image_name = container_image_name # image friendly name
1212
1213 # If the service exposes REST-like API, this attribute should hold
1214 # the URL.
1215 self.service_url = service_url
1216
1217 # Number of daemons
1218 self.size = size
1219
1220 # Number of daemons up
1221 self.running = running
1222
1223 # datetime when this info was last refreshed
1224 self.last_refresh: Optional[datetime.datetime] = last_refresh
1225 self.created: Optional[datetime.datetime] = created
1226 self.deleted: Optional[datetime.datetime] = deleted
1227
1228 self.spec: ServiceSpec = spec
1229
1230 self.events: List[OrchestratorEvent] = events or []
1231
1232 self.virtual_ip = virtual_ip
1233 self.ports = ports
1234
1235 def service_type(self) -> str:
1236 return self.spec.service_type
1237
1238 def __repr__(self) -> str:
1239 return f"<ServiceDescription of {self.spec.one_line_str()}>"
1240
1241 def get_port_summary(self) -> str:
1242 if not self.ports:
1243 return ''
1244 return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1245
1246 def to_json(self) -> OrderedDict:
1247 out = self.spec.to_json()
1248 status = {
1249 'container_image_id': self.container_image_id,
1250 'container_image_name': self.container_image_name,
1251 'service_url': self.service_url,
1252 'size': self.size,
1253 'running': self.running,
1254 'last_refresh': self.last_refresh,
1255 'created': self.created,
1256 'virtual_ip': self.virtual_ip,
1257 'ports': self.ports if self.ports else None,
1258 }
1259 for k in ['last_refresh', 'created']:
1260 if getattr(self, k):
1261 status[k] = datetime_to_str(getattr(self, k))
1262 status = {k: v for (k, v) in status.items() if v is not None}
1263 out['status'] = status
1264 if self.events:
1265 out['events'] = [e.to_json() for e in self.events]
1266 return out
1267
1268 def to_dict(self) -> OrderedDict:
1269 out = self.spec.to_json()
1270 status = {
1271 'container_image_id': self.container_image_id,
1272 'container_image_name': self.container_image_name,
1273 'service_url': self.service_url,
1274 'size': self.size,
1275 'running': self.running,
1276 'last_refresh': self.last_refresh,
1277 'created': self.created,
1278 'virtual_ip': self.virtual_ip,
1279 'ports': self.ports if self.ports else None,
1280 }
1281 for k in ['last_refresh', 'created']:
1282 if getattr(self, k):
1283 status[k] = datetime_to_str(getattr(self, k))
1284 status = {k: v for (k, v) in status.items() if v is not None}
1285 out['status'] = status
1286 if self.events:
1287 out['events'] = [e.to_dict() for e in self.events]
1288 return out
1289
1290 @classmethod
1291 @handle_type_error
1292 def from_json(cls, data: dict) -> 'ServiceDescription':
1293 c = data.copy()
1294 status = c.pop('status', {})
1295 event_strs = c.pop('events', [])
1296 spec = ServiceSpec.from_json(c)
1297
1298 c_status = status.copy()
1299 for k in ['last_refresh', 'created']:
1300 if k in c_status:
1301 c_status[k] = str_to_datetime(c_status[k])
1302 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1303 return cls(spec=spec, events=events, **c_status)
1304
1305 @staticmethod
1306 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
1307 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
1308
1309
1310 yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
1311
1312
1313 class InventoryFilter(object):
1314 """
1315 When fetching inventory, use this filter to avoid unnecessarily
1316 scanning the whole estate.
1317
1318 Typical use:
1319
1320 filter by host when presentig UI workflow for configuring
1321 a particular server.
1322 filter by label when not all of estate is Ceph servers,
1323 and we want to only learn about the Ceph servers.
1324 filter by label when we are interested particularly
1325 in e.g. OSD servers.
1326 """
1327
1328 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
1329
1330 #: Optional: get info about hosts matching labels
1331 self.labels = labels
1332
1333 #: Optional: get info about certain named hosts only
1334 self.hosts = hosts
1335
1336
1337 class InventoryHost(object):
1338 """
1339 When fetching inventory, all Devices are groups inside of an
1340 InventoryHost.
1341 """
1342
1343 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
1344 if devices is None:
1345 devices = inventory.Devices([])
1346 if labels is None:
1347 labels = []
1348 assert isinstance(devices, inventory.Devices)
1349
1350 self.name = name # unique within cluster. For example a hostname.
1351 self.addr = addr or name
1352 self.devices = devices
1353 self.labels = labels
1354
1355 def to_json(self) -> dict:
1356 return {
1357 'name': self.name,
1358 'addr': self.addr,
1359 'devices': self.devices.to_json(),
1360 'labels': self.labels,
1361 }
1362
1363 @classmethod
1364 def from_json(cls, data: dict) -> 'InventoryHost':
1365 try:
1366 _data = copy.deepcopy(data)
1367 name = _data.pop('name')
1368 addr = _data.pop('addr', None) or name
1369 devices = inventory.Devices.from_json(_data.pop('devices'))
1370 labels = _data.pop('labels', list())
1371 if _data:
1372 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1373 raise OrchestratorValidationError(error_msg)
1374 return cls(name, devices, labels, addr)
1375 except KeyError as e:
1376 error_msg = '{} is required for {}'.format(e, cls.__name__)
1377 raise OrchestratorValidationError(error_msg)
1378 except TypeError as e:
1379 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1380
1381 @classmethod
1382 def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
1383 devs = inventory.Devices.from_json
1384 return [cls(item[0], devs(item[1].data)) for item in hosts]
1385
1386 def __repr__(self) -> str:
1387 return "<InventoryHost>({name})".format(name=self.name)
1388
1389 @staticmethod
1390 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
1391 return [host.name for host in hosts]
1392
1393 def __eq__(self, other: Any) -> bool:
1394 return self.name == other.name and self.devices == other.devices
1395
1396
1397 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1398 """
1399 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1400 on devices.
1401
1402 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1403
1404 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1405 See ``ceph osd metadata | jq '.[].device_ids'``
1406 """
1407 __slots__ = ()
1408
1409
1410 class OrchestratorEvent:
1411 """
1412 Similar to K8s Events.
1413
1414 Some form of "important" log message attached to something.
1415 """
1416 INFO = 'INFO'
1417 ERROR = 'ERROR'
1418 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1419
1420 def __init__(self, created: Union[str, datetime.datetime], kind: str,
1421 subject: str, level: str, message: str) -> None:
1422 if isinstance(created, str):
1423 created = str_to_datetime(created)
1424 self.created: datetime.datetime = created
1425
1426 assert kind in "service daemon".split()
1427 self.kind: str = kind
1428
1429 # service name, or daemon danem or something
1430 self.subject: str = subject
1431
1432 # Events are not meant for debugging. debugs should end in the log.
1433 assert level in "INFO ERROR".split()
1434 self.level = level
1435
1436 self.message: str = message
1437
1438 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1439
1440 def kind_subject(self) -> str:
1441 return f'{self.kind}:{self.subject}'
1442
1443 def to_json(self) -> str:
1444 # Make a long list of events readable.
1445 created = datetime_to_str(self.created)
1446 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1447
1448 def to_dict(self) -> dict:
1449 # Convert events data to dict.
1450 return {
1451 'created': datetime_to_str(self.created),
1452 'subject': self.kind_subject(),
1453 'level': self.level,
1454 'message': self.message
1455 }
1456
1457 @classmethod
1458 @handle_type_error
1459 def from_json(cls, data: str) -> "OrchestratorEvent":
1460 """
1461 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1462 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1463
1464 :param data:
1465 :return:
1466 """
1467 match = cls.regex_v1.match(data)
1468 if match:
1469 return cls(*match.groups())
1470 raise ValueError(f'Unable to match: "{data}"')
1471
1472 def __eq__(self, other: Any) -> bool:
1473 if not isinstance(other, OrchestratorEvent):
1474 return False
1475
1476 return self.created == other.created and self.kind == other.kind \
1477 and self.subject == other.subject and self.message == other.message
1478
1479 def __repr__(self) -> str:
1480 return f'OrchestratorEvent.from_json({self.to_json()!r})'
1481
1482
1483 def _mk_orch_methods(cls: Any) -> Any:
1484 # Needs to be defined outside of for.
1485 # Otherwise meth is always bound to last key
1486 def shim(method_name: str) -> Callable:
1487 def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
1488 completion = self._oremote(method_name, args, kwargs)
1489 return completion
1490 return inner
1491
1492 for name, method in Orchestrator.__dict__.items():
1493 if not name.startswith('_') and name not in ['is_orchestrator_module']:
1494 remote_call = update_wrapper(shim(name), method)
1495 setattr(cls, name, remote_call)
1496 return cls
1497
1498
1499 @_mk_orch_methods
1500 class OrchestratorClientMixin(Orchestrator):
1501 """
1502 A module that inherents from `OrchestratorClientMixin` can directly call
1503 all :class:`Orchestrator` methods without manually calling remote.
1504
1505 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1506 calls :func:`OrchestratorClientMixin._oremote`
1507
1508 >>> class MyModule(OrchestratorClientMixin):
1509 ... def func(self):
1510 ... completion = self.add_host('somehost') # calls `_oremote()`
1511 ... self.log.debug(completion.result)
1512
1513 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1514 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1515 "real" implementation of the orchestrator.
1516
1517
1518 >>> import mgr_module
1519 >>> #doctest: +SKIP
1520 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1521 ... def __init__(self, ...):
1522 ... self.orch_client = OrchestratorClientMixin()
1523 ... self.orch_client.set_mgr(self.mgr))
1524 """
1525
1526 def set_mgr(self, mgr: MgrModule) -> None:
1527 """
1528 Useable in the Dashbord that uses a global ``mgr``
1529 """
1530
1531 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1532
1533 def __get_mgr(self) -> Any:
1534 try:
1535 return self.__mgr
1536 except AttributeError:
1537 return self
1538
1539 def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
1540 """
1541 Helper for invoking `remote` on whichever orchestrator is enabled
1542
1543 :raises RuntimeError: If the remote method failed.
1544 :raises OrchestratorError: orchestrator failed to perform
1545 :raises ImportError: no `orchestrator` module or backend not found.
1546 """
1547 mgr = self.__get_mgr()
1548
1549 try:
1550 o = mgr._select_orchestrator()
1551 except AttributeError:
1552 o = mgr.remote('orchestrator', '_select_orchestrator')
1553
1554 if o is None:
1555 raise NoOrchestrator()
1556
1557 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1558 try:
1559 return mgr.remote(o, meth, *args, **kwargs)
1560 except Exception as e:
1561 if meth == 'get_feature_set':
1562 raise # self.get_feature_set() calls self._oremote()
1563 f_set = self.get_feature_set()
1564 if meth not in f_set or not f_set[meth]['available']:
1565 raise NotImplementedError(f'{o} does not implement {meth}') from e
1566 raise