]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
3 ceph-mgr orchestrator interface
5 Please see the ceph-mgr module developer's guide for more information.
16 from collections
import namedtuple
, OrderedDict
17 from contextlib
import contextmanager
18 from functools
import wraps
, reduce
20 from typing
import TypeVar
, Generic
, List
, Optional
, Union
, Tuple
, Iterator
, Callable
, Any
, \
24 from typing
import Protocol
# Protocol was added in Python 3.8
26 class Protocol
: # type: ignore
32 from ceph
.deployment
import inventory
33 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, \
34 IscsiServiceSpec
, IngressSpec
35 from ceph
.deployment
.drive_group
import DriveGroupSpec
36 from ceph
.deployment
.hostspec
import HostSpec
, SpecValidationError
37 from ceph
.utils
import datetime_to_str
, str_to_datetime
39 from mgr_module
import MgrModule
, CLICommand
, HandleCommandResult
42 logger
= logging
.getLogger(__name__
)
45 FuncT
= TypeVar('FuncT', bound
=Callable
[..., Any
])
48 class OrchestratorError(Exception):
50 General orchestrator specific error.
52 Used for deployment, configuration or user errors.
54 It's not intended for programming errors or orchestrator internal errors.
59 errno
: int = -errno
.EINVAL
,
60 event_kind_subject
: Optional
[Tuple
[str, str]] = None) -> None:
61 super(Exception, self
).__init
__(msg
)
63 # See OrchestratorEvent.subject
64 self
.event_subject
= event_kind_subject
67 class NoOrchestrator(OrchestratorError
):
69 No orchestrator in configured.
72 def __init__(self
, msg
: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator
, self
).__init
__(msg
, errno
=-errno
.ENOENT
)
76 class OrchestratorValidationError(OrchestratorError
):
78 Raised when an orchestrator doesn't support a specific feature.
83 def set_exception_subject(kind
: str, subject
: str, overwrite
: bool = False) -> Iterator
[None]:
86 except OrchestratorError
as e
:
87 if overwrite
or hasattr(e
, 'event_subject'):
88 e
.event_subject
= (kind
, subject
)
92 def handle_exception(prefix
: str, perm
: str, func
: FuncT
) -> FuncT
:
94 def wrapper(*args
: Any
, **kwargs
: Any
) -> Any
:
96 return func(*args
, **kwargs
)
97 except (OrchestratorError
, SpecValidationError
) as e
:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e
.errno
, stderr
=str(e
))
100 except ImportError as e
:
101 return HandleCommandResult(-errno
.ENOENT
, stderr
=str(e
))
102 except NotImplementedError:
103 msg
= 'This Orchestrator does not support `{}`'.format(prefix
)
104 return HandleCommandResult(-errno
.ENOENT
, stderr
=msg
)
106 # misuse lambda to copy `wrapper`
107 wrapper_copy
= lambda *l_args
, **l_kwargs
: wrapper(*l_args
, **l_kwargs
) # noqa: E731
108 wrapper_copy
._prefix
= prefix
# type: ignore
109 wrapper_copy
._cli
_command
= CLICommand(prefix
, perm
) # type: ignore
110 wrapper_copy
._cli
_command
.store_func_metadata(func
) # type: ignore
111 wrapper_copy
._cli
_command
.func
= wrapper_copy
# type: ignore
113 return cast(FuncT
, wrapper_copy
)
116 def handle_orch_error(f
: Callable
[..., T
]) -> Callable
[..., 'OrchResult[T]']:
118 Decorator to make Orchestrator methods return
123 def wrapper(*args
: Any
, **kwargs
: Any
) -> OrchResult
[T
]:
125 return OrchResult(f(*args
, **kwargs
))
126 except Exception as e
:
127 return OrchResult(None, exception
=e
)
129 return cast(Callable
[..., OrchResult
[T
]], wrapper
)
132 class InnerCliCommandCallable(Protocol
):
133 def __call__(self
, prefix
: str) -> Callable
[[FuncT
], FuncT
]:
137 def _cli_command(perm
: str) -> InnerCliCommandCallable
:
138 def inner_cli_command(prefix
: str) -> Callable
[[FuncT
], FuncT
]:
139 return lambda func
: handle_exception(prefix
, perm
, func
)
140 return inner_cli_command
143 _cli_read_command
= _cli_command('r')
144 _cli_write_command
= _cli_command('rw')
147 class CLICommandMeta(type):
149 This is a workaround for the use of a global variable CLICommand.COMMANDS which
150 prevents modules from importing any other module.
152 We make use of CLICommand, except for the use of the global variable.
154 def __init__(cls
, name
: str, bases
: Any
, dct
: Any
) -> None:
155 super(CLICommandMeta
, cls
).__init
__(name
, bases
, dct
)
156 dispatch
: Dict
[str, CLICommand
] = {}
157 for v
in dct
.values():
159 dispatch
[v
._prefix
] = v
._cli
_command
160 except AttributeError:
163 def handle_command(self
: Any
, inbuf
: Optional
[str], cmd
: dict) -> Any
:
164 if cmd
['prefix'] not in dispatch
:
165 return self
.handle_command(inbuf
, cmd
)
167 return dispatch
[cmd
['prefix']].call(self
, cmd
, inbuf
)
169 cls
.COMMANDS
= [cmd
.dump_cmd() for cmd
in dispatch
.values()]
170 cls
.handle_command
= handle_command
173 class OrchResult(Generic
[T
]):
175 Stores a result and an exception. Mainly to circumvent the
176 MgrModule.remote() method that hides all exceptions and for
177 handling different sub-interpreters.
180 def __init__(self
, result
: Optional
[T
], exception
: Optional
[Exception] = None) -> None:
182 self
.serialized_exception
: Optional
[bytes
] = None
183 self
.exception_str
: str = ''
184 self
.set_exception(exception
)
186 __slots__
= 'result', 'serialized_exception', 'exception_str'
188 def set_exception(self
, e
: Optional
[Exception]) -> None:
190 self
.serialized_exception
= None
191 self
.exception_str
= ''
194 self
.exception_str
= f
'{type(e)}: {str(e)}'
196 self
.serialized_exception
= pickle
.dumps(e
)
197 except pickle
.PicklingError
:
198 logger
.error(f
"failed to pickle {e}")
199 if isinstance(e
, Exception):
200 e
= Exception(*e
.args
)
202 e
= Exception(str(e
))
203 # degenerate to a plain Exception
204 self
.serialized_exception
= pickle
.dumps(e
)
206 def result_str(self
) -> str:
207 """Force a string."""
208 if self
.result
is None:
210 if isinstance(self
.result
, list):
211 return '\n'.join(str(x
) for x
in self
.result
)
212 return str(self
.result
)
215 def raise_if_exception(c
: OrchResult
[T
]) -> T
:
217 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
219 if c
.serialized_exception
is not None:
221 e
= pickle
.loads(c
.serialized_exception
)
222 except (KeyError, AttributeError):
223 raise Exception(c
.exception_str
)
225 assert c
.result
is not None, 'OrchResult should either have an exception or a result'
229 def _hide_in_features(f
: FuncT
) -> FuncT
:
230 f
._hide
_in
_features
= True # type: ignore
234 class Orchestrator(object):
236 Calls in this class may do long running remote operations, with time
237 periods ranging from network latencies to package install latencies and large
238 internet downloads. For that reason, all are asynchronous, and return
239 ``Completion`` objects.
241 Methods should only return the completion and not directly execute
242 anything, like network calls. Otherwise the purpose of
243 those completions is defeated.
245 Implementations are not required to start work on an operation until
246 the caller waits on the relevant Completion objects. Callers making
247 multiple updates should not wait on Completions until they're done
248 sending operations: this enables implementations to batch up a series
249 of updates when wait() is called on a set of Completion objects.
251 Implementations are encouraged to keep reasonably fresh caches of
252 the status of the system: it is better to serve a stale-but-recent
253 result read of e.g. device inventory than it is to keep the caller waiting
254 while you scan hosts every time.
258 def is_orchestrator_module(self
) -> bool:
260 Enable other modules to interrogate this module to discover
261 whether it's usable as an orchestrator module.
263 Subclasses do not need to override this.
268 def available(self
) -> Tuple
[bool, str, Dict
[str, Any
]]:
270 Report whether we can talk to the orchestrator. This is the
271 place to give the user a meaningful message if the orchestrator
272 isn't running or can't be contacted.
274 This method may be called frequently (e.g. every page load
275 to conditionally display a warning banner), so make sure it's
276 not too expensive. It's okay to give a slightly stale status
277 (e.g. based on a periodic background ping of the orchestrator)
278 if that's necessary to make this method fast.
281 `True` doesn't mean that the desired functionality
282 is actually available in the orchestrator. I.e. this
283 won't work as expected::
286 ... if OrchestratorClientMixin().available()[0]: # wrong.
287 ... OrchestratorClientMixin().get_hosts()
289 :return: boolean representing whether the module is available/usable
290 :return: string describing any error
291 :return: dict containing any module specific information
293 raise NotImplementedError()
296 def get_feature_set(self
) -> Dict
[str, dict]:
297 """Describes which methods this orchestrator implements
300 `True` doesn't mean that the desired functionality
301 is actually possible in the orchestrator. I.e. this
302 won't work as expected::
305 ... api = OrchestratorClientMixin()
306 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
309 It's better to ask for forgiveness instead::
313 ... OrchestratorClientMixin().get_hosts()
314 ... except (OrchestratorError, NotImplementedError):
317 :returns: Dict of API method names to ``{'available': True or False}``
319 module
= self
.__class
__
320 features
= {a
: {'available': getattr(Orchestrator
, a
, None) != getattr(module
, a
)}
321 for a
in Orchestrator
.__dict
__
322 if not a
.startswith('_') and not getattr(getattr(Orchestrator
, a
), '_hide_in_features', False)
326 def cancel_completions(self
) -> None:
328 Cancels ongoing completions. Unstuck the mgr.
330 raise NotImplementedError()
332 def pause(self
) -> None:
333 raise NotImplementedError()
335 def resume(self
) -> None:
336 raise NotImplementedError()
338 def add_host(self
, host_spec
: HostSpec
) -> OrchResult
[str]:
340 Add a host to the orchestrator inventory.
342 :param host: hostname
344 raise NotImplementedError()
346 def remove_host(self
, host
: str) -> OrchResult
[str]:
348 Remove a host from the orchestrator inventory.
350 :param host: hostname
352 raise NotImplementedError()
354 def update_host_addr(self
, host
: str, addr
: str) -> OrchResult
[str]:
356 Update a host's address
358 :param host: hostname
359 :param addr: address (dns name or IP)
361 raise NotImplementedError()
363 def get_hosts(self
) -> OrchResult
[List
[HostSpec
]]:
365 Report the hosts in the cluster.
367 :return: list of HostSpec
369 raise NotImplementedError()
371 def add_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
375 raise NotImplementedError()
377 def remove_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
381 raise NotImplementedError()
383 def host_ok_to_stop(self
, hostname
: str) -> OrchResult
:
385 Check if the specified host can be safely stopped without reducing availability
387 :param host: hostname
389 raise NotImplementedError()
391 def enter_host_maintenance(self
, hostname
: str, force
: bool = False) -> OrchResult
:
393 Place a host in maintenance, stopping daemons and disabling it's systemd target
395 raise NotImplementedError()
397 def exit_host_maintenance(self
, hostname
: str) -> OrchResult
:
399 Return a host from maintenance, restarting the clusters systemd target
401 raise NotImplementedError()
403 def get_inventory(self
, host_filter
: Optional
['InventoryFilter'] = None, refresh
: bool = False) -> OrchResult
[List
['InventoryHost']]:
405 Returns something that was created by `ceph-volume inventory`.
407 :return: list of InventoryHost
409 raise NotImplementedError()
411 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['ServiceDescription']]:
413 Describe a service (of any kind) that is already configured in
414 the orchestrator. For example, when viewing an OSD in the dashboard
415 we might like to also display information about the orchestrator's
416 view of the service (like the kubernetes pod ID).
418 When viewing a CephFS filesystem in the dashboard, we would use this
419 to display the pods being currently run for MDS daemons.
421 :return: list of ServiceDescription objects.
423 raise NotImplementedError()
425 def list_daemons(self
, service_name
: Optional
[str] = None, daemon_type
: Optional
[str] = None, daemon_id
: Optional
[str] = None, host
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['DaemonDescription']]:
427 Describe a daemon (of any kind) that is already configured in
430 :return: list of DaemonDescription objects.
432 raise NotImplementedError()
435 def apply(self
, specs
: Sequence
["GenericSpec"], no_overwrite
: bool = False) -> List
[str]:
439 fns
: Dict
[str, Callable
[..., OrchResult
[str]]] = {
440 'alertmanager': self
.apply_alertmanager
,
441 'crash': self
.apply_crash
,
442 'grafana': self
.apply_grafana
,
443 'iscsi': self
.apply_iscsi
,
444 'mds': self
.apply_mds
,
445 'mgr': self
.apply_mgr
,
446 'mon': self
.apply_mon
,
447 'nfs': self
.apply_nfs
,
448 'node-exporter': self
.apply_node_exporter
,
449 'osd': lambda dg
: self
.apply_drivegroups([dg
]), # type: ignore
450 'prometheus': self
.apply_prometheus
,
451 'rbd-mirror': self
.apply_rbd_mirror
,
452 'rgw': self
.apply_rgw
,
453 'ingress': self
.apply_ingress
,
454 'host': self
.add_host
,
455 'cephadm-exporter': self
.apply_cephadm_exporter
,
458 def merge(l
: OrchResult
[List
[str]], r
: OrchResult
[str]) -> OrchResult
[List
[str]]: # noqa: E741
459 l_res
= raise_if_exception(l
)
460 r_res
= raise_if_exception(r
)
462 return OrchResult(l_res
)
463 return raise_if_exception(reduce(merge
, [fns
[spec
.service_type
](spec
) for spec
in specs
], OrchResult([])))
465 def plan(self
, spec
: Sequence
["GenericSpec"]) -> OrchResult
[List
]:
467 Plan (Dry-run, Preview) a List of Specs.
469 raise NotImplementedError()
471 def remove_daemons(self
, names
: List
[str]) -> OrchResult
[List
[str]]:
473 Remove specific daemon(s).
477 raise NotImplementedError()
479 def remove_service(self
, service_name
: str) -> OrchResult
[str]:
481 Remove a service (a collection of daemons).
485 raise NotImplementedError()
487 def service_action(self
, action
: str, service_name
: str) -> OrchResult
[List
[str]]:
489 Perform an action (start/stop/reload) on a service (i.e., all daemons
490 providing the logical service).
492 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
493 :param service_name: service_type + '.' + service_id
494 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
497 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
498 raise NotImplementedError()
500 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> OrchResult
[str]:
502 Perform an action (start/stop/reload) on a daemon.
504 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
505 :param daemon_name: name of daemon
506 :param image: Container image when redeploying that daemon
509 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
510 raise NotImplementedError()
512 def create_osds(self
, drive_group
: DriveGroupSpec
) -> OrchResult
[str]:
514 Create one or more OSDs within a single Drive Group.
516 The principal argument here is the drive_group member
517 of OsdSpec: other fields are advisory/extensible for any
518 finer-grained OSD feature enablement (choice of backing store,
519 compression/encryption, etc).
521 raise NotImplementedError()
523 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> OrchResult
[List
[str]]:
524 """ Update OSD cluster """
525 raise NotImplementedError()
527 def set_unmanaged_flag(self
,
528 unmanaged_flag
: bool,
529 service_type
: str = 'osd',
530 service_name
: Optional
[str] = None
531 ) -> HandleCommandResult
:
532 raise NotImplementedError()
534 def preview_osdspecs(self
,
535 osdspec_name
: Optional
[str] = 'osd',
536 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
537 ) -> OrchResult
[str]:
538 """ Get a preview for OSD deployments """
539 raise NotImplementedError()
541 def remove_osds(self
, osd_ids
: List
[str],
542 replace
: bool = False,
543 force
: bool = False) -> OrchResult
[str]:
545 :param osd_ids: list of OSD IDs
546 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
547 :param force: Forces the OSD removal process without waiting for the data to be drained first.
548 Note that this can only remove OSDs that were successfully
549 created (i.e. got an OSD ID).
551 raise NotImplementedError()
553 def stop_remove_osds(self
, osd_ids
: List
[str]) -> OrchResult
:
557 raise NotImplementedError()
559 def remove_osds_status(self
) -> OrchResult
:
561 Returns a status of the ongoing OSD removal operations.
563 raise NotImplementedError()
565 def blink_device_light(self
, ident_fault
: str, on
: bool, locations
: List
['DeviceLightLoc']) -> OrchResult
[List
[str]]:
567 Instructs the orchestrator to enable or disable either the ident or the fault LED.
569 :param ident_fault: either ``"ident"`` or ``"fault"``
570 :param on: ``True`` = on.
571 :param locations: See :class:`orchestrator.DeviceLightLoc`
573 raise NotImplementedError()
575 def zap_device(self
, host
: str, path
: str) -> OrchResult
[str]:
576 """Zap/Erase a device (DESTROYS DATA)"""
577 raise NotImplementedError()
579 def add_daemon(self
, spec
: ServiceSpec
) -> OrchResult
[List
[str]]:
580 """Create daemons daemon(s) for unmanaged services"""
581 raise NotImplementedError()
583 def apply_mon(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
584 """Update mon cluster"""
585 raise NotImplementedError()
587 def apply_mgr(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
588 """Update mgr cluster"""
589 raise NotImplementedError()
591 def apply_mds(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
592 """Update MDS cluster"""
593 raise NotImplementedError()
595 def apply_rgw(self
, spec
: RGWSpec
) -> OrchResult
[str]:
596 """Update RGW cluster"""
597 raise NotImplementedError()
599 def apply_ingress(self
, spec
: IngressSpec
) -> OrchResult
[str]:
600 """Update ingress daemons"""
601 raise NotImplementedError()
603 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
604 """Update rbd-mirror cluster"""
605 raise NotImplementedError()
607 def apply_nfs(self
, spec
: NFSServiceSpec
) -> OrchResult
[str]:
608 """Update NFS cluster"""
609 raise NotImplementedError()
611 def apply_iscsi(self
, spec
: IscsiServiceSpec
) -> OrchResult
[str]:
612 """Update iscsi cluster"""
613 raise NotImplementedError()
615 def apply_prometheus(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
616 """Update prometheus cluster"""
617 raise NotImplementedError()
619 def apply_node_exporter(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
620 """Update existing a Node-Exporter daemon(s)"""
621 raise NotImplementedError()
623 def apply_crash(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
624 """Update existing a crash daemon(s)"""
625 raise NotImplementedError()
627 def apply_grafana(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
628 """Update existing a grafana service"""
629 raise NotImplementedError()
631 def apply_alertmanager(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
632 """Update an existing AlertManager daemon(s)"""
633 raise NotImplementedError()
635 def apply_cephadm_exporter(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
636 """Update an existing cephadm exporter daemon"""
637 raise NotImplementedError()
639 def upgrade_check(self
, image
: Optional
[str], version
: Optional
[str]) -> OrchResult
[str]:
640 raise NotImplementedError()
642 def upgrade_start(self
, image
: Optional
[str], version
: Optional
[str]) -> OrchResult
[str]:
643 raise NotImplementedError()
645 def upgrade_pause(self
) -> OrchResult
[str]:
646 raise NotImplementedError()
648 def upgrade_resume(self
) -> OrchResult
[str]:
649 raise NotImplementedError()
651 def upgrade_stop(self
) -> OrchResult
[str]:
652 raise NotImplementedError()
654 def upgrade_status(self
) -> OrchResult
['UpgradeStatusSpec']:
656 If an upgrade is currently underway, report on where
657 we are in the process, or if some error has occurred.
659 :return: UpgradeStatusSpec instance
661 raise NotImplementedError()
664 def upgrade_available(self
) -> OrchResult
:
666 Report on what versions are available to upgrade to
668 :return: List of strings
670 raise NotImplementedError()
673 GenericSpec
= Union
[ServiceSpec
, HostSpec
]
676 def json_to_generic_spec(spec
: dict) -> GenericSpec
:
677 if 'service_type' in spec
and spec
['service_type'] == 'host':
678 return HostSpec
.from_json(spec
)
680 return ServiceSpec
.from_json(spec
)
683 def daemon_type_to_service(dtype
: str) -> str:
690 'haproxy': 'ingress',
691 'keepalived': 'ingress',
693 'rbd-mirror': 'rbd-mirror',
694 'cephfs-mirror': 'cephfs-mirror',
696 'grafana': 'grafana',
697 'alertmanager': 'alertmanager',
698 'prometheus': 'prometheus',
699 'node-exporter': 'node-exporter',
701 'crashcollector': 'crash', # Specific Rook Daemon
702 'container': 'container',
703 'cephadm-exporter': 'cephadm-exporter',
705 return mapping
[dtype
]
708 def service_to_daemon_types(stype
: str) -> List
[str]:
715 'ingress': ['haproxy', 'keepalived'],
717 'rbd-mirror': ['rbd-mirror'],
718 'cephfs-mirror': ['cephfs-mirror'],
720 'grafana': ['grafana'],
721 'alertmanager': ['alertmanager'],
722 'prometheus': ['prometheus'],
723 'node-exporter': ['node-exporter'],
725 'container': ['container'],
726 'cephadm-exporter': ['cephadm-exporter'],
728 return mapping
[stype
]
731 class UpgradeStatusSpec(object):
732 # Orchestrator's report on what's going on with any ongoing upgrade
733 def __init__(self
) -> None:
734 self
.in_progress
= False # Is an upgrade underway?
735 self
.target_image
: Optional
[str] = None
736 self
.services_complete
: List
[str] = [] # Which daemon types are fully updated?
737 self
.progress
: Optional
[str] = None # How many of the daemons have we upgraded
738 self
.message
= "" # Freeform description
741 def handle_type_error(method
: FuncT
) -> FuncT
:
743 def inner(cls
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
745 return method(cls
, *args
, **kwargs
)
746 except TypeError as e
:
747 error_msg
= '{}: {}'.format(cls
.__name
__, e
)
748 raise OrchestratorValidationError(error_msg
)
749 return cast(FuncT
, inner
)
752 class DaemonDescriptionStatus(enum
.IntEnum
):
758 class DaemonDescription(object):
760 For responding to queries about the status of a particular daemon,
761 stateful or stateless.
763 This is not about health or performance monitoring of daemons: it's
764 about letting the orchestrator tell Ceph whether and where a
765 daemon is scheduled in the cluster. When an orchestrator tells
766 Ceph "it's running on host123", that's not a promise that the process
767 is literally up this second, it's a description of where the orchestrator
768 has decided the daemon should run.
772 daemon_type
: Optional
[str] = None,
773 daemon_id
: Optional
[str] = None,
774 hostname
: Optional
[str] = None,
775 container_id
: Optional
[str] = None,
776 container_image_id
: Optional
[str] = None,
777 container_image_name
: Optional
[str] = None,
778 container_image_digests
: Optional
[List
[str]] = None,
779 version
: Optional
[str] = None,
780 status
: Optional
[DaemonDescriptionStatus
] = None,
781 status_desc
: Optional
[str] = None,
782 last_refresh
: Optional
[datetime
.datetime
] = None,
783 created
: Optional
[datetime
.datetime
] = None,
784 started
: Optional
[datetime
.datetime
] = None,
785 last_configured
: Optional
[datetime
.datetime
] = None,
786 osdspec_affinity
: Optional
[str] = None,
787 last_deployed
: Optional
[datetime
.datetime
] = None,
788 events
: Optional
[List
['OrchestratorEvent']] = None,
789 is_active
: bool = False,
790 memory_usage
: Optional
[int] = None,
791 memory_request
: Optional
[int] = None,
792 memory_limit
: Optional
[int] = None,
793 service_name
: Optional
[str] = None,
794 ports
: Optional
[List
[int]] = None,
795 ip
: Optional
[str] = None,
796 deployed_by
: Optional
[List
[str]] = None,
797 rank
: Optional
[int] = None,
798 rank_generation
: Optional
[int] = None,
801 # Host is at the same granularity as InventoryHost
802 self
.hostname
: Optional
[str] = hostname
804 # Not everyone runs in containers, but enough people do to
805 # justify having the container_id (runtime id) and container_image
807 self
.container_id
= container_id
# runtime id
808 self
.container_image_id
= container_image_id
# image id locally
809 self
.container_image_name
= container_image_name
# image friendly name
810 self
.container_image_digests
= container_image_digests
# reg hashes
812 # The type of service (osd, mon, mgr, etc.)
813 self
.daemon_type
= daemon_type
815 # The orchestrator will have picked some names for daemons,
816 # typically either based on hostnames or on pod names.
817 # This is the <foo> in mds.<foo>, the ID that will appear
818 # in the FSMap/ServiceMap.
819 self
.daemon_id
: Optional
[str] = daemon_id
821 # Some daemon types have a numeric rank assigned
822 self
.rank
: Optional
[int] = rank
823 self
.rank_generation
: Optional
[int] = rank_generation
825 self
._service
_name
: Optional
[str] = service_name
827 # Service version that was deployed
828 self
.version
= version
830 # Service status: -1 error, 0 stopped, 1 running
833 # Service status description when status == error.
834 self
.status_desc
= status_desc
836 # datetime when this info was last refreshed
837 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
839 self
.created
: Optional
[datetime
.datetime
] = created
840 self
.started
: Optional
[datetime
.datetime
] = started
841 self
.last_configured
: Optional
[datetime
.datetime
] = last_configured
842 self
.last_deployed
: Optional
[datetime
.datetime
] = last_deployed
844 # Affinity to a certain OSDSpec
845 self
.osdspec_affinity
: Optional
[str] = osdspec_affinity
847 self
.events
: List
[OrchestratorEvent
] = events
or []
849 self
.memory_usage
: Optional
[int] = memory_usage
850 self
.memory_request
: Optional
[int] = memory_request
851 self
.memory_limit
: Optional
[int] = memory_limit
853 self
.ports
: Optional
[List
[int]] = ports
854 self
.ip
: Optional
[str] = ip
856 self
.deployed_by
= deployed_by
858 self
.is_active
= is_active
860 def get_port_summary(self
) -> str:
863 return f
"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
865 def name(self
) -> str:
866 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
868 def matches_service(self
, service_name
: Optional
[str]) -> bool:
869 assert self
.daemon_id
is not None
870 assert self
.daemon_type
is not None
872 return (daemon_type_to_service(self
.daemon_type
) + '.' + self
.daemon_id
).startswith(service_name
+ '.')
875 def service_id(self
) -> str:
876 assert self
.daemon_id
is not None
877 assert self
.daemon_type
is not None
879 if self
._service
_name
:
880 if '.' in self
._service
_name
:
881 return self
._service
_name
.split('.', 1)[1]
885 if self
.daemon_type
== 'osd':
886 if self
.osdspec_affinity
and self
.osdspec_affinity
!= 'None':
887 return self
.osdspec_affinity
891 assert self
.daemon_id
is not None
892 err
= OrchestratorError("DaemonDescription: Cannot calculate service_id: "
893 f
"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
895 if not self
.hostname
:
896 # TODO: can a DaemonDescription exist without a hostname?
899 # use the bare hostname, not the FQDN.
900 host
= self
.hostname
.split('.')[0]
902 if host
== self
.daemon_id
:
903 # daemon_id == "host"
904 return self
.daemon_id
906 elif host
in self
.daemon_id
:
907 # daemon_id == "service_id.host"
908 # daemon_id == "service_id.host.random"
909 pre
, post
= self
.daemon_id
.rsplit(host
, 1)
910 if not pre
.endswith('.'):
911 # '.' sep missing at front of host
913 elif post
and not post
.startswith('.'):
914 # '.' sep missing at end of host
918 # daemon_id == "service_id.random"
919 if self
.daemon_type
== 'rgw':
920 v
= self
.daemon_id
.split('.')
922 return '.'.join(v
[0:2])
924 if self
.daemon_type
== 'iscsi':
925 v
= self
.daemon_id
.split('.')
926 return '.'.join(v
[0:-1])
928 # daemon_id == "service_id"
929 return self
.daemon_id
931 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
934 return self
.daemon_id
936 def service_name(self
) -> str:
937 if self
._service
_name
:
938 return self
._service
_name
939 assert self
.daemon_type
is not None
940 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
941 return f
'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
942 return daemon_type_to_service(self
.daemon_type
)
944 def __repr__(self
) -> str:
945 return "<DaemonDescription>({type}.{id})".format(type=self
.daemon_type
,
948 def to_json(self
) -> dict:
949 out
: Dict
[str, Any
] = OrderedDict()
950 out
['daemon_type'] = self
.daemon_type
951 out
['daemon_id'] = self
.daemon_id
952 out
['service_name'] = self
._service
_name
953 out
['hostname'] = self
.hostname
954 out
['container_id'] = self
.container_id
955 out
['container_image_id'] = self
.container_image_id
956 out
['container_image_name'] = self
.container_image_name
957 out
['container_image_digests'] = self
.container_image_digests
958 out
['memory_usage'] = self
.memory_usage
959 out
['memory_request'] = self
.memory_request
960 out
['memory_limit'] = self
.memory_limit
961 out
['version'] = self
.version
962 out
['status'] = self
.status
.value
if self
.status
is not None else None
963 out
['status_desc'] = self
.status_desc
964 if self
.daemon_type
== 'osd':
965 out
['osdspec_affinity'] = self
.osdspec_affinity
966 out
['is_active'] = self
.is_active
967 out
['ports'] = self
.ports
969 out
['rank'] = self
.rank
970 out
['rank_generation'] = self
.rank_generation
972 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
975 out
[k
] = datetime_to_str(getattr(self
, k
))
978 out
['events'] = [e
.to_json() for e
in self
.events
]
980 empty
= [k
for k
, v
in out
.items() if v
is None]
985 def to_dict(self
) -> dict:
986 out
: Dict
[str, Any
] = OrderedDict()
987 out
['daemon_type'] = self
.daemon_type
988 out
['daemon_id'] = self
.daemon_id
989 out
['hostname'] = self
.hostname
990 out
['container_id'] = self
.container_id
991 out
['container_image_id'] = self
.container_image_id
992 out
['container_image_name'] = self
.container_image_name
993 out
['container_image_digests'] = self
.container_image_digests
994 out
['memory_usage'] = self
.memory_usage
995 out
['memory_request'] = self
.memory_request
996 out
['memory_limit'] = self
.memory_limit
997 out
['version'] = self
.version
998 out
['status'] = self
.status
.value
if self
.status
is not None else None
999 out
['status_desc'] = self
.status_desc
1000 if self
.daemon_type
== 'osd':
1001 out
['osdspec_affinity'] = self
.osdspec_affinity
1002 out
['is_active'] = self
.is_active
1003 out
['ports'] = self
.ports
1006 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1008 if getattr(self
, k
):
1009 out
[k
] = datetime_to_str(getattr(self
, k
))
1012 out
['events'] = [e
.to_dict() for e
in self
.events
]
1014 empty
= [k
for k
, v
in out
.items() if v
is None]
1021 def from_json(cls
, data
: dict) -> 'DaemonDescription':
1023 event_strs
= c
.pop('events', [])
1024 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1027 c
[k
] = str_to_datetime(c
[k
])
1028 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1029 status_int
= c
.pop('status', None)
1030 status
= DaemonDescriptionStatus(status_int
) if status_int
is not None else None
1031 return cls(events
=events
, status
=status
, **c
)
1033 def __copy__(self
) -> 'DaemonDescription':
1034 # feel free to change this:
1035 return DaemonDescription
.from_json(self
.to_json())
1038 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'DaemonDescription') -> Any
:
1039 return dumper
.represent_dict(data
.to_json().items())
1042 yaml
.add_representer(DaemonDescription
, DaemonDescription
.yaml_representer
)
1045 class ServiceDescription(object):
1047 For responding to queries about the status of a particular service,
1048 stateful or stateless.
1050 This is not about health or performance monitoring of services: it's
1051 about letting the orchestrator tell Ceph whether and where a
1052 service is scheduled in the cluster. When an orchestrator tells
1053 Ceph "it's running on host123", that's not a promise that the process
1054 is literally up this second, it's a description of where the orchestrator
1055 has decided the service should run.
1060 container_image_id
: Optional
[str] = None,
1061 container_image_name
: Optional
[str] = None,
1062 rados_config_location
: Optional
[str] = None,
1063 service_url
: Optional
[str] = None,
1064 last_refresh
: Optional
[datetime
.datetime
] = None,
1065 created
: Optional
[datetime
.datetime
] = None,
1066 deleted
: Optional
[datetime
.datetime
] = None,
1069 events
: Optional
[List
['OrchestratorEvent']] = None,
1070 virtual_ip
: Optional
[str] = None,
1071 ports
: List
[int] = []) -> None:
1072 # Not everyone runs in containers, but enough people do to
1073 # justify having the container_image_id (image hash) and container_image
1075 self
.container_image_id
= container_image_id
# image hash
1076 self
.container_image_name
= container_image_name
# image friendly name
1078 # Location of the service configuration when stored in rados
1079 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1080 self
.rados_config_location
= rados_config_location
1082 # If the service exposes REST-like API, this attribute should hold
1084 self
.service_url
= service_url
1089 # Number of daemons up
1090 self
.running
= running
1092 # datetime when this info was last refreshed
1093 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
1094 self
.created
: Optional
[datetime
.datetime
] = created
1095 self
.deleted
: Optional
[datetime
.datetime
] = deleted
1097 self
.spec
: ServiceSpec
= spec
1099 self
.events
: List
[OrchestratorEvent
] = events
or []
1101 self
.virtual_ip
= virtual_ip
1104 def service_type(self
) -> str:
1105 return self
.spec
.service_type
1107 def __repr__(self
) -> str:
1108 return f
"<ServiceDescription of {self.spec.one_line_str()}>"
1110 def get_port_summary(self
) -> str:
1113 return f
"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1115 def to_json(self
) -> OrderedDict
:
1116 out
= self
.spec
.to_json()
1118 'container_image_id': self
.container_image_id
,
1119 'container_image_name': self
.container_image_name
,
1120 'rados_config_location': self
.rados_config_location
,
1121 'service_url': self
.service_url
,
1123 'running': self
.running
,
1124 'last_refresh': self
.last_refresh
,
1125 'created': self
.created
,
1126 'virtual_ip': self
.virtual_ip
,
1127 'ports': self
.ports
if self
.ports
else None,
1129 for k
in ['last_refresh', 'created']:
1130 if getattr(self
, k
):
1131 status
[k
] = datetime_to_str(getattr(self
, k
))
1132 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1133 out
['status'] = status
1135 out
['events'] = [e
.to_json() for e
in self
.events
]
1138 def to_dict(self
) -> OrderedDict
:
1139 out
= self
.spec
.to_json()
1141 'container_image_id': self
.container_image_id
,
1142 'container_image_name': self
.container_image_name
,
1143 'rados_config_location': self
.rados_config_location
,
1144 'service_url': self
.service_url
,
1146 'running': self
.running
,
1147 'last_refresh': self
.last_refresh
,
1148 'created': self
.created
,
1149 'virtual_ip': self
.virtual_ip
,
1150 'ports': self
.ports
if self
.ports
else None,
1152 for k
in ['last_refresh', 'created']:
1153 if getattr(self
, k
):
1154 status
[k
] = datetime_to_str(getattr(self
, k
))
1155 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1156 out
['status'] = status
1158 out
['events'] = [e
.to_dict() for e
in self
.events
]
1163 def from_json(cls
, data
: dict) -> 'ServiceDescription':
1165 status
= c
.pop('status', {})
1166 event_strs
= c
.pop('events', [])
1167 spec
= ServiceSpec
.from_json(c
)
1169 c_status
= status
.copy()
1170 for k
in ['last_refresh', 'created']:
1172 c_status
[k
] = str_to_datetime(c_status
[k
])
1173 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1174 return cls(spec
=spec
, events
=events
, **c_status
)
1177 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'DaemonDescription') -> Any
:
1178 return dumper
.represent_dict(data
.to_json().items())
1181 yaml
.add_representer(ServiceDescription
, ServiceDescription
.yaml_representer
)
1184 class InventoryFilter(object):
1186 When fetching inventory, use this filter to avoid unnecessarily
1187 scanning the whole estate.
1189 Typical use: filter by host when presenting UI workflow for configuring
1190 a particular server.
1191 filter by label when not all of estate is Ceph servers,
1192 and we want to only learn about the Ceph servers.
1193 filter by label when we are interested particularly
1194 in e.g. OSD servers.
1198 def __init__(self
, labels
: Optional
[List
[str]] = None, hosts
: Optional
[List
[str]] = None) -> None:
1200 #: Optional: get info about hosts matching labels
1201 self
.labels
= labels
1203 #: Optional: get info about certain named hosts only
1207 class InventoryHost(object):
1209 When fetching inventory, all Devices are groups inside of an
1213 def __init__(self
, name
: str, devices
: Optional
[inventory
.Devices
] = None, labels
: Optional
[List
[str]] = None, addr
: Optional
[str] = None) -> None:
1215 devices
= inventory
.Devices([])
1218 assert isinstance(devices
, inventory
.Devices
)
1220 self
.name
= name
# unique within cluster. For example a hostname.
1221 self
.addr
= addr
or name
1222 self
.devices
= devices
1223 self
.labels
= labels
1225 def to_json(self
) -> dict:
1229 'devices': self
.devices
.to_json(),
1230 'labels': self
.labels
,
1234 def from_json(cls
, data
: dict) -> 'InventoryHost':
1236 _data
= copy
.deepcopy(data
)
1237 name
= _data
.pop('name')
1238 addr
= _data
.pop('addr', None) or name
1239 devices
= inventory
.Devices
.from_json(_data
.pop('devices'))
1240 labels
= _data
.pop('labels', list())
1242 error_msg
= 'Unknown key(s) in Inventory: {}'.format(','.join(_data
.keys()))
1243 raise OrchestratorValidationError(error_msg
)
1244 return cls(name
, devices
, labels
, addr
)
1245 except KeyError as e
:
1246 error_msg
= '{} is required for {}'.format(e
, cls
.__name
__)
1247 raise OrchestratorValidationError(error_msg
)
1248 except TypeError as e
:
1249 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e
))
1252 def from_nested_items(cls
, hosts
: List
[dict]) -> List
['InventoryHost']:
1253 devs
= inventory
.Devices
.from_json
1254 return [cls(item
[0], devs(item
[1].data
)) for item
in hosts
]
1256 def __repr__(self
) -> str:
1257 return "<InventoryHost>({name})".format(name
=self
.name
)
1260 def get_host_names(hosts
: List
['InventoryHost']) -> List
[str]:
1261 return [host
.name
for host
in hosts
]
1263 def __eq__(self
, other
: Any
) -> bool:
1264 return self
.name
== other
.name
and self
.devices
== other
.devices
1267 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1269 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1272 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1274 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1275 See ``ceph osd metadata | jq '.[].device_ids'``
1280 class OrchestratorEvent
:
1282 Similar to K8s Events.
1284 Some form of "important" log message attached to something.
1288 regex_v1
= re
.compile(r
'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re
.MULTILINE
)
1290 def __init__(self
, created
: Union
[str, datetime
.datetime
], kind
: str,
1291 subject
: str, level
: str, message
: str) -> None:
1292 if isinstance(created
, str):
1293 created
= str_to_datetime(created
)
1294 self
.created
: datetime
.datetime
= created
1296 assert kind
in "service daemon".split()
1297 self
.kind
: str = kind
1299 # service name, or daemon danem or something
1300 self
.subject
: str = subject
1302 # Events are not meant for debugging. debugs should end in the log.
1303 assert level
in "INFO ERROR".split()
1306 self
.message
: str = message
1308 __slots__
= ('created', 'kind', 'subject', 'level', 'message')
1310 def kind_subject(self
) -> str:
1311 return f
'{self.kind}:{self.subject}'
1313 def to_json(self
) -> str:
1314 # Make a long list of events readable.
1315 created
= datetime_to_str(self
.created
)
1316 return f
'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1318 def to_dict(self
) -> dict:
1319 # Convert events data to dict.
1321 'created': datetime_to_str(self
.created
),
1322 'subject': self
.kind_subject(),
1323 'level': self
.level
,
1324 'message': self
.message
1329 def from_json(cls
, data
: str) -> "OrchestratorEvent":
1331 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1332 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1337 match
= cls
.regex_v1
.match(data
)
1339 return cls(*match
.groups())
1340 raise ValueError(f
'Unable to match: "{data}"')
1342 def __eq__(self
, other
: Any
) -> bool:
1343 if not isinstance(other
, OrchestratorEvent
):
1346 return self
.created
== other
.created
and self
.kind
== other
.kind \
1347 and self
.subject
== other
.subject
and self
.message
== other
.message
1350 def _mk_orch_methods(cls
: Any
) -> Any
:
1351 # Needs to be defined outside of for.
1352 # Otherwise meth is always bound to last key
1353 def shim(method_name
: str) -> Callable
:
1354 def inner(self
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
1355 completion
= self
._oremote
(method_name
, args
, kwargs
)
1359 for meth
in Orchestrator
.__dict
__:
1360 if not meth
.startswith('_') and meth
not in ['is_orchestrator_module']:
1361 setattr(cls
, meth
, shim(meth
))
1366 class OrchestratorClientMixin(Orchestrator
):
1368 A module that inherents from `OrchestratorClientMixin` can directly call
1369 all :class:`Orchestrator` methods without manually calling remote.
1371 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1372 calls :func:`OrchestratorClientMixin._oremote`
1374 >>> class MyModule(OrchestratorClientMixin):
1376 ... completion = self.add_host('somehost') # calls `_oremote()`
1377 ... self.log.debug(completion.result)
1379 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1380 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1381 "real" implementation of the orchestrator.
1384 >>> import mgr_module
1386 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1387 ... def __init__(self, ...):
1388 ... self.orch_client = OrchestratorClientMixin()
1389 ... self.orch_client.set_mgr(self.mgr))
1392 def set_mgr(self
, mgr
: MgrModule
) -> None:
1394 Useable in the Dashbord that uses a global ``mgr``
1397 self
.__mgr
= mgr
# Make sure we're not overwriting any other `mgr` properties
1399 def __get_mgr(self
) -> Any
:
1402 except AttributeError:
1405 def _oremote(self
, meth
: Any
, args
: Any
, kwargs
: Any
) -> Any
:
1407 Helper for invoking `remote` on whichever orchestrator is enabled
1409 :raises RuntimeError: If the remote method failed.
1410 :raises OrchestratorError: orchestrator failed to perform
1411 :raises ImportError: no `orchestrator` module or backend not found.
1413 mgr
= self
.__get
_mgr
()
1416 o
= mgr
._select
_orchestrator
()
1417 except AttributeError:
1418 o
= mgr
.remote('orchestrator', '_select_orchestrator')
1421 raise NoOrchestrator()
1423 mgr
.log
.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr
.module_name
, o
, meth
, args
, kwargs
))
1425 return mgr
.remote(o
, meth
, *args
, **kwargs
)
1426 except Exception as e
:
1427 if meth
== 'get_feature_set':
1428 raise # self.get_feature_set() calls self._oremote()
1429 f_set
= self
.get_feature_set()
1430 if meth
not in f_set
or not f_set
[meth
]['available']:
1431 raise NotImplementedError(f
'{o} does not implement {meth}') from e