]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
3 ceph-mgr orchestrator interface
5 Please see the ceph-mgr module developer's guide for more information.
16 from collections
import namedtuple
, OrderedDict
17 from contextlib
import contextmanager
18 from functools
import wraps
, reduce
20 from typing
import TypeVar
, Generic
, List
, Optional
, Union
, Tuple
, Iterator
, Callable
, Any
, \
21 Sequence
, Dict
, cast
, Mapping
24 from typing
import Protocol
# Protocol was added in Python 3.8
26 class Protocol
: # type: ignore
32 from ceph
.deployment
import inventory
33 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, \
34 IscsiServiceSpec
, IngressSpec
35 from ceph
.deployment
.drive_group
import DriveGroupSpec
36 from ceph
.deployment
.hostspec
import HostSpec
, SpecValidationError
37 from ceph
.utils
import datetime_to_str
, str_to_datetime
39 from mgr_module
import MgrModule
, CLICommand
, HandleCommandResult
42 logger
= logging
.getLogger(__name__
)
45 FuncT
= TypeVar('FuncT', bound
=Callable
[..., Any
])
48 class OrchestratorError(Exception):
50 General orchestrator specific error.
52 Used for deployment, configuration or user errors.
54 It's not intended for programming errors or orchestrator internal errors.
59 errno
: int = -errno
.EINVAL
,
60 event_kind_subject
: Optional
[Tuple
[str, str]] = None) -> None:
61 super(Exception, self
).__init
__(msg
)
63 # See OrchestratorEvent.subject
64 self
.event_subject
= event_kind_subject
67 class NoOrchestrator(OrchestratorError
):
69 No orchestrator in configured.
72 def __init__(self
, msg
: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator
, self
).__init
__(msg
, errno
=-errno
.ENOENT
)
76 class OrchestratorValidationError(OrchestratorError
):
78 Raised when an orchestrator doesn't support a specific feature.
83 def set_exception_subject(kind
: str, subject
: str, overwrite
: bool = False) -> Iterator
[None]:
86 except OrchestratorError
as e
:
87 if overwrite
or hasattr(e
, 'event_subject'):
88 e
.event_subject
= (kind
, subject
)
92 def handle_exception(prefix
: str, perm
: str, func
: FuncT
) -> FuncT
:
94 def wrapper(*args
: Any
, **kwargs
: Any
) -> Any
:
96 return func(*args
, **kwargs
)
97 except (OrchestratorError
, SpecValidationError
) as e
:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e
.errno
, stderr
=str(e
))
100 except ImportError as e
:
101 return HandleCommandResult(-errno
.ENOENT
, stderr
=str(e
))
102 except NotImplementedError:
103 msg
= 'This Orchestrator does not support `{}`'.format(prefix
)
104 return HandleCommandResult(-errno
.ENOENT
, stderr
=msg
)
106 # misuse lambda to copy `wrapper`
107 wrapper_copy
= lambda *l_args
, **l_kwargs
: wrapper(*l_args
, **l_kwargs
) # noqa: E731
108 wrapper_copy
._prefix
= prefix
# type: ignore
109 wrapper_copy
._cli
_command
= CLICommand(prefix
, perm
) # type: ignore
110 wrapper_copy
._cli
_command
.store_func_metadata(func
) # type: ignore
111 wrapper_copy
._cli
_command
.func
= wrapper_copy
# type: ignore
113 return cast(FuncT
, wrapper_copy
)
116 def handle_orch_error(f
: Callable
[..., T
]) -> Callable
[..., 'OrchResult[T]']:
118 Decorator to make Orchestrator methods return
123 def wrapper(*args
: Any
, **kwargs
: Any
) -> OrchResult
[T
]:
125 return OrchResult(f(*args
, **kwargs
))
126 except Exception as e
:
128 return OrchResult(None, exception
=e
)
130 return cast(Callable
[..., OrchResult
[T
]], wrapper
)
133 class InnerCliCommandCallable(Protocol
):
134 def __call__(self
, prefix
: str) -> Callable
[[FuncT
], FuncT
]:
138 def _cli_command(perm
: str) -> InnerCliCommandCallable
:
139 def inner_cli_command(prefix
: str) -> Callable
[[FuncT
], FuncT
]:
140 return lambda func
: handle_exception(prefix
, perm
, func
)
141 return inner_cli_command
144 _cli_read_command
= _cli_command('r')
145 _cli_write_command
= _cli_command('rw')
148 class CLICommandMeta(type):
150 This is a workaround for the use of a global variable CLICommand.COMMANDS which
151 prevents modules from importing any other module.
153 We make use of CLICommand, except for the use of the global variable.
155 def __init__(cls
, name
: str, bases
: Any
, dct
: Any
) -> None:
156 super(CLICommandMeta
, cls
).__init
__(name
, bases
, dct
)
157 dispatch
: Dict
[str, CLICommand
] = {}
158 for v
in dct
.values():
160 dispatch
[v
._prefix
] = v
._cli
_command
161 except AttributeError:
164 def handle_command(self
: Any
, inbuf
: Optional
[str], cmd
: dict) -> Any
:
165 if cmd
['prefix'] not in dispatch
:
166 return self
.handle_command(inbuf
, cmd
)
168 return dispatch
[cmd
['prefix']].call(self
, cmd
, inbuf
)
170 cls
.COMMANDS
= [cmd
.dump_cmd() for cmd
in dispatch
.values()]
171 cls
.handle_command
= handle_command
174 class OrchResult(Generic
[T
]):
176 Stores a result and an exception. Mainly to circumvent the
177 MgrModule.remote() method that hides all exceptions and for
178 handling different sub-interpreters.
181 def __init__(self
, result
: Optional
[T
], exception
: Optional
[Exception] = None) -> None:
183 self
.serialized_exception
: Optional
[bytes
] = None
184 self
.exception_str
: str = ''
185 self
.set_exception(exception
)
187 __slots__
= 'result', 'serialized_exception', 'exception_str'
189 def set_exception(self
, e
: Optional
[Exception]) -> None:
191 self
.serialized_exception
= None
192 self
.exception_str
= ''
195 self
.exception_str
= f
'{type(e)}: {str(e)}'
197 self
.serialized_exception
= pickle
.dumps(e
)
198 except pickle
.PicklingError
:
199 logger
.error(f
"failed to pickle {e}")
200 if isinstance(e
, Exception):
201 e
= Exception(*e
.args
)
203 e
= Exception(str(e
))
204 # degenerate to a plain Exception
205 self
.serialized_exception
= pickle
.dumps(e
)
207 def result_str(self
) -> str:
208 """Force a string."""
209 if self
.result
is None:
211 if isinstance(self
.result
, list):
212 return '\n'.join(str(x
) for x
in self
.result
)
213 return str(self
.result
)
216 def raise_if_exception(c
: OrchResult
[T
]) -> T
:
218 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
220 if c
.serialized_exception
is not None:
222 e
= pickle
.loads(c
.serialized_exception
)
223 except (KeyError, AttributeError):
224 raise Exception(c
.exception_str
)
226 assert c
.result
is not None, 'OrchResult should either have an exception or a result'
230 def _hide_in_features(f
: FuncT
) -> FuncT
:
231 f
._hide
_in
_features
= True # type: ignore
235 class Orchestrator(object):
237 Calls in this class may do long running remote operations, with time
238 periods ranging from network latencies to package install latencies and large
239 internet downloads. For that reason, all are asynchronous, and return
240 ``Completion`` objects.
242 Methods should only return the completion and not directly execute
243 anything, like network calls. Otherwise the purpose of
244 those completions is defeated.
246 Implementations are not required to start work on an operation until
247 the caller waits on the relevant Completion objects. Callers making
248 multiple updates should not wait on Completions until they're done
249 sending operations: this enables implementations to batch up a series
250 of updates when wait() is called on a set of Completion objects.
252 Implementations are encouraged to keep reasonably fresh caches of
253 the status of the system: it is better to serve a stale-but-recent
254 result read of e.g. device inventory than it is to keep the caller waiting
255 while you scan hosts every time.
259 def is_orchestrator_module(self
) -> bool:
261 Enable other modules to interrogate this module to discover
262 whether it's usable as an orchestrator module.
264 Subclasses do not need to override this.
269 def available(self
) -> Tuple
[bool, str, Dict
[str, Any
]]:
271 Report whether we can talk to the orchestrator. This is the
272 place to give the user a meaningful message if the orchestrator
273 isn't running or can't be contacted.
275 This method may be called frequently (e.g. every page load
276 to conditionally display a warning banner), so make sure it's
277 not too expensive. It's okay to give a slightly stale status
278 (e.g. based on a periodic background ping of the orchestrator)
279 if that's necessary to make this method fast.
282 `True` doesn't mean that the desired functionality
283 is actually available in the orchestrator. I.e. this
284 won't work as expected::
287 ... if OrchestratorClientMixin().available()[0]: # wrong.
288 ... OrchestratorClientMixin().get_hosts()
290 :return: boolean representing whether the module is available/usable
291 :return: string describing any error
292 :return: dict containing any module specific information
294 raise NotImplementedError()
297 def get_feature_set(self
) -> Dict
[str, dict]:
298 """Describes which methods this orchestrator implements
301 `True` doesn't mean that the desired functionality
302 is actually possible in the orchestrator. I.e. this
303 won't work as expected::
306 ... api = OrchestratorClientMixin()
307 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
310 It's better to ask for forgiveness instead::
314 ... OrchestratorClientMixin().get_hosts()
315 ... except (OrchestratorError, NotImplementedError):
318 :returns: Dict of API method names to ``{'available': True or False}``
320 module
= self
.__class
__
321 features
= {a
: {'available': getattr(Orchestrator
, a
, None) != getattr(module
, a
)}
322 for a
in Orchestrator
.__dict
__
323 if not a
.startswith('_') and not getattr(getattr(Orchestrator
, a
), '_hide_in_features', False)
327 def cancel_completions(self
) -> None:
329 Cancels ongoing completions. Unstuck the mgr.
331 raise NotImplementedError()
333 def pause(self
) -> None:
334 raise NotImplementedError()
336 def resume(self
) -> None:
337 raise NotImplementedError()
339 def add_host(self
, host_spec
: HostSpec
) -> OrchResult
[str]:
341 Add a host to the orchestrator inventory.
343 :param host: hostname
345 raise NotImplementedError()
347 def remove_host(self
, host
: str, force
: bool, offline
: bool) -> OrchResult
[str]:
349 Remove a host from the orchestrator inventory.
351 :param host: hostname
353 raise NotImplementedError()
355 def drain_host(self
, hostname
: str) -> OrchResult
[str]:
357 drain all daemons from a host
359 :param hostname: hostname
361 raise NotImplementedError()
363 def update_host_addr(self
, host
: str, addr
: str) -> OrchResult
[str]:
365 Update a host's address
367 :param host: hostname
368 :param addr: address (dns name or IP)
370 raise NotImplementedError()
372 def get_hosts(self
) -> OrchResult
[List
[HostSpec
]]:
374 Report the hosts in the cluster.
376 :return: list of HostSpec
378 raise NotImplementedError()
380 def get_facts(self
, hostname
: Optional
[str] = None) -> OrchResult
[List
[Dict
[str, Any
]]]:
382 Return hosts metadata(gather_facts).
384 raise NotImplementedError()
386 def add_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
390 raise NotImplementedError()
392 def remove_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
396 raise NotImplementedError()
398 def host_ok_to_stop(self
, hostname
: str) -> OrchResult
:
400 Check if the specified host can be safely stopped without reducing availability
402 :param host: hostname
404 raise NotImplementedError()
406 def enter_host_maintenance(self
, hostname
: str, force
: bool = False) -> OrchResult
:
408 Place a host in maintenance, stopping daemons and disabling it's systemd target
410 raise NotImplementedError()
412 def exit_host_maintenance(self
, hostname
: str) -> OrchResult
:
414 Return a host from maintenance, restarting the clusters systemd target
416 raise NotImplementedError()
418 def get_inventory(self
, host_filter
: Optional
['InventoryFilter'] = None, refresh
: bool = False) -> OrchResult
[List
['InventoryHost']]:
420 Returns something that was created by `ceph-volume inventory`.
422 :return: list of InventoryHost
424 raise NotImplementedError()
426 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['ServiceDescription']]:
428 Describe a service (of any kind) that is already configured in
429 the orchestrator. For example, when viewing an OSD in the dashboard
430 we might like to also display information about the orchestrator's
431 view of the service (like the kubernetes pod ID).
433 When viewing a CephFS filesystem in the dashboard, we would use this
434 to display the pods being currently run for MDS daemons.
436 :return: list of ServiceDescription objects.
438 raise NotImplementedError()
440 def list_daemons(self
, service_name
: Optional
[str] = None, daemon_type
: Optional
[str] = None, daemon_id
: Optional
[str] = None, host
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['DaemonDescription']]:
442 Describe a daemon (of any kind) that is already configured in
445 :return: list of DaemonDescription objects.
447 raise NotImplementedError()
450 def apply(self
, specs
: Sequence
["GenericSpec"], no_overwrite
: bool = False) -> List
[str]:
454 fns
: Dict
[str, Callable
[..., OrchResult
[str]]] = {
455 'alertmanager': self
.apply_alertmanager
,
456 'crash': self
.apply_crash
,
457 'grafana': self
.apply_grafana
,
458 'iscsi': self
.apply_iscsi
,
459 'mds': self
.apply_mds
,
460 'mgr': self
.apply_mgr
,
461 'mon': self
.apply_mon
,
462 'nfs': self
.apply_nfs
,
463 'node-exporter': self
.apply_node_exporter
,
464 'osd': lambda dg
: self
.apply_drivegroups([dg
]), # type: ignore
465 'prometheus': self
.apply_prometheus
,
466 'rbd-mirror': self
.apply_rbd_mirror
,
467 'rgw': self
.apply_rgw
,
468 'ingress': self
.apply_ingress
,
469 'host': self
.add_host
,
470 'cephadm-exporter': self
.apply_cephadm_exporter
,
473 def merge(l
: OrchResult
[List
[str]], r
: OrchResult
[str]) -> OrchResult
[List
[str]]: # noqa: E741
474 l_res
= raise_if_exception(l
)
475 r_res
= raise_if_exception(r
)
477 return OrchResult(l_res
)
478 return raise_if_exception(reduce(merge
, [fns
[spec
.service_type
](spec
) for spec
in specs
], OrchResult([])))
480 def plan(self
, spec
: Sequence
["GenericSpec"]) -> OrchResult
[List
]:
482 Plan (Dry-run, Preview) a List of Specs.
484 raise NotImplementedError()
486 def remove_daemons(self
, names
: List
[str]) -> OrchResult
[List
[str]]:
488 Remove specific daemon(s).
492 raise NotImplementedError()
494 def remove_service(self
, service_name
: str, force
: bool = False) -> OrchResult
[str]:
496 Remove a service (a collection of daemons).
500 raise NotImplementedError()
502 def service_action(self
, action
: str, service_name
: str) -> OrchResult
[List
[str]]:
504 Perform an action (start/stop/reload) on a service (i.e., all daemons
505 providing the logical service).
507 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
508 :param service_name: service_type + '.' + service_id
509 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
512 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
513 raise NotImplementedError()
515 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> OrchResult
[str]:
517 Perform an action (start/stop/reload) on a daemon.
519 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
520 :param daemon_name: name of daemon
521 :param image: Container image when redeploying that daemon
524 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
525 raise NotImplementedError()
527 def create_osds(self
, drive_group
: DriveGroupSpec
) -> OrchResult
[str]:
529 Create one or more OSDs within a single Drive Group.
531 The principal argument here is the drive_group member
532 of OsdSpec: other fields are advisory/extensible for any
533 finer-grained OSD feature enablement (choice of backing store,
534 compression/encryption, etc).
536 raise NotImplementedError()
538 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> OrchResult
[List
[str]]:
539 """ Update OSD cluster """
540 raise NotImplementedError()
542 def set_unmanaged_flag(self
,
543 unmanaged_flag
: bool,
544 service_type
: str = 'osd',
545 service_name
: Optional
[str] = None
546 ) -> HandleCommandResult
:
547 raise NotImplementedError()
549 def preview_osdspecs(self
,
550 osdspec_name
: Optional
[str] = 'osd',
551 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
552 ) -> OrchResult
[str]:
553 """ Get a preview for OSD deployments """
554 raise NotImplementedError()
556 def remove_osds(self
, osd_ids
: List
[str],
557 replace
: bool = False,
559 zap
: bool = False) -> OrchResult
[str]:
561 :param osd_ids: list of OSD IDs
562 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
563 :param force: Forces the OSD removal process without waiting for the data to be drained first.
564 :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
565 Note that this can only remove OSDs that were successfully
566 created (i.e. got an OSD ID).
568 raise NotImplementedError()
570 def stop_remove_osds(self
, osd_ids
: List
[str]) -> OrchResult
:
574 raise NotImplementedError()
576 def remove_osds_status(self
) -> OrchResult
:
578 Returns a status of the ongoing OSD removal operations.
580 raise NotImplementedError()
582 def blink_device_light(self
, ident_fault
: str, on
: bool, locations
: List
['DeviceLightLoc']) -> OrchResult
[List
[str]]:
584 Instructs the orchestrator to enable or disable either the ident or the fault LED.
586 :param ident_fault: either ``"ident"`` or ``"fault"``
587 :param on: ``True`` = on.
588 :param locations: See :class:`orchestrator.DeviceLightLoc`
590 raise NotImplementedError()
592 def zap_device(self
, host
: str, path
: str) -> OrchResult
[str]:
593 """Zap/Erase a device (DESTROYS DATA)"""
594 raise NotImplementedError()
596 def add_daemon(self
, spec
: ServiceSpec
) -> OrchResult
[List
[str]]:
597 """Create daemons daemon(s) for unmanaged services"""
598 raise NotImplementedError()
600 def apply_mon(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
601 """Update mon cluster"""
602 raise NotImplementedError()
604 def apply_mgr(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
605 """Update mgr cluster"""
606 raise NotImplementedError()
608 def apply_mds(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
609 """Update MDS cluster"""
610 raise NotImplementedError()
612 def apply_rgw(self
, spec
: RGWSpec
) -> OrchResult
[str]:
613 """Update RGW cluster"""
614 raise NotImplementedError()
616 def apply_ingress(self
, spec
: IngressSpec
) -> OrchResult
[str]:
617 """Update ingress daemons"""
618 raise NotImplementedError()
620 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
621 """Update rbd-mirror cluster"""
622 raise NotImplementedError()
624 def apply_nfs(self
, spec
: NFSServiceSpec
) -> OrchResult
[str]:
625 """Update NFS cluster"""
626 raise NotImplementedError()
628 def apply_iscsi(self
, spec
: IscsiServiceSpec
) -> OrchResult
[str]:
629 """Update iscsi cluster"""
630 raise NotImplementedError()
632 def apply_prometheus(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
633 """Update prometheus cluster"""
634 raise NotImplementedError()
636 def apply_node_exporter(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
637 """Update existing a Node-Exporter daemon(s)"""
638 raise NotImplementedError()
640 def apply_crash(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
641 """Update existing a crash daemon(s)"""
642 raise NotImplementedError()
644 def apply_grafana(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
645 """Update existing a grafana service"""
646 raise NotImplementedError()
648 def apply_alertmanager(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
649 """Update an existing AlertManager daemon(s)"""
650 raise NotImplementedError()
652 def apply_cephadm_exporter(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
653 """Update an existing cephadm exporter daemon"""
654 raise NotImplementedError()
656 def upgrade_check(self
, image
: Optional
[str], version
: Optional
[str]) -> OrchResult
[str]:
657 raise NotImplementedError()
659 def upgrade_ls(self
, image
: Optional
[str], tags
: bool) -> OrchResult
[Dict
[Any
, Any
]]:
660 raise NotImplementedError()
662 def upgrade_start(self
, image
: Optional
[str], version
: Optional
[str]) -> OrchResult
[str]:
663 raise NotImplementedError()
665 def upgrade_pause(self
) -> OrchResult
[str]:
666 raise NotImplementedError()
668 def upgrade_resume(self
) -> OrchResult
[str]:
669 raise NotImplementedError()
671 def upgrade_stop(self
) -> OrchResult
[str]:
672 raise NotImplementedError()
674 def upgrade_status(self
) -> OrchResult
['UpgradeStatusSpec']:
676 If an upgrade is currently underway, report on where
677 we are in the process, or if some error has occurred.
679 :return: UpgradeStatusSpec instance
681 raise NotImplementedError()
684 def upgrade_available(self
) -> OrchResult
:
686 Report on what versions are available to upgrade to
688 :return: List of strings
690 raise NotImplementedError()
693 GenericSpec
= Union
[ServiceSpec
, HostSpec
]
696 def json_to_generic_spec(spec
: dict) -> GenericSpec
:
697 if 'service_type' in spec
and spec
['service_type'] == 'host':
698 return HostSpec
.from_json(spec
)
700 return ServiceSpec
.from_json(spec
)
703 def daemon_type_to_service(dtype
: str) -> str:
710 'haproxy': 'ingress',
711 'keepalived': 'ingress',
713 'rbd-mirror': 'rbd-mirror',
714 'cephfs-mirror': 'cephfs-mirror',
716 'grafana': 'grafana',
717 'alertmanager': 'alertmanager',
718 'prometheus': 'prometheus',
719 'node-exporter': 'node-exporter',
721 'crashcollector': 'crash', # Specific Rook Daemon
722 'container': 'container',
723 'cephadm-exporter': 'cephadm-exporter',
725 return mapping
[dtype
]
728 def service_to_daemon_types(stype
: str) -> List
[str]:
735 'ingress': ['haproxy', 'keepalived'],
737 'rbd-mirror': ['rbd-mirror'],
738 'cephfs-mirror': ['cephfs-mirror'],
740 'grafana': ['grafana'],
741 'alertmanager': ['alertmanager'],
742 'prometheus': ['prometheus'],
743 'node-exporter': ['node-exporter'],
745 'container': ['container'],
746 'cephadm-exporter': ['cephadm-exporter'],
748 return mapping
[stype
]
751 KNOWN_DAEMON_TYPES
: List
[str] = list(
752 sum((service_to_daemon_types(t
) for t
in ServiceSpec
.KNOWN_SERVICE_TYPES
), []))
755 class UpgradeStatusSpec(object):
756 # Orchestrator's report on what's going on with any ongoing upgrade
757 def __init__(self
) -> None:
758 self
.in_progress
= False # Is an upgrade underway?
759 self
.target_image
: Optional
[str] = None
760 self
.services_complete
: List
[str] = [] # Which daemon types are fully updated?
761 self
.progress
: Optional
[str] = None # How many of the daemons have we upgraded
762 self
.message
= "" # Freeform description
765 def handle_type_error(method
: FuncT
) -> FuncT
:
767 def inner(cls
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
769 return method(cls
, *args
, **kwargs
)
770 except TypeError as e
:
771 error_msg
= '{}: {}'.format(cls
.__name
__, e
)
772 raise OrchestratorValidationError(error_msg
)
773 return cast(FuncT
, inner
)
776 class DaemonDescriptionStatus(enum
.IntEnum
):
782 class DaemonDescription(object):
784 For responding to queries about the status of a particular daemon,
785 stateful or stateless.
787 This is not about health or performance monitoring of daemons: it's
788 about letting the orchestrator tell Ceph whether and where a
789 daemon is scheduled in the cluster. When an orchestrator tells
790 Ceph "it's running on host123", that's not a promise that the process
791 is literally up this second, it's a description of where the orchestrator
792 has decided the daemon should run.
796 daemon_type
: Optional
[str] = None,
797 daemon_id
: Optional
[str] = None,
798 hostname
: Optional
[str] = None,
799 container_id
: Optional
[str] = None,
800 container_image_id
: Optional
[str] = None,
801 container_image_name
: Optional
[str] = None,
802 container_image_digests
: Optional
[List
[str]] = None,
803 version
: Optional
[str] = None,
804 status
: Optional
[DaemonDescriptionStatus
] = None,
805 status_desc
: Optional
[str] = None,
806 last_refresh
: Optional
[datetime
.datetime
] = None,
807 created
: Optional
[datetime
.datetime
] = None,
808 started
: Optional
[datetime
.datetime
] = None,
809 last_configured
: Optional
[datetime
.datetime
] = None,
810 osdspec_affinity
: Optional
[str] = None,
811 last_deployed
: Optional
[datetime
.datetime
] = None,
812 events
: Optional
[List
['OrchestratorEvent']] = None,
813 is_active
: bool = False,
814 memory_usage
: Optional
[int] = None,
815 memory_request
: Optional
[int] = None,
816 memory_limit
: Optional
[int] = None,
817 service_name
: Optional
[str] = None,
818 ports
: Optional
[List
[int]] = None,
819 ip
: Optional
[str] = None,
820 deployed_by
: Optional
[List
[str]] = None,
821 rank
: Optional
[int] = None,
822 rank_generation
: Optional
[int] = None,
825 # Host is at the same granularity as InventoryHost
826 self
.hostname
: Optional
[str] = hostname
828 # Not everyone runs in containers, but enough people do to
829 # justify having the container_id (runtime id) and container_image
831 self
.container_id
= container_id
# runtime id
832 self
.container_image_id
= container_image_id
# image id locally
833 self
.container_image_name
= container_image_name
# image friendly name
834 self
.container_image_digests
= container_image_digests
# reg hashes
836 # The type of service (osd, mon, mgr, etc.)
837 self
.daemon_type
= daemon_type
839 # The orchestrator will have picked some names for daemons,
840 # typically either based on hostnames or on pod names.
841 # This is the <foo> in mds.<foo>, the ID that will appear
842 # in the FSMap/ServiceMap.
843 self
.daemon_id
: Optional
[str] = daemon_id
844 self
.daemon_name
= self
.name()
846 # Some daemon types have a numeric rank assigned
847 self
.rank
: Optional
[int] = rank
848 self
.rank_generation
: Optional
[int] = rank_generation
850 self
._service
_name
: Optional
[str] = service_name
852 # Service version that was deployed
853 self
.version
= version
855 # Service status: -1 error, 0 stopped, 1 running
858 # Service status description when status == error.
859 self
.status_desc
= status_desc
861 # datetime when this info was last refreshed
862 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
864 self
.created
: Optional
[datetime
.datetime
] = created
865 self
.started
: Optional
[datetime
.datetime
] = started
866 self
.last_configured
: Optional
[datetime
.datetime
] = last_configured
867 self
.last_deployed
: Optional
[datetime
.datetime
] = last_deployed
869 # Affinity to a certain OSDSpec
870 self
.osdspec_affinity
: Optional
[str] = osdspec_affinity
872 self
.events
: List
[OrchestratorEvent
] = events
or []
874 self
.memory_usage
: Optional
[int] = memory_usage
875 self
.memory_request
: Optional
[int] = memory_request
876 self
.memory_limit
: Optional
[int] = memory_limit
878 self
.ports
: Optional
[List
[int]] = ports
879 self
.ip
: Optional
[str] = ip
881 self
.deployed_by
= deployed_by
883 self
.is_active
= is_active
885 def get_port_summary(self
) -> str:
888 return f
"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
890 def name(self
) -> str:
891 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
893 def matches_service(self
, service_name
: Optional
[str]) -> bool:
894 assert self
.daemon_id
is not None
895 assert self
.daemon_type
is not None
897 return (daemon_type_to_service(self
.daemon_type
) + '.' + self
.daemon_id
).startswith(service_name
+ '.')
900 def service_id(self
) -> str:
901 assert self
.daemon_id
is not None
902 assert self
.daemon_type
is not None
904 if self
._service
_name
:
905 if '.' in self
._service
_name
:
906 return self
._service
_name
.split('.', 1)[1]
910 if self
.daemon_type
== 'osd':
911 if self
.osdspec_affinity
and self
.osdspec_affinity
!= 'None':
912 return self
.osdspec_affinity
916 assert self
.daemon_id
is not None
917 err
= OrchestratorError("DaemonDescription: Cannot calculate service_id: "
918 f
"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
920 if not self
.hostname
:
921 # TODO: can a DaemonDescription exist without a hostname?
924 # use the bare hostname, not the FQDN.
925 host
= self
.hostname
.split('.')[0]
927 if host
== self
.daemon_id
:
928 # daemon_id == "host"
929 return self
.daemon_id
931 elif host
in self
.daemon_id
:
932 # daemon_id == "service_id.host"
933 # daemon_id == "service_id.host.random"
934 pre
, post
= self
.daemon_id
.rsplit(host
, 1)
935 if not pre
.endswith('.'):
936 # '.' sep missing at front of host
938 elif post
and not post
.startswith('.'):
939 # '.' sep missing at end of host
943 # daemon_id == "service_id.random"
944 if self
.daemon_type
== 'rgw':
945 v
= self
.daemon_id
.split('.')
947 return '.'.join(v
[0:2])
949 if self
.daemon_type
== 'iscsi':
950 v
= self
.daemon_id
.split('.')
951 return '.'.join(v
[0:-1])
953 # daemon_id == "service_id"
954 return self
.daemon_id
956 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
959 return self
.daemon_id
961 def service_name(self
) -> str:
962 if self
._service
_name
:
963 return self
._service
_name
964 assert self
.daemon_type
is not None
965 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
966 return f
'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
967 return daemon_type_to_service(self
.daemon_type
)
969 def __repr__(self
) -> str:
970 return "<DaemonDescription>({type}.{id})".format(type=self
.daemon_type
,
973 def to_json(self
) -> dict:
974 out
: Dict
[str, Any
] = OrderedDict()
975 out
['daemon_type'] = self
.daemon_type
976 out
['daemon_id'] = self
.daemon_id
977 out
['service_name'] = self
._service
_name
978 out
['daemon_name'] = self
.name()
979 out
['hostname'] = self
.hostname
980 out
['container_id'] = self
.container_id
981 out
['container_image_id'] = self
.container_image_id
982 out
['container_image_name'] = self
.container_image_name
983 out
['container_image_digests'] = self
.container_image_digests
984 out
['memory_usage'] = self
.memory_usage
985 out
['memory_request'] = self
.memory_request
986 out
['memory_limit'] = self
.memory_limit
987 out
['version'] = self
.version
988 out
['status'] = self
.status
.value
if self
.status
is not None else None
989 out
['status_desc'] = self
.status_desc
990 if self
.daemon_type
== 'osd':
991 out
['osdspec_affinity'] = self
.osdspec_affinity
992 out
['is_active'] = self
.is_active
993 out
['ports'] = self
.ports
995 out
['rank'] = self
.rank
996 out
['rank_generation'] = self
.rank_generation
998 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1000 if getattr(self
, k
):
1001 out
[k
] = datetime_to_str(getattr(self
, k
))
1004 out
['events'] = [e
.to_json() for e
in self
.events
]
1006 empty
= [k
for k
, v
in out
.items() if v
is None]
1011 def to_dict(self
) -> dict:
1012 out
: Dict
[str, Any
] = OrderedDict()
1013 out
['daemon_type'] = self
.daemon_type
1014 out
['daemon_id'] = self
.daemon_id
1015 out
['daemon_name'] = self
.name()
1016 out
['hostname'] = self
.hostname
1017 out
['container_id'] = self
.container_id
1018 out
['container_image_id'] = self
.container_image_id
1019 out
['container_image_name'] = self
.container_image_name
1020 out
['container_image_digests'] = self
.container_image_digests
1021 out
['memory_usage'] = self
.memory_usage
1022 out
['memory_request'] = self
.memory_request
1023 out
['memory_limit'] = self
.memory_limit
1024 out
['version'] = self
.version
1025 out
['status'] = self
.status
.value
if self
.status
is not None else None
1026 out
['status_desc'] = self
.status_desc
1027 if self
.daemon_type
== 'osd':
1028 out
['osdspec_affinity'] = self
.osdspec_affinity
1029 out
['is_active'] = self
.is_active
1030 out
['ports'] = self
.ports
1033 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1035 if getattr(self
, k
):
1036 out
[k
] = datetime_to_str(getattr(self
, k
))
1039 out
['events'] = [e
.to_dict() for e
in self
.events
]
1041 empty
= [k
for k
, v
in out
.items() if v
is None]
1048 def from_json(cls
, data
: dict) -> 'DaemonDescription':
1050 event_strs
= c
.pop('events', [])
1051 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1054 c
[k
] = str_to_datetime(c
[k
])
1055 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1056 status_int
= c
.pop('status', None)
1057 if 'daemon_name' in c
:
1058 del c
['daemon_name']
1059 status
= DaemonDescriptionStatus(status_int
) if status_int
is not None else None
1060 return cls(events
=events
, status
=status
, **c
)
1062 def __copy__(self
) -> 'DaemonDescription':
1063 # feel free to change this:
1064 return DaemonDescription
.from_json(self
.to_json())
1067 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'DaemonDescription') -> Any
:
1068 return dumper
.represent_dict(cast(Mapping
, data
.to_json().items()))
1071 yaml
.add_representer(DaemonDescription
, DaemonDescription
.yaml_representer
)
1074 class ServiceDescription(object):
1076 For responding to queries about the status of a particular service,
1077 stateful or stateless.
1079 This is not about health or performance monitoring of services: it's
1080 about letting the orchestrator tell Ceph whether and where a
1081 service is scheduled in the cluster. When an orchestrator tells
1082 Ceph "it's running on host123", that's not a promise that the process
1083 is literally up this second, it's a description of where the orchestrator
1084 has decided the service should run.
1089 container_image_id
: Optional
[str] = None,
1090 container_image_name
: Optional
[str] = None,
1091 service_url
: Optional
[str] = None,
1092 last_refresh
: Optional
[datetime
.datetime
] = None,
1093 created
: Optional
[datetime
.datetime
] = None,
1094 deleted
: Optional
[datetime
.datetime
] = None,
1097 events
: Optional
[List
['OrchestratorEvent']] = None,
1098 virtual_ip
: Optional
[str] = None,
1099 ports
: List
[int] = []) -> None:
1100 # Not everyone runs in containers, but enough people do to
1101 # justify having the container_image_id (image hash) and container_image
1103 self
.container_image_id
= container_image_id
# image hash
1104 self
.container_image_name
= container_image_name
# image friendly name
1106 # If the service exposes REST-like API, this attribute should hold
1108 self
.service_url
= service_url
1113 # Number of daemons up
1114 self
.running
= running
1116 # datetime when this info was last refreshed
1117 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
1118 self
.created
: Optional
[datetime
.datetime
] = created
1119 self
.deleted
: Optional
[datetime
.datetime
] = deleted
1121 self
.spec
: ServiceSpec
= spec
1123 self
.events
: List
[OrchestratorEvent
] = events
or []
1125 self
.virtual_ip
= virtual_ip
1128 def service_type(self
) -> str:
1129 return self
.spec
.service_type
1131 def __repr__(self
) -> str:
1132 return f
"<ServiceDescription of {self.spec.one_line_str()}>"
1134 def get_port_summary(self
) -> str:
1137 return f
"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1139 def to_json(self
) -> OrderedDict
:
1140 out
= self
.spec
.to_json()
1142 'container_image_id': self
.container_image_id
,
1143 'container_image_name': self
.container_image_name
,
1144 'service_url': self
.service_url
,
1146 'running': self
.running
,
1147 'last_refresh': self
.last_refresh
,
1148 'created': self
.created
,
1149 'virtual_ip': self
.virtual_ip
,
1150 'ports': self
.ports
if self
.ports
else None,
1152 for k
in ['last_refresh', 'created']:
1153 if getattr(self
, k
):
1154 status
[k
] = datetime_to_str(getattr(self
, k
))
1155 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1156 out
['status'] = status
1158 out
['events'] = [e
.to_json() for e
in self
.events
]
1161 def to_dict(self
) -> OrderedDict
:
1162 out
= self
.spec
.to_json()
1164 'container_image_id': self
.container_image_id
,
1165 'container_image_name': self
.container_image_name
,
1166 'service_url': self
.service_url
,
1168 'running': self
.running
,
1169 'last_refresh': self
.last_refresh
,
1170 'created': self
.created
,
1171 'virtual_ip': self
.virtual_ip
,
1172 'ports': self
.ports
if self
.ports
else None,
1174 for k
in ['last_refresh', 'created']:
1175 if getattr(self
, k
):
1176 status
[k
] = datetime_to_str(getattr(self
, k
))
1177 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1178 out
['status'] = status
1180 out
['events'] = [e
.to_dict() for e
in self
.events
]
1185 def from_json(cls
, data
: dict) -> 'ServiceDescription':
1187 status
= c
.pop('status', {})
1188 event_strs
= c
.pop('events', [])
1189 spec
= ServiceSpec
.from_json(c
)
1191 c_status
= status
.copy()
1192 for k
in ['last_refresh', 'created']:
1194 c_status
[k
] = str_to_datetime(c_status
[k
])
1195 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1196 return cls(spec
=spec
, events
=events
, **c_status
)
1199 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'ServiceDescription') -> Any
:
1200 return dumper
.represent_dict(cast(Mapping
, data
.to_json().items()))
1203 yaml
.add_representer(ServiceDescription
, ServiceDescription
.yaml_representer
)
1206 class InventoryFilter(object):
1208 When fetching inventory, use this filter to avoid unnecessarily
1209 scanning the whole estate.
1211 Typical use: filter by host when presenting UI workflow for configuring
1212 a particular server.
1213 filter by label when not all of estate is Ceph servers,
1214 and we want to only learn about the Ceph servers.
1215 filter by label when we are interested particularly
1216 in e.g. OSD servers.
1220 def __init__(self
, labels
: Optional
[List
[str]] = None, hosts
: Optional
[List
[str]] = None) -> None:
1222 #: Optional: get info about hosts matching labels
1223 self
.labels
= labels
1225 #: Optional: get info about certain named hosts only
1229 class InventoryHost(object):
1231 When fetching inventory, all Devices are groups inside of an
1235 def __init__(self
, name
: str, devices
: Optional
[inventory
.Devices
] = None, labels
: Optional
[List
[str]] = None, addr
: Optional
[str] = None) -> None:
1237 devices
= inventory
.Devices([])
1240 assert isinstance(devices
, inventory
.Devices
)
1242 self
.name
= name
# unique within cluster. For example a hostname.
1243 self
.addr
= addr
or name
1244 self
.devices
= devices
1245 self
.labels
= labels
1247 def to_json(self
) -> dict:
1251 'devices': self
.devices
.to_json(),
1252 'labels': self
.labels
,
1256 def from_json(cls
, data
: dict) -> 'InventoryHost':
1258 _data
= copy
.deepcopy(data
)
1259 name
= _data
.pop('name')
1260 addr
= _data
.pop('addr', None) or name
1261 devices
= inventory
.Devices
.from_json(_data
.pop('devices'))
1262 labels
= _data
.pop('labels', list())
1264 error_msg
= 'Unknown key(s) in Inventory: {}'.format(','.join(_data
.keys()))
1265 raise OrchestratorValidationError(error_msg
)
1266 return cls(name
, devices
, labels
, addr
)
1267 except KeyError as e
:
1268 error_msg
= '{} is required for {}'.format(e
, cls
.__name
__)
1269 raise OrchestratorValidationError(error_msg
)
1270 except TypeError as e
:
1271 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e
))
1274 def from_nested_items(cls
, hosts
: List
[dict]) -> List
['InventoryHost']:
1275 devs
= inventory
.Devices
.from_json
1276 return [cls(item
[0], devs(item
[1].data
)) for item
in hosts
]
1278 def __repr__(self
) -> str:
1279 return "<InventoryHost>({name})".format(name
=self
.name
)
1282 def get_host_names(hosts
: List
['InventoryHost']) -> List
[str]:
1283 return [host
.name
for host
in hosts
]
1285 def __eq__(self
, other
: Any
) -> bool:
1286 return self
.name
== other
.name
and self
.devices
== other
.devices
1289 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1291 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1294 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1296 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1297 See ``ceph osd metadata | jq '.[].device_ids'``
1302 class OrchestratorEvent
:
1304 Similar to K8s Events.
1306 Some form of "important" log message attached to something.
1310 regex_v1
= re
.compile(r
'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re
.MULTILINE
)
1312 def __init__(self
, created
: Union
[str, datetime
.datetime
], kind
: str,
1313 subject
: str, level
: str, message
: str) -> None:
1314 if isinstance(created
, str):
1315 created
= str_to_datetime(created
)
1316 self
.created
: datetime
.datetime
= created
1318 assert kind
in "service daemon".split()
1319 self
.kind
: str = kind
1321 # service name, or daemon danem or something
1322 self
.subject
: str = subject
1324 # Events are not meant for debugging. debugs should end in the log.
1325 assert level
in "INFO ERROR".split()
1328 self
.message
: str = message
1330 __slots__
= ('created', 'kind', 'subject', 'level', 'message')
1332 def kind_subject(self
) -> str:
1333 return f
'{self.kind}:{self.subject}'
1335 def to_json(self
) -> str:
1336 # Make a long list of events readable.
1337 created
= datetime_to_str(self
.created
)
1338 return f
'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1340 def to_dict(self
) -> dict:
1341 # Convert events data to dict.
1343 'created': datetime_to_str(self
.created
),
1344 'subject': self
.kind_subject(),
1345 'level': self
.level
,
1346 'message': self
.message
1351 def from_json(cls
, data
: str) -> "OrchestratorEvent":
1353 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1354 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1359 match
= cls
.regex_v1
.match(data
)
1361 return cls(*match
.groups())
1362 raise ValueError(f
'Unable to match: "{data}"')
1364 def __eq__(self
, other
: Any
) -> bool:
1365 if not isinstance(other
, OrchestratorEvent
):
1368 return self
.created
== other
.created
and self
.kind
== other
.kind \
1369 and self
.subject
== other
.subject
and self
.message
== other
.message
1372 def _mk_orch_methods(cls
: Any
) -> Any
:
1373 # Needs to be defined outside of for.
1374 # Otherwise meth is always bound to last key
1375 def shim(method_name
: str) -> Callable
:
1376 def inner(self
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
1377 completion
= self
._oremote
(method_name
, args
, kwargs
)
1381 for meth
in Orchestrator
.__dict
__:
1382 if not meth
.startswith('_') and meth
not in ['is_orchestrator_module']:
1383 setattr(cls
, meth
, shim(meth
))
1388 class OrchestratorClientMixin(Orchestrator
):
1390 A module that inherents from `OrchestratorClientMixin` can directly call
1391 all :class:`Orchestrator` methods without manually calling remote.
1393 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1394 calls :func:`OrchestratorClientMixin._oremote`
1396 >>> class MyModule(OrchestratorClientMixin):
1398 ... completion = self.add_host('somehost') # calls `_oremote()`
1399 ... self.log.debug(completion.result)
1401 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1402 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1403 "real" implementation of the orchestrator.
1406 >>> import mgr_module
1408 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1409 ... def __init__(self, ...):
1410 ... self.orch_client = OrchestratorClientMixin()
1411 ... self.orch_client.set_mgr(self.mgr))
1414 def set_mgr(self
, mgr
: MgrModule
) -> None:
1416 Useable in the Dashbord that uses a global ``mgr``
1419 self
.__mgr
= mgr
# Make sure we're not overwriting any other `mgr` properties
1421 def __get_mgr(self
) -> Any
:
1424 except AttributeError:
1427 def _oremote(self
, meth
: Any
, args
: Any
, kwargs
: Any
) -> Any
:
1429 Helper for invoking `remote` on whichever orchestrator is enabled
1431 :raises RuntimeError: If the remote method failed.
1432 :raises OrchestratorError: orchestrator failed to perform
1433 :raises ImportError: no `orchestrator` module or backend not found.
1435 mgr
= self
.__get
_mgr
()
1438 o
= mgr
._select
_orchestrator
()
1439 except AttributeError:
1440 o
= mgr
.remote('orchestrator', '_select_orchestrator')
1443 raise NoOrchestrator()
1445 mgr
.log
.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr
.module_name
, o
, meth
, args
, kwargs
))
1447 return mgr
.remote(o
, meth
, *args
, **kwargs
)
1448 except Exception as e
:
1449 if meth
== 'get_feature_set':
1450 raise # self.get_feature_set() calls self._oremote()
1451 f_set
= self
.get_feature_set()
1452 if meth
not in f_set
or not f_set
[meth
]['available']:
1453 raise NotImplementedError(f
'{o} does not implement {meth}') from e