3 ceph-mgr orchestrator interface
5 Please see the ceph-mgr module developer's guide for more information.
17 from collections
import namedtuple
, OrderedDict
18 from contextlib
import contextmanager
19 from functools
import wraps
23 from ceph
.deployment
import inventory
24 from ceph
.deployment
.service_spec
import ServiceSpec
, NFSServiceSpec
, RGWSpec
, \
25 ServiceSpecValidationError
, IscsiServiceSpec
26 from ceph
.deployment
.drive_group
import DriveGroupSpec
27 from ceph
.deployment
.hostspec
import HostSpec
28 from ceph
.utils
import datetime_to_str
, str_to_datetime
30 from mgr_module
import MgrModule
, CLICommand
, HandleCommandResult
33 from typing
import TypeVar
, Generic
, List
, Optional
, Union
, Tuple
, Iterator
, Callable
, Any
, \
34 Type
, Sequence
, Dict
, cast
38 logger
= logging
.getLogger(__name__
)
43 class OrchestratorError(Exception):
45 General orchestrator specific error.
47 Used for deployment, configuration or user errors.
49 It's not intended for programming errors or orchestrator internal errors.
54 errno
: int = -errno
.EINVAL
,
55 event_kind_subject
: Optional
[Tuple
[str, str]] = None):
56 super(Exception, self
).__init
__(msg
)
58 # See OrchestratorEvent.subject
59 self
.event_subject
= event_kind_subject
62 class NoOrchestrator(OrchestratorError
):
64 No orchestrator in configured.
67 def __init__(self
, msg
="No orchestrator configured (try `ceph orch set backend`)"):
68 super(NoOrchestrator
, self
).__init
__(msg
, errno
=-errno
.ENOENT
)
71 class OrchestratorValidationError(OrchestratorError
):
73 Raised when an orchestrator doesn't support a specific feature.
78 def set_exception_subject(kind
, subject
, overwrite
=False):
81 except OrchestratorError
as e
:
82 if overwrite
or hasattr(e
, 'event_subject'):
83 e
.event_subject
= (kind
, subject
)
87 def handle_exception(prefix
, cmd_args
, desc
, perm
, func
):
89 def wrapper(*args
, **kwargs
):
91 return func(*args
, **kwargs
)
92 except (OrchestratorError
, ServiceSpecValidationError
) as e
:
93 # Do not print Traceback for expected errors.
94 return HandleCommandResult(e
.errno
, stderr
=str(e
))
95 except ImportError as e
:
96 return HandleCommandResult(-errno
.ENOENT
, stderr
=str(e
))
97 except NotImplementedError:
98 msg
= 'This Orchestrator does not support `{}`'.format(prefix
)
99 return HandleCommandResult(-errno
.ENOENT
, stderr
=msg
)
101 # misuse partial to copy `wrapper`
102 wrapper_copy
= lambda *l_args
, **l_kwargs
: wrapper(*l_args
, **l_kwargs
)
103 wrapper_copy
._prefix
= prefix
# type: ignore
104 wrapper_copy
._cli
_command
= CLICommand(prefix
, cmd_args
, desc
, perm
) # type: ignore
105 wrapper_copy
._cli
_command
.func
= wrapper_copy
# type: ignore
110 def _cli_command(perm
):
111 def inner_cli_command(prefix
, cmd_args
="", desc
=""):
112 return lambda func
: handle_exception(prefix
, cmd_args
, desc
, perm
, func
)
113 return inner_cli_command
116 _cli_read_command
= _cli_command('r')
117 _cli_write_command
= _cli_command('rw')
120 class CLICommandMeta(type):
122 This is a workaround for the use of a global variable CLICommand.COMMANDS which
123 prevents modules from importing any other module.
125 We make use of CLICommand, except for the use of the global variable.
127 def __init__(cls
, name
, bases
, dct
):
128 super(CLICommandMeta
, cls
).__init
__(name
, bases
, dct
)
129 dispatch
: Dict
[str, CLICommand
] = {}
130 for v
in dct
.values():
132 dispatch
[v
._prefix
] = v
._cli
_command
133 except AttributeError:
136 def handle_command(self
, inbuf
, cmd
):
137 if cmd
['prefix'] not in dispatch
:
138 return self
.handle_command(inbuf
, cmd
)
140 return dispatch
[cmd
['prefix']].call(self
, cmd
, inbuf
)
142 cls
.COMMANDS
= [cmd
.dump_cmd() for cmd
in dispatch
.values()]
143 cls
.handle_command
= handle_command
150 class _Promise(object):
152 A completion may need multiple promises to be fulfilled. `_Promise` is one
155 Typically ``Orchestrator`` implementations inherit from this class to
156 build their own way of finishing a step to fulfil a future.
158 They are not exposed in the orchestrator interface and can be seen as a
159 helper to build orchestrator modules.
161 INITIALIZED
= 1 # We have a parent completion and a next completion
163 FINISHED
= 3 # we have a final result
165 NO_RESULT
: None = _no_result()
166 ASYNC_RESULT
= object()
169 _first_promise
: Optional
["_Promise"] = None,
170 value
: Optional
[Any
] = NO_RESULT
,
171 on_complete
: Optional
[Callable
] = None,
172 name
: Optional
[str] = None,
174 self
._on
_complete
_ = on_complete
176 self
._next
_promise
: Optional
[_Promise
] = None
178 self
._state
= self
.INITIALIZED
179 self
._exception
: Optional
[Exception] = None
181 # Value of this _Promise. may be an intermediate result.
184 # _Promise is not a continuation monad, as `_result` is of type
185 # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
186 self
._first
_promise
: '_Promise' = _first_promise
or self
189 def _exception(self
) -> Optional
[Exception]:
190 return getattr(self
, '_exception_', None)
193 def _exception(self
, e
):
196 self
._serialized
_exception
_ = pickle
.dumps(e
) if e
is not None else None
197 except pickle
.PicklingError
:
198 logger
.error(f
"failed to pickle {e}")
199 if isinstance(e
, Exception):
200 e
= Exception(*e
.args
)
202 e
= Exception(str(e
))
203 # degenerate to a plain Exception
204 self
._serialized
_exception
_ = pickle
.dumps(e
)
207 def _serialized_exception(self
) -> Optional
[bytes
]:
208 return getattr(self
, '_serialized_exception_', None)
211 def _on_complete(self
) -> Optional
[Callable
]:
212 # https://github.com/python/mypy/issues/4125
213 return self
._on
_complete
_
216 def _on_complete(self
, val
: Optional
[Callable
]) -> None:
217 self
._on
_complete
_ = val
220 name
= self
._name
or getattr(self
._on
_complete
, '__name__',
221 '??') if self
._on
_complete
else 'None'
222 val
= repr(self
._value
) if self
._value
is not self
.NO_RESULT
else 'NA'
223 return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
224 self
.__class
__, self
._state
, val
, self
._on
_complete
, id(self
), name
, getattr(
225 next
, '_progress_reference', 'NA'), repr(self
._next
_promise
)
228 def pretty_print_1(self
):
231 elif self
._on
_complete
is None:
233 elif hasattr(self
._on
_complete
, '__name__'):
234 name
= getattr(self
._on
_complete
, '__name__')
236 name
= self
._on
_complete
.__class
__.__name
__
237 val
= repr(self
._value
) if self
._value
not in (self
.NO_RESULT
, self
.ASYNC_RESULT
) else '...'
239 self
.INITIALIZED
: ' ',
240 self
.RUNNING
: ' >>>',
241 self
.FINISHED
: '(done)'
243 return '{} {}({}),'.format(prefix
, name
, val
)
245 def then(self
: Any
, on_complete
: Callable
) -> Any
:
247 Call ``on_complete`` as soon as this promise is finalized.
249 assert self
._state
in (self
.INITIALIZED
, self
.RUNNING
)
251 if self
._next
_promise
is not None:
252 return self
._next
_promise
.then(on_complete
)
254 if self
._on
_complete
is not None:
255 self
._set
_next
_promise
(self
.__class
__(
256 _first_promise
=self
._first
_promise
,
257 on_complete
=on_complete
259 return self
._next
_promise
262 self
._on
_complete
= on_complete
263 self
._set
_next
_promise
(self
.__class
__(_first_promise
=self
._first
_promise
))
264 return self
._next
_promise
266 def _set_next_promise(self
, next
: '_Promise') -> None:
267 assert self
is not next
268 assert self
._state
in (self
.INITIALIZED
, self
.RUNNING
)
270 self
._next
_promise
= next
271 assert self
._next
_promise
is not None
272 for p
in iter(self
._next
_promise
):
273 p
._first
_promise
= self
._first
_promise
275 def _finalize(self
, value
=NO_RESULT
):
277 Sets this promise to complete.
279 Orchestrators may choose to use this helper function.
281 :param value: new value.
283 if self
._state
not in (self
.INITIALIZED
, self
.RUNNING
):
284 raise ValueError('finalize: {} already finished. {}'.format(repr(self
), value
))
286 self
._state
= self
.RUNNING
288 if value
is not self
.NO_RESULT
:
290 assert self
._value
is not self
.NO_RESULT
, repr(self
)
292 if self
._on
_complete
:
294 next_result
= self
._on
_complete
(self
._value
)
295 except Exception as e
:
299 next_result
= self
._value
301 if isinstance(next_result
, _Promise
):
302 # hack: _Promise is not a continuation monad.
303 next_result
= next_result
._first
_promise
# type: ignore
304 assert next_result
not in self
, repr(self
._first
_promise
) + repr(next_result
)
305 assert self
not in next_result
306 next_result
._append
_promise
(self
._next
_promise
)
307 self
._set
_next
_promise
(next_result
)
308 assert self
._next
_promise
309 if self
._next
_promise
._value
is self
.NO_RESULT
:
310 self
._next
_promise
._value
= self
._value
311 self
.propagate_to_next()
312 elif next_result
is not self
.ASYNC_RESULT
:
313 # simple map. simply forward
314 if self
._next
_promise
:
315 self
._next
_promise
._value
= next_result
317 # Hack: next_result is of type U, _value is of type T
318 self
._value
= next_result
# type: ignore
319 self
.propagate_to_next()
321 # asynchronous promise
324 def propagate_to_next(self
):
325 self
._state
= self
.FINISHED
326 logger
.debug('finalized {}'.format(repr(self
)))
327 if self
._next
_promise
:
328 self
._next
_promise
._finalize
()
330 def fail(self
, e
: Exception) -> None:
332 Sets the whole completion to be faild with this exception and end the
335 if self
._state
== self
.FINISHED
:
337 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e
)))
338 assert self
._state
in (self
.INITIALIZED
, self
.RUNNING
)
339 logger
.exception('_Promise failed')
341 self
._value
= f
'_exception: {e}'
342 if self
._next
_promise
:
343 self
._next
_promise
.fail(e
)
344 self
._state
= self
.FINISHED
346 def __contains__(self
, item
):
347 return any(item
is p
for p
in iter(self
._first
_promise
))
351 elem
= self
._next
_promise
352 while elem
is not None:
354 elem
= elem
._next
_promise
356 def _append_promise(self
, other
):
357 if other
is not None:
358 assert self
not in other
359 assert other
not in self
360 self
._last
_promise
()._set
_next
_promise
(other
)
362 def _last_promise(self
) -> '_Promise':
363 return list(iter(self
))[-1]
366 class ProgressReference(object):
370 completion
: Optional
[Callable
[[], 'Completion']] = None
373 ProgressReference can be used within Completions::
375 +---------------+ +---------------------------------+
377 | My Completion | +--> | on_complete=ProgressReference() |
379 +---------------+ +---------------------------------+
381 See :func:`Completion.with_progress` for an easy way to create
385 super(ProgressReference
, self
).__init
__()
386 self
.progress_id
= str(uuid
.uuid4())
387 self
.message
= message
390 #: The completion can already have a result, before the write
391 #: operation is effective. progress == 1 means, the services are
392 #: created / removed.
393 self
.completion
: Optional
[Callable
[[], Completion
]] = completion
395 #: if a orchestrator module can provide a more detailed
396 #: progress information, it needs to also call ``progress.update()``.
399 self
._completion
_has
_result
= False
400 self
.mgr
.all_progress_references
.append(self
)
404 ``__str__()`` is used for determining the message for progress events.
406 return self
.message
or super(ProgressReference
, self
).__str
__()
408 def __call__(self
, arg
):
409 self
._completion
_has
_result
= True
415 return self
._progress
418 def progress(self
, progress
):
419 assert progress
<= 1.0
420 self
._progress
= progress
423 self
.mgr
.remote("progress", "complete", self
.progress_id
)
424 self
.mgr
.all_progress_references
= [
425 p
for p
in self
.mgr
.all_progress_references
if p
is not self
]
427 self
.mgr
.remote("progress", "update", self
.progress_id
, self
.message
,
429 [("origin", "orchestrator")])
431 # If the progress module is disabled that's fine,
432 # they just won't see the output.
437 return self
.progress
== 1 and self
._completion
_has
_result
440 def progress_run(progress
):
441 self
.progress
= progress
443 c
= self
.completion().then(progress_run
)
444 self
.mgr
.process([c
._first
_promise
])
449 self
._completion
_has
_result
= True
453 class Completion(_Promise
, Generic
[T
]):
455 Combines multiple promises into one overall operation.
457 Completions are composable by being able to
458 call one completion from another completion. I.e. making them re-usable
459 using Promises E.g.::
462 ... return Orchestrator().get_hosts().then(self._create_osd)
464 where ``get_hosts`` returns a Completion of list of hosts and
465 ``_create_osd`` takes a list of hosts.
467 The concept behind this is to store the computation steps
468 explicit and then explicitly evaluate the chain:
471 ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
473 ... assert p.result = "4"
477 +---------------+ +-----------------+
479 | lambda x: x*x | +--> | lambda x: str(x)|
481 +---------------+ +-----------------+
486 _first_promise
: Optional
["Completion"] = None,
487 value
: Any
= _Promise
.NO_RESULT
,
488 on_complete
: Optional
[Callable
] = None,
489 name
: Optional
[str] = None,
491 super(Completion
, self
).__init
__(_first_promise
, value
, on_complete
, name
)
494 def _progress_reference(self
) -> Optional
[ProgressReference
]:
495 if hasattr(self
._on
_complete
, 'progress_id'):
496 return self
._on
_complete
# type: ignore
500 def progress_reference(self
) -> Optional
[ProgressReference
]:
502 ProgressReference. Marks this completion
503 as a write completeion.
506 references
= [c
._progress
_reference
for c
in iter(
507 self
) if c
._progress
_reference
is not None]
509 assert len(references
) == 1
514 def with_progress(cls
: Any
,
517 _first_promise
: Optional
["Completion"] = None,
518 value
: Any
= _Promise
.NO_RESULT
,
519 on_complete
: Optional
[Callable
] = None,
520 calc_percent
: Optional
[Callable
[[], Any
]] = None
524 _first_promise
=_first_promise
,
526 on_complete
=on_complete
527 ).add_progress(message
, mgr
, calc_percent
)
529 return c
._first
_promise
531 def add_progress(self
,
534 calc_percent
: Optional
[Callable
[[], Any
]] = None
537 on_complete
=ProgressReference(
540 completion
=calc_percent
544 def fail(self
, e
: Exception):
545 super(Completion
, self
).fail(e
)
546 if self
._progress
_reference
:
547 self
._progress
_reference
.fail()
549 def finalize(self
, result
: Union
[None, object, T
] = _Promise
.NO_RESULT
):
550 if self
._first
_promise
._state
== self
.INITIALIZED
:
551 self
._first
_promise
._finalize
(result
)
554 def result(self
) -> T
:
556 The result of the operation that we were waited
557 for. Only valid after calling Orchestrator.process() on this
560 last
= self
._last
_promise
()
561 assert last
._state
== _Promise
.FINISHED
562 return cast(T
, last
._value
)
564 def result_str(self
) -> str:
565 """Force a string."""
566 if self
.result
is None:
568 if isinstance(self
.result
, list):
569 return '\n'.join(str(x
) for x
in self
.result
)
570 return str(self
.result
)
573 def exception(self
) -> Optional
[Exception]:
574 return self
._last
_promise
()._exception
577 def serialized_exception(self
) -> Optional
[bytes
]:
578 return self
._last
_promise
()._serialized
_exception
581 def has_result(self
) -> bool:
583 Has the operation already a result?
585 For Write operations, it can already have a
586 result, if the orchestrator's configuration is
587 persistently written. Typically this would
588 indicate that an update had been written to
589 a manifest, but that the update had not
590 necessarily been pushed out to the cluster.
594 return self
._last
_promise
()._state
== _Promise
.FINISHED
597 def is_errored(self
) -> bool:
599 Has the completion failed. Default implementation looks for
600 self.exception. Can be overwritten.
602 return self
.exception
is not None
605 def needs_result(self
) -> bool:
607 Could the external operation be deemed as complete,
609 We must wait for a read operation only if it is not complete.
611 return not self
.is_errored
and not self
.has_result
614 def is_finished(self
) -> bool:
616 Could the external operation be deemed as complete,
618 We must wait for a read operation only if it is not complete.
620 return self
.is_errored
or (self
.has_result
)
622 def pretty_print(self
):
624 reprs
= '\n'.join(p
.pretty_print_1() for p
in iter(self
._first
_promise
))
625 return """<{}>[\n{}\n]""".format(self
.__class
__.__name
__, reprs
)
628 def pretty_print(completions
: Sequence
[Completion
]) -> str:
629 return ', '.join(c
.pretty_print() for c
in completions
)
632 def raise_if_exception(c
: Completion
) -> None:
634 :raises OrchestratorError: Some user error or a config error.
635 :raises Exception: Some internal error
637 if c
.serialized_exception
is not None:
639 e
= pickle
.loads(c
.serialized_exception
)
640 except (KeyError, AttributeError):
641 raise Exception('{}: {}'.format(type(c
.exception
), c
.exception
))
645 class TrivialReadCompletion(Completion
[T
]):
647 This is the trivial completion simply wrapping a result.
650 def __init__(self
, result
: T
):
651 super(TrivialReadCompletion
, self
).__init
__()
653 self
.finalize(result
)
656 def _hide_in_features(f
):
657 f
._hide
_in
_features
= True
661 class Orchestrator(object):
663 Calls in this class may do long running remote operations, with time
664 periods ranging from network latencies to package install latencies and large
665 internet downloads. For that reason, all are asynchronous, and return
666 ``Completion`` objects.
668 Methods should only return the completion and not directly execute
669 anything, like network calls. Otherwise the purpose of
670 those completions is defeated.
672 Implementations are not required to start work on an operation until
673 the caller waits on the relevant Completion objects. Callers making
674 multiple updates should not wait on Completions until they're done
675 sending operations: this enables implementations to batch up a series
676 of updates when wait() is called on a set of Completion objects.
678 Implementations are encouraged to keep reasonably fresh caches of
679 the status of the system: it is better to serve a stale-but-recent
680 result read of e.g. device inventory than it is to keep the caller waiting
681 while you scan hosts every time.
685 def is_orchestrator_module(self
):
687 Enable other modules to interrogate this module to discover
688 whether it's usable as an orchestrator module.
690 Subclasses do not need to override this.
695 def available(self
) -> Tuple
[bool, str]:
697 Report whether we can talk to the orchestrator. This is the
698 place to give the user a meaningful message if the orchestrator
699 isn't running or can't be contacted.
701 This method may be called frequently (e.g. every page load
702 to conditionally display a warning banner), so make sure it's
703 not too expensive. It's okay to give a slightly stale status
704 (e.g. based on a periodic background ping of the orchestrator)
705 if that's necessary to make this method fast.
708 `True` doesn't mean that the desired functionality
709 is actually available in the orchestrator. I.e. this
710 won't work as expected::
713 ... if OrchestratorClientMixin().available()[0]: # wrong.
714 ... OrchestratorClientMixin().get_hosts()
716 :return: two-tuple of boolean, string
718 raise NotImplementedError()
721 def process(self
, completions
: List
[Completion
]) -> None:
723 Given a list of Completion instances, process any which are
726 Callers should inspect the detail of each completion to identify
727 partial completion/progress information, and present that information
730 This method should not block, as this would make it slow to query
731 a status, while other long running operations are in progress.
733 raise NotImplementedError()
736 def get_feature_set(self
):
737 """Describes which methods this orchestrator implements
740 `True` doesn't mean that the desired functionality
741 is actually possible in the orchestrator. I.e. this
742 won't work as expected::
745 ... api = OrchestratorClientMixin()
746 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
749 It's better to ask for forgiveness instead::
753 ... OrchestratorClientMixin().get_hosts()
754 ... except (OrchestratorError, NotImplementedError):
757 :returns: Dict of API method names to ``{'available': True or False}``
759 module
= self
.__class
__
760 features
= {a
: {'available': getattr(Orchestrator
, a
, None) != getattr(module
, a
)}
761 for a
in Orchestrator
.__dict
__
762 if not a
.startswith('_') and not getattr(getattr(Orchestrator
, a
), '_hide_in_features', False)
766 def cancel_completions(self
) -> None:
768 Cancels ongoing completions. Unstuck the mgr.
770 raise NotImplementedError()
772 def pause(self
) -> None:
773 raise NotImplementedError()
775 def resume(self
) -> None:
776 raise NotImplementedError()
778 def add_host(self
, host_spec
: HostSpec
) -> Completion
[str]:
780 Add a host to the orchestrator inventory.
782 :param host: hostname
784 raise NotImplementedError()
786 def remove_host(self
, host
: str) -> Completion
[str]:
788 Remove a host from the orchestrator inventory.
790 :param host: hostname
792 raise NotImplementedError()
794 def update_host_addr(self
, host
: str, addr
: str) -> Completion
[str]:
796 Update a host's address
798 :param host: hostname
799 :param addr: address (dns name or IP)
801 raise NotImplementedError()
803 def get_hosts(self
) -> Completion
[List
[HostSpec
]]:
805 Report the hosts in the cluster.
807 :return: list of HostSpec
809 raise NotImplementedError()
811 def add_host_label(self
, host
: str, label
: str) -> Completion
[str]:
815 raise NotImplementedError()
817 def remove_host_label(self
, host
: str, label
: str) -> Completion
[str]:
821 raise NotImplementedError()
823 def host_ok_to_stop(self
, hostname
: str) -> Completion
:
825 Check if the specified host can be safely stopped without reducing availability
827 :param host: hostname
829 raise NotImplementedError()
831 def get_inventory(self
, host_filter
: Optional
['InventoryFilter'] = None, refresh
: bool = False) -> Completion
[List
['InventoryHost']]:
833 Returns something that was created by `ceph-volume inventory`.
835 :return: list of InventoryHost
837 raise NotImplementedError()
839 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None, refresh
: bool = False) -> Completion
[List
['ServiceDescription']]:
841 Describe a service (of any kind) that is already configured in
842 the orchestrator. For example, when viewing an OSD in the dashboard
843 we might like to also display information about the orchestrator's
844 view of the service (like the kubernetes pod ID).
846 When viewing a CephFS filesystem in the dashboard, we would use this
847 to display the pods being currently run for MDS daemons.
849 :return: list of ServiceDescription objects.
851 raise NotImplementedError()
853 def list_daemons(self
, service_name
: Optional
[str] = None, daemon_type
: Optional
[str] = None, daemon_id
: Optional
[str] = None, host
: Optional
[str] = None, refresh
: bool = False) -> Completion
[List
['DaemonDescription']]:
855 Describe a daemon (of any kind) that is already configured in
858 :return: list of DaemonDescription objects.
860 raise NotImplementedError()
862 def apply(self
, specs
: List
["GenericSpec"]) -> Completion
[List
[str]]:
866 fns
: Dict
[str, function
] = {
867 'alertmanager': self
.apply_alertmanager
,
868 'crash': self
.apply_crash
,
869 'grafana': self
.apply_grafana
,
870 'iscsi': self
.apply_iscsi
,
871 'mds': self
.apply_mds
,
872 'mgr': self
.apply_mgr
,
873 'mon': self
.apply_mon
,
874 'nfs': self
.apply_nfs
,
875 'node-exporter': self
.apply_node_exporter
,
876 'osd': lambda dg
: self
.apply_drivegroups([dg
]),
877 'prometheus': self
.apply_prometheus
,
878 'rbd-mirror': self
.apply_rbd_mirror
,
879 'rgw': self
.apply_rgw
,
880 'host': self
.add_host
,
884 if isinstance(ls
, list):
890 fn
= cast(Callable
[["GenericSpec"], Completion
], fns
[spec
.service_type
])
891 completion
= fn(spec
)
894 fn
= cast(Callable
[["GenericSpec"], Completion
], fns
[spec
.service_type
])
895 return fn(s
).then(lambda r
: merge(ls
, r
))
896 completion
= completion
.then(next
)
899 def plan(self
, spec
: List
["GenericSpec"]) -> Completion
[List
]:
901 Plan (Dry-run, Preview) a List of Specs.
903 raise NotImplementedError()
905 def remove_daemons(self
, names
: List
[str]) -> Completion
[List
[str]]:
907 Remove specific daemon(s).
911 raise NotImplementedError()
913 def remove_service(self
, service_name
: str) -> Completion
[str]:
915 Remove a service (a collection of daemons).
919 raise NotImplementedError()
921 def service_action(self
, action
: str, service_name
: str) -> Completion
[List
[str]]:
923 Perform an action (start/stop/reload) on a service (i.e., all daemons
924 providing the logical service).
926 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
927 :param service_name: service_type + '.' + service_id
928 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
931 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
932 raise NotImplementedError()
934 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> Completion
[str]:
936 Perform an action (start/stop/reload) on a daemon.
938 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
939 :param daemon_name: name of daemon
940 :param image: Container image when redeploying that daemon
943 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
944 raise NotImplementedError()
946 def create_osds(self
, drive_group
: DriveGroupSpec
) -> Completion
[str]:
948 Create one or more OSDs within a single Drive Group.
950 The principal argument here is the drive_group member
951 of OsdSpec: other fields are advisory/extensible for any
952 finer-grained OSD feature enablement (choice of backing store,
953 compression/encryption, etc).
955 raise NotImplementedError()
957 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> Completion
[List
[str]]:
958 """ Update OSD cluster """
959 raise NotImplementedError()
961 def set_unmanaged_flag(self
,
962 unmanaged_flag
: bool,
963 service_type
: str = 'osd',
965 ) -> HandleCommandResult
:
966 raise NotImplementedError()
968 def preview_osdspecs(self
,
969 osdspec_name
: Optional
[str] = 'osd',
970 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
971 ) -> Completion
[str]:
972 """ Get a preview for OSD deployments """
973 raise NotImplementedError()
975 def remove_osds(self
, osd_ids
: List
[str],
976 replace
: bool = False,
977 force
: bool = False) -> Completion
[str]:
979 :param osd_ids: list of OSD IDs
980 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
981 :param force: Forces the OSD removal process without waiting for the data to be drained first.
982 Note that this can only remove OSDs that were successfully
983 created (i.e. got an OSD ID).
985 raise NotImplementedError()
987 def stop_remove_osds(self
, osd_ids
: List
[str]) -> Completion
:
991 raise NotImplementedError()
993 def remove_osds_status(self
) -> Completion
:
995 Returns a status of the ongoing OSD removal operations.
997 raise NotImplementedError()
999 def blink_device_light(self
, ident_fault
: str, on
: bool, locations
: List
['DeviceLightLoc']) -> Completion
[List
[str]]:
1001 Instructs the orchestrator to enable or disable either the ident or the fault LED.
1003 :param ident_fault: either ``"ident"`` or ``"fault"``
1004 :param on: ``True`` = on.
1005 :param locations: See :class:`orchestrator.DeviceLightLoc`
1007 raise NotImplementedError()
1009 def zap_device(self
, host
: str, path
: str) -> Completion
[str]:
1010 """Zap/Erase a device (DESTROYS DATA)"""
1011 raise NotImplementedError()
1013 def add_mon(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1014 """Create mon daemon(s)"""
1015 raise NotImplementedError()
1017 def apply_mon(self
, spec
: ServiceSpec
) -> Completion
[str]:
1018 """Update mon cluster"""
1019 raise NotImplementedError()
1021 def add_mgr(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1022 """Create mgr daemon(s)"""
1023 raise NotImplementedError()
1025 def apply_mgr(self
, spec
: ServiceSpec
) -> Completion
[str]:
1026 """Update mgr cluster"""
1027 raise NotImplementedError()
1029 def add_mds(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1030 """Create MDS daemon(s)"""
1031 raise NotImplementedError()
1033 def apply_mds(self
, spec
: ServiceSpec
) -> Completion
[str]:
1034 """Update MDS cluster"""
1035 raise NotImplementedError()
1037 def add_rgw(self
, spec
: RGWSpec
) -> Completion
[List
[str]]:
1038 """Create RGW daemon(s)"""
1039 raise NotImplementedError()
1041 def apply_rgw(self
, spec
: RGWSpec
) -> Completion
[str]:
1042 """Update RGW cluster"""
1043 raise NotImplementedError()
1045 def add_rbd_mirror(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1046 """Create rbd-mirror daemon(s)"""
1047 raise NotImplementedError()
1049 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> Completion
[str]:
1050 """Update rbd-mirror cluster"""
1051 raise NotImplementedError()
1053 def add_nfs(self
, spec
: NFSServiceSpec
) -> Completion
[List
[str]]:
1054 """Create NFS daemon(s)"""
1055 raise NotImplementedError()
1057 def apply_nfs(self
, spec
: NFSServiceSpec
) -> Completion
[str]:
1058 """Update NFS cluster"""
1059 raise NotImplementedError()
1061 def add_iscsi(self
, spec
: IscsiServiceSpec
) -> Completion
[List
[str]]:
1062 """Create iscsi daemon(s)"""
1063 raise NotImplementedError()
1065 def apply_iscsi(self
, spec
: IscsiServiceSpec
) -> Completion
[str]:
1066 """Update iscsi cluster"""
1067 raise NotImplementedError()
1069 def add_prometheus(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1070 """Create new prometheus daemon"""
1071 raise NotImplementedError()
1073 def apply_prometheus(self
, spec
: ServiceSpec
) -> Completion
[str]:
1074 """Update prometheus cluster"""
1075 raise NotImplementedError()
1077 def add_node_exporter(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1078 """Create a new Node-Exporter service"""
1079 raise NotImplementedError()
1081 def apply_node_exporter(self
, spec
: ServiceSpec
) -> Completion
[str]:
1082 """Update existing a Node-Exporter daemon(s)"""
1083 raise NotImplementedError()
1085 def add_crash(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1086 """Create a new crash service"""
1087 raise NotImplementedError()
1089 def apply_crash(self
, spec
: ServiceSpec
) -> Completion
[str]:
1090 """Update existing a crash daemon(s)"""
1091 raise NotImplementedError()
1093 def add_grafana(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1094 """Create a new Node-Exporter service"""
1095 raise NotImplementedError()
1097 def apply_grafana(self
, spec
: ServiceSpec
) -> Completion
[str]:
1098 """Update existing a Node-Exporter daemon(s)"""
1099 raise NotImplementedError()
1101 def add_alertmanager(self
, spec
: ServiceSpec
) -> Completion
[List
[str]]:
1102 """Create a new AlertManager service"""
1103 raise NotImplementedError()
1105 def apply_alertmanager(self
, spec
: ServiceSpec
) -> Completion
[str]:
1106 """Update an existing AlertManager daemon(s)"""
1107 raise NotImplementedError()
1109 def upgrade_check(self
, image
: Optional
[str], version
: Optional
[str]) -> Completion
[str]:
1110 raise NotImplementedError()
1112 def upgrade_start(self
, image
: Optional
[str], version
: Optional
[str]) -> Completion
[str]:
1113 raise NotImplementedError()
1115 def upgrade_pause(self
) -> Completion
[str]:
1116 raise NotImplementedError()
1118 def upgrade_resume(self
) -> Completion
[str]:
1119 raise NotImplementedError()
1121 def upgrade_stop(self
) -> Completion
[str]:
1122 raise NotImplementedError()
1124 def upgrade_status(self
) -> Completion
['UpgradeStatusSpec']:
1126 If an upgrade is currently underway, report on where
1127 we are in the process, or if some error has occurred.
1129 :return: UpgradeStatusSpec instance
1131 raise NotImplementedError()
1134 def upgrade_available(self
) -> Completion
:
1136 Report on what versions are available to upgrade to
1138 :return: List of strings
1140 raise NotImplementedError()
1143 GenericSpec
= Union
[ServiceSpec
, HostSpec
]
1146 def json_to_generic_spec(spec
: dict) -> GenericSpec
:
1147 if 'service_type' in spec
and spec
['service_type'] == 'host':
1148 return HostSpec
.from_json(spec
)
1150 return ServiceSpec
.from_json(spec
)
1153 class UpgradeStatusSpec(object):
1154 # Orchestrator's report on what's going on with any ongoing upgrade
1156 self
.in_progress
= False # Is an upgrade underway?
1157 self
.target_image
= None
1158 self
.services_complete
= [] # Which daemon types are fully updated?
1159 self
.message
= "" # Freeform description
1162 def handle_type_error(method
):
1164 def inner(cls
, *args
, **kwargs
):
1166 return method(cls
, *args
, **kwargs
)
1167 except TypeError as e
:
1168 error_msg
= '{}: {}'.format(cls
.__name
__, e
)
1169 raise OrchestratorValidationError(error_msg
)
1173 class DaemonDescription(object):
1175 For responding to queries about the status of a particular daemon,
1176 stateful or stateless.
1178 This is not about health or performance monitoring of daemons: it's
1179 about letting the orchestrator tell Ceph whether and where a
1180 daemon is scheduled in the cluster. When an orchestrator tells
1181 Ceph "it's running on host123", that's not a promise that the process
1182 is literally up this second, it's a description of where the orchestrator
1183 has decided the daemon should run.
1191 container_image_id
=None,
1192 container_image_name
=None,
1199 last_configured
=None,
1200 osdspec_affinity
=None,
1202 events
: Optional
[List
['OrchestratorEvent']] = None,
1203 is_active
: bool = False):
1205 # Host is at the same granularity as InventoryHost
1206 self
.hostname
: str = hostname
1208 # Not everyone runs in containers, but enough people do to
1209 # justify having the container_id (runtime id) and container_image
1211 self
.container_id
= container_id
# runtime id
1212 self
.container_image_id
= container_image_id
# image hash
1213 self
.container_image_name
= container_image_name
# image friendly name
1215 # The type of service (osd, mon, mgr, etc.)
1216 self
.daemon_type
= daemon_type
1218 # The orchestrator will have picked some names for daemons,
1219 # typically either based on hostnames or on pod names.
1220 # This is the <foo> in mds.<foo>, the ID that will appear
1221 # in the FSMap/ServiceMap.
1222 self
.daemon_id
: str = daemon_id
1224 # Service version that was deployed
1225 self
.version
= version
1227 # Service status: -1 error, 0 stopped, 1 running
1228 self
.status
= status
1230 # Service status description when status == -1.
1231 self
.status_desc
= status_desc
1233 # datetime when this info was last refreshed
1234 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
1236 self
.created
: Optional
[datetime
.datetime
] = created
1237 self
.started
: Optional
[datetime
.datetime
] = started
1238 self
.last_configured
: Optional
[datetime
.datetime
] = last_configured
1239 self
.last_deployed
: Optional
[datetime
.datetime
] = last_deployed
1241 # Affinity to a certain OSDSpec
1242 self
.osdspec_affinity
: Optional
[str] = osdspec_affinity
1244 self
.events
: List
[OrchestratorEvent
] = events
or []
1246 self
.is_active
= is_active
1249 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
1251 def matches_service(self
, service_name
: Optional
[str]) -> bool:
1253 return self
.name().startswith(service_name
+ '.')
1256 def service_id(self
):
1257 if self
.daemon_type
== 'osd' and self
.osdspec_affinity
:
1258 return self
.osdspec_affinity
1261 err
= OrchestratorError("DaemonDescription: Cannot calculate service_id: "
1262 f
"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
1264 if not self
.hostname
:
1265 # TODO: can a DaemonDescription exist without a hostname?
1268 # use the bare hostname, not the FQDN.
1269 host
= self
.hostname
.split('.')[0]
1271 if host
== self
.daemon_id
:
1272 # daemon_id == "host"
1273 return self
.daemon_id
1275 elif host
in self
.daemon_id
:
1276 # daemon_id == "service_id.host"
1277 # daemon_id == "service_id.host.random"
1278 pre
, post
= self
.daemon_id
.rsplit(host
, 1)
1279 if not pre
.endswith('.'):
1280 # '.' sep missing at front of host
1282 elif post
and not post
.startswith('.'):
1283 # '.' sep missing at end of host
1287 # daemon_id == "service_id.random"
1288 if self
.daemon_type
== 'rgw':
1289 v
= self
.daemon_id
.split('.')
1290 if len(v
) in [3, 4]:
1291 return '.'.join(v
[0:2])
1293 if self
.daemon_type
== 'iscsi':
1294 v
= self
.daemon_id
.split('.')
1295 return '.'.join(v
[0:-1])
1297 # daemon_id == "service_id"
1298 return self
.daemon_id
1300 if self
.daemon_type
in ServiceSpec
.REQUIRES_SERVICE_ID
:
1303 return self
.daemon_id
1305 def service_name(self
):
1306 if self
.daemon_type
in ServiceSpec
.REQUIRES_SERVICE_ID
:
1307 return f
'{self.daemon_type}.{self.service_id()}'
1308 return self
.daemon_type
1311 return "<DaemonDescription>({type}.{id})".format(type=self
.daemon_type
,
1316 out
['daemon_type'] = self
.daemon_type
1317 out
['daemon_id'] = self
.daemon_id
1318 out
['hostname'] = self
.hostname
1319 out
['container_id'] = self
.container_id
1320 out
['container_image_id'] = self
.container_image_id
1321 out
['container_image_name'] = self
.container_image_name
1322 out
['version'] = self
.version
1323 out
['status'] = self
.status
1324 out
['status_desc'] = self
.status_desc
1325 if self
.daemon_type
== 'osd':
1326 out
['osdspec_affinity'] = self
.osdspec_affinity
1327 out
['is_active'] = self
.is_active
1329 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1331 if getattr(self
, k
):
1332 out
[k
] = datetime_to_str(getattr(self
, k
))
1335 out
['events'] = [e
.to_json() for e
in self
.events
]
1337 empty
= [k
for k
, v
in out
.items() if v
is None]
1344 def from_json(cls
, data
):
1346 event_strs
= c
.pop('events', [])
1347 for k
in ['last_refresh', 'created', 'started', 'last_deployed',
1350 c
[k
] = str_to_datetime(c
[k
])
1351 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1352 return cls(events
=events
, **c
)
1355 # feel free to change this:
1356 return DaemonDescription
.from_json(self
.to_json())
1359 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'DaemonDescription'):
1360 return dumper
.represent_dict(data
.to_json().items())
1363 yaml
.add_representer(DaemonDescription
, DaemonDescription
.yaml_representer
)
1366 class ServiceDescription(object):
1368 For responding to queries about the status of a particular service,
1369 stateful or stateless.
1371 This is not about health or performance monitoring of services: it's
1372 about letting the orchestrator tell Ceph whether and where a
1373 service is scheduled in the cluster. When an orchestrator tells
1374 Ceph "it's running on host123", that's not a promise that the process
1375 is literally up this second, it's a description of where the orchestrator
1376 has decided the service should run.
1381 container_image_id
=None,
1382 container_image_name
=None,
1383 rados_config_location
=None,
1389 events
: Optional
[List
['OrchestratorEvent']] = None):
1390 # Not everyone runs in containers, but enough people do to
1391 # justify having the container_image_id (image hash) and container_image
1393 self
.container_image_id
= container_image_id
# image hash
1394 self
.container_image_name
= container_image_name
# image friendly name
1396 # Location of the service configuration when stored in rados
1397 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1398 self
.rados_config_location
= rados_config_location
1400 # If the service exposes REST-like API, this attribute should hold
1402 self
.service_url
= service_url
1407 # Number of daemons up
1408 self
.running
= running
1410 # datetime when this info was last refreshed
1411 self
.last_refresh
: Optional
[datetime
.datetime
] = last_refresh
1412 self
.created
: Optional
[datetime
.datetime
] = created
1414 self
.spec
: ServiceSpec
= spec
1416 self
.events
: List
[OrchestratorEvent
] = events
or []
1418 def service_type(self
):
1419 return self
.spec
.service_type
1422 return f
"<ServiceDescription of {self.spec.one_line_str()}>"
1424 def to_json(self
) -> OrderedDict
:
1425 out
= self
.spec
.to_json()
1427 'container_image_id': self
.container_image_id
,
1428 'container_image_name': self
.container_image_name
,
1429 'rados_config_location': self
.rados_config_location
,
1430 'service_url': self
.service_url
,
1432 'running': self
.running
,
1433 'last_refresh': self
.last_refresh
,
1434 'created': self
.created
,
1436 for k
in ['last_refresh', 'created']:
1437 if getattr(self
, k
):
1438 status
[k
] = datetime_to_str(getattr(self
, k
))
1439 status
= {k
: v
for (k
, v
) in status
.items() if v
is not None}
1440 out
['status'] = status
1442 out
['events'] = [e
.to_json() for e
in self
.events
]
1447 def from_json(cls
, data
: dict):
1449 status
= c
.pop('status', {})
1450 event_strs
= c
.pop('events', [])
1451 spec
= ServiceSpec
.from_json(c
)
1453 c_status
= status
.copy()
1454 for k
in ['last_refresh', 'created']:
1456 c_status
[k
] = str_to_datetime(c_status
[k
])
1457 events
= [OrchestratorEvent
.from_json(e
) for e
in event_strs
]
1458 return cls(spec
=spec
, events
=events
, **c_status
)
1461 def yaml_representer(dumper
: 'yaml.SafeDumper', data
: 'DaemonDescription'):
1462 return dumper
.represent_dict(data
.to_json().items())
1465 yaml
.add_representer(ServiceDescription
, ServiceDescription
.yaml_representer
)
1468 class InventoryFilter(object):
1470 When fetching inventory, use this filter to avoid unnecessarily
1471 scanning the whole estate.
1473 Typical use: filter by host when presenting UI workflow for configuring
1474 a particular server.
1475 filter by label when not all of estate is Ceph servers,
1476 and we want to only learn about the Ceph servers.
1477 filter by label when we are interested particularly
1478 in e.g. OSD servers.
1482 def __init__(self
, labels
: Optional
[List
[str]] = None, hosts
: Optional
[List
[str]] = None) -> None:
1484 #: Optional: get info about hosts matching labels
1485 self
.labels
= labels
1487 #: Optional: get info about certain named hosts only
1491 class InventoryHost(object):
1493 When fetching inventory, all Devices are groups inside of an
1497 def __init__(self
, name
: str, devices
: Optional
[inventory
.Devices
] = None, labels
: Optional
[List
[str]] = None, addr
: Optional
[str] = None) -> None:
1499 devices
= inventory
.Devices([])
1502 assert isinstance(devices
, inventory
.Devices
)
1504 self
.name
= name
# unique within cluster. For example a hostname.
1505 self
.addr
= addr
or name
1506 self
.devices
= devices
1507 self
.labels
= labels
1513 'devices': self
.devices
.to_json(),
1514 'labels': self
.labels
,
1518 def from_json(cls
, data
):
1520 _data
= copy
.deepcopy(data
)
1521 name
= _data
.pop('name')
1522 addr
= _data
.pop('addr', None) or name
1523 devices
= inventory
.Devices
.from_json(_data
.pop('devices'))
1524 labels
= _data
.pop('labels', list())
1526 error_msg
= 'Unknown key(s) in Inventory: {}'.format(','.join(_data
.keys()))
1527 raise OrchestratorValidationError(error_msg
)
1528 return cls(name
, devices
, labels
, addr
)
1529 except KeyError as e
:
1530 error_msg
= '{} is required for {}'.format(e
, cls
.__name
__)
1531 raise OrchestratorValidationError(error_msg
)
1532 except TypeError as e
:
1533 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e
))
1536 def from_nested_items(cls
, hosts
):
1537 devs
= inventory
.Devices
.from_json
1538 return [cls(item
[0], devs(item
[1].data
)) for item
in hosts
]
1541 return "<InventoryHost>({name})".format(name
=self
.name
)
1544 def get_host_names(hosts
: List
['InventoryHost']) -> List
[str]:
1545 return [host
.name
for host
in hosts
]
1547 def __eq__(self
, other
):
1548 return self
.name
== other
.name
and self
.devices
== other
.devices
1551 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1553 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1556 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1558 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1559 See ``ceph osd metadata | jq '.[].device_ids'``
1564 class OrchestratorEvent
:
1566 Similar to K8s Events.
1568 Some form of "important" log message attached to something.
1572 regex_v1
= re
.compile(r
'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re
.MULTILINE
)
1574 def __init__(self
, created
: Union
[str, datetime
.datetime
], kind
, subject
, level
, message
):
1575 if isinstance(created
, str):
1576 created
= str_to_datetime(created
)
1577 self
.created
: datetime
.datetime
= created
1579 assert kind
in "service daemon".split()
1580 self
.kind
: str = kind
1582 # service name, or daemon danem or something
1583 self
.subject
: str = subject
1585 # Events are not meant for debugging. debugs should end in the log.
1586 assert level
in "INFO ERROR".split()
1589 self
.message
: str = message
1591 __slots__
= ('created', 'kind', 'subject', 'level', 'message')
1593 def kind_subject(self
) -> str:
1594 return f
'{self.kind}:{self.subject}'
1596 def to_json(self
) -> str:
1597 # Make a long list of events readable.
1598 created
= datetime_to_str(self
.created
)
1599 return f
'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1603 def from_json(cls
, data
) -> "OrchestratorEvent":
1605 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1606 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1611 match
= cls
.regex_v1
.match(data
)
1613 return cls(*match
.groups())
1614 raise ValueError(f
'Unable to match: "{data}"')
1616 def __eq__(self
, other
):
1617 if not isinstance(other
, OrchestratorEvent
):
1620 return self
.created
== other
.created
and self
.kind
== other
.kind \
1621 and self
.subject
== other
.subject
and self
.message
== other
.message
1624 def _mk_orch_methods(cls
):
1625 # Needs to be defined outside of for.
1626 # Otherwise meth is always bound to last key
1627 def shim(method_name
):
1628 def inner(self
, *args
, **kwargs
):
1629 completion
= self
._oremote
(method_name
, args
, kwargs
)
1633 for meth
in Orchestrator
.__dict
__:
1634 if not meth
.startswith('_') and meth
not in ['is_orchestrator_module']:
1635 setattr(cls
, meth
, shim(meth
))
1640 class OrchestratorClientMixin(Orchestrator
):
1642 A module that inherents from `OrchestratorClientMixin` can directly call
1643 all :class:`Orchestrator` methods without manually calling remote.
1645 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1646 calls :func:`OrchestratorClientMixin._oremote`
1648 >>> class MyModule(OrchestratorClientMixin):
1650 ... completion = self.add_host('somehost') # calls `_oremote()`
1651 ... self._orchestrator_wait([completion])
1652 ... self.log.debug(completion.result)
1654 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1655 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1656 "real" implementation of the orchestrator.
1659 >>> import mgr_module
1661 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1662 ... def __init__(self, ...):
1663 ... self.orch_client = OrchestratorClientMixin()
1664 ... self.orch_client.set_mgr(self.mgr))
1667 def set_mgr(self
, mgr
: MgrModule
) -> None:
1669 Useable in the Dashbord that uses a global ``mgr``
1672 self
.__mgr
= mgr
# Make sure we're not overwriting any other `mgr` properties
1674 def __get_mgr(self
):
1677 except AttributeError:
1680 def _oremote(self
, meth
, args
, kwargs
):
1682 Helper for invoking `remote` on whichever orchestrator is enabled
1684 :raises RuntimeError: If the remote method failed.
1685 :raises OrchestratorError: orchestrator failed to perform
1686 :raises ImportError: no `orchestrator` module or backend not found.
1688 mgr
= self
.__get
_mgr
()
1691 o
= mgr
._select
_orchestrator
()
1692 except AttributeError:
1693 o
= mgr
.remote('orchestrator', '_select_orchestrator')
1696 raise NoOrchestrator()
1698 mgr
.log
.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr
.module_name
, o
, meth
, args
, kwargs
))
1700 return mgr
.remote(o
, meth
, *args
, **kwargs
)
1701 except Exception as e
:
1702 if meth
== 'get_feature_set':
1703 raise # self.get_feature_set() calls self._oremote()
1704 f_set
= self
.get_feature_set()
1705 if meth
not in f_set
or not f_set
[meth
]['available']:
1706 raise NotImplementedError(f
'{o} does not implement {meth}') from e
1709 def _orchestrator_wait(self
, completions
: List
[Completion
]) -> None:
1711 Wait for completions to complete (reads) or
1712 become persistent (writes).
1714 Waits for writes to be *persistent* but not *effective*.
1716 :param completions: List of Completions
1717 :raises NoOrchestrator:
1718 :raises RuntimeError: something went wrong while calling the process method.
1719 :raises ImportError: no `orchestrator` module or backend not found.
1721 while any(not c
.has_result
for c
in completions
):
1722 self
.process(completions
)
1723 self
.__get
_mgr
().log
.info("Operations pending: %s",
1724 sum(1 for c
in completions
if not c
.has_result
))
1725 if any(c
.needs_result
for c
in completions
):