3 ceph-mgr orchestrator interface
5 Please see the ceph-mgr module developer's guide for more information.
17 from collections
import namedtuple
, OrderedDict
18 from contextlib
import contextmanager
19 from functools
import wraps
23 from ceph
.deployment
import inventory
24 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, \
25 ServiceSpecValidationError
, IscsiServiceSpec
26 from ceph
.deployment
.drive_group
import DriveGroupSpec
27 from ceph
.deployment
.hostspec
import HostSpec
29 from mgr_module
import MgrModule
, CLICommand
, HandleCommandResult
32 from typing
import TypeVar
, Generic
, List
, Optional
, Union
, Tuple
, Iterator
, Callable
, Any
, \
33 Type
, Sequence
, Dict
, cast
37 logger
= logging
.getLogger(__name__
)
39 DATEFMT
= '%Y-%m-%dT%H:%M:%S.%f'
44 class OrchestratorError(Exception):
46 General orchestrator specific error.
48 Used for deployment, configuration or user errors.
50 It's not intended for programming errors or orchestrator internal errors.
55 errno
: int = -errno
.EINVAL
,
56 event_kind_subject
: Optional
[Tuple
[str, str]] = None):
57 super(Exception, self
).__init
__(msg
)
59 # See OrchestratorEvent.subject
60 self
.event_subject
= event_kind_subject
63 class NoOrchestrator(OrchestratorError
):
65 No orchestrator in configured.
68 def __init__(self
, msg
="No orchestrator configured (try `ceph orch set backend`)"):
69 super(NoOrchestrator
, self
).__init
__(msg
, errno
=-errno
.ENOENT
)
72 class OrchestratorValidationError(OrchestratorError
):
74 Raised when an orchestrator doesn't support a specific feature.
79 def set_exception_subject(kind
, subject
, overwrite
=False):
82 except OrchestratorError
as e
:
83 if overwrite
or hasattr(e
, 'event_subject'):
84 e
.event_subject
= (kind
, subject
)
88 def handle_exception(prefix
, cmd_args
, desc
, perm
, func
):
90 def wrapper(*args
, **kwargs
):
92 return func(*args
, **kwargs
)
93 except (OrchestratorError
, ServiceSpecValidationError
) as e
:
94 # Do not print Traceback for expected errors.
95 return HandleCommandResult(e
.errno
, stderr
=str(e
))
96 except ImportError as e
:
97 return HandleCommandResult(-errno
.ENOENT
, stderr
=str(e
))
98 except NotImplementedError:
99 msg
= 'This Orchestrator does not support `{}`'.format(prefix
)
100 return HandleCommandResult(-errno
.ENOENT
, stderr
=msg
)
102 # misuse partial to copy `wrapper`
103 wrapper_copy
= lambda *l_args
, **l_kwargs
: wrapper(*l_args
, **l_kwargs
)
104 wrapper_copy
._prefix
= prefix
# type: ignore
105 wrapper_copy
._cli
_command
= CLICommand(prefix
, cmd_args
, desc
, perm
) # type: ignore
106 wrapper_copy
._cli
_command
.func
= wrapper_copy
# type: ignore
111 def _cli_command(perm
):
112 def inner_cli_command(prefix
, cmd_args
="", desc
=""):
113 return lambda func
: handle_exception(prefix
, cmd_args
, desc
, perm
, func
)
114 return inner_cli_command
117 _cli_read_command
= _cli_command('r')
118 _cli_write_command
= _cli_command('rw')
121 class CLICommandMeta(type):
123 This is a workaround for the use of a global variable CLICommand.COMMANDS which
124 prevents modules from importing any other module.
126 We make use of CLICommand, except for the use of the global variable.
128 def __init__(cls
, name
, bases
, dct
):
129 super(CLICommandMeta
, cls
).__init
__(name
, bases
, dct
)
130 dispatch
: Dict
[str, CLICommand
] = {}
131 for v
in dct
.values():
133 dispatch
[v
._prefix
] = v
._cli
_command
134 except AttributeError:
137 def handle_command(self
, inbuf
, cmd
):
138 if cmd
['prefix'] not in dispatch
:
139 return self
.handle_command(inbuf
, cmd
)
141 return dispatch
[cmd
['prefix']].call(self
, cmd
, inbuf
)
143 cls
.COMMANDS
= [cmd
.dump_cmd() for cmd
in dispatch
.values()]
144 cls
.handle_command
= handle_command
151 class _Promise(object):
153 A completion may need multiple promises to be fulfilled. `_Promise` is one
156 Typically ``Orchestrator`` implementations inherit from this class to
157 build their own way of finishing a step to fulfil a future.
159 They are not exposed in the orchestrator interface and can be seen as a
160 helper to build orchestrator modules.
162 INITIALIZED
= 1 # We have a parent completion and a next completion
164 FINISHED
= 3 # we have a final result
166 NO_RESULT
: None = _no_result()
167 ASYNC_RESULT
= object()
170 _first_promise
: Optional
["_Promise"] = None,
171 value
: Optional
[Any
] = NO_RESULT
,
172 on_complete
: Optional
[Callable
] = None,
173 name
: Optional
[str] = None,
175 self
._on
_complete
_ = on_complete
177 self
._next
_promise
: Optional
[_Promise
] = None
179 self
._state
= self
.INITIALIZED
180 self
._exception
: Optional
[Exception] = None
182 # Value of this _Promise. may be an intermediate result.
185 # _Promise is not a continuation monad, as `_result` is of type
186 # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
187 self
._first
_promise
: '_Promise' = _first_promise
or self
190 def _exception(self
) -> Optional
[Exception]:
191 return getattr(self
, '_exception_', None)
194 def _exception(self
, e
):
197 self
._serialized
_exception
_ = pickle
.dumps(e
) if e
is not None else None
198 except pickle
.PicklingError
:
199 logger
.error(f
"failed to pickle {e}")
200 if isinstance(e
, Exception):
201 e
= Exception(*e
.args
)
203 e
= Exception(str(e
))
204 # degenerate to a plain Exception
205 self
._serialized
_exception
_ = pickle
.dumps(e
)
208 def _serialized_exception(self
) -> Optional
[bytes
]:
209 return getattr(self
, '_serialized_exception_', None)
212 def _on_complete(self
) -> Optional
[Callable
]:
213 # https://github.com/python/mypy/issues/4125
214 return self
._on
_complete
_
217 def _on_complete(self
, val
: Optional
[Callable
]) -> None:
218 self
._on
_complete
_ = val
221 name
= self
._name
or getattr(self
._on
_complete
, '__name__',
222 '??') if self
._on
_complete
else 'None'
223 val
= repr(self
._value
) if self
._value
is not self
.NO_RESULT
else 'NA'
224 return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
225 self
.__class
__, self
._state
, val
, self
._on
_complete
, id(self
), name
, getattr(
226 next
, '_progress_reference', 'NA'), repr(self
._next
_promise
)
229 def pretty_print_1(self
):
232 elif self
._on
_complete
is None:
234 elif hasattr(self
._on
_complete
, '__name__'):
235 name
= getattr(self
._on
_complete
, '__name__')
237 name
= self
._on
_complete
.__class
__.__name
__
238 val
= repr(self
._value
) if self
._value
not in (self
.NO_RESULT
, self
.ASYNC_RESULT
) else '...'
240 self
.INITIALIZED
: ' ',
241 self
.RUNNING
: ' >>>',
242 self
.FINISHED
: '(done)'
244 return '{} {}({}),'.format(prefix
, name
, val
)
246 def then(self
: Any
, on_complete
: Callable
) -> Any
:
248 Call ``on_complete`` as soon as this promise is finalized.
250 assert self
._state
in (self
.INITIALIZED
, self
.RUNNING
)
252 if self
._next
_promise
is not None:
253 return self
._next
_promise
.then(on_complete
)
255 if self
._on
_complete
is not None:
256 self
._set
_next
_promise
(self
.__class
__(
257 _first_promise
=self
._first
_promise
,
258 on_complete
=on_complete
260 return self
._next
_promise
263 self
._on
_complete
= on_complete
264 self
._set
_next
_promise
(self
.__class
__(_first_promise
=self
._first
_promise
))
265 return self
._next
_promise
267 def _set_next_promise(self
, next
: '_Promise') -> None:
268 assert self
is not next
269 assert self
._state
in (self
.INITIALIZED
, self
.RUNNING
)
271 self
._next
_promise
= next
272 assert self
._next
_promise
is not None
273 for p
in iter(self
._next
_promise
):
274 p
._first
_promise
= self
._first
_promise
276 def _finalize(self
, value
=NO_RESULT
):
278 Sets this promise to complete.
280 Orchestrators may choose to use this helper function.
282 :param value: new value.
284 if self
._state
not in (self
.INITIALIZED
, self
.RUNNING
):
285 raise ValueError('finalize: {} already finished. {}'.format(repr(self
), value
))
287 self
._state
= self
.RUNNING
289 if value
is not self
.NO_RESULT
:
291 assert self
._value
is not self
.NO_RESULT
, repr(self
)
293 if self
._on
_complete
:
295 next_result
= self
._on
_complete
(self
._value
)
296 except Exception as e
:
300 next_result
= self
._value
302 if isinstance(next_result
, _Promise
):
303 # hack: _Promise is not a continuation monad.
304 next_result
= next_result
._first
_promise
# type: ignore
305 assert next_result
not in self
, repr(self
._first
_promise
) + repr(next_result
)
306 assert self
not in next_result
307 next_result
._append
_promise
(self
._next
_promise
)
308 self
._set
_next
_promise
(next_result
)
309 assert self
._next
_promise
310 if self
._next
_promise
._value
is self
.NO_RESULT
:
311 self
._next
_promise
._value
= self
._value
312 self
.propagate_to_next()
313 elif next_result
is not self
.ASYNC_RESULT
:
314 # simple map. simply forward
315 if self
._next
_promise
:
316 self
._next
_promise
._value
= next_result
318 # Hack: next_result is of type U, _value is of type T
319 self
._value
= next_result
# type: ignore
320 self
.propagate_to_next()
322 # asynchronous promise
325 def propagate_to_next(self
):
326 self
._state
= self
.FINISHED
327 logger
.debug('finalized {}'.format(repr(self
)))
328 if self
._next
_promise
:
329 self
._next
_promise
._finalize
()
331 def fail(self
, e
: Exception) -> None:
333 Sets the whole completion to be faild with this exception and end the
336 if self
._state
== self
.FINISHED
:
338 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e
)))
339 assert self
._state
in (self
.INITIALIZED
, self
.RUNNING
)
340 logger
.exception('_Promise failed')
342 self
._value
= f
'_exception: {e}'
343 if self
._next
_promise
:
344 self
._next
_promise
.fail(e
)
345 self
._state
= self
.FINISHED
347 def __contains__(self
, item
):
348 return any(item
is p
for p
in iter(self
._first
_promise
))
352 elem
= self
._next
_promise
353 while elem
is not None:
355 elem
= elem
._next
_promise
357 def _append_promise(self
, other
):
358 if other
is not None:
359 assert self
not in other
360 assert other
not in self
361 self
._last
_promise
()._set
_next
_promise
(other
)
363 def _last_promise(self
) -> '_Promise':
364 return list(iter(self
))[-1]
367 class ProgressReference(object):
371 completion
: Optional
[Callable
[[], 'Completion']] = None
374 ProgressReference can be used within Completions::
376 +---------------+ +---------------------------------+
378 | My Completion | +--> | on_complete=ProgressReference() |
380 +---------------+ +---------------------------------+
382 See :func:`Completion.with_progress` for an easy way to create
386 super(ProgressReference
, self
).__init
__()
387 self
.progress_id
= str(uuid
.uuid4())
388 self
.message
= message
391 #: The completion can already have a result, before the write
392 #: operation is effective. progress == 1 means, the services are
393 #: created / removed.
394 self
.completion
: Optional
[Callable
[[], Completion
]] = completion
396 #: if a orchestrator module can provide a more detailed
397 #: progress information, it needs to also call ``progress.update()``.
400 self
._completion
_has
_result
= False
401 self
.mgr
.all_progress_references
.append(self
)
405 ``__str__()`` is used for determining the message for progress events.
407 return self
.message
or super(ProgressReference
, self
).__str
__()
409 def __call__(self
, arg
):
410 self
._completion
_has
_result
= True
416 return self
._progress
419 def progress(self
, progress
):
420 assert progress
<= 1.0
421 self
._progress
= progress
424 self
.mgr
.remote("progress", "complete", self
.progress_id
)
425 self
.mgr
.all_progress_references
= [
426 p
for p
in self
.mgr
.all_progress_references
if p
is not self
]
428 self
.mgr
.remote("progress", "update", self
.progress_id
, self
.message
,
430 [("origin", "orchestrator")])
432 # If the progress module is disabled that's fine,
433 # they just won't see the output.
438 return self
.progress
== 1 and self
._completion
_has
_result
441 def progress_run(progress
):
442 self
.progress
= progress
444 c
= self
.completion().then(progress_run
)
445 self
.mgr
.process([c
._first
_promise
])
450 self
._completion
_has
_result
= True
454 class Completion(_Promise
, Generic
[T
]):
456 Combines multiple promises into one overall operation.
458 Completions are composable by being able to
459 call one completion from another completion. I.e. making them re-usable
460 using Promises E.g.::
463 ... return Orchestrator().get_hosts().then(self._create_osd)
465 where ``get_hosts`` returns a Completion of list of hosts and
466 ``_create_osd`` takes a list of hosts.
468 The concept behind this is to store the computation steps
469 explicit and then explicitly evaluate the chain:
472 ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
474 ... assert p.result = "4"
478 +---------------+ +-----------------+
480 | lambda x: x*x | +--> | lambda x: str(x)|
482 +---------------+ +-----------------+
487 _first_promise
: Optional
["Completion"] = None,
488 value
: Any
= _Promise
.NO_RESULT
,
489 on_complete
: Optional
[Callable
] = None,
490 name
: Optional
[str] = None,
492 super(Completion
, self
).__init
__(_first_promise
, value
, on_complete
, name
)
495 def _progress_reference(self
) -> Optional
[ProgressReference
]:
496 if hasattr(self
._on
_complete
, 'progress_id'):
497 return self
._on
_complete
# type: ignore
501 def progress_reference(self
) -> Optional
[ProgressReference
]:
503 ProgressReference. Marks this completion
504 as a write completeion.
507 references
= [c
._progress
_reference
for c
in iter(
508 self
) if c
._progress
_reference
is not None]
510 assert len(references
) == 1
515 def with_progress(cls
: Any
,
518 _first_promise
: Optional
["Completion"] = None,
519 value
: Any
= _Promise
.NO_RESULT
,
520 on_complete
: Optional
[Callable
] = None,
521 calc_percent
: Optional
[Callable
[[], Any
]] = None
525 _first_promise
=_first_promise
,
527 on_complete
=on_complete
528 ).add_progress(message
, mgr
, calc_percent
)
530 return c
._first
_promise
532 def add_progress(self
,
535 calc_percent
: Optional
[Callable
[[], Any
]] = None
538 on_complete
=ProgressReference(
541 completion
=calc_percent
545 def fail(self
, e
: Exception):
546 super(Completion
, self
).fail(e
)
547 if self
._progress
_reference
:
548 self
._progress
_reference
.fail()
550 def finalize(self
, result
: Union
[None, object, T
] = _Promise
.NO_RESULT
):
551 if self
._first
_promise
._state
== self
.INITIALIZED
:
552 self
._first
_promise
._finalize
(result
)
555 def result(self
) -> T
:
557 The result of the operation that we were waited
558 for. Only valid after calling Orchestrator.process() on this
561 last
= self
._last
_promise
()
562 assert last
._state
== _Promise
.FINISHED
563 return cast(T
, last
._value
)
565 def result_str(self
) -> str:
566 """Force a string."""
567 if self
.result
is None:
569 if isinstance(self
.result
, list):
570 return '\n'.join(str(x
) for x
in self
.result
)
571 return str(self
.result
)
574 def exception(self
) -> Optional
[Exception]:
575 return self
._last
_promise
()._exception
578 def serialized_exception(self
) -> Optional
[bytes
]:
579 return self
._last
_promise
()._serialized
_exception
582 def has_result(self
) -> bool:
584 Has the operation already a result?
586 For Write operations, it can already have a
587 result, if the orchestrator's configuration is
588 persistently written. Typically this would
589 indicate that an update had been written to
590 a manifest, but that the update had not
591 necessarily been pushed out to the cluster.
595 return self
._last
_promise
()._state
== _Promise
.FINISHED
598 def is_errored(self
) -> bool:
600 Has the completion failed. Default implementation looks for
601 self.exception. Can be overwritten.
603 return self
.exception
is not None
606 def needs_result(self
) -> bool:
608 Could the external operation be deemed as complete,
610 We must wait for a read operation only if it is not complete.
612 return not self
.is_errored
and not self
.has_result
615 def is_finished(self
) -> bool:
617 Could the external operation be deemed as complete,
619 We must wait for a read operation only if it is not complete.
621 return self
.is_errored
or (self
.has_result
)
623 def pretty_print(self
):
625 reprs
= '\n'.join(p
.pretty_print_1() for p
in iter(self
._first
_promise
))
626 return """<{}>[\n{}\n]""".format(self
.__class
__.__name
__, reprs
)
629 def pretty_print(completions
: Sequence
[Completion
]) -> str:
630 return ', '.join(c
.pretty_print() for c
in completions
)
633 def raise_if_exception(c
: Completion
) -> None:
635 :raises OrchestratorError: Some user error or a config error.
636 :raises Exception: Some internal error
638 if c
.serialized_exception
is not None:
640 e
= pickle
.loads(c
.serialized_exception
)
641 except (KeyError, AttributeError):
642 raise Exception('{}: {}'.format(type(c
.exception
), c
.exception
))
646 class TrivialReadCompletion(Completion
[T
]):
648 This is the trivial completion simply wrapping a result.
651 def __init__(self
, result
: T
):
652 super(TrivialReadCompletion
, self
).__init
__()
654 self
.finalize(result
)
657 def _hide_in_features(f
):
658 f
._hide
_in
_features
= True
662 class Orchestrator(object):
664 Calls in this class may do long running remote operations, with time
665 periods ranging from network latencies to package install latencies and large
666 internet downloads. For that reason, all are asynchronous, and return
667 ``Completion`` objects.
669 Methods should only return the completion and not directly execute
670 anything, like network calls. Otherwise the purpose of
671 those completions is defeated.
673 Implementations are not required to start work on an operation until
674 the caller waits on the relevant Completion objects. Callers making
675 multiple updates should not wait on Completions until they're done
676 sending operations: this enables implementations to batch up a series
677 of updates when wait() is called on a set of Completion objects.
679 Implementations are encouraged to keep reasonably fresh caches of
680 the status of the system: it is better to serve a stale-but-recent
681 result read of e.g. device inventory than it is to keep the caller waiting
682 while you scan hosts every time.
686 def is_orchestrator_module(self
):
688 Enable other modules to interrogate this module to discover
689 whether it's usable as an orchestrator module.
691 Subclasses do not need to override this.
696 def available(self
) -> Tuple
[bool, str]:
698 Report whether we can talk to the orchestrator. This is the
699 place to give the user a meaningful message if the orchestrator
700 isn't running or can't be contacted.
702 This method may be called frequently (e.g. every page load
703 to conditionally display a warning banner), so make sure it's
704 not too expensive. It's okay to give a slightly stale status
705 (e.g. based on a periodic background ping of the orchestrator)
706 if that's necessary to make this method fast.
709 `True` doesn't mean that the desired functionality
710 is actually available in the orchestrator. I.e. this
711 won't work as expected::
714 ... if OrchestratorClientMixin().available()[0]: # wrong.
715 ... OrchestratorClientMixin().get_hosts()
717 :return: two-tuple of boolean, string
719 raise NotImplementedError()
722 def process(self
, completions
: List
[Completion
]) -> None:
724 Given a list of Completion instances, process any which are
727 Callers should inspect the detail of each completion to identify
728 partial completion/progress information, and present that information
731 This method should not block, as this would make it slow to query
732 a status, while other long running operations are in progress.
734 raise NotImplementedError()
737 def get_feature_set(self
):
738 """Describes which methods this orchestrator implements
741 `True` doesn't mean that the desired functionality
742 is actually possible in the orchestrator. I.e. this
743 won't work as expected::
746 ... api = OrchestratorClientMixin()
747 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
750 It's better to ask for forgiveness instead::
754 ... OrchestratorClientMixin().get_hosts()
755 ... except (OrchestratorError, NotImplementedError):
758 :returns: Dict of API method names to ``{'available': True or False}``
760 module
= self
.__class
__
761 features
= {a
: {'available': getattr(Orchestrator
, a
, None) != getattr(module
, a
)}
762 for a
in Orchestrator
.__dict
__
763 if not a
.startswith('_') and not getattr(getattr(Orchestrator
, a
), '_hide_in_features', False)
767 def cancel_completions(self
) -> None:
769 Cancels ongoing completions. Unstuck the mgr.
771 raise NotImplementedError()
773 def pause(self
) -> None:
774 raise NotImplementedError()
776 def resume(self
) -> None:
777 raise NotImplementedError()
779 def add_host(self
, host_spec
: HostSpec
) -> Completion
[str]:
781 Add a host to the orchestrator inventory.
783 :param host: hostname
785 raise NotImplementedError()
787 def remove_host(self
, host
: str) -> Completion
[str]:
789 Remove a host from the orchestrator inventory.
791 :param host: hostname
793 raise NotImplementedError()
795 def update_host_addr(self
, host
: str, addr
: str) -> Completion
[str]:
797 Update a host's address
799 :param host: hostname
800 :param addr: address (dns name or IP)
802 raise NotImplementedError()
804 def get_hosts(self
) -> Completion
[List
[HostSpec
]]:
806 Report the hosts in the cluster.
808 :return: list of HostSpec
810 raise NotImplementedError()
812 def add_host_label(self
, host
: str, label
: str) -> Completion
[str]:
816 raise NotImplementedError()
818 def remove_host_label(self
, host
: str, label
: str) -> Completion
[str]:
822 raise NotImplementedError()
824 def host_ok_to_stop(self
, hostname
: str) -> Completion
:
826 Check if the specified host can be safely stopped without reducing availability
828 :param host: hostname
830 raise NotImplementedError()
832 def get_inventory(self
, host_filter
: Optional
['InventoryFilter'] = None, refresh
: bool = False) -> Completion
[List
['InventoryHost']]:
834 Returns something that was created by `ceph-volume inventory`.
836 :return: list of InventoryHost
838 raise NotImplementedError()
840 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None, refresh
: bool = False) -> Completion
[List
['ServiceDescription']]:
842 Describe a service (of any kind) that is already configured in
843 the orchestrator. For example, when viewing an OSD in the dashboard
844 we might like to also display information about the orchestrator's
845 view of the service (like the kubernetes pod ID).
847 When viewing a CephFS filesystem in the dashboard, we would use this
848 to display the pods being currently run for MDS daemons.
850 :return: list of ServiceDescription objects.
852 raise NotImplementedError()
854 def list_daemons(self
, service_name
: Optional
[str] = None, daemon_type
: Optional
[str] = None, daemon_id
: Optional
[str] = None, host
: Optional
[str] = None, refresh
: bool = False) -> Completion
[List
['DaemonDescription']]:
856 Describe a daemon (of any kind) that is already configured in
859 :return: list of DaemonDescription objects.
861 raise NotImplementedError()
863 def apply(self
, specs
: List
["GenericSpec"]) -> Completion
[List
[str]]:
867 fns
: Dict
[str, function
] = {
868 'alertmanager': self
.apply_alertmanager
,
869 'crash': self
.apply_crash
,
870 'grafana': self
.apply_grafana
,
871 'iscsi': self
.apply_iscsi
,
872 'mds': self
.apply_mds
,
873 'mgr': self
.apply_mgr
,
874 'mon': self
.apply_mon
,
875 'nfs': self
.apply_nfs
,
876 'node-exporter': self
.apply_node_exporter
,
877 'osd': lambda dg
: self
.apply_drivegroups([dg
]),
878 'prometheus': self
.apply_prometheus
,
879 'rbd-mirror': self
.apply_rbd_mirror
,
880 'rgw': self
.apply_rgw
,
881 'host': self
.add_host
,
885 if isinstance(ls
, list):
891 fn
= cast(Callable
[["GenericSpec"], Completion
], fns
[spec
.service_type
])
892 completion
= fn(spec
)
895 fn
= cast(Callable
[["GenericSpec"], Completion
], fns
[spec
.service_type
])
896 return fn(s
).then(lambda r
: merge(ls
, r
))
897 completion
= completion
.then(next
)
900 def plan(self
, spec
: List
["GenericSpec"]) -> Completion
[List
]:
902 Plan (Dry-run, Preview) a List of Specs.
904 raise NotImplementedError()
906 def remove_daemons(self
, names
: List
[str]) -> Completion
[List
[str]]:
908 Remove specific daemon(s).
912 raise NotImplementedError()
914 def remove_service(self
, service_name
: str) -> Completion
[str]:
916 Remove a service (a collection of daemons).
920 raise NotImplementedError()
922 def service_action(self
, action
: str, service_name
: str) -> Completion
[List
[str]]:
924 Perform an action (start/stop/reload) on a service (i.e., all daemons
925 providing the logical service).
927 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
928 :param service_name: service_type + '.' + service_id
929 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
932 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
933 raise NotImplementedError()
935 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str]=None) -> Completion
[str]:
937 Perform an action (start/stop/reload) on a daemon.
939 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
940 :param daemon_name: name of daemon
941 :param image: Container image when redeploying that daemon
944 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
945 raise NotImplementedError()
947 def create_osds(self
, drive_group
: DriveGroupSpec
) -> Completion
[str]:
949 Create one or more OSDs within a single Drive Group.
951 The principal argument here is the drive_group member
952 of OsdSpec: other fields are advisory/extensible for any
953 finer-grained OSD feature enablement (choice of backing store,
954 compression/encryption, etc).
956 raise NotImplementedError()
958 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> Completion
[List
[str]]:
959 """ Update OSD cluster """
960 raise NotImplementedError()
962 def set_unmanaged_flag(self
,
963 unmanaged_flag
: bool,
964 service_type
: str = 'osd',
966 ) -> HandleCommandResult
:
967 raise NotImplementedError()
969 def preview_osdspecs(self
,
970 osdspec_name
: Optional
[str] = 'osd',
971 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
972 ) -> Completion
[str]:
973 """ Get a preview for OSD deployments """
974 raise NotImplementedError()
976 def remove_osds(self
, osd_ids
: List
[str],
977 replace
: bool = False,
978 force
: bool = False) -> Completion
[str]:
980 :param osd_ids: list of OSD IDs
981 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
982 :param force: Forces the OSD removal process without waiting for the data to be drained first.
983 Note that this can only remove OSDs that were successfully
984 created (i.e. got an OSD ID).
986 raise NotImplementedError()
988 def stop_remove_osds(self
, osd_ids
: List
[str]) -> Completion
:
992 raise NotImplementedError()
994 def remove_osds_status(self
) -> Completion
:
996 Returns a status of the ongoing OSD removal operations.
998 raise NotImplementedError()
1000 def blink_device_light(self
, ident_fault
: str, on
: bool, locations
: List
['DeviceLightLoc']) -> Completion
[List
[str]]:
1002 Instructs the orchestrator to enable or disable either the ident or the fault LED.
1004 :param ident_fault: either ``"ident"`` or ``"fault"``
1005 :param on: ``True`` = on.
1006 :param locations: See :class:`orchestrator.DeviceLightLoc`
1008 raise NotImplementedError()
1010 def zap_device(self
, host
: str, path
: str) -> Completion
[str]:
1011 """Zap/Erase a device (DESTROYS DATA)"""
1012 raise NotImplementedError()
1014 def add_mon(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1015 """Create mon daemon(s)"""
1016 raise NotImplementedError()
1018 def apply_mon(self
, spec
: ServiceSpec
) -> Completion
[str]:
1019 """Update mon cluster"""
1020 raise NotImplementedError()
1022 def add_mgr(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1023 """Create mgr daemon(s)"""
1024 raise NotImplementedError()
1026 def apply_mgr(self
, spec
: ServiceSpec
) -> Completion
[str]:
1027 """Update mgr cluster"""
1028 raise NotImplementedError()
1030 def add_mds(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1031 """Create MDS daemon(s)"""
1032 raise NotImplementedError()
1034 def apply_mds(self
, spec
: ServiceSpec
) -> Completion
[str]:
1035 """Update MDS cluster"""
1036 raise NotImplementedError()
1038 def add_rgw(self
, spec
: RGWSpec
) -> Completion
[List
[str]]:
1039 """Create RGW daemon(s)"""
1040 raise NotImplementedError()
1042 def apply_rgw(self
, spec
: RGWSpec
) -> Completion
[str]:
1043 """Update RGW cluster"""
1044 raise NotImplementedError()
1046 def add_rbd_mirror(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1047 """Create rbd-mirror daemon(s)"""
1048 raise NotImplementedError()
1050 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> Completion
[str]:
1051 """Update rbd-mirror cluster"""
1052 raise NotImplementedError()
1054 def add_nfs(self
, spec
: NFSServiceSpec
) -> Completion
[List
[str]]:
1055 """Create NFS daemon(s)"""
1056 raise NotImplementedError()
1058 def apply_nfs(self
, spec
: NFSServiceSpec
) -> Completion
[str]:
1059 """Update NFS cluster"""
1060 raise NotImplementedError()
1062 def add_iscsi(self
, spec
: IscsiServiceSpec
) -> Completion
[List
[str]]:
1063 """Create iscsi daemon(s)"""
1064 raise NotImplementedError()
1066 def apply_iscsi(self
, spec
: IscsiServiceSpec
) -> Completion
[str]:
1067 """Update iscsi cluster"""
1068 raise NotImplementedError()
1070 def add_prometheus(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1071 """Create new prometheus daemon"""
1072 raise NotImplementedError()
1074 def apply_prometheus(self
, spec
: ServiceSpec
) -> Completion
[str]:
1075 """Update prometheus cluster"""
1076 raise NotImplementedError()
1078 def add_node_exporter(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1079 """Create a new Node-Exporter service"""
1080 raise NotImplementedError()
1082 def apply_node_exporter(self
, spec
: ServiceSpec
) -> Completion
[str]:
1083 """Update existing a Node-Exporter daemon(s)"""
1084 raise NotImplementedError()
1086 def add_crash(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1087 """Create a new crash service"""
1088 raise NotImplementedError()
1090 def apply_crash(self
, spec
: ServiceSpec
) -> Completion
[str]:
1091 """Update existing a crash daemon(s)"""
1092 raise NotImplementedError()
1094 def add_grafana(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1095 """Create a new Node-Exporter service"""
1096 raise NotImplementedError()
1098 def apply_grafana(self
, spec
: ServiceSpec
) -> Completion
[str]:
1099 """Update existing a Node-Exporter daemon(s)"""
1100 raise NotImplementedError()
1102 def add_alertmanager(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1103 """Create a new AlertManager service"""
1104 raise NotImplementedError()
1106 def apply_alertmanager(self
, spec
: ServiceSpec
) -> Completion
[str]:
1107 """Update an existing AlertManager daemon(s)"""
1108 raise NotImplementedError()
1110 def upgrade_check(self
, image
: Optional
[str], version
: Optional
[str]) -> Completion
[str]:
1111 raise NotImplementedError()
1113 def upgrade_start(self
, image
: Optional
[str], version
: Optional
[str]) -> Completion
[str]:
1114 raise NotImplementedError()
1116 def upgrade_pause(self
) -> Completion
[str]:
1117 raise NotImplementedError()
1119 def upgrade_resume(self
) -> Completion
[str]:
1120 raise NotImplementedError()
1122 def upgrade_stop(self
) -> Completion
[str]:
1123 raise NotImplementedError()
1125 def upgrade_status(self
) -> Completion
['UpgradeStatusSpec']:
1127 If an upgrade is currently underway, report on where
1128 we are in the process, or if some error has occurred.
1130 :return: UpgradeStatusSpec instance
1132 raise NotImplementedError()
1135 def upgrade_available(self
) -> Completion
:
1137 Report on what versions are available to upgrade to
1139 :return: List of strings
1141 raise NotImplementedError()
1144 GenericSpec
= Union
[ServiceSpec
, HostSpec
]
1147 def json_to_generic_spec(spec
: dict) -> GenericSpec
:
1148 if 'service_type' in spec
and spec
['service_type'] == 'host':
1149 return HostSpec
.from_json(spec
)
1151 return ServiceSpec
.from_json(spec
)
1154 class UpgradeStatusSpec(object):
1155 # Orchestrator's report on what's going on with any ongoing upgrade
1157 self
.in_progress
= False # Is an upgrade underway?
1158 self
.target_image
= None
1159 self
.services_complete
= [] # Which daemon types are fully updated?
1160 self
.message
= "" # Freeform description
1163 def handle_type_error(method
):
1165 def inner(cls
, *args
, **kwargs
):
1167 return method(cls
, *args
, **kwargs
)
1168 except TypeError as e
:
1169 error_msg
= '{}: {}'.format(cls
.__name
__, e
)
1170 raise OrchestratorValidationError(error_msg
)
1174 class DaemonDescription(object):
1176 For responding to queries about the status of a particular daemon,
1177 stateful or stateless.
1179 This is not about health or performance monitoring of daemons: it's
1180 about letting the orchestrator tell Ceph whether and where a
1181 daemon is scheduled in the cluster. When an orchestrator tells
1182 Ceph "it's running on host123", that's not a promise that the process
1183 is literally up this second, it's a description of where the orchestrator
1184 has decided the daemon should run.
1192 container_image_id
=None,
1193 container_image_name
=None,
1200 last_configured
=None,
1201 osdspec_affinity
=None,
1203 events
: Optional
[List
['OrchestratorEvent']] = None,
1204 is_active
: bool=False):
1206 # Host is at the same granularity as InventoryHost
1207 self
.hostname
: str = hostname
1209 # Not everyone runs in containers, but enough people do to
1210 # justify having the container_id (runtime id) and container_image
1212 self
.container_id
= container_id
# runtime id
1213 self
.container_image_id
= container_image_id
# image hash
1214 self
.container_image_name
= container_image_name
# image friendly name
1216 # The type of service (osd, mon, mgr, etc.)
1217 self
.daemon_type
= daemon_type
1219 # The orchestrator will have picked some names for daemons,
1220 # typically either based on hostnames or on pod names.
1221 # This is the <foo> in mds.<foo>, the ID that will appear
1222 # in the FSMap/ServiceMap.
1223 self
.daemon_id
: str = daemon_id
1225 # Service version that was deployed
1226 self
.version
= version
1228 # Service status: -1 error, 0 stopped, 1 running
1229 self
.status
= status
1231 # Service status description when status == -1.
1232 self
.status_desc
= status_desc
1234 # datetime when this info was last refreshed
1235 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
1237 self
.created
: Optional
[datetime
.datetime
] = created
1238 self
.started
: Optional
[datetime
.datetime
] = started
1239 self
.last_configured
: Optional
[datetime
.datetime
] = last_configured
1240 self
.last_deployed
: Optional
[datetime
.datetime
] = last_deployed
1242 # Affinity to a certain OSDSpec
1243 self
.osdspec_affinity
: Optional
[str] = osdspec_affinity
1245 self
.events
: List
[OrchestratorEvent
] = events
or []
1247 self
.is_active
= is_active
1250 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
1252 def matches_service(self
, service_name
: Optional
[str]) -> bool:
1254 return self
.name().startswith(service_name
+ '.')
1257 def service_id(self
):
1258 if self
.daemon_type
== 'osd' and self
.osdspec_affinity
:
1259 return self
.osdspec_affinity
1262 err
= OrchestratorError("DaemonDescription: Cannot calculate service_id: "
1263 f
"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
1265 if not self
.hostname
:
1266 # TODO: can a DaemonDescription exist without a hostname?
1269 # use the bare hostname, not the FQDN.
1270 host
= self
.hostname
.split('.')[0]
1272 if host
== self
.daemon_id
:
1273 # daemon_id == "host"
1274 return self
.daemon_id
1276 elif host
in self
.daemon_id
:
1277 # daemon_id == "service_id.host"
1278 # daemon_id == "service_id.host.random"
1279 pre
, post
= self
.daemon_id
.rsplit(host
, 1)
1280 if not pre
.endswith('.'):
1281 # '.' sep missing at front of host
1283 elif post
and not post
.startswith('.'):
1284 # '.' sep missing at end of host
1288 # daemon_id == "service_id.random"
1289 if self
.daemon_type
== 'rgw':
1290 v
= self
.daemon_id
.split('.')
1291 if len(v
) in [3, 4]:
1292 return '.'.join(v
[0:2])
1294 # daemon_id == "service_id"
1295 return self
.daemon_id
1297 if self
.daemon_type
in ServiceSpec
.REQUIRES_SERVICE_ID
:
1300 return self
.daemon_id
1302 def service_name(self
):
1303 if self
.daemon_type
in ServiceSpec
.REQUIRES_SERVICE_ID
:
1304 return f
'{self.daemon_type}.{self.service_id()}'
1305 return self
.daemon_type
1308 return "<DaemonDescription>({type}.{id})".format(type=self
.daemon_type
,
1313 out
['daemon_type'] = self
.daemon_type
1314 out
['daemon_id'] = self
.daemon_id
1315 out
['hostname'] = self
.hostname
1316 out
['container_id'] = self
.container_id
1317 out
['container_image_id'] = self
.container_image_id
1318 out
['container_image_name'] = self
.container_image_name
1319 out
['version'] = self
.version
1320 out
['status'] = self
.status
1321 out
['status_desc'] = self
.status_desc
1322 if self
.daemon_type
== 'osd':
1323 out
['osdspec_affinity'] = self
.osdspec_affinity
1324 out
['is_active'] = self
.is_active
1326 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1328 if getattr(self
, k
):
1329 out
[k
] = getattr(self
, k
).strftime(DATEFMT
)
1332 out
['events'] = [e
.to_json() for e
in self
.events
]
1334 empty
= [k
for k
, v
in out
.items() if v
is None]
1341 def from_json(cls
, data
):
1343 event_strs
= c
.pop('events', [])
1344 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1347 c
[k
] = datetime
.datetime
.strptime(c
[k
], DATEFMT
)
1348 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1349 return cls(events
=events
, **c
)
1352 # feel free to change this:
1353 return DaemonDescription
.from_json(self
.to_json())
1356 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'DaemonDescription'):
1357 return dumper
.represent_dict(data
.to_json().items())
1360 yaml
.add_representer(DaemonDescription
, DaemonDescription
.yaml_representer
)
1363 class ServiceDescription(object):
1365 For responding to queries about the status of a particular service,
1366 stateful or stateless.
1368 This is not about health or performance monitoring of services: it's
1369 about letting the orchestrator tell Ceph whether and where a
1370 service is scheduled in the cluster. When an orchestrator tells
1371 Ceph "it's running on host123", that's not a promise that the process
1372 is literally up this second, it's a description of where the orchestrator
1373 has decided the service should run.
1378 container_image_id
=None,
1379 container_image_name
=None,
1380 rados_config_location
=None,
1386 events
: Optional
[List
['OrchestratorEvent']] = None):
1387 # Not everyone runs in containers, but enough people do to
1388 # justify having the container_image_id (image hash) and container_image
1390 self
.container_image_id
= container_image_id
# image hash
1391 self
.container_image_name
= container_image_name
# image friendly name
1393 # Location of the service configuration when stored in rados
1394 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1395 self
.rados_config_location
= rados_config_location
1397 # If the service exposes REST-like API, this attribute should hold
1399 self
.service_url
= service_url
1404 # Number of daemons up
1405 self
.running
= running
1407 # datetime when this info was last refreshed
1408 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
1409 self
.created
: Optional
[datetime
.datetime
] = created
1411 self
.spec
: ServiceSpec
= spec
1413 self
.events
: List
[OrchestratorEvent
] = events
or []
1415 def service_type(self
):
1416 return self
.spec
.service_type
1419 return f
"<ServiceDescription of {self.spec.one_line_str()}>"
1421 def to_json(self
) -> OrderedDict
:
1422 out
= self
.spec
.to_json()
1424 'container_image_id': self
.container_image_id
,
1425 'container_image_name': self
.container_image_name
,
1426 'rados_config_location': self
.rados_config_location
,
1427 'service_url': self
.service_url
,
1429 'running': self
.running
,
1430 'last_refresh': self
.last_refresh
,
1431 'created': self
.created
,
1433 for k
in ['last_refresh', 'created']:
1434 if getattr(self
, k
):
1435 status
[k
] = getattr(self
, k
).strftime(DATEFMT
)
1436 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1437 out
['status'] = status
1439 out
['events'] = [e
.to_json() for e
in self
.events
]
1444 def from_json(cls
, data
: dict):
1446 status
= c
.pop('status', {})
1447 event_strs
= c
.pop('events', [])
1448 spec
= ServiceSpec
.from_json(c
)
1450 c_status
= status
.copy()
1451 for k
in ['last_refresh', 'created']:
1453 c_status
[k
] = datetime
.datetime
.strptime(c_status
[k
], DATEFMT
)
1454 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1455 return cls(spec
=spec
, events
=events
, **c_status
)
1458 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'DaemonDescription'):
1459 return dumper
.represent_dict(data
.to_json().items())
1462 yaml
.add_representer(ServiceDescription
, ServiceDescription
.yaml_representer
)
1465 class InventoryFilter(object):
1467 When fetching inventory, use this filter to avoid unnecessarily
1468 scanning the whole estate.
1470 Typical use: filter by host when presenting UI workflow for configuring
1471 a particular server.
1472 filter by label when not all of estate is Ceph servers,
1473 and we want to only learn about the Ceph servers.
1474 filter by label when we are interested particularly
1475 in e.g. OSD servers.
1479 def __init__(self
, labels
: Optional
[List
[str]] = None, hosts
: Optional
[List
[str]] = None) -> None:
1481 #: Optional: get info about hosts matching labels
1482 self
.labels
= labels
1484 #: Optional: get info about certain named hosts only
1488 class InventoryHost(object):
1490 When fetching inventory, all Devices are groups inside of an
1494 def __init__(self
, name
: str, devices
: Optional
[inventory
.Devices
] = None, labels
: Optional
[List
[str]] = None, addr
: Optional
[str] = None) -> None:
1496 devices
= inventory
.Devices([])
1499 assert isinstance(devices
, inventory
.Devices
)
1501 self
.name
= name
# unique within cluster. For example a hostname.
1502 self
.addr
= addr
or name
1503 self
.devices
= devices
1504 self
.labels
= labels
1510 'devices': self
.devices
.to_json(),
1511 'labels': self
.labels
,
1515 def from_json(cls
, data
):
1517 _data
= copy
.deepcopy(data
)
1518 name
= _data
.pop('name')
1519 addr
= _data
.pop('addr', None) or name
1520 devices
= inventory
.Devices
.from_json(_data
.pop('devices'))
1521 labels
= _data
.pop('labels', list())
1523 error_msg
= 'Unknown key(s) in Inventory: {}'.format(','.join(_data
.keys()))
1524 raise OrchestratorValidationError(error_msg
)
1525 return cls(name
, devices
, labels
, addr
)
1526 except KeyError as e
:
1527 error_msg
= '{} is required for {}'.format(e
, cls
.__name
__)
1528 raise OrchestratorValidationError(error_msg
)
1529 except TypeError as e
:
1530 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e
))
1533 def from_nested_items(cls
, hosts
):
1534 devs
= inventory
.Devices
.from_json
1535 return [cls(item
[0], devs(item
[1].data
)) for item
in hosts
]
1538 return "<InventoryHost>({name})".format(name
=self
.name
)
1541 def get_host_names(hosts
: List
['InventoryHost']) -> List
[str]:
1542 return [host
.name
for host
in hosts
]
1544 def __eq__(self
, other
):
1545 return self
.name
== other
.name
and self
.devices
== other
.devices
1548 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1550 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1553 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1555 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1556 See ``ceph osd metadata | jq '.[].device_ids'``
1561 class OrchestratorEvent
:
1563 Similar to K8s Events.
1565 Some form of "important" log message attached to something.
1569 regex_v1
= re
.compile(r
'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re
.MULTILINE
)
1571 def __init__(self
, created
: Union
[str, datetime
.datetime
], kind
, subject
, level
, message
):
1572 if isinstance(created
, str):
1573 created
= datetime
.datetime
.strptime(created
, DATEFMT
)
1574 self
.created
: datetime
.datetime
= created
1576 assert kind
in "service daemon".split()
1577 self
.kind
: str = kind
1579 # service name, or daemon danem or something
1580 self
.subject
: str = subject
1582 # Events are not meant for debugging. debugs should end in the log.
1583 assert level
in "INFO ERROR".split()
1586 self
.message
: str = message
1588 __slots__
= ('created', 'kind', 'subject', 'level', 'message')
1590 def kind_subject(self
) -> str:
1591 return f
'{self.kind}:{self.subject}'
1593 def to_json(self
) -> str:
1594 # Make a long list of events readable.
1595 created
= self
.created
.strftime(DATEFMT
)
1596 return f
'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1600 def from_json(cls
, data
) -> "OrchestratorEvent":
1602 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1603 '2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1608 match
= cls
.regex_v1
.match(data
)
1610 return cls(*match
.groups())
1611 raise ValueError(f
'Unable to match: "{data}"')
1613 def __eq__(self
, other
):
1614 if not isinstance(other
, OrchestratorEvent
):
1617 return self
.created
== other
.created
and self
.kind
== other
.kind \
1618 and self
.subject
== other
.subject
and self
.message
== other
.message
1621 def _mk_orch_methods(cls
):
1622 # Needs to be defined outside of for.
1623 # Otherwise meth is always bound to last key
1624 def shim(method_name
):
1625 def inner(self
, *args
, **kwargs
):
1626 completion
= self
._oremote
(method_name
, args
, kwargs
)
1630 for meth
in Orchestrator
.__dict
__:
1631 if not meth
.startswith('_') and meth
not in ['is_orchestrator_module']:
1632 setattr(cls
, meth
, shim(meth
))
1637 class OrchestratorClientMixin(Orchestrator
):
1639 A module that inherents from `OrchestratorClientMixin` can directly call
1640 all :class:`Orchestrator` methods without manually calling remote.
1642 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1643 calls :func:`OrchestratorClientMixin._oremote`
1645 >>> class MyModule(OrchestratorClientMixin):
1647 ... completion = self.add_host('somehost') # calls `_oremote()`
1648 ... self._orchestrator_wait([completion])
1649 ... self.log.debug(completion.result)
1651 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1652 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1653 "real" implementation of the orchestrator.
1656 >>> import mgr_module
1658 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1659 ... def __init__(self, ...):
1660 ... self.orch_client = OrchestratorClientMixin()
1661 ... self.orch_client.set_mgr(self.mgr))
1664 def set_mgr(self
, mgr
: MgrModule
) -> None:
1666 Useable in the Dashbord that uses a global ``mgr``
1669 self
.__mgr
= mgr
# Make sure we're not overwriting any other `mgr` properties
1671 def __get_mgr(self
):
1674 except AttributeError:
1677 def _oremote(self
, meth
, args
, kwargs
):
1679 Helper for invoking `remote` on whichever orchestrator is enabled
1681 :raises RuntimeError: If the remote method failed.
1682 :raises OrchestratorError: orchestrator failed to perform
1683 :raises ImportError: no `orchestrator` module or backend not found.
1685 mgr
= self
.__get
_mgr
()
1688 o
= mgr
._select
_orchestrator
()
1689 except AttributeError:
1690 o
= mgr
.remote('orchestrator', '_select_orchestrator')
1693 raise NoOrchestrator()
1695 mgr
.log
.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr
.module_name
, o
, meth
, args
, kwargs
))
1697 return mgr
.remote(o
, meth
, *args
, **kwargs
)
1698 except Exception as e
:
1699 if meth
== 'get_feature_set':
1700 raise # self.get_feature_set() calls self._oremote()
1701 f_set
= self
.get_feature_set()
1702 if meth
not in f_set
or not f_set
[meth
]['available']:
1703 raise NotImplementedError(f
'{o} does not implement {meth}') from e
1706 def _orchestrator_wait(self
, completions
: List
[Completion
]) -> None:
1708 Wait for completions to complete (reads) or
1709 become persistent (writes).
1711 Waits for writes to be *persistent* but not *effective*.
1713 :param completions: List of Completions
1714 :raises NoOrchestrator:
1715 :raises RuntimeError: something went wrong while calling the process method.
1716 :raises ImportError: no `orchestrator` module or backend not found.
1718 while any(not c
.has_result
for c
in completions
):
1719 self
.process(completions
)
1720 self
.__get
_mgr
().log
.info("Operations pending: %s",
1721 sum(1 for c
in completions
if not c
.has_result
))
1722 if any(c
.needs_result
for c
in completions
):