]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
1
2 """
3 ceph-mgr orchestrator interface
4
5 Please see the ceph-mgr module developer's guide for more information.
6 """
7
8 import copy
9 import datetime
10 import enum
11 import errno
12 import logging
13 import pickle
14 import re
15
16 from collections import namedtuple, OrderedDict
17 from contextlib import contextmanager
18 from functools import wraps, reduce
19
20 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
21 Sequence, Dict, cast
22
23 try:
24 from typing import Protocol # Protocol was added in Python 3.8
25 except ImportError:
26 class Protocol: # type: ignore
27 pass
28
29
30 import yaml
31
32 from ceph.deployment import inventory
33 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
34 IscsiServiceSpec, IngressSpec
35 from ceph.deployment.drive_group import DriveGroupSpec
36 from ceph.deployment.hostspec import HostSpec, SpecValidationError
37 from ceph.utils import datetime_to_str, str_to_datetime
38
39 from mgr_module import MgrModule, CLICommand, HandleCommandResult
40
41
42 logger = logging.getLogger(__name__)
43
44 T = TypeVar('T')
45 FuncT = TypeVar('FuncT', bound=Callable[..., Any])
46
47
48 class OrchestratorError(Exception):
49 """
50 General orchestrator specific error.
51
52 Used for deployment, configuration or user errors.
53
54 It's not intended for programming errors or orchestrator internal errors.
55 """
56
57 def __init__(self,
58 msg: str,
59 errno: int = -errno.EINVAL,
60 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
61 super(Exception, self).__init__(msg)
62 self.errno = errno
63 # See OrchestratorEvent.subject
64 self.event_subject = event_kind_subject
65
66
67 class NoOrchestrator(OrchestratorError):
68 """
69 No orchestrator in configured.
70 """
71
72 def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
74
75
76 class OrchestratorValidationError(OrchestratorError):
77 """
78 Raised when an orchestrator doesn't support a specific feature.
79 """
80
81
82 @contextmanager
83 def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
84 try:
85 yield
86 except OrchestratorError as e:
87 if overwrite or hasattr(e, 'event_subject'):
88 e.event_subject = (kind, subject)
89 raise
90
91
92 def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
93 @wraps(func)
94 def wrapper(*args: Any, **kwargs: Any) -> Any:
95 try:
96 return func(*args, **kwargs)
97 except (OrchestratorError, SpecValidationError) as e:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e.errno, stderr=str(e))
100 except ImportError as e:
101 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
102 except NotImplementedError:
103 msg = 'This Orchestrator does not support `{}`'.format(prefix)
104 return HandleCommandResult(-errno.ENOENT, stderr=msg)
105
106 # misuse lambda to copy `wrapper`
107 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731
108 wrapper_copy._prefix = prefix # type: ignore
109 wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore
110 wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
111 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
112
113 return cast(FuncT, wrapper_copy)
114
115
116 def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
117 """
118 Decorator to make Orchestrator methods return
119 an OrchResult.
120 """
121
122 @wraps(f)
123 def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
124 try:
125 return OrchResult(f(*args, **kwargs))
126 except Exception as e:
127 return OrchResult(None, exception=e)
128
129 return cast(Callable[..., OrchResult[T]], wrapper)
130
131
132 class InnerCliCommandCallable(Protocol):
133 def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
134 ...
135
136
137 def _cli_command(perm: str) -> InnerCliCommandCallable:
138 def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
139 return lambda func: handle_exception(prefix, perm, func)
140 return inner_cli_command
141
142
143 _cli_read_command = _cli_command('r')
144 _cli_write_command = _cli_command('rw')
145
146
147 class CLICommandMeta(type):
148 """
149 This is a workaround for the use of a global variable CLICommand.COMMANDS which
150 prevents modules from importing any other module.
151
152 We make use of CLICommand, except for the use of the global variable.
153 """
154 def __init__(cls, name: str, bases: Any, dct: Any) -> None:
155 super(CLICommandMeta, cls).__init__(name, bases, dct)
156 dispatch: Dict[str, CLICommand] = {}
157 for v in dct.values():
158 try:
159 dispatch[v._prefix] = v._cli_command
160 except AttributeError:
161 pass
162
163 def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
164 if cmd['prefix'] not in dispatch:
165 return self.handle_command(inbuf, cmd)
166
167 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
168
169 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
170 cls.handle_command = handle_command
171
172
173 class OrchResult(Generic[T]):
174 """
175 Stores a result and an exception. Mainly to circumvent the
176 MgrModule.remote() method that hides all exceptions and for
177 handling different sub-interpreters.
178 """
179
180 def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
181 self.result = result
182 self.serialized_exception: Optional[bytes] = None
183 self.exception_str: str = ''
184 self.set_exception(exception)
185
186 __slots__ = 'result', 'serialized_exception', 'exception_str'
187
188 def set_exception(self, e: Optional[Exception]) -> None:
189 if e is None:
190 self.serialized_exception = None
191 self.exception_str = ''
192 return
193
194 self.exception_str = f'{type(e)}: {str(e)}'
195 try:
196 self.serialized_exception = pickle.dumps(e)
197 except pickle.PicklingError:
198 logger.error(f"failed to pickle {e}")
199 if isinstance(e, Exception):
200 e = Exception(*e.args)
201 else:
202 e = Exception(str(e))
203 # degenerate to a plain Exception
204 self.serialized_exception = pickle.dumps(e)
205
206 def result_str(self) -> str:
207 """Force a string."""
208 if self.result is None:
209 return ''
210 if isinstance(self.result, list):
211 return '\n'.join(str(x) for x in self.result)
212 return str(self.result)
213
214
215 def raise_if_exception(c: OrchResult[T]) -> T:
216 """
217 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
218 """
219 if c.serialized_exception is not None:
220 try:
221 e = pickle.loads(c.serialized_exception)
222 except (KeyError, AttributeError):
223 raise Exception(c.exception_str)
224 raise e
225 assert c.result is not None, 'OrchResult should either have an exception or a result'
226 return c.result
227
228
229 def _hide_in_features(f: FuncT) -> FuncT:
230 f._hide_in_features = True # type: ignore
231 return f
232
233
234 class Orchestrator(object):
235 """
236 Calls in this class may do long running remote operations, with time
237 periods ranging from network latencies to package install latencies and large
238 internet downloads. For that reason, all are asynchronous, and return
239 ``Completion`` objects.
240
241 Methods should only return the completion and not directly execute
242 anything, like network calls. Otherwise the purpose of
243 those completions is defeated.
244
245 Implementations are not required to start work on an operation until
246 the caller waits on the relevant Completion objects. Callers making
247 multiple updates should not wait on Completions until they're done
248 sending operations: this enables implementations to batch up a series
249 of updates when wait() is called on a set of Completion objects.
250
251 Implementations are encouraged to keep reasonably fresh caches of
252 the status of the system: it is better to serve a stale-but-recent
253 result read of e.g. device inventory than it is to keep the caller waiting
254 while you scan hosts every time.
255 """
256
257 @_hide_in_features
258 def is_orchestrator_module(self) -> bool:
259 """
260 Enable other modules to interrogate this module to discover
261 whether it's usable as an orchestrator module.
262
263 Subclasses do not need to override this.
264 """
265 return True
266
267 @_hide_in_features
268 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
269 """
270 Report whether we can talk to the orchestrator. This is the
271 place to give the user a meaningful message if the orchestrator
272 isn't running or can't be contacted.
273
274 This method may be called frequently (e.g. every page load
275 to conditionally display a warning banner), so make sure it's
276 not too expensive. It's okay to give a slightly stale status
277 (e.g. based on a periodic background ping of the orchestrator)
278 if that's necessary to make this method fast.
279
280 .. note::
281 `True` doesn't mean that the desired functionality
282 is actually available in the orchestrator. I.e. this
283 won't work as expected::
284
285 >>> #doctest: +SKIP
286 ... if OrchestratorClientMixin().available()[0]: # wrong.
287 ... OrchestratorClientMixin().get_hosts()
288
289 :return: boolean representing whether the module is available/usable
290 :return: string describing any error
291 :return: dict containing any module specific information
292 """
293 raise NotImplementedError()
294
295 @_hide_in_features
296 def get_feature_set(self) -> Dict[str, dict]:
297 """Describes which methods this orchestrator implements
298
299 .. note::
300 `True` doesn't mean that the desired functionality
301 is actually possible in the orchestrator. I.e. this
302 won't work as expected::
303
304 >>> #doctest: +SKIP
305 ... api = OrchestratorClientMixin()
306 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
307 ... api.get_hosts()
308
309 It's better to ask for forgiveness instead::
310
311 >>> #doctest: +SKIP
312 ... try:
313 ... OrchestratorClientMixin().get_hosts()
314 ... except (OrchestratorError, NotImplementedError):
315 ... ...
316
317 :returns: Dict of API method names to ``{'available': True or False}``
318 """
319 module = self.__class__
320 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
321 for a in Orchestrator.__dict__
322 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
323 }
324 return features
325
326 def cancel_completions(self) -> None:
327 """
328 Cancels ongoing completions. Unstuck the mgr.
329 """
330 raise NotImplementedError()
331
332 def pause(self) -> None:
333 raise NotImplementedError()
334
335 def resume(self) -> None:
336 raise NotImplementedError()
337
338 def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
339 """
340 Add a host to the orchestrator inventory.
341
342 :param host: hostname
343 """
344 raise NotImplementedError()
345
346 def remove_host(self, host: str) -> OrchResult[str]:
347 """
348 Remove a host from the orchestrator inventory.
349
350 :param host: hostname
351 """
352 raise NotImplementedError()
353
354 def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
355 """
356 Update a host's address
357
358 :param host: hostname
359 :param addr: address (dns name or IP)
360 """
361 raise NotImplementedError()
362
363 def get_hosts(self) -> OrchResult[List[HostSpec]]:
364 """
365 Report the hosts in the cluster.
366
367 :return: list of HostSpec
368 """
369 raise NotImplementedError()
370
371 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
372 """
373 Add a host label
374 """
375 raise NotImplementedError()
376
377 def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
378 """
379 Remove a host label
380 """
381 raise NotImplementedError()
382
383 def host_ok_to_stop(self, hostname: str) -> OrchResult:
384 """
385 Check if the specified host can be safely stopped without reducing availability
386
387 :param host: hostname
388 """
389 raise NotImplementedError()
390
391 def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
392 """
393 Place a host in maintenance, stopping daemons and disabling it's systemd target
394 """
395 raise NotImplementedError()
396
397 def exit_host_maintenance(self, hostname: str) -> OrchResult:
398 """
399 Return a host from maintenance, restarting the clusters systemd target
400 """
401 raise NotImplementedError()
402
403 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
404 """
405 Returns something that was created by `ceph-volume inventory`.
406
407 :return: list of InventoryHost
408 """
409 raise NotImplementedError()
410
411 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
412 """
413 Describe a service (of any kind) that is already configured in
414 the orchestrator. For example, when viewing an OSD in the dashboard
415 we might like to also display information about the orchestrator's
416 view of the service (like the kubernetes pod ID).
417
418 When viewing a CephFS filesystem in the dashboard, we would use this
419 to display the pods being currently run for MDS daemons.
420
421 :return: list of ServiceDescription objects.
422 """
423 raise NotImplementedError()
424
425 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
426 """
427 Describe a daemon (of any kind) that is already configured in
428 the orchestrator.
429
430 :return: list of DaemonDescription objects.
431 """
432 raise NotImplementedError()
433
434 @handle_orch_error
435 def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
436 """
437 Applies any spec
438 """
439 fns: Dict[str, Callable[..., OrchResult[str]]] = {
440 'alertmanager': self.apply_alertmanager,
441 'crash': self.apply_crash,
442 'grafana': self.apply_grafana,
443 'iscsi': self.apply_iscsi,
444 'mds': self.apply_mds,
445 'mgr': self.apply_mgr,
446 'mon': self.apply_mon,
447 'nfs': self.apply_nfs,
448 'node-exporter': self.apply_node_exporter,
449 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
450 'prometheus': self.apply_prometheus,
451 'rbd-mirror': self.apply_rbd_mirror,
452 'rgw': self.apply_rgw,
453 'ingress': self.apply_ingress,
454 'host': self.add_host,
455 'cephadm-exporter': self.apply_cephadm_exporter,
456 }
457
458 def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
459 l_res = raise_if_exception(l)
460 r_res = raise_if_exception(r)
461 l_res.append(r_res)
462 return OrchResult(l_res)
463 return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
464
465 def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
466 """
467 Plan (Dry-run, Preview) a List of Specs.
468 """
469 raise NotImplementedError()
470
471 def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
472 """
473 Remove specific daemon(s).
474
475 :return: None
476 """
477 raise NotImplementedError()
478
479 def remove_service(self, service_name: str) -> OrchResult[str]:
480 """
481 Remove a service (a collection of daemons).
482
483 :return: None
484 """
485 raise NotImplementedError()
486
487 def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
488 """
489 Perform an action (start/stop/reload) on a service (i.e., all daemons
490 providing the logical service).
491
492 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
493 :param service_name: service_type + '.' + service_id
494 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
495 :rtype: OrchResult
496 """
497 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
498 raise NotImplementedError()
499
500 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
501 """
502 Perform an action (start/stop/reload) on a daemon.
503
504 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
505 :param daemon_name: name of daemon
506 :param image: Container image when redeploying that daemon
507 :rtype: OrchResult
508 """
509 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
510 raise NotImplementedError()
511
512 def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
513 """
514 Create one or more OSDs within a single Drive Group.
515
516 The principal argument here is the drive_group member
517 of OsdSpec: other fields are advisory/extensible for any
518 finer-grained OSD feature enablement (choice of backing store,
519 compression/encryption, etc).
520 """
521 raise NotImplementedError()
522
523 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
524 """ Update OSD cluster """
525 raise NotImplementedError()
526
527 def set_unmanaged_flag(self,
528 unmanaged_flag: bool,
529 service_type: str = 'osd',
530 service_name: Optional[str] = None
531 ) -> HandleCommandResult:
532 raise NotImplementedError()
533
534 def preview_osdspecs(self,
535 osdspec_name: Optional[str] = 'osd',
536 osdspecs: Optional[List[DriveGroupSpec]] = None
537 ) -> OrchResult[str]:
538 """ Get a preview for OSD deployments """
539 raise NotImplementedError()
540
541 def remove_osds(self, osd_ids: List[str],
542 replace: bool = False,
543 force: bool = False) -> OrchResult[str]:
544 """
545 :param osd_ids: list of OSD IDs
546 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
547 :param force: Forces the OSD removal process without waiting for the data to be drained first.
548 Note that this can only remove OSDs that were successfully
549 created (i.e. got an OSD ID).
550 """
551 raise NotImplementedError()
552
553 def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
554 """
555 TODO
556 """
557 raise NotImplementedError()
558
559 def remove_osds_status(self) -> OrchResult:
560 """
561 Returns a status of the ongoing OSD removal operations.
562 """
563 raise NotImplementedError()
564
565 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
566 """
567 Instructs the orchestrator to enable or disable either the ident or the fault LED.
568
569 :param ident_fault: either ``"ident"`` or ``"fault"``
570 :param on: ``True`` = on.
571 :param locations: See :class:`orchestrator.DeviceLightLoc`
572 """
573 raise NotImplementedError()
574
575 def zap_device(self, host: str, path: str) -> OrchResult[str]:
576 """Zap/Erase a device (DESTROYS DATA)"""
577 raise NotImplementedError()
578
579 def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
580 """Create daemons daemon(s) for unmanaged services"""
581 raise NotImplementedError()
582
583 def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
584 """Update mon cluster"""
585 raise NotImplementedError()
586
587 def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
588 """Update mgr cluster"""
589 raise NotImplementedError()
590
591 def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]:
592 """Update MDS cluster"""
593 raise NotImplementedError()
594
595 def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
596 """Update RGW cluster"""
597 raise NotImplementedError()
598
599 def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
600 """Update ingress daemons"""
601 raise NotImplementedError()
602
603 def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
604 """Update rbd-mirror cluster"""
605 raise NotImplementedError()
606
607 def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
608 """Update NFS cluster"""
609 raise NotImplementedError()
610
611 def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
612 """Update iscsi cluster"""
613 raise NotImplementedError()
614
615 def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
616 """Update prometheus cluster"""
617 raise NotImplementedError()
618
619 def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
620 """Update existing a Node-Exporter daemon(s)"""
621 raise NotImplementedError()
622
623 def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
624 """Update existing a crash daemon(s)"""
625 raise NotImplementedError()
626
627 def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
628 """Update existing a grafana service"""
629 raise NotImplementedError()
630
631 def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
632 """Update an existing AlertManager daemon(s)"""
633 raise NotImplementedError()
634
635 def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
636 """Update an existing cephadm exporter daemon"""
637 raise NotImplementedError()
638
639 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
640 raise NotImplementedError()
641
642 def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
643 raise NotImplementedError()
644
645 def upgrade_pause(self) -> OrchResult[str]:
646 raise NotImplementedError()
647
648 def upgrade_resume(self) -> OrchResult[str]:
649 raise NotImplementedError()
650
651 def upgrade_stop(self) -> OrchResult[str]:
652 raise NotImplementedError()
653
654 def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
655 """
656 If an upgrade is currently underway, report on where
657 we are in the process, or if some error has occurred.
658
659 :return: UpgradeStatusSpec instance
660 """
661 raise NotImplementedError()
662
663 @_hide_in_features
664 def upgrade_available(self) -> OrchResult:
665 """
666 Report on what versions are available to upgrade to
667
668 :return: List of strings
669 """
670 raise NotImplementedError()
671
672
673 GenericSpec = Union[ServiceSpec, HostSpec]
674
675
676 def json_to_generic_spec(spec: dict) -> GenericSpec:
677 if 'service_type' in spec and spec['service_type'] == 'host':
678 return HostSpec.from_json(spec)
679 else:
680 return ServiceSpec.from_json(spec)
681
682
683 def daemon_type_to_service(dtype: str) -> str:
684 mapping = {
685 'mon': 'mon',
686 'mgr': 'mgr',
687 'mds': 'mds',
688 'rgw': 'rgw',
689 'osd': 'osd',
690 'haproxy': 'ingress',
691 'keepalived': 'ingress',
692 'iscsi': 'iscsi',
693 'rbd-mirror': 'rbd-mirror',
694 'cephfs-mirror': 'cephfs-mirror',
695 'nfs': 'nfs',
696 'grafana': 'grafana',
697 'alertmanager': 'alertmanager',
698 'prometheus': 'prometheus',
699 'node-exporter': 'node-exporter',
700 'crash': 'crash',
701 'crashcollector': 'crash', # Specific Rook Daemon
702 'container': 'container',
703 'cephadm-exporter': 'cephadm-exporter',
704 }
705 return mapping[dtype]
706
707
708 def service_to_daemon_types(stype: str) -> List[str]:
709 mapping = {
710 'mon': ['mon'],
711 'mgr': ['mgr'],
712 'mds': ['mds'],
713 'rgw': ['rgw'],
714 'osd': ['osd'],
715 'ingress': ['haproxy', 'keepalived'],
716 'iscsi': ['iscsi'],
717 'rbd-mirror': ['rbd-mirror'],
718 'cephfs-mirror': ['cephfs-mirror'],
719 'nfs': ['nfs'],
720 'grafana': ['grafana'],
721 'alertmanager': ['alertmanager'],
722 'prometheus': ['prometheus'],
723 'node-exporter': ['node-exporter'],
724 'crash': ['crash'],
725 'container': ['container'],
726 'cephadm-exporter': ['cephadm-exporter'],
727 }
728 return mapping[stype]
729
730
731 class UpgradeStatusSpec(object):
732 # Orchestrator's report on what's going on with any ongoing upgrade
733 def __init__(self) -> None:
734 self.in_progress = False # Is an upgrade underway?
735 self.target_image: Optional[str] = None
736 self.services_complete: List[str] = [] # Which daemon types are fully updated?
737 self.progress: Optional[str] = None # How many of the daemons have we upgraded
738 self.message = "" # Freeform description
739
740
741 def handle_type_error(method: FuncT) -> FuncT:
742 @wraps(method)
743 def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
744 try:
745 return method(cls, *args, **kwargs)
746 except TypeError as e:
747 error_msg = '{}: {}'.format(cls.__name__, e)
748 raise OrchestratorValidationError(error_msg)
749 return cast(FuncT, inner)
750
751
752 class DaemonDescriptionStatus(enum.IntEnum):
753 error = -1
754 stopped = 0
755 running = 1
756
757
758 class DaemonDescription(object):
759 """
760 For responding to queries about the status of a particular daemon,
761 stateful or stateless.
762
763 This is not about health or performance monitoring of daemons: it's
764 about letting the orchestrator tell Ceph whether and where a
765 daemon is scheduled in the cluster. When an orchestrator tells
766 Ceph "it's running on host123", that's not a promise that the process
767 is literally up this second, it's a description of where the orchestrator
768 has decided the daemon should run.
769 """
770
771 def __init__(self,
772 daemon_type: Optional[str] = None,
773 daemon_id: Optional[str] = None,
774 hostname: Optional[str] = None,
775 container_id: Optional[str] = None,
776 container_image_id: Optional[str] = None,
777 container_image_name: Optional[str] = None,
778 container_image_digests: Optional[List[str]] = None,
779 version: Optional[str] = None,
780 status: Optional[DaemonDescriptionStatus] = None,
781 status_desc: Optional[str] = None,
782 last_refresh: Optional[datetime.datetime] = None,
783 created: Optional[datetime.datetime] = None,
784 started: Optional[datetime.datetime] = None,
785 last_configured: Optional[datetime.datetime] = None,
786 osdspec_affinity: Optional[str] = None,
787 last_deployed: Optional[datetime.datetime] = None,
788 events: Optional[List['OrchestratorEvent']] = None,
789 is_active: bool = False,
790 memory_usage: Optional[int] = None,
791 memory_request: Optional[int] = None,
792 memory_limit: Optional[int] = None,
793 service_name: Optional[str] = None,
794 ports: Optional[List[int]] = None,
795 ip: Optional[str] = None,
796 deployed_by: Optional[List[str]] = None,
797 ) -> None:
798
799 # Host is at the same granularity as InventoryHost
800 self.hostname: Optional[str] = hostname
801
802 # Not everyone runs in containers, but enough people do to
803 # justify having the container_id (runtime id) and container_image
804 # (image name)
805 self.container_id = container_id # runtime id
806 self.container_image_id = container_image_id # image id locally
807 self.container_image_name = container_image_name # image friendly name
808 self.container_image_digests = container_image_digests # reg hashes
809
810 # The type of service (osd, mon, mgr, etc.)
811 self.daemon_type = daemon_type
812
813 # The orchestrator will have picked some names for daemons,
814 # typically either based on hostnames or on pod names.
815 # This is the <foo> in mds.<foo>, the ID that will appear
816 # in the FSMap/ServiceMap.
817 self.daemon_id: Optional[str] = daemon_id
818
819 self._service_name: Optional[str] = service_name
820
821 # Service version that was deployed
822 self.version = version
823
824 # Service status: -1 error, 0 stopped, 1 running
825 self.status = status
826
827 # Service status description when status == error.
828 self.status_desc = status_desc
829
830 # datetime when this info was last refreshed
831 self.last_refresh: Optional[datetime.datetime] = last_refresh
832
833 self.created: Optional[datetime.datetime] = created
834 self.started: Optional[datetime.datetime] = started
835 self.last_configured: Optional[datetime.datetime] = last_configured
836 self.last_deployed: Optional[datetime.datetime] = last_deployed
837
838 # Affinity to a certain OSDSpec
839 self.osdspec_affinity: Optional[str] = osdspec_affinity
840
841 self.events: List[OrchestratorEvent] = events or []
842
843 self.memory_usage: Optional[int] = memory_usage
844 self.memory_request: Optional[int] = memory_request
845 self.memory_limit: Optional[int] = memory_limit
846
847 self.ports: Optional[List[int]] = ports
848 self.ip: Optional[str] = ip
849
850 self.deployed_by = deployed_by
851
852 self.is_active = is_active
853
854 def get_port_summary(self) -> str:
855 if not self.ports:
856 return ''
857 return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
858
859 def name(self) -> str:
860 return '%s.%s' % (self.daemon_type, self.daemon_id)
861
862 def matches_service(self, service_name: Optional[str]) -> bool:
863 assert self.daemon_id is not None
864 assert self.daemon_type is not None
865 if service_name:
866 return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
867 return False
868
869 def service_id(self) -> str:
870 assert self.daemon_id is not None
871 assert self.daemon_type is not None
872
873 if self._service_name:
874 if '.' in self._service_name:
875 return self._service_name.split('.', 1)[1]
876 else:
877 return ''
878
879 if self.daemon_type == 'osd':
880 if self.osdspec_affinity and self.osdspec_affinity != 'None':
881 return self.osdspec_affinity
882 return 'unmanaged'
883
884 def _match() -> str:
885 assert self.daemon_id is not None
886 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
887 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
888
889 if not self.hostname:
890 # TODO: can a DaemonDescription exist without a hostname?
891 raise err
892
893 # use the bare hostname, not the FQDN.
894 host = self.hostname.split('.')[0]
895
896 if host == self.daemon_id:
897 # daemon_id == "host"
898 return self.daemon_id
899
900 elif host in self.daemon_id:
901 # daemon_id == "service_id.host"
902 # daemon_id == "service_id.host.random"
903 pre, post = self.daemon_id.rsplit(host, 1)
904 if not pre.endswith('.'):
905 # '.' sep missing at front of host
906 raise err
907 elif post and not post.startswith('.'):
908 # '.' sep missing at end of host
909 raise err
910 return pre[:-1]
911
912 # daemon_id == "service_id.random"
913 if self.daemon_type == 'rgw':
914 v = self.daemon_id.split('.')
915 if len(v) in [3, 4]:
916 return '.'.join(v[0:2])
917
918 if self.daemon_type == 'iscsi':
919 v = self.daemon_id.split('.')
920 return '.'.join(v[0:-1])
921
922 # daemon_id == "service_id"
923 return self.daemon_id
924
925 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
926 return _match()
927
928 return self.daemon_id
929
930 def service_name(self) -> str:
931 if self._service_name:
932 return self._service_name
933 assert self.daemon_type is not None
934 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
935 return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
936 return daemon_type_to_service(self.daemon_type)
937
938 def __repr__(self) -> str:
939 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
940 id=self.daemon_id)
941
942 def to_json(self) -> dict:
943 out: Dict[str, Any] = OrderedDict()
944 out['daemon_type'] = self.daemon_type
945 out['daemon_id'] = self.daemon_id
946 out['hostname'] = self.hostname
947 out['container_id'] = self.container_id
948 out['container_image_id'] = self.container_image_id
949 out['container_image_name'] = self.container_image_name
950 out['container_image_digests'] = self.container_image_digests
951 out['memory_usage'] = self.memory_usage
952 out['memory_request'] = self.memory_request
953 out['memory_limit'] = self.memory_limit
954 out['version'] = self.version
955 out['status'] = self.status.value if self.status is not None else None
956 out['status_desc'] = self.status_desc
957 if self.daemon_type == 'osd':
958 out['osdspec_affinity'] = self.osdspec_affinity
959 out['is_active'] = self.is_active
960 out['ports'] = self.ports
961 out['ip'] = self.ip
962
963 for k in ['last_refresh', 'created', 'started', 'last_deployed',
964 'last_configured']:
965 if getattr(self, k):
966 out[k] = datetime_to_str(getattr(self, k))
967
968 if self.events:
969 out['events'] = [e.to_json() for e in self.events]
970
971 empty = [k for k, v in out.items() if v is None]
972 for e in empty:
973 del out[e]
974 return out
975
976 @classmethod
977 @handle_type_error
978 def from_json(cls, data: dict) -> 'DaemonDescription':
979 c = data.copy()
980 event_strs = c.pop('events', [])
981 for k in ['last_refresh', 'created', 'started', 'last_deployed',
982 'last_configured']:
983 if k in c:
984 c[k] = str_to_datetime(c[k])
985 events = [OrchestratorEvent.from_json(e) for e in event_strs]
986 status_int = c.pop('status', None)
987 status = DaemonDescriptionStatus(status_int) if status_int is not None else None
988 return cls(events=events, status=status, **c)
989
990 def __copy__(self) -> 'DaemonDescription':
991 # feel free to change this:
992 return DaemonDescription.from_json(self.to_json())
993
994 @staticmethod
995 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
996 return dumper.represent_dict(data.to_json().items())
997
998
999 yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1000
1001
1002 class ServiceDescription(object):
1003 """
1004 For responding to queries about the status of a particular service,
1005 stateful or stateless.
1006
1007 This is not about health or performance monitoring of services: it's
1008 about letting the orchestrator tell Ceph whether and where a
1009 service is scheduled in the cluster. When an orchestrator tells
1010 Ceph "it's running on host123", that's not a promise that the process
1011 is literally up this second, it's a description of where the orchestrator
1012 has decided the service should run.
1013 """
1014
1015 def __init__(self,
1016 spec: ServiceSpec,
1017 container_image_id: Optional[str] = None,
1018 container_image_name: Optional[str] = None,
1019 rados_config_location: Optional[str] = None,
1020 service_url: Optional[str] = None,
1021 last_refresh: Optional[datetime.datetime] = None,
1022 created: Optional[datetime.datetime] = None,
1023 deleted: Optional[datetime.datetime] = None,
1024 size: int = 0,
1025 running: int = 0,
1026 events: Optional[List['OrchestratorEvent']] = None,
1027 virtual_ip: Optional[str] = None,
1028 ports: List[int] = []) -> None:
1029 # Not everyone runs in containers, but enough people do to
1030 # justify having the container_image_id (image hash) and container_image
1031 # (image name)
1032 self.container_image_id = container_image_id # image hash
1033 self.container_image_name = container_image_name # image friendly name
1034
1035 # Location of the service configuration when stored in rados
1036 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1037 self.rados_config_location = rados_config_location
1038
1039 # If the service exposes REST-like API, this attribute should hold
1040 # the URL.
1041 self.service_url = service_url
1042
1043 # Number of daemons
1044 self.size = size
1045
1046 # Number of daemons up
1047 self.running = running
1048
1049 # datetime when this info was last refreshed
1050 self.last_refresh: Optional[datetime.datetime] = last_refresh
1051 self.created: Optional[datetime.datetime] = created
1052 self.deleted: Optional[datetime.datetime] = deleted
1053
1054 self.spec: ServiceSpec = spec
1055
1056 self.events: List[OrchestratorEvent] = events or []
1057
1058 self.virtual_ip = virtual_ip
1059 self.ports = ports
1060
1061 def service_type(self) -> str:
1062 return self.spec.service_type
1063
1064 def __repr__(self) -> str:
1065 return f"<ServiceDescription of {self.spec.one_line_str()}>"
1066
1067 def get_port_summary(self) -> str:
1068 if not self.ports:
1069 return ''
1070 return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1071
1072 def to_json(self) -> OrderedDict:
1073 out = self.spec.to_json()
1074 status = {
1075 'container_image_id': self.container_image_id,
1076 'container_image_name': self.container_image_name,
1077 'rados_config_location': self.rados_config_location,
1078 'service_url': self.service_url,
1079 'size': self.size,
1080 'running': self.running,
1081 'last_refresh': self.last_refresh,
1082 'created': self.created,
1083 'virtual_ip': self.virtual_ip,
1084 'ports': self.ports if self.ports else None,
1085 }
1086 for k in ['last_refresh', 'created']:
1087 if getattr(self, k):
1088 status[k] = datetime_to_str(getattr(self, k))
1089 status = {k: v for (k, v) in status.items() if v is not None}
1090 out['status'] = status
1091 if self.events:
1092 out['events'] = [e.to_json() for e in self.events]
1093 return out
1094
1095 @classmethod
1096 @handle_type_error
1097 def from_json(cls, data: dict) -> 'ServiceDescription':
1098 c = data.copy()
1099 status = c.pop('status', {})
1100 event_strs = c.pop('events', [])
1101 spec = ServiceSpec.from_json(c)
1102
1103 c_status = status.copy()
1104 for k in ['last_refresh', 'created']:
1105 if k in c_status:
1106 c_status[k] = str_to_datetime(c_status[k])
1107 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1108 return cls(spec=spec, events=events, **c_status)
1109
1110 @staticmethod
1111 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
1112 return dumper.represent_dict(data.to_json().items())
1113
1114
1115 yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
1116
1117
1118 class InventoryFilter(object):
1119 """
1120 When fetching inventory, use this filter to avoid unnecessarily
1121 scanning the whole estate.
1122
1123 Typical use: filter by host when presenting UI workflow for configuring
1124 a particular server.
1125 filter by label when not all of estate is Ceph servers,
1126 and we want to only learn about the Ceph servers.
1127 filter by label when we are interested particularly
1128 in e.g. OSD servers.
1129
1130 """
1131
1132 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
1133
1134 #: Optional: get info about hosts matching labels
1135 self.labels = labels
1136
1137 #: Optional: get info about certain named hosts only
1138 self.hosts = hosts
1139
1140
1141 class InventoryHost(object):
1142 """
1143 When fetching inventory, all Devices are groups inside of an
1144 InventoryHost.
1145 """
1146
1147 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
1148 if devices is None:
1149 devices = inventory.Devices([])
1150 if labels is None:
1151 labels = []
1152 assert isinstance(devices, inventory.Devices)
1153
1154 self.name = name # unique within cluster. For example a hostname.
1155 self.addr = addr or name
1156 self.devices = devices
1157 self.labels = labels
1158
1159 def to_json(self) -> dict:
1160 return {
1161 'name': self.name,
1162 'addr': self.addr,
1163 'devices': self.devices.to_json(),
1164 'labels': self.labels,
1165 }
1166
1167 @classmethod
1168 def from_json(cls, data: dict) -> 'InventoryHost':
1169 try:
1170 _data = copy.deepcopy(data)
1171 name = _data.pop('name')
1172 addr = _data.pop('addr', None) or name
1173 devices = inventory.Devices.from_json(_data.pop('devices'))
1174 labels = _data.pop('labels', list())
1175 if _data:
1176 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1177 raise OrchestratorValidationError(error_msg)
1178 return cls(name, devices, labels, addr)
1179 except KeyError as e:
1180 error_msg = '{} is required for {}'.format(e, cls.__name__)
1181 raise OrchestratorValidationError(error_msg)
1182 except TypeError as e:
1183 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1184
1185 @classmethod
1186 def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
1187 devs = inventory.Devices.from_json
1188 return [cls(item[0], devs(item[1].data)) for item in hosts]
1189
1190 def __repr__(self) -> str:
1191 return "<InventoryHost>({name})".format(name=self.name)
1192
1193 @staticmethod
1194 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
1195 return [host.name for host in hosts]
1196
1197 def __eq__(self, other: Any) -> bool:
1198 return self.name == other.name and self.devices == other.devices
1199
1200
1201 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1202 """
1203 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1204 on devices.
1205
1206 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1207
1208 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1209 See ``ceph osd metadata | jq '.[].device_ids'``
1210 """
1211 __slots__ = ()
1212
1213
1214 class OrchestratorEvent:
1215 """
1216 Similar to K8s Events.
1217
1218 Some form of "important" log message attached to something.
1219 """
1220 INFO = 'INFO'
1221 ERROR = 'ERROR'
1222 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1223
1224 def __init__(self, created: Union[str, datetime.datetime], kind: str,
1225 subject: str, level: str, message: str) -> None:
1226 if isinstance(created, str):
1227 created = str_to_datetime(created)
1228 self.created: datetime.datetime = created
1229
1230 assert kind in "service daemon".split()
1231 self.kind: str = kind
1232
1233 # service name, or daemon danem or something
1234 self.subject: str = subject
1235
1236 # Events are not meant for debugging. debugs should end in the log.
1237 assert level in "INFO ERROR".split()
1238 self.level = level
1239
1240 self.message: str = message
1241
1242 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1243
1244 def kind_subject(self) -> str:
1245 return f'{self.kind}:{self.subject}'
1246
1247 def to_json(self) -> str:
1248 # Make a long list of events readable.
1249 created = datetime_to_str(self.created)
1250 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1251
1252 @classmethod
1253 @handle_type_error
1254 def from_json(cls, data: str) -> "OrchestratorEvent":
1255 """
1256 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1257 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1258
1259 :param data:
1260 :return:
1261 """
1262 match = cls.regex_v1.match(data)
1263 if match:
1264 return cls(*match.groups())
1265 raise ValueError(f'Unable to match: "{data}"')
1266
1267 def __eq__(self, other: Any) -> bool:
1268 if not isinstance(other, OrchestratorEvent):
1269 return False
1270
1271 return self.created == other.created and self.kind == other.kind \
1272 and self.subject == other.subject and self.message == other.message
1273
1274
1275 def _mk_orch_methods(cls: Any) -> Any:
1276 # Needs to be defined outside of for.
1277 # Otherwise meth is always bound to last key
1278 def shim(method_name: str) -> Callable:
1279 def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
1280 completion = self._oremote(method_name, args, kwargs)
1281 return completion
1282 return inner
1283
1284 for meth in Orchestrator.__dict__:
1285 if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
1286 setattr(cls, meth, shim(meth))
1287 return cls
1288
1289
1290 @_mk_orch_methods
1291 class OrchestratorClientMixin(Orchestrator):
1292 """
1293 A module that inherents from `OrchestratorClientMixin` can directly call
1294 all :class:`Orchestrator` methods without manually calling remote.
1295
1296 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1297 calls :func:`OrchestratorClientMixin._oremote`
1298
1299 >>> class MyModule(OrchestratorClientMixin):
1300 ... def func(self):
1301 ... completion = self.add_host('somehost') # calls `_oremote()`
1302 ... self.log.debug(completion.result)
1303
1304 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1305 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1306 "real" implementation of the orchestrator.
1307
1308
1309 >>> import mgr_module
1310 >>> #doctest: +SKIP
1311 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1312 ... def __init__(self, ...):
1313 ... self.orch_client = OrchestratorClientMixin()
1314 ... self.orch_client.set_mgr(self.mgr))
1315 """
1316
1317 def set_mgr(self, mgr: MgrModule) -> None:
1318 """
1319 Useable in the Dashbord that uses a global ``mgr``
1320 """
1321
1322 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1323
1324 def __get_mgr(self) -> Any:
1325 try:
1326 return self.__mgr
1327 except AttributeError:
1328 return self
1329
1330 def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
1331 """
1332 Helper for invoking `remote` on whichever orchestrator is enabled
1333
1334 :raises RuntimeError: If the remote method failed.
1335 :raises OrchestratorError: orchestrator failed to perform
1336 :raises ImportError: no `orchestrator` module or backend not found.
1337 """
1338 mgr = self.__get_mgr()
1339
1340 try:
1341 o = mgr._select_orchestrator()
1342 except AttributeError:
1343 o = mgr.remote('orchestrator', '_select_orchestrator')
1344
1345 if o is None:
1346 raise NoOrchestrator()
1347
1348 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1349 try:
1350 return mgr.remote(o, meth, *args, **kwargs)
1351 except Exception as e:
1352 if meth == 'get_feature_set':
1353 raise # self.get_feature_set() calls self._oremote()
1354 f_set = self.get_feature_set()
1355 if meth not in f_set or not f_set[meth]['available']:
1356 raise NotImplementedError(f'{o} does not implement {meth}') from e
1357 raise