3 ceph-mgr orchestrator interface
5 Please see the ceph-mgr module developer's guide for more information.
10 from collections
import namedtuple
11 from functools
import wraps
17 from ceph
.deployment
import inventory
18 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, \
19 ServiceSpecValidationError
20 from ceph
.deployment
.drive_group
import DriveGroupSpec
22 from mgr_module
import MgrModule
, CLICommand
, HandleCommandResult
25 from typing
import TypeVar
, Generic
, List
, Optional
, Union
, Tuple
, Iterator
, Callable
, Any
, \
26 Type
, Sequence
, Dict
, cast
30 logger
= logging
.getLogger(__name__
)
32 DATEFMT
= '%Y-%m-%dT%H:%M:%S.%f'
35 class OrchestratorError(Exception):
37 General orchestrator specific error.
39 Used for deployment, configuration or user errors.
41 It's not intended for programming errors or orchestrator internal errors.
45 class NoOrchestrator(OrchestratorError
):
47 No orchestrator in configured.
49 def __init__(self
, msg
="No orchestrator configured (try `ceph orch set backend`)"):
50 super(NoOrchestrator
, self
).__init
__(msg
)
53 class OrchestratorValidationError(OrchestratorError
):
55 Raised when an orchestrator doesn't support a specific feature.
59 def handle_exception(prefix
, cmd_args
, desc
, perm
, func
):
61 def wrapper(*args
, **kwargs
):
63 return func(*args
, **kwargs
)
64 except (OrchestratorError
, ImportError, ServiceSpecValidationError
) as e
:
65 # Do not print Traceback for expected errors.
66 return HandleCommandResult(-errno
.ENOENT
, stderr
=str(e
))
67 except NotImplementedError:
68 msg
= 'This Orchestrator does not support `{}`'.format(prefix
)
69 return HandleCommandResult(-errno
.ENOENT
, stderr
=msg
)
71 # misuse partial to copy `wrapper`
72 wrapper_copy
= lambda *l_args
, **l_kwargs
: wrapper(*l_args
, **l_kwargs
)
73 wrapper_copy
._prefix
= prefix
# type: ignore
74 wrapper_copy
._cli
_command
= CLICommand(prefix
, cmd_args
, desc
, perm
) # type: ignore
75 wrapper_copy
._cli
_command
.func
= wrapper_copy
# type: ignore
80 def _cli_command(perm
):
81 def inner_cli_command(prefix
, cmd_args
="", desc
=""):
82 return lambda func
: handle_exception(prefix
, cmd_args
, desc
, perm
, func
)
83 return inner_cli_command
86 _cli_read_command
= _cli_command('r')
87 _cli_write_command
= _cli_command('rw')
90 class CLICommandMeta(type):
92 This is a workaround for the use of a global variable CLICommand.COMMANDS which
93 prevents modules from importing any other module.
95 We make use of CLICommand, except for the use of the global variable.
97 def __init__(cls
, name
, bases
, dct
):
98 super(CLICommandMeta
, cls
).__init
__(name
, bases
, dct
)
99 dispatch
= {} # type: Dict[str, CLICommand]
100 for v
in dct
.values():
102 dispatch
[v
._prefix
] = v
._cli
_command
103 except AttributeError:
106 def handle_command(self
, inbuf
, cmd
):
107 if cmd
['prefix'] not in dispatch
:
108 return self
.handle_command(inbuf
, cmd
)
110 return dispatch
[cmd
['prefix']].call(self
, cmd
, inbuf
)
112 cls
.COMMANDS
= [cmd
.dump_cmd() for cmd
in dispatch
.values()]
113 cls
.handle_command
= handle_command
120 class _Promise(object):
122 A completion may need multiple promises to be fulfilled. `_Promise` is one
125 Typically ``Orchestrator`` implementations inherit from this class to
126 build their own way of finishing a step to fulfil a future.
128 They are not exposed in the orchestrator interface and can be seen as a
129 helper to build orchestrator modules.
131 INITIALIZED
= 1 # We have a parent completion and a next completion
133 FINISHED
= 3 # we have a final result
135 NO_RESULT
= _no_result() # type: None
136 ASYNC_RESULT
= object()
139 _first_promise
=None, # type: Optional["_Promise"]
140 value
=NO_RESULT
, # type: Optional[Any]
141 on_complete
=None, # type: Optional[Callable]
142 name
=None, # type: Optional[str]
144 self
._on
_complete
_ = on_complete
146 self
._next
_promise
= None # type: Optional[_Promise]
148 self
._state
= self
.INITIALIZED
149 self
._exception
= None # type: Optional[Exception]
151 # Value of this _Promise. may be an intermediate result.
154 # _Promise is not a continuation monad, as `_result` is of type
155 # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
156 self
._first
_promise
= _first_promise
or self
# type: '_Promise'
159 def _exception(self
):
160 # type: () -> Optional[Exception]
161 return getattr(self
, '_exception_', None)
164 def _exception(self
, e
):
167 self
._serialized
_exception
_ = pickle
.dumps(e
) if e
is not None else None
168 except pickle
.PicklingError
:
169 logger
.error(f
"failed to pickle {e}")
170 if isinstance(e
, Exception):
171 e
= Exception(*e
.args
)
173 e
= Exception(str(e
))
174 # degenerate to a plain Exception
175 self
._serialized
_exception
_ = pickle
.dumps(e
)
178 def _serialized_exception(self
):
179 # type: () -> Optional[bytes]
180 return getattr(self
, '_serialized_exception_', None)
185 def _on_complete(self
):
186 # type: () -> Optional[Callable]
187 # https://github.com/python/mypy/issues/4125
188 return self
._on
_complete
_
191 def _on_complete(self
, val
):
192 # type: (Optional[Callable]) -> None
193 self
._on
_complete
_ = val
197 name
= self
._name
or getattr(self
._on
_complete
, '__name__', '??') if self
._on
_complete
else 'None'
198 val
= repr(self
._value
) if self
._value
is not self
.NO_RESULT
else 'NA'
199 return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
200 self
.__class
__, self
._state
, val
, self
._on
_complete
, id(self
), name
, getattr(next
, '_progress_reference', 'NA'), repr(self
._next
_promise
)
203 def pretty_print_1(self
):
206 elif self
._on
_complete
is None:
208 elif hasattr(self
._on
_complete
, '__name__'):
209 name
= getattr(self
._on
_complete
, '__name__')
211 name
= self
._on
_complete
.__class
__.__name
__
212 val
= repr(self
._value
) if self
._value
not in (self
.NO_RESULT
, self
.ASYNC_RESULT
) else '...'
214 self
.INITIALIZED
: ' ',
215 self
.RUNNING
: ' >>>',
216 self
.FINISHED
: '(done)'
218 return '{} {}({}),'.format(prefix
, name
, val
)
220 def then(self
, on_complete
):
221 # type: (Any, Callable) -> Any
223 Call ``on_complete`` as soon as this promise is finalized.
225 assert self
._state
in (self
.INITIALIZED
, self
.RUNNING
)
227 if self
._next
_promise
is not None:
228 return self
._next
_promise
.then(on_complete
)
230 if self
._on
_complete
is not None:
231 self
._set
_next
_promise
(self
.__class
__(
232 _first_promise
=self
._first
_promise
,
233 on_complete
=on_complete
235 return self
._next
_promise
238 self
._on
_complete
= on_complete
239 self
._set
_next
_promise
(self
.__class
__(_first_promise
=self
._first
_promise
))
240 return self
._next
_promise
242 def _set_next_promise(self
, next
):
243 # type: (_Promise) -> None
244 assert self
is not next
245 assert self
._state
in (self
.INITIALIZED
, self
.RUNNING
)
247 self
._next
_promise
= next
248 assert self
._next
_promise
is not None
249 for p
in iter(self
._next
_promise
):
250 p
._first
_promise
= self
._first
_promise
252 def _finalize(self
, value
=NO_RESULT
):
254 Sets this promise to complete.
256 Orchestrators may choose to use this helper function.
258 :param value: new value.
260 if self
._state
not in (self
.INITIALIZED
, self
.RUNNING
):
261 raise ValueError('finalize: {} already finished. {}'.format(repr(self
), value
))
263 self
._state
= self
.RUNNING
265 if value
is not self
.NO_RESULT
:
267 assert self
._value
is not self
.NO_RESULT
, repr(self
)
269 if self
._on
_complete
:
271 next_result
= self
._on
_complete
(self
._value
)
272 except Exception as e
:
276 next_result
= self
._value
278 if isinstance(next_result
, _Promise
):
279 # hack: _Promise is not a continuation monad.
280 next_result
= next_result
._first
_promise
# type: ignore
281 assert next_result
not in self
, repr(self
._first
_promise
) + repr(next_result
)
282 assert self
not in next_result
283 next_result
._append
_promise
(self
._next
_promise
)
284 self
._set
_next
_promise
(next_result
)
285 assert self
._next
_promise
286 if self
._next
_promise
._value
is self
.NO_RESULT
:
287 self
._next
_promise
._value
= self
._value
288 self
.propagate_to_next()
289 elif next_result
is not self
.ASYNC_RESULT
:
290 # simple map. simply forward
291 if self
._next
_promise
:
292 self
._next
_promise
._value
= next_result
294 # Hack: next_result is of type U, _value is of type T
295 self
._value
= next_result
# type: ignore
296 self
.propagate_to_next()
298 # asynchronous promise
302 def propagate_to_next(self
):
303 self
._state
= self
.FINISHED
304 logger
.debug('finalized {}'.format(repr(self
)))
305 if self
._next
_promise
:
306 self
._next
_promise
._finalize
()
309 # type: (Exception) -> None
311 Sets the whole completion to be faild with this exception and end the
314 if self
._state
== self
.FINISHED
:
316 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e
)))
317 assert self
._state
in (self
.INITIALIZED
, self
.RUNNING
)
318 logger
.exception('_Promise failed')
320 self
._value
= f
'_exception: {e}'
321 if self
._next
_promise
:
322 self
._next
_promise
.fail(e
)
323 self
._state
= self
.FINISHED
325 def __contains__(self
, item
):
326 return any(item
is p
for p
in iter(self
._first
_promise
))
330 elem
= self
._next
_promise
331 while elem
is not None:
333 elem
= elem
._next
_promise
335 def _append_promise(self
, other
):
336 if other
is not None:
337 assert self
not in other
338 assert other
not in self
339 self
._last
_promise
()._set
_next
_promise
(other
)
341 def _last_promise(self
):
342 # type: () -> _Promise
343 return list(iter(self
))[-1]
346 class ProgressReference(object):
350 completion
=None # type: Optional[Callable[[], Completion]]
353 ProgressReference can be used within Completions::
355 +---------------+ +---------------------------------+
357 | My Completion | +--> | on_complete=ProgressReference() |
359 +---------------+ +---------------------------------+
361 See :func:`Completion.with_progress` for an easy way to create
365 super(ProgressReference
, self
).__init
__()
366 self
.progress_id
= str(uuid
.uuid4())
367 self
.message
= message
370 #: The completion can already have a result, before the write
371 #: operation is effective. progress == 1 means, the services are
372 #: created / removed.
373 self
.completion
= completion
# type: Optional[Callable[[], Completion]]
375 #: if a orchestrator module can provide a more detailed
376 #: progress information, it needs to also call ``progress.update()``.
379 self
._completion
_has
_result
= False
380 self
.mgr
.all_progress_references
.append(self
)
384 ``__str__()`` is used for determining the message for progress events.
386 return self
.message
or super(ProgressReference
, self
).__str
__()
388 def __call__(self
, arg
):
389 self
._completion
_has
_result
= True
395 return self
._progress
398 def progress(self
, progress
):
399 assert progress
<= 1.0
400 self
._progress
= progress
403 self
.mgr
.remote("progress", "complete", self
.progress_id
)
404 self
.mgr
.all_progress_references
= [p
for p
in self
.mgr
.all_progress_references
if p
is not self
]
406 self
.mgr
.remote("progress", "update", self
.progress_id
, self
.message
,
408 [("origin", "orchestrator")])
410 # If the progress module is disabled that's fine,
411 # they just won't see the output.
416 return self
.progress
== 1 and self
._completion
_has
_result
419 def progress_run(progress
):
420 self
.progress
= progress
422 c
= self
.completion().then(progress_run
)
423 self
.mgr
.process([c
._first
_promise
])
428 self
._completion
_has
_result
= True
432 class Completion(_Promise
):
434 Combines multiple promises into one overall operation.
436 Completions are composable by being able to
437 call one completion from another completion. I.e. making them re-usable
438 using Promises E.g.::
440 >>> return Orchestrator().get_hosts().then(self._create_osd)
442 where ``get_hosts`` returns a Completion of list of hosts and
443 ``_create_osd`` takes a list of hosts.
445 The concept behind this is to store the computation steps
446 explicit and then explicitly evaluate the chain:
448 >>> p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
450 ... assert p.result = "4"
454 +---------------+ +-----------------+
456 | lambda x: x*x | +--> | lambda x: str(x)|
458 +---------------+ +-----------------+
462 _first_promise
=None, # type: Optional["Completion"]
463 value
=_Promise
.NO_RESULT
, # type: Any
464 on_complete
=None, # type: Optional[Callable]
465 name
=None, # type: Optional[str]
467 super(Completion
, self
).__init
__(_first_promise
, value
, on_complete
, name
)
470 def _progress_reference(self
):
471 # type: () -> Optional[ProgressReference]
472 if hasattr(self
._on
_complete
, 'progress_id'):
473 return self
._on
_complete
# type: ignore
477 def progress_reference(self
):
478 # type: () -> Optional[ProgressReference]
480 ProgressReference. Marks this completion
481 as a write completeion.
484 references
= [c
._progress
_reference
for c
in iter(self
) if c
._progress
_reference
is not None]
486 assert len(references
) == 1
491 def with_progress(cls
, # type: Any
494 _first_promise
=None, # type: Optional["Completion"]
495 value
=_Promise
.NO_RESULT
, # type: Any
496 on_complete
=None, # type: Optional[Callable]
497 calc_percent
=None # type: Optional[Callable[[], Any]]
502 _first_promise
=_first_promise
,
504 on_complete
=on_complete
505 ).add_progress(message
, mgr
, calc_percent
)
507 return c
._first
_promise
509 def add_progress(self
,
512 calc_percent
=None # type: Optional[Callable[[], Any]]
515 on_complete
=ProgressReference(
518 completion
=calc_percent
523 super(Completion
, self
).fail(e
)
524 if self
._progress
_reference
:
525 self
._progress
_reference
.fail()
527 def finalize(self
, result
=_Promise
.NO_RESULT
):
528 if self
._first
_promise
._state
== self
.INITIALIZED
:
529 self
._first
_promise
._finalize
(result
)
534 The result of the operation that we were waited
535 for. Only valid after calling Orchestrator.process() on this
538 last
= self
._last
_promise
()
539 assert last
._state
== _Promise
.FINISHED
542 def result_str(self
):
543 """Force a string."""
544 if self
.result
is None:
546 if isinstance(self
.result
, list):
547 return '\n'.join(str(x
) for x
in self
.result
)
548 return str(self
.result
)
552 # type: () -> Optional[Exception]
553 return self
._last
_promise
()._exception
556 def serialized_exception(self
):
557 # type: () -> Optional[bytes]
558 return self
._last
_promise
()._serialized
_exception
561 def has_result(self
):
564 Has the operation already a result?
566 For Write operations, it can already have a
567 result, if the orchestrator's configuration is
568 persistently written. Typically this would
569 indicate that an update had been written to
570 a manifest, but that the update had not
571 necessarily been pushed out to the cluster.
575 return self
._last
_promise
()._state
== _Promise
.FINISHED
578 def is_errored(self
):
581 Has the completion failed. Default implementation looks for
582 self.exception. Can be overwritten.
584 return self
.exception
is not None
587 def needs_result(self
):
590 Could the external operation be deemed as complete,
592 We must wait for a read operation only if it is not complete.
594 return not self
.is_errored
and not self
.has_result
597 def is_finished(self
):
600 Could the external operation be deemed as complete,
602 We must wait for a read operation only if it is not complete.
604 return self
.is_errored
or (self
.has_result
)
606 def pretty_print(self
):
608 reprs
= '\n'.join(p
.pretty_print_1() for p
in iter(self
._first
_promise
))
609 return """<{}>[\n{}\n]""".format(self
.__class
__.__name
__, reprs
)
612 def pretty_print(completions
):
613 # type: (Sequence[Completion]) -> str
614 return ', '.join(c
.pretty_print() for c
in completions
)
617 def raise_if_exception(c
):
618 # type: (Completion) -> None
620 :raises OrchestratorError: Some user error or a config error.
621 :raises Exception: Some internal error
623 if c
.serialized_exception
is not None:
625 e
= pickle
.loads(c
.serialized_exception
)
626 except (KeyError, AttributeError):
627 raise Exception('{}: {}'.format(type(c
.exception
), c
.exception
))
631 class TrivialReadCompletion(Completion
):
633 This is the trivial completion simply wrapping a result.
635 def __init__(self
, result
):
636 super(TrivialReadCompletion
, self
).__init
__()
638 self
.finalize(result
)
641 def _hide_in_features(f
):
642 f
._hide
_in
_features
= True
646 class Orchestrator(object):
648 Calls in this class may do long running remote operations, with time
649 periods ranging from network latencies to package install latencies and large
650 internet downloads. For that reason, all are asynchronous, and return
651 ``Completion`` objects.
653 Methods should only return the completion and not directly execute
654 anything, like network calls. Otherwise the purpose of
655 those completions is defeated.
657 Implementations are not required to start work on an operation until
658 the caller waits on the relevant Completion objects. Callers making
659 multiple updates should not wait on Completions until they're done
660 sending operations: this enables implementations to batch up a series
661 of updates when wait() is called on a set of Completion objects.
663 Implementations are encouraged to keep reasonably fresh caches of
664 the status of the system: it is better to serve a stale-but-recent
665 result read of e.g. device inventory than it is to keep the caller waiting
666 while you scan hosts every time.
670 def is_orchestrator_module(self
):
672 Enable other modules to interrogate this module to discover
673 whether it's usable as an orchestrator module.
675 Subclasses do not need to override this.
681 # type: () -> Tuple[bool, str]
683 Report whether we can talk to the orchestrator. This is the
684 place to give the user a meaningful message if the orchestrator
685 isn't running or can't be contacted.
687 This method may be called frequently (e.g. every page load
688 to conditionally display a warning banner), so make sure it's
689 not too expensive. It's okay to give a slightly stale status
690 (e.g. based on a periodic background ping of the orchestrator)
691 if that's necessary to make this method fast.
694 `True` doesn't mean that the desired functionality
695 is actually available in the orchestrator. I.e. this
696 won't work as expected::
698 >>> if OrchestratorClientMixin().available()[0]: # wrong.
699 ... OrchestratorClientMixin().get_hosts()
701 :return: two-tuple of boolean, string
703 raise NotImplementedError()
706 def process(self
, completions
):
707 # type: (List[Completion]) -> None
709 Given a list of Completion instances, process any which are
712 Callers should inspect the detail of each completion to identify
713 partial completion/progress information, and present that information
716 This method should not block, as this would make it slow to query
717 a status, while other long running operations are in progress.
719 raise NotImplementedError()
722 def get_feature_set(self
):
723 """Describes which methods this orchestrator implements
726 `True` doesn't mean that the desired functionality
727 is actually possible in the orchestrator. I.e. this
728 won't work as expected::
730 >>> api = OrchestratorClientMixin()
731 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
734 It's better to ask for forgiveness instead::
737 ... OrchestratorClientMixin().get_hosts()
738 ... except (OrchestratorError, NotImplementedError):
741 :returns: Dict of API method names to ``{'available': True or False}``
743 module
= self
.__class
__
744 features
= {a
: {'available': getattr(Orchestrator
, a
, None) != getattr(module
, a
)}
745 for a
in Orchestrator
.__dict
__
746 if not a
.startswith('_') and not getattr(getattr(Orchestrator
, a
), '_hide_in_features', False)
750 def cancel_completions(self
):
753 Cancels ongoing completions. Unstuck the mgr.
755 raise NotImplementedError()
759 raise NotImplementedError()
763 raise NotImplementedError()
765 def add_host(self
, host_spec
):
766 # type: (HostSpec) -> Completion
768 Add a host to the orchestrator inventory.
770 :param host: hostname
772 raise NotImplementedError()
774 def remove_host(self
, host
):
775 # type: (str) -> Completion
777 Remove a host from the orchestrator inventory.
779 :param host: hostname
781 raise NotImplementedError()
783 def update_host_addr(self
, host
, addr
):
784 # type: (str, str) -> Completion
786 Update a host's address
788 :param host: hostname
789 :param addr: address (dns name or IP)
791 raise NotImplementedError()
794 # type: () -> Completion
796 Report the hosts in the cluster.
798 :return: list of HostSpec
800 raise NotImplementedError()
802 def add_host_label(self
, host
, label
):
803 # type: (str, str) -> Completion
807 raise NotImplementedError()
809 def remove_host_label(self
, host
, label
):
810 # type: (str, str) -> Completion
814 raise NotImplementedError()
816 def get_inventory(self
, host_filter
=None, refresh
=False):
817 # type: (Optional[InventoryFilter], bool) -> Completion
819 Returns something that was created by `ceph-volume inventory`.
821 :return: list of InventoryHost
823 raise NotImplementedError()
825 def describe_service(self
, service_type
=None, service_name
=None, refresh
=False):
826 # type: (Optional[str], Optional[str], bool) -> Completion
828 Describe a service (of any kind) that is already configured in
829 the orchestrator. For example, when viewing an OSD in the dashboard
830 we might like to also display information about the orchestrator's
831 view of the service (like the kubernetes pod ID).
833 When viewing a CephFS filesystem in the dashboard, we would use this
834 to display the pods being currently run for MDS daemons.
836 :return: list of ServiceDescription objects.
838 raise NotImplementedError()
840 def list_daemons(self
, daemon_type
=None, daemon_id
=None, host
=None, refresh
=False):
841 # type: (Optional[str], Optional[str], Optional[str], bool) -> Completion
843 Describe a daemon (of any kind) that is already configured in
846 :return: list of DaemonDescription objects.
848 raise NotImplementedError()
850 def apply(self
, specs
: List
[ServiceSpec
]) -> Completion
:
854 fns
: Dict
[str, Callable
[[ServiceSpec
], Completion
]] = {
855 'alertmanager': self
.apply_alertmanager
,
856 'crash': self
.apply_crash
,
857 'grafana': self
.apply_grafana
,
858 'mds': self
.apply_mds
,
859 'mgr': self
.apply_mgr
,
860 'mon': self
.apply_mon
,
861 'nfs': cast(Callable
[[ServiceSpec
], Completion
], self
.apply_nfs
),
862 'node-exporter': self
.apply_node_exporter
,
863 'osd': cast(Callable
[[ServiceSpec
], Completion
], lambda dg
: self
.apply_drivegroups([dg
])),
864 'prometheus': self
.apply_prometheus
,
865 'rbd-mirror': self
.apply_rbd_mirror
,
866 'rgw': cast(Callable
[[ServiceSpec
], Completion
], self
.apply_rgw
),
870 if isinstance(ls
, list):
876 completion
= fns
[spec
.service_type
](spec
)
879 return fns
[s
.service_type
](s
).then(lambda r
: merge(ls
, r
))
880 completion
= completion
.then(next
)
883 def remove_daemons(self
, names
):
884 # type: (List[str]) -> Completion
886 Remove specific daemon(s).
890 raise NotImplementedError()
892 def list_specs(self
, service_name
=None):
893 # type: (Optional[str]) -> Completion
895 Lists saved service specs
897 raise NotImplementedError()
899 def remove_service(self
, service_name
):
900 # type: (str) -> Completion
902 Remove a service (a collection of daemons).
906 raise NotImplementedError()
908 def service_action(self
, action
, service_name
):
909 # type: (str, str) -> Completion
911 Perform an action (start/stop/reload) on a service (i.e., all daemons
912 providing the logical service).
914 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
915 :param service_type: e.g. "mds", "rgw", ...
916 :param service_name: name of logical service ("cephfs", "us-east", ...)
919 #assert action in ["start", "stop", "reload, "restart", "redeploy"]
920 raise NotImplementedError()
922 def daemon_action(self
, action
, daemon_type
, daemon_id
):
923 # type: (str, str, str) -> Completion
925 Perform an action (start/stop/reload) on a daemon.
927 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
928 :param name: name of daemon
931 #assert action in ["start", "stop", "reload, "restart", "redeploy"]
932 raise NotImplementedError()
934 def create_osds(self
, drive_group
):
935 # type: (DriveGroupSpec) -> Completion
937 Create one or more OSDs within a single Drive Group.
939 The principal argument here is the drive_group member
940 of OsdSpec: other fields are advisory/extensible for any
941 finer-grained OSD feature enablement (choice of backing store,
942 compression/encryption, etc).
944 raise NotImplementedError()
946 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> Completion
:
947 """ Update OSD cluster """
948 raise NotImplementedError()
950 def remove_osds(self
, osd_ids
: List
[str],
951 replace
: bool = False,
952 force
: bool = False) -> Completion
:
954 :param osd_ids: list of OSD IDs
955 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
956 :param force: Forces the OSD removal process without waiting for the data to be drained first.
957 Note that this can only remove OSDs that were successfully
958 created (i.e. got an OSD ID).
960 raise NotImplementedError()
962 def remove_osds_status(self
):
963 # type: () -> Completion
965 Returns a status of the ongoing OSD removal operations.
967 raise NotImplementedError()
969 def blink_device_light(self
, ident_fault
, on
, locations
):
970 # type: (str, bool, List[DeviceLightLoc]) -> Completion
972 Instructs the orchestrator to enable or disable either the ident or the fault LED.
974 :param ident_fault: either ``"ident"`` or ``"fault"``
975 :param on: ``True`` = on.
976 :param locations: See :class:`orchestrator.DeviceLightLoc`
978 raise NotImplementedError()
980 def zap_device(self
, host
, path
):
981 # type: (str, str) -> Completion
982 """Zap/Erase a device (DESTROYS DATA)"""
983 raise NotImplementedError()
985 def add_mon(self
, spec
):
986 # type: (ServiceSpec) -> Completion
987 """Create mon daemon(s)"""
988 raise NotImplementedError()
990 def apply_mon(self
, spec
):
991 # type: (ServiceSpec) -> Completion
992 """Update mon cluster"""
993 raise NotImplementedError()
995 def add_mgr(self
, spec
):
996 # type: (ServiceSpec) -> Completion
997 """Create mgr daemon(s)"""
998 raise NotImplementedError()
1000 def apply_mgr(self
, spec
):
1001 # type: (ServiceSpec) -> Completion
1002 """Update mgr cluster"""
1003 raise NotImplementedError()
1005 def add_mds(self
, spec
):
1006 # type: (ServiceSpec) -> Completion
1007 """Create MDS daemon(s)"""
1008 raise NotImplementedError()
1010 def apply_mds(self
, spec
):
1011 # type: (ServiceSpec) -> Completion
1012 """Update MDS cluster"""
1013 raise NotImplementedError()
1015 def add_rbd_mirror(self
, spec
):
1016 # type: (ServiceSpec) -> Completion
1017 """Create rbd-mirror daemon(s)"""
1018 raise NotImplementedError()
1020 def apply_rbd_mirror(self
, spec
):
1021 # type: (ServiceSpec) -> Completion
1022 """Update rbd-mirror cluster"""
1023 raise NotImplementedError()
1025 def add_nfs(self
, spec
):
1026 # type: (NFSServiceSpec) -> Completion
1027 """Create NFS daemon(s)"""
1028 raise NotImplementedError()
1030 def apply_nfs(self
, spec
):
1031 # type: (NFSServiceSpec) -> Completion
1032 """Update NFS cluster"""
1033 raise NotImplementedError()
1035 def add_rgw(self
, spec
):
1036 # type: (RGWSpec) -> Completion
1037 """Create RGW daemon(s)"""
1038 raise NotImplementedError()
1040 def apply_rgw(self
, spec
):
1041 # type: (RGWSpec) -> Completion
1042 """Update RGW cluster"""
1043 raise NotImplementedError()
1045 def add_prometheus(self
, spec
):
1046 # type: (ServiceSpec) -> Completion
1047 """Create new prometheus daemon"""
1048 raise NotImplementedError()
1050 def apply_prometheus(self
, spec
):
1051 # type: (ServiceSpec) -> Completion
1052 """Update prometheus cluster"""
1053 raise NotImplementedError()
1055 def add_node_exporter(self
, spec
):
1056 # type: (ServiceSpec) -> Completion
1057 """Create a new Node-Exporter service"""
1058 raise NotImplementedError()
1060 def apply_node_exporter(self
, spec
):
1061 # type: (ServiceSpec) -> Completion
1062 """Update existing a Node-Exporter daemon(s)"""
1063 raise NotImplementedError()
1065 def add_crash(self
, spec
):
1066 # type: (ServiceSpec) -> Completion
1067 """Create a new crash service"""
1068 raise NotImplementedError()
1070 def apply_crash(self
, spec
):
1071 # type: (ServiceSpec) -> Completion
1072 """Update existing a crash daemon(s)"""
1073 raise NotImplementedError()
1075 def add_grafana(self
, spec
):
1076 # type: (ServiceSpec) -> Completion
1077 """Create a new Node-Exporter service"""
1078 raise NotImplementedError()
1080 def apply_grafana(self
, spec
):
1081 # type: (ServiceSpec) -> Completion
1082 """Update existing a Node-Exporter daemon(s)"""
1083 raise NotImplementedError()
1085 def add_alertmanager(self
, spec
):
1086 # type: (ServiceSpec) -> Completion
1087 """Create a new AlertManager service"""
1088 raise NotImplementedError()
1090 def apply_alertmanager(self
, spec
):
1091 # type: (ServiceSpec) -> Completion
1092 """Update an existing AlertManager daemon(s)"""
1093 raise NotImplementedError()
1095 def upgrade_check(self
, image
, version
):
1096 # type: (Optional[str], Optional[str]) -> Completion
1097 raise NotImplementedError()
1099 def upgrade_start(self
, image
, version
):
1100 # type: (Optional[str], Optional[str]) -> Completion
1101 raise NotImplementedError()
1103 def upgrade_pause(self
):
1104 # type: () -> Completion
1105 raise NotImplementedError()
1107 def upgrade_resume(self
):
1108 # type: () -> Completion
1109 raise NotImplementedError()
1111 def upgrade_stop(self
):
1112 # type: () -> Completion
1113 raise NotImplementedError()
1115 def upgrade_status(self
):
1116 # type: () -> Completion
1118 If an upgrade is currently underway, report on where
1119 we are in the process, or if some error has occurred.
1121 :return: UpgradeStatusSpec instance
1123 raise NotImplementedError()
1126 def upgrade_available(self
):
1127 # type: () -> Completion
1129 Report on what versions are available to upgrade to
1131 :return: List of strings
1133 raise NotImplementedError()
1136 class HostSpec(object):
1138 Information about hosts. Like e.g. ``kubectl get nodes``
1141 hostname
, # type: str
1142 addr
=None, # type: Optional[str]
1143 labels
=None, # type: Optional[List[str]]
1144 status
=None, # type: Optional[str]
1146 #: the bare hostname on the host. Not the FQDN.
1147 self
.hostname
= hostname
# type: str
1149 #: DNS name or IP address to reach it
1150 self
.addr
= addr
or hostname
# type: str
1153 self
.labels
= labels
or [] # type: List[str]
1155 #: human readable status
1156 self
.status
= status
or '' # type: str
1160 'hostname': self
.hostname
,
1162 'labels': self
.labels
,
1163 'status': self
.status
,
1167 args
= [self
.hostname
] # type: List[Any]
1168 if self
.addr
is not None:
1169 args
.append(self
.addr
)
1171 args
.append(self
.labels
)
1173 args
.append(self
.status
)
1175 return "<HostSpec>({})".format(', '.join(map(repr, args
)))
1177 def __eq__(self
, other
):
1178 # Let's omit `status` for the moment, as it is still the very same host.
1179 return self
.hostname
== other
.hostname
and \
1180 self
.addr
== other
.addr
and \
1181 self
.labels
== other
.labels
1184 class UpgradeStatusSpec(object):
1185 # Orchestrator's report on what's going on with any ongoing upgrade
1187 self
.in_progress
= False # Is an upgrade underway?
1188 self
.target_image
= None
1189 self
.services_complete
= [] # Which daemon types are fully updated?
1190 self
.message
= "" # Freeform description
1193 def handle_type_error(method
):
1195 def inner(cls
, *args
, **kwargs
):
1197 return method(cls
, *args
, **kwargs
)
1198 except TypeError as e
:
1199 error_msg
= '{}: {}'.format(cls
.__name
__, e
)
1200 raise OrchestratorValidationError(error_msg
)
1204 class DaemonDescription(object):
1206 For responding to queries about the status of a particular daemon,
1207 stateful or stateless.
1209 This is not about health or performance monitoring of daemons: it's
1210 about letting the orchestrator tell Ceph whether and where a
1211 daemon is scheduled in the cluster. When an orchestrator tells
1212 Ceph "it's running on host123", that's not a promise that the process
1213 is literally up this second, it's a description of where the orchestrator
1214 has decided the daemon should run.
1222 container_image_id
=None,
1223 container_image_name
=None,
1230 last_configured
=None,
1231 last_deployed
=None):
1232 # Host is at the same granularity as InventoryHost
1233 self
.hostname
= hostname
1235 # Not everyone runs in containers, but enough people do to
1236 # justify having the container_id (runtime id) and container_image
1238 self
.container_id
= container_id
# runtime id
1239 self
.container_image_id
= container_image_id
# image hash
1240 self
.container_image_name
= container_image_name
# image friendly name
1242 # The type of service (osd, mon, mgr, etc.)
1243 self
.daemon_type
= daemon_type
1245 # The orchestrator will have picked some names for daemons,
1246 # typically either based on hostnames or on pod names.
1247 # This is the <foo> in mds.<foo>, the ID that will appear
1248 # in the FSMap/ServiceMap.
1249 self
.daemon_id
= daemon_id
1251 # Service version that was deployed
1252 self
.version
= version
1254 # Service status: -1 error, 0 stopped, 1 running
1255 self
.status
= status
1257 # Service status description when status == -1.
1258 self
.status_desc
= status_desc
1260 # datetime when this info was last refreshed
1261 self
.last_refresh
= last_refresh
# type: Optional[datetime.datetime]
1263 self
.created
= created
# type: Optional[datetime.datetime]
1264 self
.started
= started
# type: Optional[datetime.datetime]
1265 self
.last_configured
= last_configured
# type: Optional[datetime.datetime]
1266 self
.last_deployed
= last_deployed
# type: Optional[datetime.datetime]
1269 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
1271 def matches_service(self
, service_name
):
1272 # type: (Optional[str]) -> bool
1274 return self
.name().startswith(service_name
+ '.')
1277 def service_name(self
):
1278 if self
.daemon_type
== 'rgw':
1279 v
= self
.daemon_id
.split('.')
1280 s_name
= '.'.join(v
[0:2])
1281 return 'rgw.%s' % s_name
1282 if self
.daemon_type
in ['mds', 'nfs']:
1283 _s_name
= self
.daemon_id
.split('.')[0]
1284 return 'mds.%s' % _s_name
1285 return self
.daemon_type
1288 return "<DaemonDescription>({type}.{id})".format(type=self
.daemon_type
,
1293 'hostname': self
.hostname
,
1294 'container_id': self
.container_id
,
1295 'container_image_id': self
.container_image_id
,
1296 'container_image_name': self
.container_image_name
,
1297 'daemon_id': self
.daemon_id
,
1298 'daemon_type': self
.daemon_type
,
1299 'version': self
.version
,
1300 'status': self
.status
,
1301 'status_desc': self
.status_desc
,
1303 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1305 if getattr(self
, k
):
1306 out
[k
] = getattr(self
, k
).strftime(DATEFMT
)
1307 return {k
: v
for (k
, v
) in out
.items() if v
is not None}
1311 def from_json(cls
, data
):
1313 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1316 c
[k
] = datetime
.datetime
.strptime(c
[k
], DATEFMT
)
1319 class ServiceDescription(object):
1321 For responding to queries about the status of a particular service,
1322 stateful or stateless.
1324 This is not about health or performance monitoring of services: it's
1325 about letting the orchestrator tell Ceph whether and where a
1326 service is scheduled in the cluster. When an orchestrator tells
1327 Ceph "it's running on host123", that's not a promise that the process
1328 is literally up this second, it's a description of where the orchestrator
1329 has decided the service should run.
1333 container_image_id
=None,
1334 container_image_name
=None,
1336 rados_config_location
=None,
1343 # Not everyone runs in containers, but enough people do to
1344 # justify having the container_image_id (image hash) and container_image
1346 self
.container_image_id
= container_image_id
# image hash
1347 self
.container_image_name
= container_image_name
# image friendly name
1349 # The service_name is either a bare type (e.g., 'mgr') or
1350 # type.id combination (e.g., 'mds.fsname' or 'rgw.realm.zone').
1351 self
.service_name
= service_name
1353 # Location of the service configuration when stored in rados
1354 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1355 self
.rados_config_location
= rados_config_location
1357 # If the service exposes REST-like API, this attribute should hold
1359 self
.service_url
= service_url
1364 # Number of daemons up
1365 self
.running
= running
1367 # datetime when this info was last refreshed
1368 self
.last_refresh
= last_refresh
# type: Optional[datetime.datetime]
1369 self
.created
= created
# type: Optional[datetime.datetime]
1373 def service_type(self
):
1374 if self
.service_name
:
1375 return self
.service_name
.split('.')[0]
1379 return "<ServiceDescription>({name})".format(name
=self
.service_name
)
1383 'container_image_id': self
.container_image_id
,
1384 'container_image_name': self
.container_image_name
,
1385 'service_name': self
.service_name
,
1386 'rados_config_location': self
.rados_config_location
,
1387 'service_url': self
.service_url
,
1389 'running': self
.running
,
1390 'spec': self
.spec
.to_json() if self
.spec
is not None else None
1392 for k
in ['last_refresh', 'created']:
1393 if getattr(self
, k
):
1394 out
[k
] = getattr(self
, k
).strftime(DATEFMT
)
1395 return {k
: v
for (k
, v
) in out
.items() if v
is not None}
1399 def from_json(cls
, data
):
1401 for k
in ['last_refresh', 'created']:
1403 c
[k
] = datetime
.datetime
.strptime(c
[k
], DATEFMT
)
1407 class InventoryFilter(object):
1409 When fetching inventory, use this filter to avoid unnecessarily
1410 scanning the whole estate.
1412 Typical use: filter by host when presenting UI workflow for configuring
1413 a particular server.
1414 filter by label when not all of estate is Ceph servers,
1415 and we want to only learn about the Ceph servers.
1416 filter by label when we are interested particularly
1417 in e.g. OSD servers.
1420 def __init__(self
, labels
=None, hosts
=None):
1421 # type: (Optional[List[str]], Optional[List[str]]) -> None
1423 #: Optional: get info about hosts matching labels
1424 self
.labels
= labels
1426 #: Optional: get info about certain named hosts only
1430 class InventoryHost(object):
1432 When fetching inventory, all Devices are groups inside of an
1435 def __init__(self
, name
, devices
=None, labels
=None, addr
=None):
1436 # type: (str, Optional[inventory.Devices], Optional[List[str]], Optional[str]) -> None
1438 devices
= inventory
.Devices([])
1441 assert isinstance(devices
, inventory
.Devices
)
1443 self
.name
= name
# unique within cluster. For example a hostname.
1444 self
.addr
= addr
or name
1445 self
.devices
= devices
1446 self
.labels
= labels
1452 'devices': self
.devices
.to_json(),
1453 'labels': self
.labels
,
1457 def from_json(cls
, data
):
1459 _data
= copy
.deepcopy(data
)
1460 name
= _data
.pop('name')
1461 addr
= _data
.pop('addr', None) or name
1462 devices
= inventory
.Devices
.from_json(_data
.pop('devices'))
1464 error_msg
= 'Unknown key(s) in Inventory: {}'.format(','.join(_data
.keys()))
1465 raise OrchestratorValidationError(error_msg
)
1466 labels
= _data
.get('labels', list())
1467 return cls(name
, devices
, labels
, addr
)
1468 except KeyError as e
:
1469 error_msg
= '{} is required for {}'.format(e
, cls
.__name
__)
1470 raise OrchestratorValidationError(error_msg
)
1471 except TypeError as e
:
1472 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e
))
1476 def from_nested_items(cls
, hosts
):
1477 devs
= inventory
.Devices
.from_json
1478 return [cls(item
[0], devs(item
[1].data
)) for item
in hosts
]
1481 return "<InventoryHost>({name})".format(name
=self
.name
)
1484 def get_host_names(hosts
):
1485 # type: (List[InventoryHost]) -> List[str]
1486 return [host
.name
for host
in hosts
]
1488 def __eq__(self
, other
):
1489 return self
.name
== other
.name
and self
.devices
== other
.devices
1492 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1494 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1497 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1499 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1500 See ``ceph osd metadata | jq '.[].device_ids'``
1505 def _mk_orch_methods(cls
):
1506 # Needs to be defined outside of for.
1507 # Otherwise meth is always bound to last key
1508 def shim(method_name
):
1509 def inner(self
, *args
, **kwargs
):
1510 completion
= self
._oremote
(method_name
, args
, kwargs
)
1514 for meth
in Orchestrator
.__dict
__:
1515 if not meth
.startswith('_') and meth
not in ['is_orchestrator_module']:
1516 setattr(cls
, meth
, shim(meth
))
1521 class OrchestratorClientMixin(Orchestrator
):
1523 A module that inherents from `OrchestratorClientMixin` can directly call
1524 all :class:`Orchestrator` methods without manually calling remote.
1526 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1527 calls :func:`OrchestratorClientMixin._oremote`
1529 >>> class MyModule(OrchestratorClientMixin):
1531 ... completion = self.add_host('somehost') # calls `_oremote()`
1532 ... self._orchestrator_wait([completion])
1533 ... self.log.debug(completion.result)
1535 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1536 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1537 "real" implementation of the orchestrator.
1540 >>> import mgr_module
1541 >>> class MyImplentation(mgr_module.MgrModule, Orchestrator):
1542 ... def __init__(self, ...):
1543 ... self.orch_client = OrchestratorClientMixin()
1544 ... self.orch_client.set_mgr(self.mgr))
1547 def set_mgr(self
, mgr
):
1548 # type: (MgrModule) -> None
1550 Useable in the Dashbord that uses a global ``mgr``
1553 self
.__mgr
= mgr
# Make sure we're not overwriting any other `mgr` properties
1555 def __get_mgr(self
):
1558 except AttributeError:
1561 def _oremote(self
, meth
, args
, kwargs
):
1563 Helper for invoking `remote` on whichever orchestrator is enabled
1565 :raises RuntimeError: If the remote method failed.
1566 :raises OrchestratorError: orchestrator failed to perform
1567 :raises ImportError: no `orchestrator` module or backend not found.
1569 mgr
= self
.__get
_mgr
()
1572 o
= mgr
._select
_orchestrator
()
1573 except AttributeError:
1574 o
= mgr
.remote('orchestrator', '_select_orchestrator')
1577 raise NoOrchestrator()
1579 mgr
.log
.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr
.module_name
, o
, meth
, args
, kwargs
))
1581 return mgr
.remote(o
, meth
, *args
, **kwargs
)
1582 except Exception as e
:
1583 if meth
== 'get_feature_set':
1584 raise # self.get_feature_set() calls self._oremote()
1585 f_set
= self
.get_feature_set()
1586 if meth
not in f_set
or not f_set
[meth
]['available']:
1587 raise NotImplementedError(f
'{o} does not implement {meth}') from e
1590 def _orchestrator_wait(self
, completions
):
1591 # type: (List[Completion]) -> None
1593 Wait for completions to complete (reads) or
1594 become persistent (writes).
1596 Waits for writes to be *persistent* but not *effective*.
1598 :param completions: List of Completions
1599 :raises NoOrchestrator:
1600 :raises RuntimeError: something went wrong while calling the process method.
1601 :raises ImportError: no `orchestrator` module or backend not found.
1603 while any(not c
.has_result
for c
in completions
):
1604 self
.process(completions
)
1605 self
.__get
_mgr
().log
.info("Operations pending: %s",
1606 sum(1 for c
in completions
if not c
.has_result
))
1607 if any(c
.needs_result
for c
in completions
):