]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
3 ceph-mgr orchestrator interface
5 Please see the ceph-mgr module developer's guide for more information.
16 from collections
import namedtuple
, OrderedDict
17 from contextlib
import contextmanager
18 from functools
import wraps
, reduce
20 from typing
import TypeVar
, Generic
, List
, Optional
, Union
, Tuple
, Iterator
, Callable
, Any
, \
24 from typing
import Protocol
# Protocol was added in Python 3.8
26 class Protocol
: # type: ignore
32 from ceph
.deployment
import inventory
33 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, \
34 IscsiServiceSpec
, IngressSpec
35 from ceph
.deployment
.drive_group
import DriveGroupSpec
36 from ceph
.deployment
.hostspec
import HostSpec
, SpecValidationError
37 from ceph
.utils
import datetime_to_str
, str_to_datetime
39 from mgr_module
import MgrModule
, CLICommand
, HandleCommandResult
42 logger
= logging
.getLogger(__name__
)
45 FuncT
= TypeVar('FuncT', bound
=Callable
[..., Any
])
48 class OrchestratorError(Exception):
50 General orchestrator specific error.
52 Used for deployment, configuration or user errors.
54 It's not intended for programming errors or orchestrator internal errors.
59 errno
: int = -errno
.EINVAL
,
60 event_kind_subject
: Optional
[Tuple
[str, str]] = None) -> None:
61 super(Exception, self
).__init
__(msg
)
63 # See OrchestratorEvent.subject
64 self
.event_subject
= event_kind_subject
67 class NoOrchestrator(OrchestratorError
):
69 No orchestrator in configured.
72 def __init__(self
, msg
: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
73 super(NoOrchestrator
, self
).__init
__(msg
, errno
=-errno
.ENOENT
)
76 class OrchestratorValidationError(OrchestratorError
):
78 Raised when an orchestrator doesn't support a specific feature.
83 def set_exception_subject(kind
: str, subject
: str, overwrite
: bool = False) -> Iterator
[None]:
86 except OrchestratorError
as e
:
87 if overwrite
or hasattr(e
, 'event_subject'):
88 e
.event_subject
= (kind
, subject
)
92 def handle_exception(prefix
: str, perm
: str, func
: FuncT
) -> FuncT
:
94 def wrapper(*args
: Any
, **kwargs
: Any
) -> Any
:
96 return func(*args
, **kwargs
)
97 except (OrchestratorError
, SpecValidationError
) as e
:
98 # Do not print Traceback for expected errors.
99 return HandleCommandResult(e
.errno
, stderr
=str(e
))
100 except ImportError as e
:
101 return HandleCommandResult(-errno
.ENOENT
, stderr
=str(e
))
102 except NotImplementedError:
103 msg
= 'This Orchestrator does not support `{}`'.format(prefix
)
104 return HandleCommandResult(-errno
.ENOENT
, stderr
=msg
)
106 # misuse lambda to copy `wrapper`
107 wrapper_copy
= lambda *l_args
, **l_kwargs
: wrapper(*l_args
, **l_kwargs
) # noqa: E731
108 wrapper_copy
._prefix
= prefix
# type: ignore
109 wrapper_copy
._cli
_command
= CLICommand(prefix
, perm
) # type: ignore
110 wrapper_copy
._cli
_command
.store_func_metadata(func
) # type: ignore
111 wrapper_copy
._cli
_command
.func
= wrapper_copy
# type: ignore
113 return cast(FuncT
, wrapper_copy
)
116 def handle_orch_error(f
: Callable
[..., T
]) -> Callable
[..., 'OrchResult[T]']:
118 Decorator to make Orchestrator methods return
123 def wrapper(*args
: Any
, **kwargs
: Any
) -> OrchResult
[T
]:
125 return OrchResult(f(*args
, **kwargs
))
126 except Exception as e
:
127 return OrchResult(None, exception
=e
)
129 return cast(Callable
[..., OrchResult
[T
]], wrapper
)
132 class InnerCliCommandCallable(Protocol
):
133 def __call__(self
, prefix
: str) -> Callable
[[FuncT
], FuncT
]:
137 def _cli_command(perm
: str) -> InnerCliCommandCallable
:
138 def inner_cli_command(prefix
: str) -> Callable
[[FuncT
], FuncT
]:
139 return lambda func
: handle_exception(prefix
, perm
, func
)
140 return inner_cli_command
143 _cli_read_command
= _cli_command('r')
144 _cli_write_command
= _cli_command('rw')
147 class CLICommandMeta(type):
149 This is a workaround for the use of a global variable CLICommand.COMMANDS which
150 prevents modules from importing any other module.
152 We make use of CLICommand, except for the use of the global variable.
154 def __init__(cls
, name
: str, bases
: Any
, dct
: Any
) -> None:
155 super(CLICommandMeta
, cls
).__init
__(name
, bases
, dct
)
156 dispatch
: Dict
[str, CLICommand
] = {}
157 for v
in dct
.values():
159 dispatch
[v
._prefix
] = v
._cli
_command
160 except AttributeError:
163 def handle_command(self
: Any
, inbuf
: Optional
[str], cmd
: dict) -> Any
:
164 if cmd
['prefix'] not in dispatch
:
165 return self
.handle_command(inbuf
, cmd
)
167 return dispatch
[cmd
['prefix']].call(self
, cmd
, inbuf
)
169 cls
.COMMANDS
= [cmd
.dump_cmd() for cmd
in dispatch
.values()]
170 cls
.handle_command
= handle_command
173 class OrchResult(Generic
[T
]):
175 Stores a result and an exception. Mainly to circumvent the
176 MgrModule.remote() method that hides all exceptions and for
177 handling different sub-interpreters.
180 def __init__(self
, result
: Optional
[T
], exception
: Optional
[Exception] = None) -> None:
182 self
.serialized_exception
: Optional
[bytes
] = None
183 self
.exception_str
: str = ''
184 self
.set_exception(exception
)
186 __slots__
= 'result', 'serialized_exception', 'exception_str'
188 def set_exception(self
, e
: Optional
[Exception]) -> None:
190 self
.serialized_exception
= None
191 self
.exception_str
= ''
194 self
.exception_str
= f
'{type(e)}: {str(e)}'
196 self
.serialized_exception
= pickle
.dumps(e
)
197 except pickle
.PicklingError
:
198 logger
.error(f
"failed to pickle {e}")
199 if isinstance(e
, Exception):
200 e
= Exception(*e
.args
)
202 e
= Exception(str(e
))
203 # degenerate to a plain Exception
204 self
.serialized_exception
= pickle
.dumps(e
)
206 def result_str(self
) -> str:
207 """Force a string."""
208 if self
.result
is None:
210 if isinstance(self
.result
, list):
211 return '\n'.join(str(x
) for x
in self
.result
)
212 return str(self
.result
)
215 def raise_if_exception(c
: OrchResult
[T
]) -> T
:
217 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
219 if c
.serialized_exception
is not None:
221 e
= pickle
.loads(c
.serialized_exception
)
222 except (KeyError, AttributeError):
223 raise Exception(c
.exception_str
)
225 assert c
.result
is not None, 'OrchResult should either have an exception or a result'
229 def _hide_in_features(f
: FuncT
) -> FuncT
:
230 f
._hide
_in
_features
= True # type: ignore
234 class Orchestrator(object):
236 Calls in this class may do long running remote operations, with time
237 periods ranging from network latencies to package install latencies and large
238 internet downloads. For that reason, all are asynchronous, and return
239 ``Completion`` objects.
241 Methods should only return the completion and not directly execute
242 anything, like network calls. Otherwise the purpose of
243 those completions is defeated.
245 Implementations are not required to start work on an operation until
246 the caller waits on the relevant Completion objects. Callers making
247 multiple updates should not wait on Completions until they're done
248 sending operations: this enables implementations to batch up a series
249 of updates when wait() is called on a set of Completion objects.
251 Implementations are encouraged to keep reasonably fresh caches of
252 the status of the system: it is better to serve a stale-but-recent
253 result read of e.g. device inventory than it is to keep the caller waiting
254 while you scan hosts every time.
258 def is_orchestrator_module(self
) -> bool:
260 Enable other modules to interrogate this module to discover
261 whether it's usable as an orchestrator module.
263 Subclasses do not need to override this.
268 def available(self
) -> Tuple
[bool, str, Dict
[str, Any
]]:
270 Report whether we can talk to the orchestrator. This is the
271 place to give the user a meaningful message if the orchestrator
272 isn't running or can't be contacted.
274 This method may be called frequently (e.g. every page load
275 to conditionally display a warning banner), so make sure it's
276 not too expensive. It's okay to give a slightly stale status
277 (e.g. based on a periodic background ping of the orchestrator)
278 if that's necessary to make this method fast.
281 `True` doesn't mean that the desired functionality
282 is actually available in the orchestrator. I.e. this
283 won't work as expected::
286 ... if OrchestratorClientMixin().available()[0]: # wrong.
287 ... OrchestratorClientMixin().get_hosts()
289 :return: boolean representing whether the module is available/usable
290 :return: string describing any error
291 :return: dict containing any module specific information
293 raise NotImplementedError()
296 def get_feature_set(self
) -> Dict
[str, dict]:
297 """Describes which methods this orchestrator implements
300 `True` doesn't mean that the desired functionality
301 is actually possible in the orchestrator. I.e. this
302 won't work as expected::
305 ... api = OrchestratorClientMixin()
306 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
309 It's better to ask for forgiveness instead::
313 ... OrchestratorClientMixin().get_hosts()
314 ... except (OrchestratorError, NotImplementedError):
317 :returns: Dict of API method names to ``{'available': True or False}``
319 module
= self
.__class
__
320 features
= {a
: {'available': getattr(Orchestrator
, a
, None) != getattr(module
, a
)}
321 for a
in Orchestrator
.__dict
__
322 if not a
.startswith('_') and not getattr(getattr(Orchestrator
, a
), '_hide_in_features', False)
326 def cancel_completions(self
) -> None:
328 Cancels ongoing completions. Unstuck the mgr.
330 raise NotImplementedError()
332 def pause(self
) -> None:
333 raise NotImplementedError()
335 def resume(self
) -> None:
336 raise NotImplementedError()
338 def add_host(self
, host_spec
: HostSpec
) -> OrchResult
[str]:
340 Add a host to the orchestrator inventory.
342 :param host: hostname
344 raise NotImplementedError()
346 def remove_host(self
, host
: str) -> OrchResult
[str]:
348 Remove a host from the orchestrator inventory.
350 :param host: hostname
352 raise NotImplementedError()
354 def update_host_addr(self
, host
: str, addr
: str) -> OrchResult
[str]:
356 Update a host's address
358 :param host: hostname
359 :param addr: address (dns name or IP)
361 raise NotImplementedError()
363 def get_hosts(self
) -> OrchResult
[List
[HostSpec
]]:
365 Report the hosts in the cluster.
367 :return: list of HostSpec
369 raise NotImplementedError()
371 def add_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
375 raise NotImplementedError()
377 def remove_host_label(self
, host
: str, label
: str) -> OrchResult
[str]:
381 raise NotImplementedError()
383 def host_ok_to_stop(self
, hostname
: str) -> OrchResult
:
385 Check if the specified host can be safely stopped without reducing availability
387 :param host: hostname
389 raise NotImplementedError()
391 def enter_host_maintenance(self
, hostname
: str, force
: bool = False) -> OrchResult
:
393 Place a host in maintenance, stopping daemons and disabling it's systemd target
395 raise NotImplementedError()
397 def exit_host_maintenance(self
, hostname
: str) -> OrchResult
:
399 Return a host from maintenance, restarting the clusters systemd target
401 raise NotImplementedError()
403 def get_inventory(self
, host_filter
: Optional
['InventoryFilter'] = None, refresh
: bool = False) -> OrchResult
[List
['InventoryHost']]:
405 Returns something that was created by `ceph-volume inventory`.
407 :return: list of InventoryHost
409 raise NotImplementedError()
411 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['ServiceDescription']]:
413 Describe a service (of any kind) that is already configured in
414 the orchestrator. For example, when viewing an OSD in the dashboard
415 we might like to also display information about the orchestrator's
416 view of the service (like the kubernetes pod ID).
418 When viewing a CephFS filesystem in the dashboard, we would use this
419 to display the pods being currently run for MDS daemons.
421 :return: list of ServiceDescription objects.
423 raise NotImplementedError()
425 def list_daemons(self
, service_name
: Optional
[str] = None, daemon_type
: Optional
[str] = None, daemon_id
: Optional
[str] = None, host
: Optional
[str] = None, refresh
: bool = False) -> OrchResult
[List
['DaemonDescription']]:
427 Describe a daemon (of any kind) that is already configured in
430 :return: list of DaemonDescription objects.
432 raise NotImplementedError()
435 def apply(self
, specs
: Sequence
["GenericSpec"], no_overwrite
: bool = False) -> List
[str]:
439 fns
: Dict
[str, Callable
[..., OrchResult
[str]]] = {
440 'alertmanager': self
.apply_alertmanager
,
441 'crash': self
.apply_crash
,
442 'grafana': self
.apply_grafana
,
443 'iscsi': self
.apply_iscsi
,
444 'mds': self
.apply_mds
,
445 'mgr': self
.apply_mgr
,
446 'mon': self
.apply_mon
,
447 'nfs': self
.apply_nfs
,
448 'node-exporter': self
.apply_node_exporter
,
449 'osd': lambda dg
: self
.apply_drivegroups([dg
]), # type: ignore
450 'prometheus': self
.apply_prometheus
,
451 'rbd-mirror': self
.apply_rbd_mirror
,
452 'rgw': self
.apply_rgw
,
453 'ingress': self
.apply_ingress
,
454 'host': self
.add_host
,
455 'cephadm-exporter': self
.apply_cephadm_exporter
,
458 def merge(l
: OrchResult
[List
[str]], r
: OrchResult
[str]) -> OrchResult
[List
[str]]: # noqa: E741
459 l_res
= raise_if_exception(l
)
460 r_res
= raise_if_exception(r
)
462 return OrchResult(l_res
)
463 return raise_if_exception(reduce(merge
, [fns
[spec
.service_type
](spec
) for spec
in specs
], OrchResult([])))
465 def plan(self
, spec
: Sequence
["GenericSpec"]) -> OrchResult
[List
]:
467 Plan (Dry-run, Preview) a List of Specs.
469 raise NotImplementedError()
471 def remove_daemons(self
, names
: List
[str]) -> OrchResult
[List
[str]]:
473 Remove specific daemon(s).
477 raise NotImplementedError()
479 def remove_service(self
, service_name
: str) -> OrchResult
[str]:
481 Remove a service (a collection of daemons).
485 raise NotImplementedError()
487 def service_action(self
, action
: str, service_name
: str) -> OrchResult
[List
[str]]:
489 Perform an action (start/stop/reload) on a service (i.e., all daemons
490 providing the logical service).
492 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
493 :param service_name: service_type + '.' + service_id
494 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
497 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
498 raise NotImplementedError()
500 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> OrchResult
[str]:
502 Perform an action (start/stop/reload) on a daemon.
504 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
505 :param daemon_name: name of daemon
506 :param image: Container image when redeploying that daemon
509 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
510 raise NotImplementedError()
512 def create_osds(self
, drive_group
: DriveGroupSpec
) -> OrchResult
[str]:
514 Create one or more OSDs within a single Drive Group.
516 The principal argument here is the drive_group member
517 of OsdSpec: other fields are advisory/extensible for any
518 finer-grained OSD feature enablement (choice of backing store,
519 compression/encryption, etc).
521 raise NotImplementedError()
523 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> OrchResult
[List
[str]]:
524 """ Update OSD cluster """
525 raise NotImplementedError()
527 def set_unmanaged_flag(self
,
528 unmanaged_flag
: bool,
529 service_type
: str = 'osd',
530 service_name
: Optional
[str] = None
531 ) -> HandleCommandResult
:
532 raise NotImplementedError()
534 def preview_osdspecs(self
,
535 osdspec_name
: Optional
[str] = 'osd',
536 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
537 ) -> OrchResult
[str]:
538 """ Get a preview for OSD deployments """
539 raise NotImplementedError()
541 def remove_osds(self
, osd_ids
: List
[str],
542 replace
: bool = False,
543 force
: bool = False) -> OrchResult
[str]:
545 :param osd_ids: list of OSD IDs
546 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
547 :param force: Forces the OSD removal process without waiting for the data to be drained first.
548 Note that this can only remove OSDs that were successfully
549 created (i.e. got an OSD ID).
551 raise NotImplementedError()
553 def stop_remove_osds(self
, osd_ids
: List
[str]) -> OrchResult
:
557 raise NotImplementedError()
559 def remove_osds_status(self
) -> OrchResult
:
561 Returns a status of the ongoing OSD removal operations.
563 raise NotImplementedError()
565 def blink_device_light(self
, ident_fault
: str, on
: bool, locations
: List
['DeviceLightLoc']) -> OrchResult
[List
[str]]:
567 Instructs the orchestrator to enable or disable either the ident or the fault LED.
569 :param ident_fault: either ``"ident"`` or ``"fault"``
570 :param on: ``True`` = on.
571 :param locations: See :class:`orchestrator.DeviceLightLoc`
573 raise NotImplementedError()
575 def zap_device(self
, host
: str, path
: str) -> OrchResult
[str]:
576 """Zap/Erase a device (DESTROYS DATA)"""
577 raise NotImplementedError()
579 def add_daemon(self
, spec
: ServiceSpec
) -> OrchResult
[List
[str]]:
580 """Create daemons daemon(s) for unmanaged services"""
581 raise NotImplementedError()
583 def apply_mon(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
584 """Update mon cluster"""
585 raise NotImplementedError()
587 def apply_mgr(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
588 """Update mgr cluster"""
589 raise NotImplementedError()
591 def apply_mds(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
592 """Update MDS cluster"""
593 raise NotImplementedError()
595 def apply_rgw(self
, spec
: RGWSpec
) -> OrchResult
[str]:
596 """Update RGW cluster"""
597 raise NotImplementedError()
599 def apply_ingress(self
, spec
: IngressSpec
) -> OrchResult
[str]:
600 """Update ingress daemons"""
601 raise NotImplementedError()
603 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
604 """Update rbd-mirror cluster"""
605 raise NotImplementedError()
607 def apply_nfs(self
, spec
: NFSServiceSpec
) -> OrchResult
[str]:
608 """Update NFS cluster"""
609 raise NotImplementedError()
611 def apply_iscsi(self
, spec
: IscsiServiceSpec
) -> OrchResult
[str]:
612 """Update iscsi cluster"""
613 raise NotImplementedError()
615 def apply_prometheus(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
616 """Update prometheus cluster"""
617 raise NotImplementedError()
619 def apply_node_exporter(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
620 """Update existing a Node-Exporter daemon(s)"""
621 raise NotImplementedError()
623 def apply_crash(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
624 """Update existing a crash daemon(s)"""
625 raise NotImplementedError()
627 def apply_grafana(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
628 """Update existing a grafana service"""
629 raise NotImplementedError()
631 def apply_alertmanager(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
632 """Update an existing AlertManager daemon(s)"""
633 raise NotImplementedError()
635 def apply_cephadm_exporter(self
, spec
: ServiceSpec
) -> OrchResult
[str]:
636 """Update an existing cephadm exporter daemon"""
637 raise NotImplementedError()
639 def upgrade_check(self
, image
: Optional
[str], version
: Optional
[str]) -> OrchResult
[str]:
640 raise NotImplementedError()
642 def upgrade_start(self
, image
: Optional
[str], version
: Optional
[str]) -> OrchResult
[str]:
643 raise NotImplementedError()
645 def upgrade_pause(self
) -> OrchResult
[str]:
646 raise NotImplementedError()
648 def upgrade_resume(self
) -> OrchResult
[str]:
649 raise NotImplementedError()
651 def upgrade_stop(self
) -> OrchResult
[str]:
652 raise NotImplementedError()
654 def upgrade_status(self
) -> OrchResult
['UpgradeStatusSpec']:
656 If an upgrade is currently underway, report on where
657 we are in the process, or if some error has occurred.
659 :return: UpgradeStatusSpec instance
661 raise NotImplementedError()
664 def upgrade_available(self
) -> OrchResult
:
666 Report on what versions are available to upgrade to
668 :return: List of strings
670 raise NotImplementedError()
673 GenericSpec
= Union
[ServiceSpec
, HostSpec
]
676 def json_to_generic_spec(spec
: dict) -> GenericSpec
:
677 if 'service_type' in spec
and spec
['service_type'] == 'host':
678 return HostSpec
.from_json(spec
)
680 return ServiceSpec
.from_json(spec
)
683 def daemon_type_to_service(dtype
: str) -> str:
690 'haproxy': 'ingress',
691 'keepalived': 'ingress',
693 'rbd-mirror': 'rbd-mirror',
694 'cephfs-mirror': 'cephfs-mirror',
696 'grafana': 'grafana',
697 'alertmanager': 'alertmanager',
698 'prometheus': 'prometheus',
699 'node-exporter': 'node-exporter',
701 'crashcollector': 'crash', # Specific Rook Daemon
702 'container': 'container',
703 'cephadm-exporter': 'cephadm-exporter',
705 return mapping
[dtype
]
708 def service_to_daemon_types(stype
: str) -> List
[str]:
715 'ingress': ['haproxy', 'keepalived'],
717 'rbd-mirror': ['rbd-mirror'],
718 'cephfs-mirror': ['cephfs-mirror'],
720 'grafana': ['grafana'],
721 'alertmanager': ['alertmanager'],
722 'prometheus': ['prometheus'],
723 'node-exporter': ['node-exporter'],
725 'container': ['container'],
726 'cephadm-exporter': ['cephadm-exporter'],
728 return mapping
[stype
]
731 class UpgradeStatusSpec(object):
732 # Orchestrator's report on what's going on with any ongoing upgrade
733 def __init__(self
) -> None:
734 self
.in_progress
= False # Is an upgrade underway?
735 self
.target_image
: Optional
[str] = None
736 self
.services_complete
: List
[str] = [] # Which daemon types are fully updated?
737 self
.progress
: Optional
[str] = None # How many of the daemons have we upgraded
738 self
.message
= "" # Freeform description
741 def handle_type_error(method
: FuncT
) -> FuncT
:
743 def inner(cls
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
745 return method(cls
, *args
, **kwargs
)
746 except TypeError as e
:
747 error_msg
= '{}: {}'.format(cls
.__name
__, e
)
748 raise OrchestratorValidationError(error_msg
)
749 return cast(FuncT
, inner
)
752 class DaemonDescriptionStatus(enum
.IntEnum
):
758 class DaemonDescription(object):
760 For responding to queries about the status of a particular daemon,
761 stateful or stateless.
763 This is not about health or performance monitoring of daemons: it's
764 about letting the orchestrator tell Ceph whether and where a
765 daemon is scheduled in the cluster. When an orchestrator tells
766 Ceph "it's running on host123", that's not a promise that the process
767 is literally up this second, it's a description of where the orchestrator
768 has decided the daemon should run.
772 daemon_type
: Optional
[str] = None,
773 daemon_id
: Optional
[str] = None,
774 hostname
: Optional
[str] = None,
775 container_id
: Optional
[str] = None,
776 container_image_id
: Optional
[str] = None,
777 container_image_name
: Optional
[str] = None,
778 container_image_digests
: Optional
[List
[str]] = None,
779 version
: Optional
[str] = None,
780 status
: Optional
[DaemonDescriptionStatus
] = None,
781 status_desc
: Optional
[str] = None,
782 last_refresh
: Optional
[datetime
.datetime
] = None,
783 created
: Optional
[datetime
.datetime
] = None,
784 started
: Optional
[datetime
.datetime
] = None,
785 last_configured
: Optional
[datetime
.datetime
] = None,
786 osdspec_affinity
: Optional
[str] = None,
787 last_deployed
: Optional
[datetime
.datetime
] = None,
788 events
: Optional
[List
['OrchestratorEvent']] = None,
789 is_active
: bool = False,
790 memory_usage
: Optional
[int] = None,
791 memory_request
: Optional
[int] = None,
792 memory_limit
: Optional
[int] = None,
793 service_name
: Optional
[str] = None,
794 ports
: Optional
[List
[int]] = None,
795 ip
: Optional
[str] = None,
796 deployed_by
: Optional
[List
[str]] = None,
799 # Host is at the same granularity as InventoryHost
800 self
.hostname
: Optional
[str] = hostname
802 # Not everyone runs in containers, but enough people do to
803 # justify having the container_id (runtime id) and container_image
805 self
.container_id
= container_id
# runtime id
806 self
.container_image_id
= container_image_id
# image id locally
807 self
.container_image_name
= container_image_name
# image friendly name
808 self
.container_image_digests
= container_image_digests
# reg hashes
810 # The type of service (osd, mon, mgr, etc.)
811 self
.daemon_type
= daemon_type
813 # The orchestrator will have picked some names for daemons,
814 # typically either based on hostnames or on pod names.
815 # This is the <foo> in mds.<foo>, the ID that will appear
816 # in the FSMap/ServiceMap.
817 self
.daemon_id
: Optional
[str] = daemon_id
819 self
._service
_name
: Optional
[str] = service_name
821 # Service version that was deployed
822 self
.version
= version
824 # Service status: -1 error, 0 stopped, 1 running
827 # Service status description when status == error.
828 self
.status_desc
= status_desc
830 # datetime when this info was last refreshed
831 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
833 self
.created
: Optional
[datetime
.datetime
] = created
834 self
.started
: Optional
[datetime
.datetime
] = started
835 self
.last_configured
: Optional
[datetime
.datetime
] = last_configured
836 self
.last_deployed
: Optional
[datetime
.datetime
] = last_deployed
838 # Affinity to a certain OSDSpec
839 self
.osdspec_affinity
: Optional
[str] = osdspec_affinity
841 self
.events
: List
[OrchestratorEvent
] = events
or []
843 self
.memory_usage
: Optional
[int] = memory_usage
844 self
.memory_request
: Optional
[int] = memory_request
845 self
.memory_limit
: Optional
[int] = memory_limit
847 self
.ports
: Optional
[List
[int]] = ports
848 self
.ip
: Optional
[str] = ip
850 self
.deployed_by
= deployed_by
852 self
.is_active
= is_active
854 def get_port_summary(self
) -> str:
857 return f
"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
859 def name(self
) -> str:
860 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
862 def matches_service(self
, service_name
: Optional
[str]) -> bool:
863 assert self
.daemon_id
is not None
864 assert self
.daemon_type
is not None
866 return (daemon_type_to_service(self
.daemon_type
) + '.' + self
.daemon_id
).startswith(service_name
+ '.')
869 def service_id(self
) -> str:
870 assert self
.daemon_id
is not None
871 assert self
.daemon_type
is not None
873 if self
._service
_name
:
874 if '.' in self
._service
_name
:
875 return self
._service
_name
.split('.', 1)[1]
879 if self
.daemon_type
== 'osd':
880 if self
.osdspec_affinity
and self
.osdspec_affinity
!= 'None':
881 return self
.osdspec_affinity
885 assert self
.daemon_id
is not None
886 err
= OrchestratorError("DaemonDescription: Cannot calculate service_id: "
887 f
"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
889 if not self
.hostname
:
890 # TODO: can a DaemonDescription exist without a hostname?
893 # use the bare hostname, not the FQDN.
894 host
= self
.hostname
.split('.')[0]
896 if host
== self
.daemon_id
:
897 # daemon_id == "host"
898 return self
.daemon_id
900 elif host
in self
.daemon_id
:
901 # daemon_id == "service_id.host"
902 # daemon_id == "service_id.host.random"
903 pre
, post
= self
.daemon_id
.rsplit(host
, 1)
904 if not pre
.endswith('.'):
905 # '.' sep missing at front of host
907 elif post
and not post
.startswith('.'):
908 # '.' sep missing at end of host
912 # daemon_id == "service_id.random"
913 if self
.daemon_type
== 'rgw':
914 v
= self
.daemon_id
.split('.')
916 return '.'.join(v
[0:2])
918 if self
.daemon_type
== 'iscsi':
919 v
= self
.daemon_id
.split('.')
920 return '.'.join(v
[0:-1])
922 # daemon_id == "service_id"
923 return self
.daemon_id
925 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
928 return self
.daemon_id
930 def service_name(self
) -> str:
931 if self
._service
_name
:
932 return self
._service
_name
933 assert self
.daemon_type
is not None
934 if daemon_type_to_service(self
.daemon_type
) in ServiceSpec
.REQUIRES_SERVICE_ID
:
935 return f
'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
936 return daemon_type_to_service(self
.daemon_type
)
938 def __repr__(self
) -> str:
939 return "<DaemonDescription>({type}.{id})".format(type=self
.daemon_type
,
942 def to_json(self
) -> dict:
943 out
: Dict
[str, Any
] = OrderedDict()
944 out
['daemon_type'] = self
.daemon_type
945 out
['daemon_id'] = self
.daemon_id
946 out
['hostname'] = self
.hostname
947 out
['container_id'] = self
.container_id
948 out
['container_image_id'] = self
.container_image_id
949 out
['container_image_name'] = self
.container_image_name
950 out
['container_image_digests'] = self
.container_image_digests
951 out
['memory_usage'] = self
.memory_usage
952 out
['memory_request'] = self
.memory_request
953 out
['memory_limit'] = self
.memory_limit
954 out
['version'] = self
.version
955 out
['status'] = self
.status
.value
if self
.status
is not None else None
956 out
['status_desc'] = self
.status_desc
957 if self
.daemon_type
== 'osd':
958 out
['osdspec_affinity'] = self
.osdspec_affinity
959 out
['is_active'] = self
.is_active
960 out
['ports'] = self
.ports
963 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
966 out
[k
] = datetime_to_str(getattr(self
, k
))
969 out
['events'] = [e
.to_json() for e
in self
.events
]
971 empty
= [k
for k
, v
in out
.items() if v
is None]
978 def from_json(cls
, data
: dict) -> 'DaemonDescription':
980 event_strs
= c
.pop('events', [])
981 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
984 c
[k
] = str_to_datetime(c
[k
])
985 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
986 status_int
= c
.pop('status', None)
987 status
= DaemonDescriptionStatus(status_int
) if status_int
is not None else None
988 return cls(events
=events
, status
=status
, **c
)
990 def __copy__(self
) -> 'DaemonDescription':
991 # feel free to change this:
992 return DaemonDescription
.from_json(self
.to_json())
995 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'DaemonDescription') -> Any
:
996 return dumper
.represent_dict(data
.to_json().items())
999 yaml
.add_representer(DaemonDescription
, DaemonDescription
.yaml_representer
)
1002 class ServiceDescription(object):
1004 For responding to queries about the status of a particular service,
1005 stateful or stateless.
1007 This is not about health or performance monitoring of services: it's
1008 about letting the orchestrator tell Ceph whether and where a
1009 service is scheduled in the cluster. When an orchestrator tells
1010 Ceph "it's running on host123", that's not a promise that the process
1011 is literally up this second, it's a description of where the orchestrator
1012 has decided the service should run.
1017 container_image_id
: Optional
[str] = None,
1018 container_image_name
: Optional
[str] = None,
1019 rados_config_location
: Optional
[str] = None,
1020 service_url
: Optional
[str] = None,
1021 last_refresh
: Optional
[datetime
.datetime
] = None,
1022 created
: Optional
[datetime
.datetime
] = None,
1023 deleted
: Optional
[datetime
.datetime
] = None,
1026 events
: Optional
[List
['OrchestratorEvent']] = None,
1027 virtual_ip
: Optional
[str] = None,
1028 ports
: List
[int] = []) -> None:
1029 # Not everyone runs in containers, but enough people do to
1030 # justify having the container_image_id (image hash) and container_image
1032 self
.container_image_id
= container_image_id
# image hash
1033 self
.container_image_name
= container_image_name
# image friendly name
1035 # Location of the service configuration when stored in rados
1036 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1037 self
.rados_config_location
= rados_config_location
1039 # If the service exposes REST-like API, this attribute should hold
1041 self
.service_url
= service_url
1046 # Number of daemons up
1047 self
.running
= running
1049 # datetime when this info was last refreshed
1050 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
1051 self
.created
: Optional
[datetime
.datetime
] = created
1052 self
.deleted
: Optional
[datetime
.datetime
] = deleted
1054 self
.spec
: ServiceSpec
= spec
1056 self
.events
: List
[OrchestratorEvent
] = events
or []
1058 self
.virtual_ip
= virtual_ip
1061 def service_type(self
) -> str:
1062 return self
.spec
.service_type
1064 def __repr__(self
) -> str:
1065 return f
"<ServiceDescription of {self.spec.one_line_str()}>"
1067 def get_port_summary(self
) -> str:
1070 return f
"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1072 def to_json(self
) -> OrderedDict
:
1073 out
= self
.spec
.to_json()
1075 'container_image_id': self
.container_image_id
,
1076 'container_image_name': self
.container_image_name
,
1077 'rados_config_location': self
.rados_config_location
,
1078 'service_url': self
.service_url
,
1080 'running': self
.running
,
1081 'last_refresh': self
.last_refresh
,
1082 'created': self
.created
,
1083 'virtual_ip': self
.virtual_ip
,
1084 'ports': self
.ports
if self
.ports
else None,
1086 for k
in ['last_refresh', 'created']:
1087 if getattr(self
, k
):
1088 status
[k
] = datetime_to_str(getattr(self
, k
))
1089 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1090 out
['status'] = status
1092 out
['events'] = [e
.to_json() for e
in self
.events
]
1097 def from_json(cls
, data
: dict) -> 'ServiceDescription':
1099 status
= c
.pop('status', {})
1100 event_strs
= c
.pop('events', [])
1101 spec
= ServiceSpec
.from_json(c
)
1103 c_status
= status
.copy()
1104 for k
in ['last_refresh', 'created']:
1106 c_status
[k
] = str_to_datetime(c_status
[k
])
1107 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1108 return cls(spec
=spec
, events
=events
, **c_status
)
1111 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'DaemonDescription') -> Any
:
1112 return dumper
.represent_dict(data
.to_json().items())
1115 yaml
.add_representer(ServiceDescription
, ServiceDescription
.yaml_representer
)
1118 class InventoryFilter(object):
1120 When fetching inventory, use this filter to avoid unnecessarily
1121 scanning the whole estate.
1123 Typical use: filter by host when presenting UI workflow for configuring
1124 a particular server.
1125 filter by label when not all of estate is Ceph servers,
1126 and we want to only learn about the Ceph servers.
1127 filter by label when we are interested particularly
1128 in e.g. OSD servers.
1132 def __init__(self
, labels
: Optional
[List
[str]] = None, hosts
: Optional
[List
[str]] = None) -> None:
1134 #: Optional: get info about hosts matching labels
1135 self
.labels
= labels
1137 #: Optional: get info about certain named hosts only
1141 class InventoryHost(object):
1143 When fetching inventory, all Devices are groups inside of an
1147 def __init__(self
, name
: str, devices
: Optional
[inventory
.Devices
] = None, labels
: Optional
[List
[str]] = None, addr
: Optional
[str] = None) -> None:
1149 devices
= inventory
.Devices([])
1152 assert isinstance(devices
, inventory
.Devices
)
1154 self
.name
= name
# unique within cluster. For example a hostname.
1155 self
.addr
= addr
or name
1156 self
.devices
= devices
1157 self
.labels
= labels
1159 def to_json(self
) -> dict:
1163 'devices': self
.devices
.to_json(),
1164 'labels': self
.labels
,
1168 def from_json(cls
, data
: dict) -> 'InventoryHost':
1170 _data
= copy
.deepcopy(data
)
1171 name
= _data
.pop('name')
1172 addr
= _data
.pop('addr', None) or name
1173 devices
= inventory
.Devices
.from_json(_data
.pop('devices'))
1174 labels
= _data
.pop('labels', list())
1176 error_msg
= 'Unknown key(s) in Inventory: {}'.format(','.join(_data
.keys()))
1177 raise OrchestratorValidationError(error_msg
)
1178 return cls(name
, devices
, labels
, addr
)
1179 except KeyError as e
:
1180 error_msg
= '{} is required for {}'.format(e
, cls
.__name
__)
1181 raise OrchestratorValidationError(error_msg
)
1182 except TypeError as e
:
1183 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e
))
1186 def from_nested_items(cls
, hosts
: List
[dict]) -> List
['InventoryHost']:
1187 devs
= inventory
.Devices
.from_json
1188 return [cls(item
[0], devs(item
[1].data
)) for item
in hosts
]
1190 def __repr__(self
) -> str:
1191 return "<InventoryHost>({name})".format(name
=self
.name
)
1194 def get_host_names(hosts
: List
['InventoryHost']) -> List
[str]:
1195 return [host
.name
for host
in hosts
]
1197 def __eq__(self
, other
: Any
) -> bool:
1198 return self
.name
== other
.name
and self
.devices
== other
.devices
1201 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1203 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1206 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1208 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1209 See ``ceph osd metadata | jq '.[].device_ids'``
1214 class OrchestratorEvent
:
1216 Similar to K8s Events.
1218 Some form of "important" log message attached to something.
1222 regex_v1
= re
.compile(r
'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re
.MULTILINE
)
1224 def __init__(self
, created
: Union
[str, datetime
.datetime
], kind
: str,
1225 subject
: str, level
: str, message
: str) -> None:
1226 if isinstance(created
, str):
1227 created
= str_to_datetime(created
)
1228 self
.created
: datetime
.datetime
= created
1230 assert kind
in "service daemon".split()
1231 self
.kind
: str = kind
1233 # service name, or daemon danem or something
1234 self
.subject
: str = subject
1236 # Events are not meant for debugging. debugs should end in the log.
1237 assert level
in "INFO ERROR".split()
1240 self
.message
: str = message
1242 __slots__
= ('created', 'kind', 'subject', 'level', 'message')
1244 def kind_subject(self
) -> str:
1245 return f
'{self.kind}:{self.subject}'
1247 def to_json(self
) -> str:
1248 # Make a long list of events readable.
1249 created
= datetime_to_str(self
.created
)
1250 return f
'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1254 def from_json(cls
, data
: str) -> "OrchestratorEvent":
1256 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1257 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1262 match
= cls
.regex_v1
.match(data
)
1264 return cls(*match
.groups())
1265 raise ValueError(f
'Unable to match: "{data}"')
1267 def __eq__(self
, other
: Any
) -> bool:
1268 if not isinstance(other
, OrchestratorEvent
):
1271 return self
.created
== other
.created
and self
.kind
== other
.kind \
1272 and self
.subject
== other
.subject
and self
.message
== other
.message
1275 def _mk_orch_methods(cls
: Any
) -> Any
:
1276 # Needs to be defined outside of for.
1277 # Otherwise meth is always bound to last key
1278 def shim(method_name
: str) -> Callable
:
1279 def inner(self
: Any
, *args
: Any
, **kwargs
: Any
) -> Any
:
1280 completion
= self
._oremote
(method_name
, args
, kwargs
)
1284 for meth
in Orchestrator
.__dict
__:
1285 if not meth
.startswith('_') and meth
not in ['is_orchestrator_module']:
1286 setattr(cls
, meth
, shim(meth
))
1291 class OrchestratorClientMixin(Orchestrator
):
1293 A module that inherents from `OrchestratorClientMixin` can directly call
1294 all :class:`Orchestrator` methods without manually calling remote.
1296 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1297 calls :func:`OrchestratorClientMixin._oremote`
1299 >>> class MyModule(OrchestratorClientMixin):
1301 ... completion = self.add_host('somehost') # calls `_oremote()`
1302 ... self.log.debug(completion.result)
1304 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1305 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1306 "real" implementation of the orchestrator.
1309 >>> import mgr_module
1311 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1312 ... def __init__(self, ...):
1313 ... self.orch_client = OrchestratorClientMixin()
1314 ... self.orch_client.set_mgr(self.mgr))
1317 def set_mgr(self
, mgr
: MgrModule
) -> None:
1319 Useable in the Dashbord that uses a global ``mgr``
1322 self
.__mgr
= mgr
# Make sure we're not overwriting any other `mgr` properties
1324 def __get_mgr(self
) -> Any
:
1327 except AttributeError:
1330 def _oremote(self
, meth
: Any
, args
: Any
, kwargs
: Any
) -> Any
:
1332 Helper for invoking `remote` on whichever orchestrator is enabled
1334 :raises RuntimeError: If the remote method failed.
1335 :raises OrchestratorError: orchestrator failed to perform
1336 :raises ImportError: no `orchestrator` module or backend not found.
1338 mgr
= self
.__get
_mgr
()
1341 o
= mgr
._select
_orchestrator
()
1342 except AttributeError:
1343 o
= mgr
.remote('orchestrator', '_select_orchestrator')
1346 raise NoOrchestrator()
1348 mgr
.log
.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr
.module_name
, o
, meth
, args
, kwargs
))
1350 return mgr
.remote(o
, meth
, *args
, **kwargs
)
1351 except Exception as e
:
1352 if meth
== 'get_feature_set':
1353 raise # self.get_feature_set() calls self._oremote()
1354 f_set
= self
.get_feature_set()
1355 if meth
not in f_set
or not f_set
[meth
]['available']:
1356 raise NotImplementedError(f
'{o} does not implement {meth}') from e