3 ceph-mgr orchestrator interface
5 Please see the ceph-mgr module developer's guide for more information.
17 from collections
import namedtuple
18 from functools
import wraps
20 from ceph
.deployment
import inventory
21 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, \
22 ServiceSpecValidationError
, IscsiServiceSpec
23 from ceph
.deployment
.drive_group
import DriveGroupSpec
25 from mgr_module
import MgrModule
, CLICommand
, HandleCommandResult
28 from typing
import TypeVar
, Generic
, List
, Optional
, Union
, Tuple
, Iterator
, Callable
, Any
, \
29 Type
, Sequence
, Dict
, cast
33 logger
= logging
.getLogger(__name__
)
35 DATEFMT
= '%Y-%m-%dT%H:%M:%S.%f'
38 class OrchestratorError(Exception):
40 General orchestrator specific error.
42 Used for deployment, configuration or user errors.
44 It's not intended for programming errors or orchestrator internal errors.
48 class NoOrchestrator(OrchestratorError
):
50 No orchestrator in configured.
52 def __init__(self
, msg
="No orchestrator configured (try `ceph orch set backend`)"):
53 super(NoOrchestrator
, self
).__init
__(msg
)
56 class OrchestratorValidationError(OrchestratorError
):
58 Raised when an orchestrator doesn't support a specific feature.
62 def handle_exception(prefix
, cmd_args
, desc
, perm
, func
):
64 def wrapper(*args
, **kwargs
):
66 return func(*args
, **kwargs
)
67 except (OrchestratorError
, ImportError, ServiceSpecValidationError
) as e
:
68 # Do not print Traceback for expected errors.
69 return HandleCommandResult(-errno
.ENOENT
, stderr
=str(e
))
70 except NotImplementedError:
71 msg
= 'This Orchestrator does not support `{}`'.format(prefix
)
72 return HandleCommandResult(-errno
.ENOENT
, stderr
=msg
)
74 # misuse partial to copy `wrapper`
75 wrapper_copy
= lambda *l_args
, **l_kwargs
: wrapper(*l_args
, **l_kwargs
)
76 wrapper_copy
._prefix
= prefix
# type: ignore
77 wrapper_copy
._cli
_command
= CLICommand(prefix
, cmd_args
, desc
, perm
) # type: ignore
78 wrapper_copy
._cli
_command
.func
= wrapper_copy
# type: ignore
83 def _cli_command(perm
):
84 def inner_cli_command(prefix
, cmd_args
="", desc
=""):
85 return lambda func
: handle_exception(prefix
, cmd_args
, desc
, perm
, func
)
86 return inner_cli_command
89 _cli_read_command
= _cli_command('r')
90 _cli_write_command
= _cli_command('rw')
93 class CLICommandMeta(type):
95 This is a workaround for the use of a global variable CLICommand.COMMANDS which
96 prevents modules from importing any other module.
98 We make use of CLICommand, except for the use of the global variable.
100 def __init__(cls
, name
, bases
, dct
):
101 super(CLICommandMeta
, cls
).__init
__(name
, bases
, dct
)
102 dispatch
= {} # type: Dict[str, CLICommand]
103 for v
in dct
.values():
105 dispatch
[v
._prefix
] = v
._cli
_command
106 except AttributeError:
109 def handle_command(self
, inbuf
, cmd
):
110 if cmd
['prefix'] not in dispatch
:
111 return self
.handle_command(inbuf
, cmd
)
113 return dispatch
[cmd
['prefix']].call(self
, cmd
, inbuf
)
115 cls
.COMMANDS
= [cmd
.dump_cmd() for cmd
in dispatch
.values()]
116 cls
.handle_command
= handle_command
123 class _Promise(object):
125 A completion may need multiple promises to be fulfilled. `_Promise` is one
128 Typically ``Orchestrator`` implementations inherit from this class to
129 build their own way of finishing a step to fulfil a future.
131 They are not exposed in the orchestrator interface and can be seen as a
132 helper to build orchestrator modules.
134 INITIALIZED
= 1 # We have a parent completion and a next completion
136 FINISHED
= 3 # we have a final result
138 NO_RESULT
= _no_result() # type: None
139 ASYNC_RESULT
= object()
142 _first_promise
=None, # type: Optional["_Promise"]
143 value
=NO_RESULT
, # type: Optional[Any]
144 on_complete
=None, # type: Optional[Callable]
145 name
=None, # type: Optional[str]
147 self
._on
_complete
_ = on_complete
149 self
._next
_promise
= None # type: Optional[_Promise]
151 self
._state
= self
.INITIALIZED
152 self
._exception
= None # type: Optional[Exception]
154 # Value of this _Promise. may be an intermediate result.
157 # _Promise is not a continuation monad, as `_result` is of type
158 # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
159 self
._first
_promise
= _first_promise
or self
# type: '_Promise'
162 def _exception(self
):
163 # type: () -> Optional[Exception]
164 return getattr(self
, '_exception_', None)
167 def _exception(self
, e
):
170 self
._serialized
_exception
_ = pickle
.dumps(e
) if e
is not None else None
171 except pickle
.PicklingError
:
172 logger
.error(f
"failed to pickle {e}")
173 if isinstance(e
, Exception):
174 e
= Exception(*e
.args
)
176 e
= Exception(str(e
))
177 # degenerate to a plain Exception
178 self
._serialized
_exception
_ = pickle
.dumps(e
)
181 def _serialized_exception(self
):
182 # type: () -> Optional[bytes]
183 return getattr(self
, '_serialized_exception_', None)
188 def _on_complete(self
):
189 # type: () -> Optional[Callable]
190 # https://github.com/python/mypy/issues/4125
191 return self
._on
_complete
_
194 def _on_complete(self
, val
):
195 # type: (Optional[Callable]) -> None
196 self
._on
_complete
_ = val
200 name
= self
._name
or getattr(self
._on
_complete
, '__name__', '??') if self
._on
_complete
else 'None'
201 val
= repr(self
._value
) if self
._value
is not self
.NO_RESULT
else 'NA'
202 return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
203 self
.__class
__, self
._state
, val
, self
._on
_complete
, id(self
), name
, getattr(next
, '_progress_reference', 'NA'), repr(self
._next
_promise
)
206 def pretty_print_1(self
):
209 elif self
._on
_complete
is None:
211 elif hasattr(self
._on
_complete
, '__name__'):
212 name
= getattr(self
._on
_complete
, '__name__')
214 name
= self
._on
_complete
.__class
__.__name
__
215 val
= repr(self
._value
) if self
._value
not in (self
.NO_RESULT
, self
.ASYNC_RESULT
) else '...'
217 self
.INITIALIZED
: ' ',
218 self
.RUNNING
: ' >>>',
219 self
.FINISHED
: '(done)'
221 return '{} {}({}),'.format(prefix
, name
, val
)
223 def then(self
, on_complete
):
224 # type: (Any, Callable) -> Any
226 Call ``on_complete`` as soon as this promise is finalized.
228 assert self
._state
in (self
.INITIALIZED
, self
.RUNNING
)
230 if self
._next
_promise
is not None:
231 return self
._next
_promise
.then(on_complete
)
233 if self
._on
_complete
is not None:
234 self
._set
_next
_promise
(self
.__class
__(
235 _first_promise
=self
._first
_promise
,
236 on_complete
=on_complete
238 return self
._next
_promise
241 self
._on
_complete
= on_complete
242 self
._set
_next
_promise
(self
.__class
__(_first_promise
=self
._first
_promise
))
243 return self
._next
_promise
245 def _set_next_promise(self
, next
):
246 # type: (_Promise) -> None
247 assert self
is not next
248 assert self
._state
in (self
.INITIALIZED
, self
.RUNNING
)
250 self
._next
_promise
= next
251 assert self
._next
_promise
is not None
252 for p
in iter(self
._next
_promise
):
253 p
._first
_promise
= self
._first
_promise
255 def _finalize(self
, value
=NO_RESULT
):
257 Sets this promise to complete.
259 Orchestrators may choose to use this helper function.
261 :param value: new value.
263 if self
._state
not in (self
.INITIALIZED
, self
.RUNNING
):
264 raise ValueError('finalize: {} already finished. {}'.format(repr(self
), value
))
266 self
._state
= self
.RUNNING
268 if value
is not self
.NO_RESULT
:
270 assert self
._value
is not self
.NO_RESULT
, repr(self
)
272 if self
._on
_complete
:
274 next_result
= self
._on
_complete
(self
._value
)
275 except Exception as e
:
279 next_result
= self
._value
281 if isinstance(next_result
, _Promise
):
282 # hack: _Promise is not a continuation monad.
283 next_result
= next_result
._first
_promise
# type: ignore
284 assert next_result
not in self
, repr(self
._first
_promise
) + repr(next_result
)
285 assert self
not in next_result
286 next_result
._append
_promise
(self
._next
_promise
)
287 self
._set
_next
_promise
(next_result
)
288 assert self
._next
_promise
289 if self
._next
_promise
._value
is self
.NO_RESULT
:
290 self
._next
_promise
._value
= self
._value
291 self
.propagate_to_next()
292 elif next_result
is not self
.ASYNC_RESULT
:
293 # simple map. simply forward
294 if self
._next
_promise
:
295 self
._next
_promise
._value
= next_result
297 # Hack: next_result is of type U, _value is of type T
298 self
._value
= next_result
# type: ignore
299 self
.propagate_to_next()
301 # asynchronous promise
305 def propagate_to_next(self
):
306 self
._state
= self
.FINISHED
307 logger
.debug('finalized {}'.format(repr(self
)))
308 if self
._next
_promise
:
309 self
._next
_promise
._finalize
()
312 # type: (Exception) -> None
314 Sets the whole completion to be faild with this exception and end the
317 if self
._state
== self
.FINISHED
:
319 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e
)))
320 assert self
._state
in (self
.INITIALIZED
, self
.RUNNING
)
321 logger
.exception('_Promise failed')
323 self
._value
= f
'_exception: {e}'
324 if self
._next
_promise
:
325 self
._next
_promise
.fail(e
)
326 self
._state
= self
.FINISHED
328 def __contains__(self
, item
):
329 return any(item
is p
for p
in iter(self
._first
_promise
))
333 elem
= self
._next
_promise
334 while elem
is not None:
336 elem
= elem
._next
_promise
338 def _append_promise(self
, other
):
339 if other
is not None:
340 assert self
not in other
341 assert other
not in self
342 self
._last
_promise
()._set
_next
_promise
(other
)
344 def _last_promise(self
):
345 # type: () -> _Promise
346 return list(iter(self
))[-1]
349 class ProgressReference(object):
353 completion
=None # type: Optional[Callable[[], Completion]]
356 ProgressReference can be used within Completions::
358 +---------------+ +---------------------------------+
360 | My Completion | +--> | on_complete=ProgressReference() |
362 +---------------+ +---------------------------------+
364 See :func:`Completion.with_progress` for an easy way to create
368 super(ProgressReference
, self
).__init
__()
369 self
.progress_id
= str(uuid
.uuid4())
370 self
.message
= message
373 #: The completion can already have a result, before the write
374 #: operation is effective. progress == 1 means, the services are
375 #: created / removed.
376 self
.completion
= completion
# type: Optional[Callable[[], Completion]]
378 #: if a orchestrator module can provide a more detailed
379 #: progress information, it needs to also call ``progress.update()``.
382 self
._completion
_has
_result
= False
383 self
.mgr
.all_progress_references
.append(self
)
387 ``__str__()`` is used for determining the message for progress events.
389 return self
.message
or super(ProgressReference
, self
).__str
__()
391 def __call__(self
, arg
):
392 self
._completion
_has
_result
= True
398 return self
._progress
401 def progress(self
, progress
):
402 assert progress
<= 1.0
403 self
._progress
= progress
406 self
.mgr
.remote("progress", "complete", self
.progress_id
)
407 self
.mgr
.all_progress_references
= [p
for p
in self
.mgr
.all_progress_references
if p
is not self
]
409 self
.mgr
.remote("progress", "update", self
.progress_id
, self
.message
,
411 [("origin", "orchestrator")])
413 # If the progress module is disabled that's fine,
414 # they just won't see the output.
419 return self
.progress
== 1 and self
._completion
_has
_result
422 def progress_run(progress
):
423 self
.progress
= progress
425 c
= self
.completion().then(progress_run
)
426 self
.mgr
.process([c
._first
_promise
])
431 self
._completion
_has
_result
= True
435 class Completion(_Promise
):
437 Combines multiple promises into one overall operation.
439 Completions are composable by being able to
440 call one completion from another completion. I.e. making them re-usable
441 using Promises E.g.::
444 ... return Orchestrator().get_hosts().then(self._create_osd)
446 where ``get_hosts`` returns a Completion of list of hosts and
447 ``_create_osd`` takes a list of hosts.
449 The concept behind this is to store the computation steps
450 explicit and then explicitly evaluate the chain:
453 ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
455 ... assert p.result = "4"
459 +---------------+ +-----------------+
461 | lambda x: x*x | +--> | lambda x: str(x)|
463 +---------------+ +-----------------+
467 _first_promise
=None, # type: Optional["Completion"]
468 value
=_Promise
.NO_RESULT
, # type: Any
469 on_complete
=None, # type: Optional[Callable]
470 name
=None, # type: Optional[str]
472 super(Completion
, self
).__init
__(_first_promise
, value
, on_complete
, name
)
475 def _progress_reference(self
):
476 # type: () -> Optional[ProgressReference]
477 if hasattr(self
._on
_complete
, 'progress_id'):
478 return self
._on
_complete
# type: ignore
482 def progress_reference(self
):
483 # type: () -> Optional[ProgressReference]
485 ProgressReference. Marks this completion
486 as a write completeion.
489 references
= [c
._progress
_reference
for c
in iter(self
) if c
._progress
_reference
is not None]
491 assert len(references
) == 1
496 def with_progress(cls
, # type: Any
499 _first_promise
=None, # type: Optional["Completion"]
500 value
=_Promise
.NO_RESULT
, # type: Any
501 on_complete
=None, # type: Optional[Callable]
502 calc_percent
=None # type: Optional[Callable[[], Any]]
507 _first_promise
=_first_promise
,
509 on_complete
=on_complete
510 ).add_progress(message
, mgr
, calc_percent
)
512 return c
._first
_promise
514 def add_progress(self
,
517 calc_percent
=None # type: Optional[Callable[[], Any]]
520 on_complete
=ProgressReference(
523 completion
=calc_percent
528 super(Completion
, self
).fail(e
)
529 if self
._progress
_reference
:
530 self
._progress
_reference
.fail()
532 def finalize(self
, result
=_Promise
.NO_RESULT
):
533 if self
._first
_promise
._state
== self
.INITIALIZED
:
534 self
._first
_promise
._finalize
(result
)
539 The result of the operation that we were waited
540 for. Only valid after calling Orchestrator.process() on this
543 last
= self
._last
_promise
()
544 assert last
._state
== _Promise
.FINISHED
547 def result_str(self
):
548 """Force a string."""
549 if self
.result
is None:
551 if isinstance(self
.result
, list):
552 return '\n'.join(str(x
) for x
in self
.result
)
553 return str(self
.result
)
557 # type: () -> Optional[Exception]
558 return self
._last
_promise
()._exception
561 def serialized_exception(self
):
562 # type: () -> Optional[bytes]
563 return self
._last
_promise
()._serialized
_exception
566 def has_result(self
):
569 Has the operation already a result?
571 For Write operations, it can already have a
572 result, if the orchestrator's configuration is
573 persistently written. Typically this would
574 indicate that an update had been written to
575 a manifest, but that the update had not
576 necessarily been pushed out to the cluster.
580 return self
._last
_promise
()._state
== _Promise
.FINISHED
583 def is_errored(self
):
586 Has the completion failed. Default implementation looks for
587 self.exception. Can be overwritten.
589 return self
.exception
is not None
592 def needs_result(self
):
595 Could the external operation be deemed as complete,
597 We must wait for a read operation only if it is not complete.
599 return not self
.is_errored
and not self
.has_result
602 def is_finished(self
):
605 Could the external operation be deemed as complete,
607 We must wait for a read operation only if it is not complete.
609 return self
.is_errored
or (self
.has_result
)
611 def pretty_print(self
):
613 reprs
= '\n'.join(p
.pretty_print_1() for p
in iter(self
._first
_promise
))
614 return """<{}>[\n{}\n]""".format(self
.__class
__.__name
__, reprs
)
617 def pretty_print(completions
):
618 # type: (Sequence[Completion]) -> str
619 return ', '.join(c
.pretty_print() for c
in completions
)
622 def raise_if_exception(c
):
623 # type: (Completion) -> None
625 :raises OrchestratorError: Some user error or a config error.
626 :raises Exception: Some internal error
628 if c
.serialized_exception
is not None:
630 e
= pickle
.loads(c
.serialized_exception
)
631 except (KeyError, AttributeError):
632 raise Exception('{}: {}'.format(type(c
.exception
), c
.exception
))
636 class TrivialReadCompletion(Completion
):
638 This is the trivial completion simply wrapping a result.
640 def __init__(self
, result
):
641 super(TrivialReadCompletion
, self
).__init
__()
643 self
.finalize(result
)
646 def _hide_in_features(f
):
647 f
._hide
_in
_features
= True
651 class Orchestrator(object):
653 Calls in this class may do long running remote operations, with time
654 periods ranging from network latencies to package install latencies and large
655 internet downloads. For that reason, all are asynchronous, and return
656 ``Completion`` objects.
658 Methods should only return the completion and not directly execute
659 anything, like network calls. Otherwise the purpose of
660 those completions is defeated.
662 Implementations are not required to start work on an operation until
663 the caller waits on the relevant Completion objects. Callers making
664 multiple updates should not wait on Completions until they're done
665 sending operations: this enables implementations to batch up a series
666 of updates when wait() is called on a set of Completion objects.
668 Implementations are encouraged to keep reasonably fresh caches of
669 the status of the system: it is better to serve a stale-but-recent
670 result read of e.g. device inventory than it is to keep the caller waiting
671 while you scan hosts every time.
675 def is_orchestrator_module(self
):
677 Enable other modules to interrogate this module to discover
678 whether it's usable as an orchestrator module.
680 Subclasses do not need to override this.
686 # type: () -> Tuple[bool, str]
688 Report whether we can talk to the orchestrator. This is the
689 place to give the user a meaningful message if the orchestrator
690 isn't running or can't be contacted.
692 This method may be called frequently (e.g. every page load
693 to conditionally display a warning banner), so make sure it's
694 not too expensive. It's okay to give a slightly stale status
695 (e.g. based on a periodic background ping of the orchestrator)
696 if that's necessary to make this method fast.
699 `True` doesn't mean that the desired functionality
700 is actually available in the orchestrator. I.e. this
701 won't work as expected::
704 ... if OrchestratorClientMixin().available()[0]: # wrong.
705 ... OrchestratorClientMixin().get_hosts()
707 :return: two-tuple of boolean, string
709 raise NotImplementedError()
712 def process(self
, completions
):
713 # type: (List[Completion]) -> None
715 Given a list of Completion instances, process any which are
718 Callers should inspect the detail of each completion to identify
719 partial completion/progress information, and present that information
722 This method should not block, as this would make it slow to query
723 a status, while other long running operations are in progress.
725 raise NotImplementedError()
728 def get_feature_set(self
):
729 """Describes which methods this orchestrator implements
732 `True` doesn't mean that the desired functionality
733 is actually possible in the orchestrator. I.e. this
734 won't work as expected::
737 ... api = OrchestratorClientMixin()
738 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
741 It's better to ask for forgiveness instead::
745 ... OrchestratorClientMixin().get_hosts()
746 ... except (OrchestratorError, NotImplementedError):
749 :returns: Dict of API method names to ``{'available': True or False}``
751 module
= self
.__class
__
752 features
= {a
: {'available': getattr(Orchestrator
, a
, None) != getattr(module
, a
)}
753 for a
in Orchestrator
.__dict
__
754 if not a
.startswith('_') and not getattr(getattr(Orchestrator
, a
), '_hide_in_features', False)
758 def cancel_completions(self
):
761 Cancels ongoing completions. Unstuck the mgr.
763 raise NotImplementedError()
767 raise NotImplementedError()
771 raise NotImplementedError()
773 def add_host(self
, host_spec
):
774 # type: (HostSpec) -> Completion
776 Add a host to the orchestrator inventory.
778 :param host: hostname
780 raise NotImplementedError()
782 def remove_host(self
, host
):
783 # type: (str) -> Completion
785 Remove a host from the orchestrator inventory.
787 :param host: hostname
789 raise NotImplementedError()
791 def update_host_addr(self
, host
, addr
):
792 # type: (str, str) -> Completion
794 Update a host's address
796 :param host: hostname
797 :param addr: address (dns name or IP)
799 raise NotImplementedError()
802 # type: () -> Completion
804 Report the hosts in the cluster.
806 :return: list of HostSpec
808 raise NotImplementedError()
810 def add_host_label(self
, host
, label
):
811 # type: (str, str) -> Completion
815 raise NotImplementedError()
817 def remove_host_label(self
, host
, label
):
818 # type: (str, str) -> Completion
822 raise NotImplementedError()
824 def get_inventory(self
, host_filter
=None, refresh
=False):
825 # type: (Optional[InventoryFilter], bool) -> Completion
827 Returns something that was created by `ceph-volume inventory`.
829 :return: list of InventoryHost
831 raise NotImplementedError()
833 def describe_service(self
, service_type
=None, service_name
=None, refresh
=False):
834 # type: (Optional[str], Optional[str], bool) -> Completion
836 Describe a service (of any kind) that is already configured in
837 the orchestrator. For example, when viewing an OSD in the dashboard
838 we might like to also display information about the orchestrator's
839 view of the service (like the kubernetes pod ID).
841 When viewing a CephFS filesystem in the dashboard, we would use this
842 to display the pods being currently run for MDS daemons.
844 :return: list of ServiceDescription objects.
846 raise NotImplementedError()
848 def list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None, host
=None, refresh
=False):
849 # type: (Optional[str], Optional[str], Optional[str], Optional[str], bool) -> Completion
851 Describe a daemon (of any kind) that is already configured in
854 :return: list of DaemonDescription objects.
856 raise NotImplementedError()
858 def apply(self
, specs
: List
["GenericSpec"]) -> Completion
:
862 fns
: Dict
[str, function
] = {
863 'alertmanager': self
.apply_alertmanager
,
864 'crash': self
.apply_crash
,
865 'grafana': self
.apply_grafana
,
866 'iscsi': self
.apply_iscsi
,
867 'mds': self
.apply_mds
,
868 'mgr': self
.apply_mgr
,
869 'mon': self
.apply_mon
,
870 'nfs': self
.apply_nfs
,
871 'node-exporter': self
.apply_node_exporter
,
872 'osd': lambda dg
: self
.apply_drivegroups([dg
]),
873 'prometheus': self
.apply_prometheus
,
874 'rbd-mirror': self
.apply_rbd_mirror
,
875 'rgw': self
.apply_rgw
,
876 'host': self
.add_host
,
880 if isinstance(ls
, list):
886 fn
= cast(Callable
[["GenericSpec"], Completion
], fns
[spec
.service_type
])
887 completion
= fn(spec
)
890 fn
= cast(Callable
[["GenericSpec"], Completion
], fns
[spec
.service_type
])
891 return fn(s
).then(lambda r
: merge(ls
, r
))
892 completion
= completion
.then(next
)
895 def remove_daemons(self
, names
):
896 # type: (List[str]) -> Completion
898 Remove specific daemon(s).
902 raise NotImplementedError()
904 def remove_service(self
, service_name
):
905 # type: (str) -> Completion
907 Remove a service (a collection of daemons).
911 raise NotImplementedError()
913 def service_action(self
, action
, service_name
):
914 # type: (str, str) -> Completion
916 Perform an action (start/stop/reload) on a service (i.e., all daemons
917 providing the logical service).
919 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
920 :param service_type: e.g. "mds", "rgw", ...
921 :param service_name: name of logical service ("cephfs", "us-east", ...)
924 #assert action in ["start", "stop", "reload, "restart", "redeploy"]
925 raise NotImplementedError()
927 def daemon_action(self
, action
, daemon_type
, daemon_id
):
928 # type: (str, str, str) -> Completion
930 Perform an action (start/stop/reload) on a daemon.
932 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
933 :param name: name of daemon
936 #assert action in ["start", "stop", "reload, "restart", "redeploy"]
937 raise NotImplementedError()
939 def create_osds(self
, drive_group
):
940 # type: (DriveGroupSpec) -> Completion
942 Create one or more OSDs within a single Drive Group.
944 The principal argument here is the drive_group member
945 of OsdSpec: other fields are advisory/extensible for any
946 finer-grained OSD feature enablement (choice of backing store,
947 compression/encryption, etc).
949 raise NotImplementedError()
951 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> Completion
:
952 """ Update OSD cluster """
953 raise NotImplementedError()
955 def set_unmanaged_flag(self
,
956 unmanaged_flag
: bool,
957 service_type
: str = 'osd',
959 ) -> HandleCommandResult
:
960 raise NotImplementedError()
962 def preview_osdspecs(self
,
963 osdspec_name
: Optional
[str] = 'osd',
964 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
966 """ Get a preview for OSD deployments """
967 raise NotImplementedError()
969 def remove_osds(self
, osd_ids
: List
[str],
970 replace
: bool = False,
971 force
: bool = False) -> Completion
:
973 :param osd_ids: list of OSD IDs
974 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
975 :param force: Forces the OSD removal process without waiting for the data to be drained first.
976 Note that this can only remove OSDs that were successfully
977 created (i.e. got an OSD ID).
979 raise NotImplementedError()
981 def remove_osds_status(self
):
982 # type: () -> Completion
984 Returns a status of the ongoing OSD removal operations.
986 raise NotImplementedError()
988 def blink_device_light(self
, ident_fault
, on
, locations
):
989 # type: (str, bool, List[DeviceLightLoc]) -> Completion
991 Instructs the orchestrator to enable or disable either the ident or the fault LED.
993 :param ident_fault: either ``"ident"`` or ``"fault"``
994 :param on: ``True`` = on.
995 :param locations: See :class:`orchestrator.DeviceLightLoc`
997 raise NotImplementedError()
999 def zap_device(self
, host
, path
):
1000 # type: (str, str) -> Completion
1001 """Zap/Erase a device (DESTROYS DATA)"""
1002 raise NotImplementedError()
1004 def add_mon(self
, spec
):
1005 # type: (ServiceSpec) -> Completion
1006 """Create mon daemon(s)"""
1007 raise NotImplementedError()
1009 def apply_mon(self
, spec
):
1010 # type: (ServiceSpec) -> Completion
1011 """Update mon cluster"""
1012 raise NotImplementedError()
1014 def add_mgr(self
, spec
):
1015 # type: (ServiceSpec) -> Completion
1016 """Create mgr daemon(s)"""
1017 raise NotImplementedError()
1019 def apply_mgr(self
, spec
):
1020 # type: (ServiceSpec) -> Completion
1021 """Update mgr cluster"""
1022 raise NotImplementedError()
1024 def add_mds(self
, spec
):
1025 # type: (ServiceSpec) -> Completion
1026 """Create MDS daemon(s)"""
1027 raise NotImplementedError()
1029 def apply_mds(self
, spec
):
1030 # type: (ServiceSpec) -> Completion
1031 """Update MDS cluster"""
1032 raise NotImplementedError()
1034 def add_rgw(self
, spec
):
1035 # type: (RGWSpec) -> Completion
1036 """Create RGW daemon(s)"""
1037 raise NotImplementedError()
1039 def apply_rgw(self
, spec
):
1040 # type: (RGWSpec) -> Completion
1041 """Update RGW cluster"""
1042 raise NotImplementedError()
1044 def add_rbd_mirror(self
, spec
):
1045 # type: (ServiceSpec) -> Completion
1046 """Create rbd-mirror daemon(s)"""
1047 raise NotImplementedError()
1049 def apply_rbd_mirror(self
, spec
):
1050 # type: (ServiceSpec) -> Completion
1051 """Update rbd-mirror cluster"""
1052 raise NotImplementedError()
1054 def add_nfs(self
, spec
):
1055 # type: (NFSServiceSpec) -> Completion
1056 """Create NFS daemon(s)"""
1057 raise NotImplementedError()
1059 def apply_nfs(self
, spec
):
1060 # type: (NFSServiceSpec) -> Completion
1061 """Update NFS cluster"""
1062 raise NotImplementedError()
1064 def add_iscsi(self
, spec
):
1065 # type: (IscsiServiceSpec) -> Completion
1066 """Create iscsi daemon(s)"""
1067 raise NotImplementedError()
1069 def apply_iscsi(self
, spec
):
1070 # type: (IscsiServiceSpec) -> Completion
1071 """Update iscsi cluster"""
1072 raise NotImplementedError()
1074 def add_prometheus(self
, spec
):
1075 # type: (ServiceSpec) -> Completion
1076 """Create new prometheus daemon"""
1077 raise NotImplementedError()
1079 def apply_prometheus(self
, spec
):
1080 # type: (ServiceSpec) -> Completion
1081 """Update prometheus cluster"""
1082 raise NotImplementedError()
1084 def add_node_exporter(self
, spec
):
1085 # type: (ServiceSpec) -> Completion
1086 """Create a new Node-Exporter service"""
1087 raise NotImplementedError()
1089 def apply_node_exporter(self
, spec
):
1090 # type: (ServiceSpec) -> Completion
1091 """Update existing a Node-Exporter daemon(s)"""
1092 raise NotImplementedError()
1094 def add_crash(self
, spec
):
1095 # type: (ServiceSpec) -> Completion
1096 """Create a new crash service"""
1097 raise NotImplementedError()
1099 def apply_crash(self
, spec
):
1100 # type: (ServiceSpec) -> Completion
1101 """Update existing a crash daemon(s)"""
1102 raise NotImplementedError()
1104 def add_grafana(self
, spec
):
1105 # type: (ServiceSpec) -> Completion
1106 """Create a new Node-Exporter service"""
1107 raise NotImplementedError()
1109 def apply_grafana(self
, spec
):
1110 # type: (ServiceSpec) -> Completion
1111 """Update existing a Node-Exporter daemon(s)"""
1112 raise NotImplementedError()
1114 def add_alertmanager(self
, spec
):
1115 # type: (ServiceSpec) -> Completion
1116 """Create a new AlertManager service"""
1117 raise NotImplementedError()
1119 def apply_alertmanager(self
, spec
):
1120 # type: (ServiceSpec) -> Completion
1121 """Update an existing AlertManager daemon(s)"""
1122 raise NotImplementedError()
1124 def upgrade_check(self
, image
, version
):
1125 # type: (Optional[str], Optional[str]) -> Completion
1126 raise NotImplementedError()
1128 def upgrade_start(self
, image
, version
):
1129 # type: (Optional[str], Optional[str]) -> Completion
1130 raise NotImplementedError()
1132 def upgrade_pause(self
):
1133 # type: () -> Completion
1134 raise NotImplementedError()
1136 def upgrade_resume(self
):
1137 # type: () -> Completion
1138 raise NotImplementedError()
1140 def upgrade_stop(self
):
1141 # type: () -> Completion
1142 raise NotImplementedError()
1144 def upgrade_status(self
):
1145 # type: () -> Completion
1147 If an upgrade is currently underway, report on where
1148 we are in the process, or if some error has occurred.
1150 :return: UpgradeStatusSpec instance
1152 raise NotImplementedError()
1155 def upgrade_available(self
):
1156 # type: () -> Completion
1158 Report on what versions are available to upgrade to
1160 :return: List of strings
1162 raise NotImplementedError()
1165 class HostSpec(object):
1167 Information about hosts. Like e.g. ``kubectl get nodes``
1170 hostname
, # type: str
1171 addr
=None, # type: Optional[str]
1172 labels
=None, # type: Optional[List[str]]
1173 status
=None, # type: Optional[str]
1175 self
.service_type
= 'host'
1177 #: the bare hostname on the host. Not the FQDN.
1178 self
.hostname
= hostname
# type: str
1180 #: DNS name or IP address to reach it
1181 self
.addr
= addr
or hostname
# type: str
1184 self
.labels
= labels
or [] # type: List[str]
1186 #: human readable status
1187 self
.status
= status
or '' # type: str
1191 'hostname': self
.hostname
,
1193 'labels': self
.labels
,
1194 'status': self
.status
,
1198 def from_json(cls
, host_spec
):
1199 _cls
= cls(host_spec
['hostname'],
1200 host_spec
['addr'] if 'addr' in host_spec
else None,
1201 host_spec
['labels'] if 'labels' in host_spec
else None)
1205 args
= [self
.hostname
] # type: List[Any]
1206 if self
.addr
is not None:
1207 args
.append(self
.addr
)
1209 args
.append(self
.labels
)
1211 args
.append(self
.status
)
1213 return "<HostSpec>({})".format(', '.join(map(repr, args
)))
1215 def __eq__(self
, other
):
1216 # Let's omit `status` for the moment, as it is still the very same host.
1217 return self
.hostname
== other
.hostname
and \
1218 self
.addr
== other
.addr
and \
1219 self
.labels
== other
.labels
1221 GenericSpec
= Union
[ServiceSpec
, HostSpec
]
1223 def json_to_generic_spec(spec
):
1224 # type: (dict) -> GenericSpec
1225 if 'service_type' in spec
and spec
['service_type'] == 'host':
1226 return HostSpec
.from_json(spec
)
1228 return ServiceSpec
.from_json(spec
)
1230 class UpgradeStatusSpec(object):
1231 # Orchestrator's report on what's going on with any ongoing upgrade
1233 self
.in_progress
= False # Is an upgrade underway?
1234 self
.target_image
= None
1235 self
.services_complete
= [] # Which daemon types are fully updated?
1236 self
.message
= "" # Freeform description
1239 def handle_type_error(method
):
1241 def inner(cls
, *args
, **kwargs
):
1243 return method(cls
, *args
, **kwargs
)
1244 except TypeError as e
:
1245 error_msg
= '{}: {}'.format(cls
.__name
__, e
)
1246 raise OrchestratorValidationError(error_msg
)
1250 class DaemonDescription(object):
1252 For responding to queries about the status of a particular daemon,
1253 stateful or stateless.
1255 This is not about health or performance monitoring of daemons: it's
1256 about letting the orchestrator tell Ceph whether and where a
1257 daemon is scheduled in the cluster. When an orchestrator tells
1258 Ceph "it's running on host123", that's not a promise that the process
1259 is literally up this second, it's a description of where the orchestrator
1260 has decided the daemon should run.
1268 container_image_id
=None,
1269 container_image_name
=None,
1276 last_configured
=None,
1277 osdspec_affinity
=None,
1278 last_deployed
=None):
1279 # Host is at the same granularity as InventoryHost
1280 self
.hostname
= hostname
1282 # Not everyone runs in containers, but enough people do to
1283 # justify having the container_id (runtime id) and container_image
1285 self
.container_id
= container_id
# runtime id
1286 self
.container_image_id
= container_image_id
# image hash
1287 self
.container_image_name
= container_image_name
# image friendly name
1289 # The type of service (osd, mon, mgr, etc.)
1290 self
.daemon_type
= daemon_type
1292 # The orchestrator will have picked some names for daemons,
1293 # typically either based on hostnames or on pod names.
1294 # This is the <foo> in mds.<foo>, the ID that will appear
1295 # in the FSMap/ServiceMap.
1296 self
.daemon_id
= daemon_id
1298 # Service version that was deployed
1299 self
.version
= version
1301 # Service status: -1 error, 0 stopped, 1 running
1302 self
.status
= status
1304 # Service status description when status == -1.
1305 self
.status_desc
= status_desc
1307 # datetime when this info was last refreshed
1308 self
.last_refresh
= last_refresh
# type: Optional[datetime.datetime]
1310 self
.created
= created
# type: Optional[datetime.datetime]
1311 self
.started
= started
# type: Optional[datetime.datetime]
1312 self
.last_configured
= last_configured
# type: Optional[datetime.datetime]
1313 self
.last_deployed
= last_deployed
# type: Optional[datetime.datetime]
1315 # Affinity to a certain OSDSpec
1316 self
.osdspec_affinity
= osdspec_affinity
# type: Optional[str]
1319 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
1321 def matches_service(self
, service_name
):
1322 # type: (Optional[str]) -> bool
1324 return self
.name().startswith(service_name
+ '.')
1327 def service_id(self
):
1329 err
= OrchestratorError("DaemonDescription: Cannot calculate service_id: " \
1330 f
"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
1332 if not self
.hostname
:
1333 # TODO: can a DaemonDescription exist without a hostname?
1336 # use the bare hostname, not the FQDN.
1337 host
= self
.hostname
.split('.')[0]
1339 if host
== self
.daemon_id
:
1340 # daemon_id == "host"
1341 return self
.daemon_id
1343 elif host
in self
.daemon_id
:
1344 # daemon_id == "service_id.host"
1345 # daemon_id == "service_id.host.random"
1346 pre
, post
= self
.daemon_id
.rsplit(host
, 1)
1347 if not pre
.endswith('.'):
1348 # '.' sep missing at front of host
1350 elif post
and not post
.startswith('.'):
1351 # '.' sep missing at end of host
1355 # daemon_id == "service_id.random"
1356 if self
.daemon_type
== 'rgw':
1357 v
= self
.daemon_id
.split('.')
1358 if len(v
) in [3, 4]:
1359 return '.'.join(v
[0:2])
1361 # daemon_id == "service_id"
1362 return self
.daemon_id
1364 if self
.daemon_type
in ['mds', 'nfs', 'iscsi', 'rgw']:
1367 return self
.daemon_id
1369 def service_name(self
):
1370 if self
.daemon_type
in ['rgw', 'mds', 'nfs', 'iscsi']:
1371 return f
'{self.daemon_type}.{self.service_id()}'
1372 return self
.daemon_type
1375 return "<DaemonDescription>({type}.{id})".format(type=self
.daemon_type
,
1380 'hostname': self
.hostname
,
1381 'container_id': self
.container_id
,
1382 'container_image_id': self
.container_image_id
,
1383 'container_image_name': self
.container_image_name
,
1384 'daemon_id': self
.daemon_id
,
1385 'daemon_type': self
.daemon_type
,
1386 'version': self
.version
,
1387 'status': self
.status
,
1388 'status_desc': self
.status_desc
,
1390 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1392 if getattr(self
, k
):
1393 out
[k
] = getattr(self
, k
).strftime(DATEFMT
)
1394 return {k
: v
for (k
, v
) in out
.items() if v
is not None}
1398 def from_json(cls
, data
):
1400 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1403 c
[k
] = datetime
.datetime
.strptime(c
[k
], DATEFMT
)
1407 # feel free to change this:
1408 return DaemonDescription
.from_json(self
.to_json())
1410 class ServiceDescription(object):
1412 For responding to queries about the status of a particular service,
1413 stateful or stateless.
1415 This is not about health or performance monitoring of services: it's
1416 about letting the orchestrator tell Ceph whether and where a
1417 service is scheduled in the cluster. When an orchestrator tells
1418 Ceph "it's running on host123", that's not a promise that the process
1419 is literally up this second, it's a description of where the orchestrator
1420 has decided the service should run.
1425 container_image_id
=None,
1426 container_image_name
=None,
1427 rados_config_location
=None,
1433 # Not everyone runs in containers, but enough people do to
1434 # justify having the container_image_id (image hash) and container_image
1436 self
.container_image_id
= container_image_id
# image hash
1437 self
.container_image_name
= container_image_name
# image friendly name
1439 # Location of the service configuration when stored in rados
1440 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1441 self
.rados_config_location
= rados_config_location
1443 # If the service exposes REST-like API, this attribute should hold
1445 self
.service_url
= service_url
1450 # Number of daemons up
1451 self
.running
= running
1453 # datetime when this info was last refreshed
1454 self
.last_refresh
= last_refresh
# type: Optional[datetime.datetime]
1455 self
.created
= created
# type: Optional[datetime.datetime]
1457 self
.spec
: ServiceSpec
= spec
1459 def service_type(self
):
1460 return self
.spec
.service_type
1463 return f
"<ServiceDescription of {self.spec.one_line_str()}>"
1466 out
= self
.spec
.to_json()
1468 'container_image_id': self
.container_image_id
,
1469 'container_image_name': self
.container_image_name
,
1470 'rados_config_location': self
.rados_config_location
,
1471 'service_url': self
.service_url
,
1473 'running': self
.running
,
1474 'last_refresh': self
.last_refresh
,
1475 'created': self
.created
1477 for k
in ['last_refresh', 'created']:
1478 if getattr(self
, k
):
1479 status
[k
] = getattr(self
, k
).strftime(DATEFMT
)
1480 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1481 out
['status'] = status
1486 def from_json(cls
, data
: dict):
1488 status
= c
.pop('status', {})
1489 spec
= ServiceSpec
.from_json(c
)
1491 c_status
= status
.copy()
1492 for k
in ['last_refresh', 'created']:
1494 c_status
[k
] = datetime
.datetime
.strptime(c_status
[k
], DATEFMT
)
1495 return cls(spec
=spec
, **c_status
)
1498 class InventoryFilter(object):
1500 When fetching inventory, use this filter to avoid unnecessarily
1501 scanning the whole estate.
1503 Typical use: filter by host when presenting UI workflow for configuring
1504 a particular server.
1505 filter by label when not all of estate is Ceph servers,
1506 and we want to only learn about the Ceph servers.
1507 filter by label when we are interested particularly
1508 in e.g. OSD servers.
1511 def __init__(self
, labels
=None, hosts
=None):
1512 # type: (Optional[List[str]], Optional[List[str]]) -> None
1514 #: Optional: get info about hosts matching labels
1515 self
.labels
= labels
1517 #: Optional: get info about certain named hosts only
1521 class InventoryHost(object):
1523 When fetching inventory, all Devices are groups inside of an
1526 def __init__(self
, name
, devices
=None, labels
=None, addr
=None):
1527 # type: (str, Optional[inventory.Devices], Optional[List[str]], Optional[str]) -> None
1529 devices
= inventory
.Devices([])
1532 assert isinstance(devices
, inventory
.Devices
)
1534 self
.name
= name
# unique within cluster. For example a hostname.
1535 self
.addr
= addr
or name
1536 self
.devices
= devices
1537 self
.labels
= labels
1543 'devices': self
.devices
.to_json(),
1544 'labels': self
.labels
,
1548 def from_json(cls
, data
):
1550 _data
= copy
.deepcopy(data
)
1551 name
= _data
.pop('name')
1552 addr
= _data
.pop('addr', None) or name
1553 devices
= inventory
.Devices
.from_json(_data
.pop('devices'))
1554 labels
= _data
.pop('labels', list())
1556 error_msg
= 'Unknown key(s) in Inventory: {}'.format(','.join(_data
.keys()))
1557 raise OrchestratorValidationError(error_msg
)
1558 return cls(name
, devices
, labels
, addr
)
1559 except KeyError as e
:
1560 error_msg
= '{} is required for {}'.format(e
, cls
.__name
__)
1561 raise OrchestratorValidationError(error_msg
)
1562 except TypeError as e
:
1563 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e
))
1567 def from_nested_items(cls
, hosts
):
1568 devs
= inventory
.Devices
.from_json
1569 return [cls(item
[0], devs(item
[1].data
)) for item
in hosts
]
1572 return "<InventoryHost>({name})".format(name
=self
.name
)
1575 def get_host_names(hosts
):
1576 # type: (List[InventoryHost]) -> List[str]
1577 return [host
.name
for host
in hosts
]
1579 def __eq__(self
, other
):
1580 return self
.name
== other
.name
and self
.devices
== other
.devices
1583 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1585 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1588 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1590 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1591 See ``ceph osd metadata | jq '.[].device_ids'``
1596 def _mk_orch_methods(cls
):
1597 # Needs to be defined outside of for.
1598 # Otherwise meth is always bound to last key
1599 def shim(method_name
):
1600 def inner(self
, *args
, **kwargs
):
1601 completion
= self
._oremote
(method_name
, args
, kwargs
)
1605 for meth
in Orchestrator
.__dict
__:
1606 if not meth
.startswith('_') and meth
not in ['is_orchestrator_module']:
1607 setattr(cls
, meth
, shim(meth
))
1612 class OrchestratorClientMixin(Orchestrator
):
1614 A module that inherents from `OrchestratorClientMixin` can directly call
1615 all :class:`Orchestrator` methods without manually calling remote.
1617 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1618 calls :func:`OrchestratorClientMixin._oremote`
1620 >>> class MyModule(OrchestratorClientMixin):
1622 ... completion = self.add_host('somehost') # calls `_oremote()`
1623 ... self._orchestrator_wait([completion])
1624 ... self.log.debug(completion.result)
1626 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1627 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1628 "real" implementation of the orchestrator.
1631 >>> import mgr_module
1633 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1634 ... def __init__(self, ...):
1635 ... self.orch_client = OrchestratorClientMixin()
1636 ... self.orch_client.set_mgr(self.mgr))
1639 def set_mgr(self
, mgr
):
1640 # type: (MgrModule) -> None
1642 Useable in the Dashbord that uses a global ``mgr``
1645 self
.__mgr
= mgr
# Make sure we're not overwriting any other `mgr` properties
1647 def __get_mgr(self
):
1650 except AttributeError:
1653 def _oremote(self
, meth
, args
, kwargs
):
1655 Helper for invoking `remote` on whichever orchestrator is enabled
1657 :raises RuntimeError: If the remote method failed.
1658 :raises OrchestratorError: orchestrator failed to perform
1659 :raises ImportError: no `orchestrator` module or backend not found.
1661 mgr
= self
.__get
_mgr
()
1664 o
= mgr
._select
_orchestrator
()
1665 except AttributeError:
1666 o
= mgr
.remote('orchestrator', '_select_orchestrator')
1669 raise NoOrchestrator()
1671 mgr
.log
.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr
.module_name
, o
, meth
, args
, kwargs
))
1673 return mgr
.remote(o
, meth
, *args
, **kwargs
)
1674 except Exception as e
:
1675 if meth
== 'get_feature_set':
1676 raise # self.get_feature_set() calls self._oremote()
1677 f_set
= self
.get_feature_set()
1678 if meth
not in f_set
or not f_set
[meth
]['available']:
1679 raise NotImplementedError(f
'{o} does not implement {meth}') from e
1682 def _orchestrator_wait(self
, completions
):
1683 # type: (List[Completion]) -> None
1685 Wait for completions to complete (reads) or
1686 become persistent (writes).
1688 Waits for writes to be *persistent* but not *effective*.
1690 :param completions: List of Completions
1691 :raises NoOrchestrator:
1692 :raises RuntimeError: something went wrong while calling the process method.
1693 :raises ImportError: no `orchestrator` module or backend not found.
1695 while any(not c
.has_result
for c
in completions
):
1696 self
.process(completions
)
1697 self
.__get
_mgr
().log
.info("Operations pending: %s",
1698 sum(1 for c
in completions
if not c
.has_result
))
1699 if any(c
.needs_result
for c
in completions
):