]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
1
2 """
3 ceph-mgr orchestrator interface
4
5 Please see the ceph-mgr module developer's guide for more information.
6 """
7 import logging
8 import pickle
9 import time
10 from collections import namedtuple
11 from functools import wraps
12 import uuid
13 import datetime
14 import copy
15 import errno
16
17 from ceph.deployment import inventory
18 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
19 ServiceSpecValidationError
20 from ceph.deployment.drive_group import DriveGroupSpec
21
22 from mgr_module import MgrModule, CLICommand, HandleCommandResult
23
24 try:
25 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
26 Type, Sequence, Dict, cast
27 except ImportError:
28 pass
29
30 logger = logging.getLogger(__name__)
31
32 DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
33
34
35 class OrchestratorError(Exception):
36 """
37 General orchestrator specific error.
38
39 Used for deployment, configuration or user errors.
40
41 It's not intended for programming errors or orchestrator internal errors.
42 """
43
44
45 class NoOrchestrator(OrchestratorError):
46 """
47 No orchestrator in configured.
48 """
49 def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"):
50 super(NoOrchestrator, self).__init__(msg)
51
52
53 class OrchestratorValidationError(OrchestratorError):
54 """
55 Raised when an orchestrator doesn't support a specific feature.
56 """
57
58
59 def handle_exception(prefix, cmd_args, desc, perm, func):
60 @wraps(func)
61 def wrapper(*args, **kwargs):
62 try:
63 return func(*args, **kwargs)
64 except (OrchestratorError, ImportError, ServiceSpecValidationError) as e:
65 # Do not print Traceback for expected errors.
66 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
67 except NotImplementedError:
68 msg = 'This Orchestrator does not support `{}`'.format(prefix)
69 return HandleCommandResult(-errno.ENOENT, stderr=msg)
70
71 # misuse partial to copy `wrapper`
72 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs)
73 wrapper_copy._prefix = prefix # type: ignore
74 wrapper_copy._cli_command = CLICommand(prefix, cmd_args, desc, perm) # type: ignore
75 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
76
77 return wrapper_copy
78
79
80 def _cli_command(perm):
81 def inner_cli_command(prefix, cmd_args="", desc=""):
82 return lambda func: handle_exception(prefix, cmd_args, desc, perm, func)
83 return inner_cli_command
84
85
86 _cli_read_command = _cli_command('r')
87 _cli_write_command = _cli_command('rw')
88
89
90 class CLICommandMeta(type):
91 """
92 This is a workaround for the use of a global variable CLICommand.COMMANDS which
93 prevents modules from importing any other module.
94
95 We make use of CLICommand, except for the use of the global variable.
96 """
97 def __init__(cls, name, bases, dct):
98 super(CLICommandMeta, cls).__init__(name, bases, dct)
99 dispatch = {} # type: Dict[str, CLICommand]
100 for v in dct.values():
101 try:
102 dispatch[v._prefix] = v._cli_command
103 except AttributeError:
104 pass
105
106 def handle_command(self, inbuf, cmd):
107 if cmd['prefix'] not in dispatch:
108 return self.handle_command(inbuf, cmd)
109
110 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
111
112 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
113 cls.handle_command = handle_command
114
115
116 def _no_result():
117 return object()
118
119
120 class _Promise(object):
121 """
122 A completion may need multiple promises to be fulfilled. `_Promise` is one
123 step.
124
125 Typically ``Orchestrator`` implementations inherit from this class to
126 build their own way of finishing a step to fulfil a future.
127
128 They are not exposed in the orchestrator interface and can be seen as a
129 helper to build orchestrator modules.
130 """
131 INITIALIZED = 1 # We have a parent completion and a next completion
132 RUNNING = 2
133 FINISHED = 3 # we have a final result
134
135 NO_RESULT = _no_result() # type: None
136 ASYNC_RESULT = object()
137
138 def __init__(self,
139 _first_promise=None, # type: Optional["_Promise"]
140 value=NO_RESULT, # type: Optional[Any]
141 on_complete=None, # type: Optional[Callable]
142 name=None, # type: Optional[str]
143 ):
144 self._on_complete_ = on_complete
145 self._name = name
146 self._next_promise = None # type: Optional[_Promise]
147
148 self._state = self.INITIALIZED
149 self._exception = None # type: Optional[Exception]
150
151 # Value of this _Promise. may be an intermediate result.
152 self._value = value
153
154 # _Promise is not a continuation monad, as `_result` is of type
155 # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
156 self._first_promise = _first_promise or self # type: '_Promise'
157
158 @property
159 def _exception(self):
160 # type: () -> Optional[Exception]
161 return getattr(self, '_exception_', None)
162
163 @_exception.setter
164 def _exception(self, e):
165 self._exception_ = e
166 try:
167 self._serialized_exception_ = pickle.dumps(e) if e is not None else None
168 except pickle.PicklingError:
169 logger.error(f"failed to pickle {e}")
170 if isinstance(e, Exception):
171 e = Exception(*e.args)
172 else:
173 e = Exception(str(e))
174 # degenerate to a plain Exception
175 self._serialized_exception_ = pickle.dumps(e)
176
177 @property
178 def _serialized_exception(self):
179 # type: () -> Optional[bytes]
180 return getattr(self, '_serialized_exception_', None)
181
182
183
184 @property
185 def _on_complete(self):
186 # type: () -> Optional[Callable]
187 # https://github.com/python/mypy/issues/4125
188 return self._on_complete_
189
190 @_on_complete.setter
191 def _on_complete(self, val):
192 # type: (Optional[Callable]) -> None
193 self._on_complete_ = val
194
195
196 def __repr__(self):
197 name = self._name or getattr(self._on_complete, '__name__', '??') if self._on_complete else 'None'
198 val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
199 return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
200 self.__class__, self._state, val, self._on_complete, id(self), name, getattr(next, '_progress_reference', 'NA'), repr(self._next_promise)
201 )
202
203 def pretty_print_1(self):
204 if self._name:
205 name = self._name
206 elif self._on_complete is None:
207 name = 'lambda x: x'
208 elif hasattr(self._on_complete, '__name__'):
209 name = getattr(self._on_complete, '__name__')
210 else:
211 name = self._on_complete.__class__.__name__
212 val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...'
213 prefix = {
214 self.INITIALIZED: ' ',
215 self.RUNNING: ' >>>',
216 self.FINISHED: '(done)'
217 }[self._state]
218 return '{} {}({}),'.format(prefix, name, val)
219
220 def then(self, on_complete):
221 # type: (Any, Callable) -> Any
222 """
223 Call ``on_complete`` as soon as this promise is finalized.
224 """
225 assert self._state in (self.INITIALIZED, self.RUNNING)
226
227 if self._next_promise is not None:
228 return self._next_promise.then(on_complete)
229
230 if self._on_complete is not None:
231 self._set_next_promise(self.__class__(
232 _first_promise=self._first_promise,
233 on_complete=on_complete
234 ))
235 return self._next_promise
236
237 else:
238 self._on_complete = on_complete
239 self._set_next_promise(self.__class__(_first_promise=self._first_promise))
240 return self._next_promise
241
242 def _set_next_promise(self, next):
243 # type: (_Promise) -> None
244 assert self is not next
245 assert self._state in (self.INITIALIZED, self.RUNNING)
246
247 self._next_promise = next
248 assert self._next_promise is not None
249 for p in iter(self._next_promise):
250 p._first_promise = self._first_promise
251
252 def _finalize(self, value=NO_RESULT):
253 """
254 Sets this promise to complete.
255
256 Orchestrators may choose to use this helper function.
257
258 :param value: new value.
259 """
260 if self._state not in (self.INITIALIZED, self.RUNNING):
261 raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
262
263 self._state = self.RUNNING
264
265 if value is not self.NO_RESULT:
266 self._value = value
267 assert self._value is not self.NO_RESULT, repr(self)
268
269 if self._on_complete:
270 try:
271 next_result = self._on_complete(self._value)
272 except Exception as e:
273 self.fail(e)
274 return
275 else:
276 next_result = self._value
277
278 if isinstance(next_result, _Promise):
279 # hack: _Promise is not a continuation monad.
280 next_result = next_result._first_promise # type: ignore
281 assert next_result not in self, repr(self._first_promise) + repr(next_result)
282 assert self not in next_result
283 next_result._append_promise(self._next_promise)
284 self._set_next_promise(next_result)
285 assert self._next_promise
286 if self._next_promise._value is self.NO_RESULT:
287 self._next_promise._value = self._value
288 self.propagate_to_next()
289 elif next_result is not self.ASYNC_RESULT:
290 # simple map. simply forward
291 if self._next_promise:
292 self._next_promise._value = next_result
293 else:
294 # Hack: next_result is of type U, _value is of type T
295 self._value = next_result # type: ignore
296 self.propagate_to_next()
297 else:
298 # asynchronous promise
299 pass
300
301
302 def propagate_to_next(self):
303 self._state = self.FINISHED
304 logger.debug('finalized {}'.format(repr(self)))
305 if self._next_promise:
306 self._next_promise._finalize()
307
308 def fail(self, e):
309 # type: (Exception) -> None
310 """
311 Sets the whole completion to be faild with this exception and end the
312 evaluation.
313 """
314 if self._state == self.FINISHED:
315 raise ValueError(
316 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e)))
317 assert self._state in (self.INITIALIZED, self.RUNNING)
318 logger.exception('_Promise failed')
319 self._exception = e
320 self._value = f'_exception: {e}'
321 if self._next_promise:
322 self._next_promise.fail(e)
323 self._state = self.FINISHED
324
325 def __contains__(self, item):
326 return any(item is p for p in iter(self._first_promise))
327
328 def __iter__(self):
329 yield self
330 elem = self._next_promise
331 while elem is not None:
332 yield elem
333 elem = elem._next_promise
334
335 def _append_promise(self, other):
336 if other is not None:
337 assert self not in other
338 assert other not in self
339 self._last_promise()._set_next_promise(other)
340
341 def _last_promise(self):
342 # type: () -> _Promise
343 return list(iter(self))[-1]
344
345
346 class ProgressReference(object):
347 def __init__(self,
348 message, # type: str
349 mgr,
350 completion=None # type: Optional[Callable[[], Completion]]
351 ):
352 """
353 ProgressReference can be used within Completions::
354
355 +---------------+ +---------------------------------+
356 | | then | |
357 | My Completion | +--> | on_complete=ProgressReference() |
358 | | | |
359 +---------------+ +---------------------------------+
360
361 See :func:`Completion.with_progress` for an easy way to create
362 a progress reference
363
364 """
365 super(ProgressReference, self).__init__()
366 self.progress_id = str(uuid.uuid4())
367 self.message = message
368 self.mgr = mgr
369
370 #: The completion can already have a result, before the write
371 #: operation is effective. progress == 1 means, the services are
372 #: created / removed.
373 self.completion = completion # type: Optional[Callable[[], Completion]]
374
375 #: if a orchestrator module can provide a more detailed
376 #: progress information, it needs to also call ``progress.update()``.
377 self.progress = 0.0
378
379 self._completion_has_result = False
380 self.mgr.all_progress_references.append(self)
381
382 def __str__(self):
383 """
384 ``__str__()`` is used for determining the message for progress events.
385 """
386 return self.message or super(ProgressReference, self).__str__()
387
388 def __call__(self, arg):
389 self._completion_has_result = True
390 self.progress = 1.0
391 return arg
392
393 @property
394 def progress(self):
395 return self._progress
396
397 @progress.setter
398 def progress(self, progress):
399 assert progress <= 1.0
400 self._progress = progress
401 try:
402 if self.effective:
403 self.mgr.remote("progress", "complete", self.progress_id)
404 self.mgr.all_progress_references = [p for p in self.mgr.all_progress_references if p is not self]
405 else:
406 self.mgr.remote("progress", "update", self.progress_id, self.message,
407 progress,
408 [("origin", "orchestrator")])
409 except ImportError:
410 # If the progress module is disabled that's fine,
411 # they just won't see the output.
412 pass
413
414 @property
415 def effective(self):
416 return self.progress == 1 and self._completion_has_result
417
418 def update(self):
419 def progress_run(progress):
420 self.progress = progress
421 if self.completion:
422 c = self.completion().then(progress_run)
423 self.mgr.process([c._first_promise])
424 else:
425 self.progress = 1
426
427 def fail(self):
428 self._completion_has_result = True
429 self.progress = 1
430
431
432 class Completion(_Promise):
433 """
434 Combines multiple promises into one overall operation.
435
436 Completions are composable by being able to
437 call one completion from another completion. I.e. making them re-usable
438 using Promises E.g.::
439
440 >>> return Orchestrator().get_hosts().then(self._create_osd)
441
442 where ``get_hosts`` returns a Completion of list of hosts and
443 ``_create_osd`` takes a list of hosts.
444
445 The concept behind this is to store the computation steps
446 explicit and then explicitly evaluate the chain:
447
448 >>> p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
449 ... p.finalize(2)
450 ... assert p.result = "4"
451
452 or graphically::
453
454 +---------------+ +-----------------+
455 | | then | |
456 | lambda x: x*x | +--> | lambda x: str(x)|
457 | | | |
458 +---------------+ +-----------------+
459
460 """
461 def __init__(self,
462 _first_promise=None, # type: Optional["Completion"]
463 value=_Promise.NO_RESULT, # type: Any
464 on_complete=None, # type: Optional[Callable]
465 name=None, # type: Optional[str]
466 ):
467 super(Completion, self).__init__(_first_promise, value, on_complete, name)
468
469 @property
470 def _progress_reference(self):
471 # type: () -> Optional[ProgressReference]
472 if hasattr(self._on_complete, 'progress_id'):
473 return self._on_complete # type: ignore
474 return None
475
476 @property
477 def progress_reference(self):
478 # type: () -> Optional[ProgressReference]
479 """
480 ProgressReference. Marks this completion
481 as a write completeion.
482 """
483
484 references = [c._progress_reference for c in iter(self) if c._progress_reference is not None]
485 if references:
486 assert len(references) == 1
487 return references[0]
488 return None
489
490 @classmethod
491 def with_progress(cls, # type: Any
492 message, # type: str
493 mgr,
494 _first_promise=None, # type: Optional["Completion"]
495 value=_Promise.NO_RESULT, # type: Any
496 on_complete=None, # type: Optional[Callable]
497 calc_percent=None # type: Optional[Callable[[], Any]]
498 ):
499 # type: (...) -> Any
500
501 c = cls(
502 _first_promise=_first_promise,
503 value=value,
504 on_complete=on_complete
505 ).add_progress(message, mgr, calc_percent)
506
507 return c._first_promise
508
509 def add_progress(self,
510 message, # type: str
511 mgr,
512 calc_percent=None # type: Optional[Callable[[], Any]]
513 ):
514 return self.then(
515 on_complete=ProgressReference(
516 message=message,
517 mgr=mgr,
518 completion=calc_percent
519 )
520 )
521
522 def fail(self, e):
523 super(Completion, self).fail(e)
524 if self._progress_reference:
525 self._progress_reference.fail()
526
527 def finalize(self, result=_Promise.NO_RESULT):
528 if self._first_promise._state == self.INITIALIZED:
529 self._first_promise._finalize(result)
530
531 @property
532 def result(self):
533 """
534 The result of the operation that we were waited
535 for. Only valid after calling Orchestrator.process() on this
536 completion.
537 """
538 last = self._last_promise()
539 assert last._state == _Promise.FINISHED
540 return last._value
541
542 def result_str(self):
543 """Force a string."""
544 if self.result is None:
545 return ''
546 if isinstance(self.result, list):
547 return '\n'.join(str(x) for x in self.result)
548 return str(self.result)
549
550 @property
551 def exception(self):
552 # type: () -> Optional[Exception]
553 return self._last_promise()._exception
554
555 @property
556 def serialized_exception(self):
557 # type: () -> Optional[bytes]
558 return self._last_promise()._serialized_exception
559
560 @property
561 def has_result(self):
562 # type: () -> bool
563 """
564 Has the operation already a result?
565
566 For Write operations, it can already have a
567 result, if the orchestrator's configuration is
568 persistently written. Typically this would
569 indicate that an update had been written to
570 a manifest, but that the update had not
571 necessarily been pushed out to the cluster.
572
573 :return:
574 """
575 return self._last_promise()._state == _Promise.FINISHED
576
577 @property
578 def is_errored(self):
579 # type: () -> bool
580 """
581 Has the completion failed. Default implementation looks for
582 self.exception. Can be overwritten.
583 """
584 return self.exception is not None
585
586 @property
587 def needs_result(self):
588 # type: () -> bool
589 """
590 Could the external operation be deemed as complete,
591 or should we wait?
592 We must wait for a read operation only if it is not complete.
593 """
594 return not self.is_errored and not self.has_result
595
596 @property
597 def is_finished(self):
598 # type: () -> bool
599 """
600 Could the external operation be deemed as complete,
601 or should we wait?
602 We must wait for a read operation only if it is not complete.
603 """
604 return self.is_errored or (self.has_result)
605
606 def pretty_print(self):
607
608 reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise))
609 return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
610
611
612 def pretty_print(completions):
613 # type: (Sequence[Completion]) -> str
614 return ', '.join(c.pretty_print() for c in completions)
615
616
617 def raise_if_exception(c):
618 # type: (Completion) -> None
619 """
620 :raises OrchestratorError: Some user error or a config error.
621 :raises Exception: Some internal error
622 """
623 if c.serialized_exception is not None:
624 try:
625 e = pickle.loads(c.serialized_exception)
626 except (KeyError, AttributeError):
627 raise Exception('{}: {}'.format(type(c.exception), c.exception))
628 raise e
629
630
631 class TrivialReadCompletion(Completion):
632 """
633 This is the trivial completion simply wrapping a result.
634 """
635 def __init__(self, result):
636 super(TrivialReadCompletion, self).__init__()
637 if result:
638 self.finalize(result)
639
640
641 def _hide_in_features(f):
642 f._hide_in_features = True
643 return f
644
645
646 class Orchestrator(object):
647 """
648 Calls in this class may do long running remote operations, with time
649 periods ranging from network latencies to package install latencies and large
650 internet downloads. For that reason, all are asynchronous, and return
651 ``Completion`` objects.
652
653 Methods should only return the completion and not directly execute
654 anything, like network calls. Otherwise the purpose of
655 those completions is defeated.
656
657 Implementations are not required to start work on an operation until
658 the caller waits on the relevant Completion objects. Callers making
659 multiple updates should not wait on Completions until they're done
660 sending operations: this enables implementations to batch up a series
661 of updates when wait() is called on a set of Completion objects.
662
663 Implementations are encouraged to keep reasonably fresh caches of
664 the status of the system: it is better to serve a stale-but-recent
665 result read of e.g. device inventory than it is to keep the caller waiting
666 while you scan hosts every time.
667 """
668
669 @_hide_in_features
670 def is_orchestrator_module(self):
671 """
672 Enable other modules to interrogate this module to discover
673 whether it's usable as an orchestrator module.
674
675 Subclasses do not need to override this.
676 """
677 return True
678
679 @_hide_in_features
680 def available(self):
681 # type: () -> Tuple[bool, str]
682 """
683 Report whether we can talk to the orchestrator. This is the
684 place to give the user a meaningful message if the orchestrator
685 isn't running or can't be contacted.
686
687 This method may be called frequently (e.g. every page load
688 to conditionally display a warning banner), so make sure it's
689 not too expensive. It's okay to give a slightly stale status
690 (e.g. based on a periodic background ping of the orchestrator)
691 if that's necessary to make this method fast.
692
693 .. note::
694 `True` doesn't mean that the desired functionality
695 is actually available in the orchestrator. I.e. this
696 won't work as expected::
697
698 >>> if OrchestratorClientMixin().available()[0]: # wrong.
699 ... OrchestratorClientMixin().get_hosts()
700
701 :return: two-tuple of boolean, string
702 """
703 raise NotImplementedError()
704
705 @_hide_in_features
706 def process(self, completions):
707 # type: (List[Completion]) -> None
708 """
709 Given a list of Completion instances, process any which are
710 incomplete.
711
712 Callers should inspect the detail of each completion to identify
713 partial completion/progress information, and present that information
714 to the user.
715
716 This method should not block, as this would make it slow to query
717 a status, while other long running operations are in progress.
718 """
719 raise NotImplementedError()
720
721 @_hide_in_features
722 def get_feature_set(self):
723 """Describes which methods this orchestrator implements
724
725 .. note::
726 `True` doesn't mean that the desired functionality
727 is actually possible in the orchestrator. I.e. this
728 won't work as expected::
729
730 >>> api = OrchestratorClientMixin()
731 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
732 ... api.get_hosts()
733
734 It's better to ask for forgiveness instead::
735
736 >>> try:
737 ... OrchestratorClientMixin().get_hosts()
738 ... except (OrchestratorError, NotImplementedError):
739 ... ...
740
741 :returns: Dict of API method names to ``{'available': True or False}``
742 """
743 module = self.__class__
744 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
745 for a in Orchestrator.__dict__
746 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
747 }
748 return features
749
750 def cancel_completions(self):
751 # type: () -> None
752 """
753 Cancels ongoing completions. Unstuck the mgr.
754 """
755 raise NotImplementedError()
756
757 def pause(self):
758 # type: () -> None
759 raise NotImplementedError()
760
761 def resume(self):
762 # type: () -> None
763 raise NotImplementedError()
764
765 def add_host(self, host_spec):
766 # type: (HostSpec) -> Completion
767 """
768 Add a host to the orchestrator inventory.
769
770 :param host: hostname
771 """
772 raise NotImplementedError()
773
774 def remove_host(self, host):
775 # type: (str) -> Completion
776 """
777 Remove a host from the orchestrator inventory.
778
779 :param host: hostname
780 """
781 raise NotImplementedError()
782
783 def update_host_addr(self, host, addr):
784 # type: (str, str) -> Completion
785 """
786 Update a host's address
787
788 :param host: hostname
789 :param addr: address (dns name or IP)
790 """
791 raise NotImplementedError()
792
793 def get_hosts(self):
794 # type: () -> Completion
795 """
796 Report the hosts in the cluster.
797
798 :return: list of HostSpec
799 """
800 raise NotImplementedError()
801
802 def add_host_label(self, host, label):
803 # type: (str, str) -> Completion
804 """
805 Add a host label
806 """
807 raise NotImplementedError()
808
809 def remove_host_label(self, host, label):
810 # type: (str, str) -> Completion
811 """
812 Remove a host label
813 """
814 raise NotImplementedError()
815
816 def get_inventory(self, host_filter=None, refresh=False):
817 # type: (Optional[InventoryFilter], bool) -> Completion
818 """
819 Returns something that was created by `ceph-volume inventory`.
820
821 :return: list of InventoryHost
822 """
823 raise NotImplementedError()
824
825 def describe_service(self, service_type=None, service_name=None, refresh=False):
826 # type: (Optional[str], Optional[str], bool) -> Completion
827 """
828 Describe a service (of any kind) that is already configured in
829 the orchestrator. For example, when viewing an OSD in the dashboard
830 we might like to also display information about the orchestrator's
831 view of the service (like the kubernetes pod ID).
832
833 When viewing a CephFS filesystem in the dashboard, we would use this
834 to display the pods being currently run for MDS daemons.
835
836 :return: list of ServiceDescription objects.
837 """
838 raise NotImplementedError()
839
840 def list_daemons(self, daemon_type=None, daemon_id=None, host=None, refresh=False):
841 # type: (Optional[str], Optional[str], Optional[str], bool) -> Completion
842 """
843 Describe a daemon (of any kind) that is already configured in
844 the orchestrator.
845
846 :return: list of DaemonDescription objects.
847 """
848 raise NotImplementedError()
849
850 def apply(self, specs: List[ServiceSpec]) -> Completion:
851 """
852 Applies any spec
853 """
854 fns: Dict[str, Callable[[ServiceSpec], Completion]] = {
855 'alertmanager': self.apply_alertmanager,
856 'crash': self.apply_crash,
857 'grafana': self.apply_grafana,
858 'mds': self.apply_mds,
859 'mgr': self.apply_mgr,
860 'mon': self.apply_mon,
861 'nfs': cast(Callable[[ServiceSpec], Completion], self.apply_nfs),
862 'node-exporter': self.apply_node_exporter,
863 'osd': cast(Callable[[ServiceSpec], Completion], lambda dg: self.apply_drivegroups([dg])),
864 'prometheus': self.apply_prometheus,
865 'rbd-mirror': self.apply_rbd_mirror,
866 'rgw': cast(Callable[[ServiceSpec], Completion], self.apply_rgw),
867 }
868
869 def merge(ls, r):
870 if isinstance(ls, list):
871 return ls + [r]
872 return [ls, r]
873
874 spec, *specs = specs
875
876 completion = fns[spec.service_type](spec)
877 for s in specs:
878 def next(ls):
879 return fns[s.service_type](s).then(lambda r: merge(ls, r))
880 completion = completion.then(next)
881 return completion
882
883 def remove_daemons(self, names):
884 # type: (List[str]) -> Completion
885 """
886 Remove specific daemon(s).
887
888 :return: None
889 """
890 raise NotImplementedError()
891
892 def list_specs(self, service_name=None):
893 # type: (Optional[str]) -> Completion
894 """
895 Lists saved service specs
896 """
897 raise NotImplementedError()
898
899 def remove_service(self, service_name):
900 # type: (str) -> Completion
901 """
902 Remove a service (a collection of daemons).
903
904 :return: None
905 """
906 raise NotImplementedError()
907
908 def service_action(self, action, service_name):
909 # type: (str, str) -> Completion
910 """
911 Perform an action (start/stop/reload) on a service (i.e., all daemons
912 providing the logical service).
913
914 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
915 :param service_type: e.g. "mds", "rgw", ...
916 :param service_name: name of logical service ("cephfs", "us-east", ...)
917 :rtype: Completion
918 """
919 #assert action in ["start", "stop", "reload, "restart", "redeploy"]
920 raise NotImplementedError()
921
922 def daemon_action(self, action, daemon_type, daemon_id):
923 # type: (str, str, str) -> Completion
924 """
925 Perform an action (start/stop/reload) on a daemon.
926
927 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
928 :param name: name of daemon
929 :rtype: Completion
930 """
931 #assert action in ["start", "stop", "reload, "restart", "redeploy"]
932 raise NotImplementedError()
933
934 def create_osds(self, drive_group):
935 # type: (DriveGroupSpec) -> Completion
936 """
937 Create one or more OSDs within a single Drive Group.
938
939 The principal argument here is the drive_group member
940 of OsdSpec: other fields are advisory/extensible for any
941 finer-grained OSD feature enablement (choice of backing store,
942 compression/encryption, etc).
943 """
944 raise NotImplementedError()
945
946 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion:
947 """ Update OSD cluster """
948 raise NotImplementedError()
949
950 def remove_osds(self, osd_ids: List[str],
951 replace: bool = False,
952 force: bool = False) -> Completion:
953 """
954 :param osd_ids: list of OSD IDs
955 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
956 :param force: Forces the OSD removal process without waiting for the data to be drained first.
957 Note that this can only remove OSDs that were successfully
958 created (i.e. got an OSD ID).
959 """
960 raise NotImplementedError()
961
962 def remove_osds_status(self):
963 # type: () -> Completion
964 """
965 Returns a status of the ongoing OSD removal operations.
966 """
967 raise NotImplementedError()
968
969 def blink_device_light(self, ident_fault, on, locations):
970 # type: (str, bool, List[DeviceLightLoc]) -> Completion
971 """
972 Instructs the orchestrator to enable or disable either the ident or the fault LED.
973
974 :param ident_fault: either ``"ident"`` or ``"fault"``
975 :param on: ``True`` = on.
976 :param locations: See :class:`orchestrator.DeviceLightLoc`
977 """
978 raise NotImplementedError()
979
980 def zap_device(self, host, path):
981 # type: (str, str) -> Completion
982 """Zap/Erase a device (DESTROYS DATA)"""
983 raise NotImplementedError()
984
985 def add_mon(self, spec):
986 # type: (ServiceSpec) -> Completion
987 """Create mon daemon(s)"""
988 raise NotImplementedError()
989
990 def apply_mon(self, spec):
991 # type: (ServiceSpec) -> Completion
992 """Update mon cluster"""
993 raise NotImplementedError()
994
995 def add_mgr(self, spec):
996 # type: (ServiceSpec) -> Completion
997 """Create mgr daemon(s)"""
998 raise NotImplementedError()
999
1000 def apply_mgr(self, spec):
1001 # type: (ServiceSpec) -> Completion
1002 """Update mgr cluster"""
1003 raise NotImplementedError()
1004
1005 def add_mds(self, spec):
1006 # type: (ServiceSpec) -> Completion
1007 """Create MDS daemon(s)"""
1008 raise NotImplementedError()
1009
1010 def apply_mds(self, spec):
1011 # type: (ServiceSpec) -> Completion
1012 """Update MDS cluster"""
1013 raise NotImplementedError()
1014
1015 def add_rbd_mirror(self, spec):
1016 # type: (ServiceSpec) -> Completion
1017 """Create rbd-mirror daemon(s)"""
1018 raise NotImplementedError()
1019
1020 def apply_rbd_mirror(self, spec):
1021 # type: (ServiceSpec) -> Completion
1022 """Update rbd-mirror cluster"""
1023 raise NotImplementedError()
1024
1025 def add_nfs(self, spec):
1026 # type: (NFSServiceSpec) -> Completion
1027 """Create NFS daemon(s)"""
1028 raise NotImplementedError()
1029
1030 def apply_nfs(self, spec):
1031 # type: (NFSServiceSpec) -> Completion
1032 """Update NFS cluster"""
1033 raise NotImplementedError()
1034
1035 def add_rgw(self, spec):
1036 # type: (RGWSpec) -> Completion
1037 """Create RGW daemon(s)"""
1038 raise NotImplementedError()
1039
1040 def apply_rgw(self, spec):
1041 # type: (RGWSpec) -> Completion
1042 """Update RGW cluster"""
1043 raise NotImplementedError()
1044
1045 def add_prometheus(self, spec):
1046 # type: (ServiceSpec) -> Completion
1047 """Create new prometheus daemon"""
1048 raise NotImplementedError()
1049
1050 def apply_prometheus(self, spec):
1051 # type: (ServiceSpec) -> Completion
1052 """Update prometheus cluster"""
1053 raise NotImplementedError()
1054
1055 def add_node_exporter(self, spec):
1056 # type: (ServiceSpec) -> Completion
1057 """Create a new Node-Exporter service"""
1058 raise NotImplementedError()
1059
1060 def apply_node_exporter(self, spec):
1061 # type: (ServiceSpec) -> Completion
1062 """Update existing a Node-Exporter daemon(s)"""
1063 raise NotImplementedError()
1064
1065 def add_crash(self, spec):
1066 # type: (ServiceSpec) -> Completion
1067 """Create a new crash service"""
1068 raise NotImplementedError()
1069
1070 def apply_crash(self, spec):
1071 # type: (ServiceSpec) -> Completion
1072 """Update existing a crash daemon(s)"""
1073 raise NotImplementedError()
1074
1075 def add_grafana(self, spec):
1076 # type: (ServiceSpec) -> Completion
1077 """Create a new Node-Exporter service"""
1078 raise NotImplementedError()
1079
1080 def apply_grafana(self, spec):
1081 # type: (ServiceSpec) -> Completion
1082 """Update existing a Node-Exporter daemon(s)"""
1083 raise NotImplementedError()
1084
1085 def add_alertmanager(self, spec):
1086 # type: (ServiceSpec) -> Completion
1087 """Create a new AlertManager service"""
1088 raise NotImplementedError()
1089
1090 def apply_alertmanager(self, spec):
1091 # type: (ServiceSpec) -> Completion
1092 """Update an existing AlertManager daemon(s)"""
1093 raise NotImplementedError()
1094
1095 def upgrade_check(self, image, version):
1096 # type: (Optional[str], Optional[str]) -> Completion
1097 raise NotImplementedError()
1098
1099 def upgrade_start(self, image, version):
1100 # type: (Optional[str], Optional[str]) -> Completion
1101 raise NotImplementedError()
1102
1103 def upgrade_pause(self):
1104 # type: () -> Completion
1105 raise NotImplementedError()
1106
1107 def upgrade_resume(self):
1108 # type: () -> Completion
1109 raise NotImplementedError()
1110
1111 def upgrade_stop(self):
1112 # type: () -> Completion
1113 raise NotImplementedError()
1114
1115 def upgrade_status(self):
1116 # type: () -> Completion
1117 """
1118 If an upgrade is currently underway, report on where
1119 we are in the process, or if some error has occurred.
1120
1121 :return: UpgradeStatusSpec instance
1122 """
1123 raise NotImplementedError()
1124
1125 @_hide_in_features
1126 def upgrade_available(self):
1127 # type: () -> Completion
1128 """
1129 Report on what versions are available to upgrade to
1130
1131 :return: List of strings
1132 """
1133 raise NotImplementedError()
1134
1135
1136 class HostSpec(object):
1137 """
1138 Information about hosts. Like e.g. ``kubectl get nodes``
1139 """
1140 def __init__(self,
1141 hostname, # type: str
1142 addr=None, # type: Optional[str]
1143 labels=None, # type: Optional[List[str]]
1144 status=None, # type: Optional[str]
1145 ):
1146 #: the bare hostname on the host. Not the FQDN.
1147 self.hostname = hostname # type: str
1148
1149 #: DNS name or IP address to reach it
1150 self.addr = addr or hostname # type: str
1151
1152 #: label(s), if any
1153 self.labels = labels or [] # type: List[str]
1154
1155 #: human readable status
1156 self.status = status or '' # type: str
1157
1158 def to_json(self):
1159 return {
1160 'hostname': self.hostname,
1161 'addr': self.addr,
1162 'labels': self.labels,
1163 'status': self.status,
1164 }
1165
1166 def __repr__(self):
1167 args = [self.hostname] # type: List[Any]
1168 if self.addr is not None:
1169 args.append(self.addr)
1170 if self.labels:
1171 args.append(self.labels)
1172 if self.status:
1173 args.append(self.status)
1174
1175 return "<HostSpec>({})".format(', '.join(map(repr, args)))
1176
1177 def __eq__(self, other):
1178 # Let's omit `status` for the moment, as it is still the very same host.
1179 return self.hostname == other.hostname and \
1180 self.addr == other.addr and \
1181 self.labels == other.labels
1182
1183
1184 class UpgradeStatusSpec(object):
1185 # Orchestrator's report on what's going on with any ongoing upgrade
1186 def __init__(self):
1187 self.in_progress = False # Is an upgrade underway?
1188 self.target_image = None
1189 self.services_complete = [] # Which daemon types are fully updated?
1190 self.message = "" # Freeform description
1191
1192
1193 def handle_type_error(method):
1194 @wraps(method)
1195 def inner(cls, *args, **kwargs):
1196 try:
1197 return method(cls, *args, **kwargs)
1198 except TypeError as e:
1199 error_msg = '{}: {}'.format(cls.__name__, e)
1200 raise OrchestratorValidationError(error_msg)
1201 return inner
1202
1203
1204 class DaemonDescription(object):
1205 """
1206 For responding to queries about the status of a particular daemon,
1207 stateful or stateless.
1208
1209 This is not about health or performance monitoring of daemons: it's
1210 about letting the orchestrator tell Ceph whether and where a
1211 daemon is scheduled in the cluster. When an orchestrator tells
1212 Ceph "it's running on host123", that's not a promise that the process
1213 is literally up this second, it's a description of where the orchestrator
1214 has decided the daemon should run.
1215 """
1216
1217 def __init__(self,
1218 daemon_type=None,
1219 daemon_id=None,
1220 hostname=None,
1221 container_id=None,
1222 container_image_id=None,
1223 container_image_name=None,
1224 version=None,
1225 status=None,
1226 status_desc=None,
1227 last_refresh=None,
1228 created=None,
1229 started=None,
1230 last_configured=None,
1231 last_deployed=None):
1232 # Host is at the same granularity as InventoryHost
1233 self.hostname = hostname
1234
1235 # Not everyone runs in containers, but enough people do to
1236 # justify having the container_id (runtime id) and container_image
1237 # (image name)
1238 self.container_id = container_id # runtime id
1239 self.container_image_id = container_image_id # image hash
1240 self.container_image_name = container_image_name # image friendly name
1241
1242 # The type of service (osd, mon, mgr, etc.)
1243 self.daemon_type = daemon_type
1244
1245 # The orchestrator will have picked some names for daemons,
1246 # typically either based on hostnames or on pod names.
1247 # This is the <foo> in mds.<foo>, the ID that will appear
1248 # in the FSMap/ServiceMap.
1249 self.daemon_id = daemon_id
1250
1251 # Service version that was deployed
1252 self.version = version
1253
1254 # Service status: -1 error, 0 stopped, 1 running
1255 self.status = status
1256
1257 # Service status description when status == -1.
1258 self.status_desc = status_desc
1259
1260 # datetime when this info was last refreshed
1261 self.last_refresh = last_refresh # type: Optional[datetime.datetime]
1262
1263 self.created = created # type: Optional[datetime.datetime]
1264 self.started = started # type: Optional[datetime.datetime]
1265 self.last_configured = last_configured # type: Optional[datetime.datetime]
1266 self.last_deployed = last_deployed # type: Optional[datetime.datetime]
1267
1268 def name(self):
1269 return '%s.%s' % (self.daemon_type, self.daemon_id)
1270
1271 def matches_service(self, service_name):
1272 # type: (Optional[str]) -> bool
1273 if service_name:
1274 return self.name().startswith(service_name + '.')
1275 return False
1276
1277 def service_name(self):
1278 if self.daemon_type == 'rgw':
1279 v = self.daemon_id.split('.')
1280 s_name = '.'.join(v[0:2])
1281 return 'rgw.%s' % s_name
1282 if self.daemon_type in ['mds', 'nfs']:
1283 _s_name = self.daemon_id.split('.')[0]
1284 return 'mds.%s' % _s_name
1285 return self.daemon_type
1286
1287 def __repr__(self):
1288 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
1289 id=self.daemon_id)
1290
1291 def to_json(self):
1292 out = {
1293 'hostname': self.hostname,
1294 'container_id': self.container_id,
1295 'container_image_id': self.container_image_id,
1296 'container_image_name': self.container_image_name,
1297 'daemon_id': self.daemon_id,
1298 'daemon_type': self.daemon_type,
1299 'version': self.version,
1300 'status': self.status,
1301 'status_desc': self.status_desc,
1302 }
1303 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1304 'last_configured']:
1305 if getattr(self, k):
1306 out[k] = getattr(self, k).strftime(DATEFMT)
1307 return {k: v for (k, v) in out.items() if v is not None}
1308
1309 @classmethod
1310 @handle_type_error
1311 def from_json(cls, data):
1312 c = data.copy()
1313 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1314 'last_configured']:
1315 if k in c:
1316 c[k] = datetime.datetime.strptime(c[k], DATEFMT)
1317 return cls(**c)
1318
1319 class ServiceDescription(object):
1320 """
1321 For responding to queries about the status of a particular service,
1322 stateful or stateless.
1323
1324 This is not about health or performance monitoring of services: it's
1325 about letting the orchestrator tell Ceph whether and where a
1326 service is scheduled in the cluster. When an orchestrator tells
1327 Ceph "it's running on host123", that's not a promise that the process
1328 is literally up this second, it's a description of where the orchestrator
1329 has decided the service should run.
1330 """
1331
1332 def __init__(self,
1333 container_image_id=None,
1334 container_image_name=None,
1335 service_name=None,
1336 rados_config_location=None,
1337 service_url=None,
1338 last_refresh=None,
1339 created=None,
1340 size=0,
1341 running=0,
1342 spec=None):
1343 # Not everyone runs in containers, but enough people do to
1344 # justify having the container_image_id (image hash) and container_image
1345 # (image name)
1346 self.container_image_id = container_image_id # image hash
1347 self.container_image_name = container_image_name # image friendly name
1348
1349 # The service_name is either a bare type (e.g., 'mgr') or
1350 # type.id combination (e.g., 'mds.fsname' or 'rgw.realm.zone').
1351 self.service_name = service_name
1352
1353 # Location of the service configuration when stored in rados
1354 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1355 self.rados_config_location = rados_config_location
1356
1357 # If the service exposes REST-like API, this attribute should hold
1358 # the URL.
1359 self.service_url = service_url
1360
1361 # Number of daemons
1362 self.size = size
1363
1364 # Number of daemons up
1365 self.running = running
1366
1367 # datetime when this info was last refreshed
1368 self.last_refresh = last_refresh # type: Optional[datetime.datetime]
1369 self.created = created # type: Optional[datetime.datetime]
1370
1371 self.spec = spec
1372
1373 def service_type(self):
1374 if self.service_name:
1375 return self.service_name.split('.')[0]
1376 return None
1377
1378 def __repr__(self):
1379 return "<ServiceDescription>({name})".format(name=self.service_name)
1380
1381 def to_json(self):
1382 out = {
1383 'container_image_id': self.container_image_id,
1384 'container_image_name': self.container_image_name,
1385 'service_name': self.service_name,
1386 'rados_config_location': self.rados_config_location,
1387 'service_url': self.service_url,
1388 'size': self.size,
1389 'running': self.running,
1390 'spec': self.spec.to_json() if self.spec is not None else None
1391 }
1392 for k in ['last_refresh', 'created']:
1393 if getattr(self, k):
1394 out[k] = getattr(self, k).strftime(DATEFMT)
1395 return {k: v for (k, v) in out.items() if v is not None}
1396
1397 @classmethod
1398 @handle_type_error
1399 def from_json(cls, data):
1400 c = data.copy()
1401 for k in ['last_refresh', 'created']:
1402 if k in c:
1403 c[k] = datetime.datetime.strptime(c[k], DATEFMT)
1404 return cls(**c)
1405
1406
1407 class InventoryFilter(object):
1408 """
1409 When fetching inventory, use this filter to avoid unnecessarily
1410 scanning the whole estate.
1411
1412 Typical use: filter by host when presenting UI workflow for configuring
1413 a particular server.
1414 filter by label when not all of estate is Ceph servers,
1415 and we want to only learn about the Ceph servers.
1416 filter by label when we are interested particularly
1417 in e.g. OSD servers.
1418
1419 """
1420 def __init__(self, labels=None, hosts=None):
1421 # type: (Optional[List[str]], Optional[List[str]]) -> None
1422
1423 #: Optional: get info about hosts matching labels
1424 self.labels = labels
1425
1426 #: Optional: get info about certain named hosts only
1427 self.hosts = hosts
1428
1429
1430 class InventoryHost(object):
1431 """
1432 When fetching inventory, all Devices are groups inside of an
1433 InventoryHost.
1434 """
1435 def __init__(self, name, devices=None, labels=None, addr=None):
1436 # type: (str, Optional[inventory.Devices], Optional[List[str]], Optional[str]) -> None
1437 if devices is None:
1438 devices = inventory.Devices([])
1439 if labels is None:
1440 labels = []
1441 assert isinstance(devices, inventory.Devices)
1442
1443 self.name = name # unique within cluster. For example a hostname.
1444 self.addr = addr or name
1445 self.devices = devices
1446 self.labels = labels
1447
1448 def to_json(self):
1449 return {
1450 'name': self.name,
1451 'addr': self.addr,
1452 'devices': self.devices.to_json(),
1453 'labels': self.labels,
1454 }
1455
1456 @classmethod
1457 def from_json(cls, data):
1458 try:
1459 _data = copy.deepcopy(data)
1460 name = _data.pop('name')
1461 addr = _data.pop('addr', None) or name
1462 devices = inventory.Devices.from_json(_data.pop('devices'))
1463 if _data:
1464 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1465 raise OrchestratorValidationError(error_msg)
1466 labels = _data.get('labels', list())
1467 return cls(name, devices, labels, addr)
1468 except KeyError as e:
1469 error_msg = '{} is required for {}'.format(e, cls.__name__)
1470 raise OrchestratorValidationError(error_msg)
1471 except TypeError as e:
1472 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1473
1474
1475 @classmethod
1476 def from_nested_items(cls, hosts):
1477 devs = inventory.Devices.from_json
1478 return [cls(item[0], devs(item[1].data)) for item in hosts]
1479
1480 def __repr__(self):
1481 return "<InventoryHost>({name})".format(name=self.name)
1482
1483 @staticmethod
1484 def get_host_names(hosts):
1485 # type: (List[InventoryHost]) -> List[str]
1486 return [host.name for host in hosts]
1487
1488 def __eq__(self, other):
1489 return self.name == other.name and self.devices == other.devices
1490
1491
1492 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1493 """
1494 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1495 on devices.
1496
1497 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1498
1499 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1500 See ``ceph osd metadata | jq '.[].device_ids'``
1501 """
1502 __slots__ = ()
1503
1504
1505 def _mk_orch_methods(cls):
1506 # Needs to be defined outside of for.
1507 # Otherwise meth is always bound to last key
1508 def shim(method_name):
1509 def inner(self, *args, **kwargs):
1510 completion = self._oremote(method_name, args, kwargs)
1511 return completion
1512 return inner
1513
1514 for meth in Orchestrator.__dict__:
1515 if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
1516 setattr(cls, meth, shim(meth))
1517 return cls
1518
1519
1520 @_mk_orch_methods
1521 class OrchestratorClientMixin(Orchestrator):
1522 """
1523 A module that inherents from `OrchestratorClientMixin` can directly call
1524 all :class:`Orchestrator` methods without manually calling remote.
1525
1526 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1527 calls :func:`OrchestratorClientMixin._oremote`
1528
1529 >>> class MyModule(OrchestratorClientMixin):
1530 ... def func(self):
1531 ... completion = self.add_host('somehost') # calls `_oremote()`
1532 ... self._orchestrator_wait([completion])
1533 ... self.log.debug(completion.result)
1534
1535 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1536 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1537 "real" implementation of the orchestrator.
1538
1539
1540 >>> import mgr_module
1541 >>> class MyImplentation(mgr_module.MgrModule, Orchestrator):
1542 ... def __init__(self, ...):
1543 ... self.orch_client = OrchestratorClientMixin()
1544 ... self.orch_client.set_mgr(self.mgr))
1545 """
1546
1547 def set_mgr(self, mgr):
1548 # type: (MgrModule) -> None
1549 """
1550 Useable in the Dashbord that uses a global ``mgr``
1551 """
1552
1553 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1554
1555 def __get_mgr(self):
1556 try:
1557 return self.__mgr
1558 except AttributeError:
1559 return self
1560
1561 def _oremote(self, meth, args, kwargs):
1562 """
1563 Helper for invoking `remote` on whichever orchestrator is enabled
1564
1565 :raises RuntimeError: If the remote method failed.
1566 :raises OrchestratorError: orchestrator failed to perform
1567 :raises ImportError: no `orchestrator` module or backend not found.
1568 """
1569 mgr = self.__get_mgr()
1570
1571 try:
1572 o = mgr._select_orchestrator()
1573 except AttributeError:
1574 o = mgr.remote('orchestrator', '_select_orchestrator')
1575
1576 if o is None:
1577 raise NoOrchestrator()
1578
1579 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1580 try:
1581 return mgr.remote(o, meth, *args, **kwargs)
1582 except Exception as e:
1583 if meth == 'get_feature_set':
1584 raise # self.get_feature_set() calls self._oremote()
1585 f_set = self.get_feature_set()
1586 if meth not in f_set or not f_set[meth]['available']:
1587 raise NotImplementedError(f'{o} does not implement {meth}') from e
1588 raise
1589
1590 def _orchestrator_wait(self, completions):
1591 # type: (List[Completion]) -> None
1592 """
1593 Wait for completions to complete (reads) or
1594 become persistent (writes).
1595
1596 Waits for writes to be *persistent* but not *effective*.
1597
1598 :param completions: List of Completions
1599 :raises NoOrchestrator:
1600 :raises RuntimeError: something went wrong while calling the process method.
1601 :raises ImportError: no `orchestrator` module or backend not found.
1602 """
1603 while any(not c.has_result for c in completions):
1604 self.process(completions)
1605 self.__get_mgr().log.info("Operations pending: %s",
1606 sum(1 for c in completions if not c.has_result))
1607 if any(c.needs_result for c in completions):
1608 time.sleep(1)
1609 else:
1610 break