- self._serialized_exception_ = pickle.dumps(e)
-
- @property
- def _serialized_exception(self) -> Optional[bytes]:
- return getattr(self, '_serialized_exception_', None)
-
- @property
- def _on_complete(self) -> Optional[Callable]:
- # https://github.com/python/mypy/issues/4125
- return self._on_complete_
-
- @_on_complete.setter
- def _on_complete(self, val: Optional[Callable]) -> None:
- self._on_complete_ = val
-
- def __repr__(self):
- name = self._name or getattr(self._on_complete, '__name__',
- '??') if self._on_complete else 'None'
- val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
- return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
- self.__class__, self._state, val, self._on_complete, id(self), name, getattr(
- next, '_progress_reference', 'NA'), repr(self._next_promise)
- )
-
- def pretty_print_1(self):
- if self._name:
- name = self._name
- elif self._on_complete is None:
- name = 'lambda x: x'
- elif hasattr(self._on_complete, '__name__'):
- name = getattr(self._on_complete, '__name__')
- else:
- name = self._on_complete.__class__.__name__
- val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...'
- prefix = {
- self.INITIALIZED: ' ',
- self.RUNNING: ' >>>',
- self.FINISHED: '(done)'
- }[self._state]
- return '{} {}({}),'.format(prefix, name, val)
-
- def then(self: Any, on_complete: Callable) -> Any:
- """
- Call ``on_complete`` as soon as this promise is finalized.
- """
- assert self._state in (self.INITIALIZED, self.RUNNING)
-
- if self._next_promise is not None:
- return self._next_promise.then(on_complete)
-
- if self._on_complete is not None:
- self._set_next_promise(self.__class__(
- _first_promise=self._first_promise,
- on_complete=on_complete
- ))
- return self._next_promise
-
- else:
- self._on_complete = on_complete
- self._set_next_promise(self.__class__(_first_promise=self._first_promise))
- return self._next_promise
-
- def _set_next_promise(self, next: '_Promise') -> None:
- assert self is not next
- assert self._state in (self.INITIALIZED, self.RUNNING)
-
- self._next_promise = next
- assert self._next_promise is not None
- for p in iter(self._next_promise):
- p._first_promise = self._first_promise
-
- def _finalize(self, value=NO_RESULT):
- """
- Sets this promise to complete.
-
- Orchestrators may choose to use this helper function.
-
- :param value: new value.
- """
- if self._state not in (self.INITIALIZED, self.RUNNING):
- raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
-
- self._state = self.RUNNING
-
- if value is not self.NO_RESULT:
- self._value = value
- assert self._value is not self.NO_RESULT, repr(self)
-
- if self._on_complete:
- try:
- next_result = self._on_complete(self._value)
- except Exception as e:
- self.fail(e)
- return
- else:
- next_result = self._value
-
- if isinstance(next_result, _Promise):
- # hack: _Promise is not a continuation monad.
- next_result = next_result._first_promise # type: ignore
- assert next_result not in self, repr(self._first_promise) + repr(next_result)
- assert self not in next_result
- next_result._append_promise(self._next_promise)
- self._set_next_promise(next_result)
- assert self._next_promise
- if self._next_promise._value is self.NO_RESULT:
- self._next_promise._value = self._value
- self.propagate_to_next()
- elif next_result is not self.ASYNC_RESULT:
- # simple map. simply forward
- if self._next_promise:
- self._next_promise._value = next_result
- else:
- # Hack: next_result is of type U, _value is of type T
- self._value = next_result # type: ignore
- self.propagate_to_next()
- else:
- # asynchronous promise
- pass
-
- def propagate_to_next(self):
- self._state = self.FINISHED
- logger.debug('finalized {}'.format(repr(self)))
- if self._next_promise:
- self._next_promise._finalize()
-
- def fail(self, e: Exception) -> None:
- """
- Sets the whole completion to be faild with this exception and end the
- evaluation.
- """
- if self._state == self.FINISHED:
- raise ValueError(
- 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e)))
- assert self._state in (self.INITIALIZED, self.RUNNING)
- logger.exception('_Promise failed')
- self._exception = e
- self._value = f'_exception: {e}'
- if self._next_promise:
- self._next_promise.fail(e)
- self._state = self.FINISHED
-
- def __contains__(self, item):
- return any(item is p for p in iter(self._first_promise))
-
- def __iter__(self):
- yield self
- elem = self._next_promise
- while elem is not None:
- yield elem
- elem = elem._next_promise
-
- def _append_promise(self, other):
- if other is not None:
- assert self not in other
- assert other not in self
- self._last_promise()._set_next_promise(other)
-
- def _last_promise(self) -> '_Promise':
- return list(iter(self))[-1]
-
-
-class ProgressReference(object):
- def __init__(self,
- message: str,
- mgr,
- completion: Optional[Callable[[], 'Completion']] = None
- ):
- """
- ProgressReference can be used within Completions::
-
- +---------------+ +---------------------------------+
- | | then | |
- | My Completion | +--> | on_complete=ProgressReference() |
- | | | |
- +---------------+ +---------------------------------+
-
- See :func:`Completion.with_progress` for an easy way to create
- a progress reference
-
- """
- super(ProgressReference, self).__init__()
- self.progress_id = str(uuid.uuid4())
- self.message = message
- self.mgr = mgr
-
- #: The completion can already have a result, before the write
- #: operation is effective. progress == 1 means, the services are
- #: created / removed.
- self.completion: Optional[Callable[[], Completion]] = completion
-
- #: if a orchestrator module can provide a more detailed
- #: progress information, it needs to also call ``progress.update()``.
- self.progress = 0.0
-
- self._completion_has_result = False
- self.mgr.all_progress_references.append(self)
-
- def __str__(self):
- """
- ``__str__()`` is used for determining the message for progress events.
- """
- return self.message or super(ProgressReference, self).__str__()
-
- def __call__(self, arg):
- self._completion_has_result = True
- self.progress = 1.0
- return arg
-
- @property
- def progress(self):
- return self._progress
-
- @progress.setter
- def progress(self, progress):
- assert progress <= 1.0
- self._progress = progress
- try:
- if self.effective:
- self.mgr.remote("progress", "complete", self.progress_id)
- self.mgr.all_progress_references = [
- p for p in self.mgr.all_progress_references if p is not self]
- else:
- self.mgr.remote("progress", "update", self.progress_id, self.message,
- progress,
- [("origin", "orchestrator")])
- except ImportError:
- # If the progress module is disabled that's fine,
- # they just won't see the output.
- pass
-
- @property
- def effective(self):
- return self.progress == 1 and self._completion_has_result
-
- def update(self):
- def progress_run(progress):
- self.progress = progress
- if self.completion:
- c = self.completion().then(progress_run)
- self.mgr.process([c._first_promise])
- else:
- self.progress = 1
-
- def fail(self):
- self._completion_has_result = True
- self.progress = 1
-
-
-class Completion(_Promise, Generic[T]):
- """
- Combines multiple promises into one overall operation.
-
- Completions are composable by being able to
- call one completion from another completion. I.e. making them re-usable
- using Promises E.g.::
-
- >>> #doctest: +SKIP
- ... return Orchestrator().get_hosts().then(self._create_osd)
-
- where ``get_hosts`` returns a Completion of list of hosts and
- ``_create_osd`` takes a list of hosts.
-
- The concept behind this is to store the computation steps
- explicit and then explicitly evaluate the chain:
-
- >>> #doctest: +SKIP
- ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
- ... p.finalize(2)
- ... assert p.result = "4"
-
- or graphically::
-
- +---------------+ +-----------------+
- | | then | |
- | lambda x: x*x | +--> | lambda x: str(x)|
- | | | |
- +---------------+ +-----------------+
-
- """
-
- def __init__(self,
- _first_promise: Optional["Completion"] = None,
- value: Any = _Promise.NO_RESULT,
- on_complete: Optional[Callable] = None,
- name: Optional[str] = None,
- ):
- super(Completion, self).__init__(_first_promise, value, on_complete, name)
-
- @property
- def _progress_reference(self) -> Optional[ProgressReference]:
- if hasattr(self._on_complete, 'progress_id'):
- return self._on_complete # type: ignore
- return None
-
- @property
- def progress_reference(self) -> Optional[ProgressReference]:
- """
- ProgressReference. Marks this completion
- as a write completeion.
- """
-
- references = [c._progress_reference for c in iter(
- self) if c._progress_reference is not None]
- if references:
- assert len(references) == 1
- return references[0]
- return None
-
- @classmethod
- def with_progress(cls: Any,
- message: str,
- mgr,
- _first_promise: Optional["Completion"] = None,
- value: Any = _Promise.NO_RESULT,
- on_complete: Optional[Callable] = None,
- calc_percent: Optional[Callable[[], Any]] = None
- ) -> Any:
-
- c = cls(
- _first_promise=_first_promise,
- value=value,
- on_complete=on_complete
- ).add_progress(message, mgr, calc_percent)
-
- return c._first_promise
-
- def add_progress(self,
- message: str,
- mgr,
- calc_percent: Optional[Callable[[], Any]] = None
- ):
- return self.then(
- on_complete=ProgressReference(
- message=message,
- mgr=mgr,
- completion=calc_percent
- )
- )
-
- def fail(self, e: Exception):
- super(Completion, self).fail(e)
- if self._progress_reference:
- self._progress_reference.fail()
-
- def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT):
- if self._first_promise._state == self.INITIALIZED:
- self._first_promise._finalize(result)
-
- @property
- def result(self) -> T:
- """
- The result of the operation that we were waited
- for. Only valid after calling Orchestrator.process() on this
- completion.
- """
- last = self._last_promise()
- assert last._state == _Promise.FINISHED
- return cast(T, last._value)