]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
import 15.2.9
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
1
2 """
3 ceph-mgr orchestrator interface
4
5 Please see the ceph-mgr module developer's guide for more information.
6 """
7
8 import copy
9 import datetime
10 import errno
11 import logging
12 import pickle
13 import re
14 import time
15 import uuid
16
17 from collections import namedtuple, OrderedDict
18 from contextlib import contextmanager
19 from functools import wraps
20
21 import yaml
22
23 from ceph.deployment import inventory
24 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
25 ServiceSpecValidationError, IscsiServiceSpec
26 from ceph.deployment.drive_group import DriveGroupSpec
27 from ceph.deployment.hostspec import HostSpec
28 from ceph.utils import datetime_to_str, str_to_datetime
29
30 from mgr_module import MgrModule, CLICommand, HandleCommandResult
31
32 try:
33 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
34 Type, Sequence, Dict, cast
35 except ImportError:
36 pass
37
38 logger = logging.getLogger(__name__)
39
40 T = TypeVar('T')
41
42
43 class OrchestratorError(Exception):
44 """
45 General orchestrator specific error.
46
47 Used for deployment, configuration or user errors.
48
49 It's not intended for programming errors or orchestrator internal errors.
50 """
51
52 def __init__(self,
53 msg: str,
54 errno: int = -errno.EINVAL,
55 event_kind_subject: Optional[Tuple[str, str]] = None):
56 super(Exception, self).__init__(msg)
57 self.errno = errno
58 # See OrchestratorEvent.subject
59 self.event_subject = event_kind_subject
60
61
62 class NoOrchestrator(OrchestratorError):
63 """
64 No orchestrator in configured.
65 """
66
67 def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"):
68 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
69
70
71 class OrchestratorValidationError(OrchestratorError):
72 """
73 Raised when an orchestrator doesn't support a specific feature.
74 """
75
76
77 @contextmanager
78 def set_exception_subject(kind, subject, overwrite=False):
79 try:
80 yield
81 except OrchestratorError as e:
82 if overwrite or hasattr(e, 'event_subject'):
83 e.event_subject = (kind, subject)
84 raise
85
86
87 def handle_exception(prefix, cmd_args, desc, perm, func):
88 @wraps(func)
89 def wrapper(*args, **kwargs):
90 try:
91 return func(*args, **kwargs)
92 except (OrchestratorError, ServiceSpecValidationError) as e:
93 # Do not print Traceback for expected errors.
94 return HandleCommandResult(e.errno, stderr=str(e))
95 except ImportError as e:
96 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
97 except NotImplementedError:
98 msg = 'This Orchestrator does not support `{}`'.format(prefix)
99 return HandleCommandResult(-errno.ENOENT, stderr=msg)
100
101 # misuse partial to copy `wrapper`
102 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs)
103 wrapper_copy._prefix = prefix # type: ignore
104 wrapper_copy._cli_command = CLICommand(prefix, cmd_args, desc, perm) # type: ignore
105 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
106
107 return wrapper_copy
108
109
110 def _cli_command(perm):
111 def inner_cli_command(prefix, cmd_args="", desc=""):
112 return lambda func: handle_exception(prefix, cmd_args, desc, perm, func)
113 return inner_cli_command
114
115
116 _cli_read_command = _cli_command('r')
117 _cli_write_command = _cli_command('rw')
118
119
120 class CLICommandMeta(type):
121 """
122 This is a workaround for the use of a global variable CLICommand.COMMANDS which
123 prevents modules from importing any other module.
124
125 We make use of CLICommand, except for the use of the global variable.
126 """
127 def __init__(cls, name, bases, dct):
128 super(CLICommandMeta, cls).__init__(name, bases, dct)
129 dispatch: Dict[str, CLICommand] = {}
130 for v in dct.values():
131 try:
132 dispatch[v._prefix] = v._cli_command
133 except AttributeError:
134 pass
135
136 def handle_command(self, inbuf, cmd):
137 if cmd['prefix'] not in dispatch:
138 return self.handle_command(inbuf, cmd)
139
140 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
141
142 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
143 cls.handle_command = handle_command
144
145
146 def _no_result():
147 return object()
148
149
150 class _Promise(object):
151 """
152 A completion may need multiple promises to be fulfilled. `_Promise` is one
153 step.
154
155 Typically ``Orchestrator`` implementations inherit from this class to
156 build their own way of finishing a step to fulfil a future.
157
158 They are not exposed in the orchestrator interface and can be seen as a
159 helper to build orchestrator modules.
160 """
161 INITIALIZED = 1 # We have a parent completion and a next completion
162 RUNNING = 2
163 FINISHED = 3 # we have a final result
164
165 NO_RESULT: None = _no_result()
166 ASYNC_RESULT = object()
167
168 def __init__(self,
169 _first_promise: Optional["_Promise"] = None,
170 value: Optional[Any] = NO_RESULT,
171 on_complete: Optional[Callable] = None,
172 name: Optional[str] = None,
173 ):
174 self._on_complete_ = on_complete
175 self._name = name
176 self._next_promise: Optional[_Promise] = None
177
178 self._state = self.INITIALIZED
179 self._exception: Optional[Exception] = None
180
181 # Value of this _Promise. may be an intermediate result.
182 self._value = value
183
184 # _Promise is not a continuation monad, as `_result` is of type
185 # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
186 self._first_promise: '_Promise' = _first_promise or self
187
188 @property
189 def _exception(self) -> Optional[Exception]:
190 return getattr(self, '_exception_', None)
191
192 @_exception.setter
193 def _exception(self, e):
194 self._exception_ = e
195 try:
196 self._serialized_exception_ = pickle.dumps(e) if e is not None else None
197 except pickle.PicklingError:
198 logger.error(f"failed to pickle {e}")
199 if isinstance(e, Exception):
200 e = Exception(*e.args)
201 else:
202 e = Exception(str(e))
203 # degenerate to a plain Exception
204 self._serialized_exception_ = pickle.dumps(e)
205
206 @property
207 def _serialized_exception(self) -> Optional[bytes]:
208 return getattr(self, '_serialized_exception_', None)
209
210 @property
211 def _on_complete(self) -> Optional[Callable]:
212 # https://github.com/python/mypy/issues/4125
213 return self._on_complete_
214
215 @_on_complete.setter
216 def _on_complete(self, val: Optional[Callable]) -> None:
217 self._on_complete_ = val
218
219 def __repr__(self):
220 name = self._name or getattr(self._on_complete, '__name__',
221 '??') if self._on_complete else 'None'
222 val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
223 return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
224 self.__class__, self._state, val, self._on_complete, id(self), name, getattr(
225 next, '_progress_reference', 'NA'), repr(self._next_promise)
226 )
227
228 def pretty_print_1(self):
229 if self._name:
230 name = self._name
231 elif self._on_complete is None:
232 name = 'lambda x: x'
233 elif hasattr(self._on_complete, '__name__'):
234 name = getattr(self._on_complete, '__name__')
235 else:
236 name = self._on_complete.__class__.__name__
237 val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...'
238 prefix = {
239 self.INITIALIZED: ' ',
240 self.RUNNING: ' >>>',
241 self.FINISHED: '(done)'
242 }[self._state]
243 return '{} {}({}),'.format(prefix, name, val)
244
245 def then(self: Any, on_complete: Callable) -> Any:
246 """
247 Call ``on_complete`` as soon as this promise is finalized.
248 """
249 assert self._state in (self.INITIALIZED, self.RUNNING)
250
251 if self._next_promise is not None:
252 return self._next_promise.then(on_complete)
253
254 if self._on_complete is not None:
255 self._set_next_promise(self.__class__(
256 _first_promise=self._first_promise,
257 on_complete=on_complete
258 ))
259 return self._next_promise
260
261 else:
262 self._on_complete = on_complete
263 self._set_next_promise(self.__class__(_first_promise=self._first_promise))
264 return self._next_promise
265
266 def _set_next_promise(self, next: '_Promise') -> None:
267 assert self is not next
268 assert self._state in (self.INITIALIZED, self.RUNNING)
269
270 self._next_promise = next
271 assert self._next_promise is not None
272 for p in iter(self._next_promise):
273 p._first_promise = self._first_promise
274
275 def _finalize(self, value=NO_RESULT):
276 """
277 Sets this promise to complete.
278
279 Orchestrators may choose to use this helper function.
280
281 :param value: new value.
282 """
283 if self._state not in (self.INITIALIZED, self.RUNNING):
284 raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
285
286 self._state = self.RUNNING
287
288 if value is not self.NO_RESULT:
289 self._value = value
290 assert self._value is not self.NO_RESULT, repr(self)
291
292 if self._on_complete:
293 try:
294 next_result = self._on_complete(self._value)
295 except Exception as e:
296 self.fail(e)
297 return
298 else:
299 next_result = self._value
300
301 if isinstance(next_result, _Promise):
302 # hack: _Promise is not a continuation monad.
303 next_result = next_result._first_promise # type: ignore
304 assert next_result not in self, repr(self._first_promise) + repr(next_result)
305 assert self not in next_result
306 next_result._append_promise(self._next_promise)
307 self._set_next_promise(next_result)
308 assert self._next_promise
309 if self._next_promise._value is self.NO_RESULT:
310 self._next_promise._value = self._value
311 self.propagate_to_next()
312 elif next_result is not self.ASYNC_RESULT:
313 # simple map. simply forward
314 if self._next_promise:
315 self._next_promise._value = next_result
316 else:
317 # Hack: next_result is of type U, _value is of type T
318 self._value = next_result # type: ignore
319 self.propagate_to_next()
320 else:
321 # asynchronous promise
322 pass
323
324 def propagate_to_next(self):
325 self._state = self.FINISHED
326 logger.debug('finalized {}'.format(repr(self)))
327 if self._next_promise:
328 self._next_promise._finalize()
329
330 def fail(self, e: Exception) -> None:
331 """
332 Sets the whole completion to be faild with this exception and end the
333 evaluation.
334 """
335 if self._state == self.FINISHED:
336 raise ValueError(
337 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e)))
338 assert self._state in (self.INITIALIZED, self.RUNNING)
339 logger.exception('_Promise failed')
340 self._exception = e
341 self._value = f'_exception: {e}'
342 if self._next_promise:
343 self._next_promise.fail(e)
344 self._state = self.FINISHED
345
346 def __contains__(self, item):
347 return any(item is p for p in iter(self._first_promise))
348
349 def __iter__(self):
350 yield self
351 elem = self._next_promise
352 while elem is not None:
353 yield elem
354 elem = elem._next_promise
355
356 def _append_promise(self, other):
357 if other is not None:
358 assert self not in other
359 assert other not in self
360 self._last_promise()._set_next_promise(other)
361
362 def _last_promise(self) -> '_Promise':
363 return list(iter(self))[-1]
364
365
366 class ProgressReference(object):
367 def __init__(self,
368 message: str,
369 mgr,
370 completion: Optional[Callable[[], 'Completion']] = None
371 ):
372 """
373 ProgressReference can be used within Completions::
374
375 +---------------+ +---------------------------------+
376 | | then | |
377 | My Completion | +--> | on_complete=ProgressReference() |
378 | | | |
379 +---------------+ +---------------------------------+
380
381 See :func:`Completion.with_progress` for an easy way to create
382 a progress reference
383
384 """
385 super(ProgressReference, self).__init__()
386 self.progress_id = str(uuid.uuid4())
387 self.message = message
388 self.mgr = mgr
389
390 #: The completion can already have a result, before the write
391 #: operation is effective. progress == 1 means, the services are
392 #: created / removed.
393 self.completion: Optional[Callable[[], Completion]] = completion
394
395 #: if a orchestrator module can provide a more detailed
396 #: progress information, it needs to also call ``progress.update()``.
397 self.progress = 0.0
398
399 self._completion_has_result = False
400 self.mgr.all_progress_references.append(self)
401
402 def __str__(self):
403 """
404 ``__str__()`` is used for determining the message for progress events.
405 """
406 return self.message or super(ProgressReference, self).__str__()
407
408 def __call__(self, arg):
409 self._completion_has_result = True
410 self.progress = 1.0
411 return arg
412
413 @property
414 def progress(self):
415 return self._progress
416
417 @progress.setter
418 def progress(self, progress):
419 assert progress <= 1.0
420 self._progress = progress
421 try:
422 if self.effective:
423 self.mgr.remote("progress", "complete", self.progress_id)
424 self.mgr.all_progress_references = [
425 p for p in self.mgr.all_progress_references if p is not self]
426 else:
427 self.mgr.remote("progress", "update", self.progress_id, self.message,
428 progress,
429 [("origin", "orchestrator")])
430 except ImportError:
431 # If the progress module is disabled that's fine,
432 # they just won't see the output.
433 pass
434
435 @property
436 def effective(self):
437 return self.progress == 1 and self._completion_has_result
438
439 def update(self):
440 def progress_run(progress):
441 self.progress = progress
442 if self.completion:
443 c = self.completion().then(progress_run)
444 self.mgr.process([c._first_promise])
445 else:
446 self.progress = 1
447
448 def fail(self):
449 self._completion_has_result = True
450 self.progress = 1
451
452
453 class Completion(_Promise, Generic[T]):
454 """
455 Combines multiple promises into one overall operation.
456
457 Completions are composable by being able to
458 call one completion from another completion. I.e. making them re-usable
459 using Promises E.g.::
460
461 >>> #doctest: +SKIP
462 ... return Orchestrator().get_hosts().then(self._create_osd)
463
464 where ``get_hosts`` returns a Completion of list of hosts and
465 ``_create_osd`` takes a list of hosts.
466
467 The concept behind this is to store the computation steps
468 explicit and then explicitly evaluate the chain:
469
470 >>> #doctest: +SKIP
471 ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
472 ... p.finalize(2)
473 ... assert p.result = "4"
474
475 or graphically::
476
477 +---------------+ +-----------------+
478 | | then | |
479 | lambda x: x*x | +--> | lambda x: str(x)|
480 | | | |
481 +---------------+ +-----------------+
482
483 """
484
485 def __init__(self,
486 _first_promise: Optional["Completion"] = None,
487 value: Any = _Promise.NO_RESULT,
488 on_complete: Optional[Callable] = None,
489 name: Optional[str] = None,
490 ):
491 super(Completion, self).__init__(_first_promise, value, on_complete, name)
492
493 @property
494 def _progress_reference(self) -> Optional[ProgressReference]:
495 if hasattr(self._on_complete, 'progress_id'):
496 return self._on_complete # type: ignore
497 return None
498
499 @property
500 def progress_reference(self) -> Optional[ProgressReference]:
501 """
502 ProgressReference. Marks this completion
503 as a write completeion.
504 """
505
506 references = [c._progress_reference for c in iter(
507 self) if c._progress_reference is not None]
508 if references:
509 assert len(references) == 1
510 return references[0]
511 return None
512
513 @classmethod
514 def with_progress(cls: Any,
515 message: str,
516 mgr,
517 _first_promise: Optional["Completion"] = None,
518 value: Any = _Promise.NO_RESULT,
519 on_complete: Optional[Callable] = None,
520 calc_percent: Optional[Callable[[], Any]] = None
521 ) -> Any:
522
523 c = cls(
524 _first_promise=_first_promise,
525 value=value,
526 on_complete=on_complete
527 ).add_progress(message, mgr, calc_percent)
528
529 return c._first_promise
530
531 def add_progress(self,
532 message: str,
533 mgr,
534 calc_percent: Optional[Callable[[], Any]] = None
535 ):
536 return self.then(
537 on_complete=ProgressReference(
538 message=message,
539 mgr=mgr,
540 completion=calc_percent
541 )
542 )
543
544 def fail(self, e: Exception):
545 super(Completion, self).fail(e)
546 if self._progress_reference:
547 self._progress_reference.fail()
548
549 def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT):
550 if self._first_promise._state == self.INITIALIZED:
551 self._first_promise._finalize(result)
552
553 @property
554 def result(self) -> T:
555 """
556 The result of the operation that we were waited
557 for. Only valid after calling Orchestrator.process() on this
558 completion.
559 """
560 last = self._last_promise()
561 assert last._state == _Promise.FINISHED
562 return cast(T, last._value)
563
564 def result_str(self) -> str:
565 """Force a string."""
566 if self.result is None:
567 return ''
568 if isinstance(self.result, list):
569 return '\n'.join(str(x) for x in self.result)
570 return str(self.result)
571
572 @property
573 def exception(self) -> Optional[Exception]:
574 return self._last_promise()._exception
575
576 @property
577 def serialized_exception(self) -> Optional[bytes]:
578 return self._last_promise()._serialized_exception
579
580 @property
581 def has_result(self) -> bool:
582 """
583 Has the operation already a result?
584
585 For Write operations, it can already have a
586 result, if the orchestrator's configuration is
587 persistently written. Typically this would
588 indicate that an update had been written to
589 a manifest, but that the update had not
590 necessarily been pushed out to the cluster.
591
592 :return:
593 """
594 return self._last_promise()._state == _Promise.FINISHED
595
596 @property
597 def is_errored(self) -> bool:
598 """
599 Has the completion failed. Default implementation looks for
600 self.exception. Can be overwritten.
601 """
602 return self.exception is not None
603
604 @property
605 def needs_result(self) -> bool:
606 """
607 Could the external operation be deemed as complete,
608 or should we wait?
609 We must wait for a read operation only if it is not complete.
610 """
611 return not self.is_errored and not self.has_result
612
613 @property
614 def is_finished(self) -> bool:
615 """
616 Could the external operation be deemed as complete,
617 or should we wait?
618 We must wait for a read operation only if it is not complete.
619 """
620 return self.is_errored or (self.has_result)
621
622 def pretty_print(self):
623
624 reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise))
625 return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
626
627
628 def pretty_print(completions: Sequence[Completion]) -> str:
629 return ', '.join(c.pretty_print() for c in completions)
630
631
632 def raise_if_exception(c: Completion) -> None:
633 """
634 :raises OrchestratorError: Some user error or a config error.
635 :raises Exception: Some internal error
636 """
637 if c.serialized_exception is not None:
638 try:
639 e = pickle.loads(c.serialized_exception)
640 except (KeyError, AttributeError):
641 raise Exception('{}: {}'.format(type(c.exception), c.exception))
642 raise e
643
644
645 class TrivialReadCompletion(Completion[T]):
646 """
647 This is the trivial completion simply wrapping a result.
648 """
649
650 def __init__(self, result: T):
651 super(TrivialReadCompletion, self).__init__()
652 if result:
653 self.finalize(result)
654
655
656 def _hide_in_features(f):
657 f._hide_in_features = True
658 return f
659
660
661 class Orchestrator(object):
662 """
663 Calls in this class may do long running remote operations, with time
664 periods ranging from network latencies to package install latencies and large
665 internet downloads. For that reason, all are asynchronous, and return
666 ``Completion`` objects.
667
668 Methods should only return the completion and not directly execute
669 anything, like network calls. Otherwise the purpose of
670 those completions is defeated.
671
672 Implementations are not required to start work on an operation until
673 the caller waits on the relevant Completion objects. Callers making
674 multiple updates should not wait on Completions until they're done
675 sending operations: this enables implementations to batch up a series
676 of updates when wait() is called on a set of Completion objects.
677
678 Implementations are encouraged to keep reasonably fresh caches of
679 the status of the system: it is better to serve a stale-but-recent
680 result read of e.g. device inventory than it is to keep the caller waiting
681 while you scan hosts every time.
682 """
683
684 @_hide_in_features
685 def is_orchestrator_module(self):
686 """
687 Enable other modules to interrogate this module to discover
688 whether it's usable as an orchestrator module.
689
690 Subclasses do not need to override this.
691 """
692 return True
693
694 @_hide_in_features
695 def available(self) -> Tuple[bool, str]:
696 """
697 Report whether we can talk to the orchestrator. This is the
698 place to give the user a meaningful message if the orchestrator
699 isn't running or can't be contacted.
700
701 This method may be called frequently (e.g. every page load
702 to conditionally display a warning banner), so make sure it's
703 not too expensive. It's okay to give a slightly stale status
704 (e.g. based on a periodic background ping of the orchestrator)
705 if that's necessary to make this method fast.
706
707 .. note::
708 `True` doesn't mean that the desired functionality
709 is actually available in the orchestrator. I.e. this
710 won't work as expected::
711
712 >>> #doctest: +SKIP
713 ... if OrchestratorClientMixin().available()[0]: # wrong.
714 ... OrchestratorClientMixin().get_hosts()
715
716 :return: two-tuple of boolean, string
717 """
718 raise NotImplementedError()
719
720 @_hide_in_features
721 def process(self, completions: List[Completion]) -> None:
722 """
723 Given a list of Completion instances, process any which are
724 incomplete.
725
726 Callers should inspect the detail of each completion to identify
727 partial completion/progress information, and present that information
728 to the user.
729
730 This method should not block, as this would make it slow to query
731 a status, while other long running operations are in progress.
732 """
733 raise NotImplementedError()
734
735 @_hide_in_features
736 def get_feature_set(self):
737 """Describes which methods this orchestrator implements
738
739 .. note::
740 `True` doesn't mean that the desired functionality
741 is actually possible in the orchestrator. I.e. this
742 won't work as expected::
743
744 >>> #doctest: +SKIP
745 ... api = OrchestratorClientMixin()
746 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
747 ... api.get_hosts()
748
749 It's better to ask for forgiveness instead::
750
751 >>> #doctest: +SKIP
752 ... try:
753 ... OrchestratorClientMixin().get_hosts()
754 ... except (OrchestratorError, NotImplementedError):
755 ... ...
756
757 :returns: Dict of API method names to ``{'available': True or False}``
758 """
759 module = self.__class__
760 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
761 for a in Orchestrator.__dict__
762 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
763 }
764 return features
765
766 def cancel_completions(self) -> None:
767 """
768 Cancels ongoing completions. Unstuck the mgr.
769 """
770 raise NotImplementedError()
771
772 def pause(self) -> None:
773 raise NotImplementedError()
774
775 def resume(self) -> None:
776 raise NotImplementedError()
777
778 def add_host(self, host_spec: HostSpec) -> Completion[str]:
779 """
780 Add a host to the orchestrator inventory.
781
782 :param host: hostname
783 """
784 raise NotImplementedError()
785
786 def remove_host(self, host: str) -> Completion[str]:
787 """
788 Remove a host from the orchestrator inventory.
789
790 :param host: hostname
791 """
792 raise NotImplementedError()
793
794 def update_host_addr(self, host: str, addr: str) -> Completion[str]:
795 """
796 Update a host's address
797
798 :param host: hostname
799 :param addr: address (dns name or IP)
800 """
801 raise NotImplementedError()
802
803 def get_hosts(self) -> Completion[List[HostSpec]]:
804 """
805 Report the hosts in the cluster.
806
807 :return: list of HostSpec
808 """
809 raise NotImplementedError()
810
811 def add_host_label(self, host: str, label: str) -> Completion[str]:
812 """
813 Add a host label
814 """
815 raise NotImplementedError()
816
817 def remove_host_label(self, host: str, label: str) -> Completion[str]:
818 """
819 Remove a host label
820 """
821 raise NotImplementedError()
822
823 def host_ok_to_stop(self, hostname: str) -> Completion:
824 """
825 Check if the specified host can be safely stopped without reducing availability
826
827 :param host: hostname
828 """
829 raise NotImplementedError()
830
831 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]:
832 """
833 Returns something that was created by `ceph-volume inventory`.
834
835 :return: list of InventoryHost
836 """
837 raise NotImplementedError()
838
839 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> Completion[List['ServiceDescription']]:
840 """
841 Describe a service (of any kind) that is already configured in
842 the orchestrator. For example, when viewing an OSD in the dashboard
843 we might like to also display information about the orchestrator's
844 view of the service (like the kubernetes pod ID).
845
846 When viewing a CephFS filesystem in the dashboard, we would use this
847 to display the pods being currently run for MDS daemons.
848
849 :return: list of ServiceDescription objects.
850 """
851 raise NotImplementedError()
852
853 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> Completion[List['DaemonDescription']]:
854 """
855 Describe a daemon (of any kind) that is already configured in
856 the orchestrator.
857
858 :return: list of DaemonDescription objects.
859 """
860 raise NotImplementedError()
861
862 def apply(self, specs: List["GenericSpec"]) -> Completion[List[str]]:
863 """
864 Applies any spec
865 """
866 fns: Dict[str, function] = {
867 'alertmanager': self.apply_alertmanager,
868 'crash': self.apply_crash,
869 'grafana': self.apply_grafana,
870 'iscsi': self.apply_iscsi,
871 'mds': self.apply_mds,
872 'mgr': self.apply_mgr,
873 'mon': self.apply_mon,
874 'nfs': self.apply_nfs,
875 'node-exporter': self.apply_node_exporter,
876 'osd': lambda dg: self.apply_drivegroups([dg]),
877 'prometheus': self.apply_prometheus,
878 'rbd-mirror': self.apply_rbd_mirror,
879 'rgw': self.apply_rgw,
880 'host': self.add_host,
881 }
882
883 def merge(ls, r):
884 if isinstance(ls, list):
885 return ls + [r]
886 return [ls, r]
887
888 spec, *specs = specs
889
890 fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
891 completion = fn(spec)
892 for s in specs:
893 def next(ls):
894 fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
895 return fn(s).then(lambda r: merge(ls, r))
896 completion = completion.then(next)
897 return completion
898
899 def plan(self, spec: List["GenericSpec"]) -> Completion[List]:
900 """
901 Plan (Dry-run, Preview) a List of Specs.
902 """
903 raise NotImplementedError()
904
905 def remove_daemons(self, names: List[str]) -> Completion[List[str]]:
906 """
907 Remove specific daemon(s).
908
909 :return: None
910 """
911 raise NotImplementedError()
912
913 def remove_service(self, service_name: str) -> Completion[str]:
914 """
915 Remove a service (a collection of daemons).
916
917 :return: None
918 """
919 raise NotImplementedError()
920
921 def service_action(self, action: str, service_name: str) -> Completion[List[str]]:
922 """
923 Perform an action (start/stop/reload) on a service (i.e., all daemons
924 providing the logical service).
925
926 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
927 :param service_name: service_type + '.' + service_id
928 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
929 :rtype: Completion
930 """
931 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
932 raise NotImplementedError()
933
934 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> Completion[str]:
935 """
936 Perform an action (start/stop/reload) on a daemon.
937
938 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
939 :param daemon_name: name of daemon
940 :param image: Container image when redeploying that daemon
941 :rtype: Completion
942 """
943 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
944 raise NotImplementedError()
945
946 def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]:
947 """
948 Create one or more OSDs within a single Drive Group.
949
950 The principal argument here is the drive_group member
951 of OsdSpec: other fields are advisory/extensible for any
952 finer-grained OSD feature enablement (choice of backing store,
953 compression/encryption, etc).
954 """
955 raise NotImplementedError()
956
957 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion[List[str]]:
958 """ Update OSD cluster """
959 raise NotImplementedError()
960
961 def set_unmanaged_flag(self,
962 unmanaged_flag: bool,
963 service_type: str = 'osd',
964 service_name=None
965 ) -> HandleCommandResult:
966 raise NotImplementedError()
967
968 def preview_osdspecs(self,
969 osdspec_name: Optional[str] = 'osd',
970 osdspecs: Optional[List[DriveGroupSpec]] = None
971 ) -> Completion[str]:
972 """ Get a preview for OSD deployments """
973 raise NotImplementedError()
974
975 def remove_osds(self, osd_ids: List[str],
976 replace: bool = False,
977 force: bool = False) -> Completion[str]:
978 """
979 :param osd_ids: list of OSD IDs
980 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
981 :param force: Forces the OSD removal process without waiting for the data to be drained first.
982 Note that this can only remove OSDs that were successfully
983 created (i.e. got an OSD ID).
984 """
985 raise NotImplementedError()
986
987 def stop_remove_osds(self, osd_ids: List[str]) -> Completion:
988 """
989 TODO
990 """
991 raise NotImplementedError()
992
993 def remove_osds_status(self) -> Completion:
994 """
995 Returns a status of the ongoing OSD removal operations.
996 """
997 raise NotImplementedError()
998
999 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> Completion[List[str]]:
1000 """
1001 Instructs the orchestrator to enable or disable either the ident or the fault LED.
1002
1003 :param ident_fault: either ``"ident"`` or ``"fault"``
1004 :param on: ``True`` = on.
1005 :param locations: See :class:`orchestrator.DeviceLightLoc`
1006 """
1007 raise NotImplementedError()
1008
1009 def zap_device(self, host: str, path: str) -> Completion[str]:
1010 """Zap/Erase a device (DESTROYS DATA)"""
1011 raise NotImplementedError()
1012
1013 def add_mon(self, spec: ServiceSpec) -> Completion[List[str]]:
1014 """Create mon daemon(s)"""
1015 raise NotImplementedError()
1016
1017 def apply_mon(self, spec: ServiceSpec) -> Completion[str]:
1018 """Update mon cluster"""
1019 raise NotImplementedError()
1020
1021 def add_mgr(self, spec: ServiceSpec) -> Completion[List[str]]:
1022 """Create mgr daemon(s)"""
1023 raise NotImplementedError()
1024
1025 def apply_mgr(self, spec: ServiceSpec) -> Completion[str]:
1026 """Update mgr cluster"""
1027 raise NotImplementedError()
1028
1029 def add_mds(self, spec: ServiceSpec) -> Completion[List[str]]:
1030 """Create MDS daemon(s)"""
1031 raise NotImplementedError()
1032
1033 def apply_mds(self, spec: ServiceSpec) -> Completion[str]:
1034 """Update MDS cluster"""
1035 raise NotImplementedError()
1036
1037 def add_rgw(self, spec: RGWSpec) -> Completion[List[str]]:
1038 """Create RGW daemon(s)"""
1039 raise NotImplementedError()
1040
1041 def apply_rgw(self, spec: RGWSpec) -> Completion[str]:
1042 """Update RGW cluster"""
1043 raise NotImplementedError()
1044
1045 def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]:
1046 """Create rbd-mirror daemon(s)"""
1047 raise NotImplementedError()
1048
1049 def apply_rbd_mirror(self, spec: ServiceSpec) -> Completion[str]:
1050 """Update rbd-mirror cluster"""
1051 raise NotImplementedError()
1052
1053 def add_nfs(self, spec: NFSServiceSpec) -> Completion[List[str]]:
1054 """Create NFS daemon(s)"""
1055 raise NotImplementedError()
1056
1057 def apply_nfs(self, spec: NFSServiceSpec) -> Completion[str]:
1058 """Update NFS cluster"""
1059 raise NotImplementedError()
1060
1061 def add_iscsi(self, spec: IscsiServiceSpec) -> Completion[List[str]]:
1062 """Create iscsi daemon(s)"""
1063 raise NotImplementedError()
1064
1065 def apply_iscsi(self, spec: IscsiServiceSpec) -> Completion[str]:
1066 """Update iscsi cluster"""
1067 raise NotImplementedError()
1068
1069 def add_prometheus(self, spec: ServiceSpec) -> Completion[List[str]]:
1070 """Create new prometheus daemon"""
1071 raise NotImplementedError()
1072
1073 def apply_prometheus(self, spec: ServiceSpec) -> Completion[str]:
1074 """Update prometheus cluster"""
1075 raise NotImplementedError()
1076
1077 def add_node_exporter(self, spec: ServiceSpec) -> Completion[List[str]]:
1078 """Create a new Node-Exporter service"""
1079 raise NotImplementedError()
1080
1081 def apply_node_exporter(self, spec: ServiceSpec) -> Completion[str]:
1082 """Update existing a Node-Exporter daemon(s)"""
1083 raise NotImplementedError()
1084
1085 def add_crash(self, spec: ServiceSpec) -> Completion[List[str]]:
1086 """Create a new crash service"""
1087 raise NotImplementedError()
1088
1089 def apply_crash(self, spec: ServiceSpec) -> Completion[str]:
1090 """Update existing a crash daemon(s)"""
1091 raise NotImplementedError()
1092
1093 def add_grafana(self, spec: ServiceSpec) -> Completion[List[str]]:
1094 """Create a new Node-Exporter service"""
1095 raise NotImplementedError()
1096
1097 def apply_grafana(self, spec: ServiceSpec) -> Completion[str]:
1098 """Update existing a Node-Exporter daemon(s)"""
1099 raise NotImplementedError()
1100
1101 def add_alertmanager(self, spec: ServiceSpec) -> Completion[List[str]]:
1102 """Create a new AlertManager service"""
1103 raise NotImplementedError()
1104
1105 def apply_alertmanager(self, spec: ServiceSpec) -> Completion[str]:
1106 """Update an existing AlertManager daemon(s)"""
1107 raise NotImplementedError()
1108
1109 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
1110 raise NotImplementedError()
1111
1112 def upgrade_start(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
1113 raise NotImplementedError()
1114
1115 def upgrade_pause(self) -> Completion[str]:
1116 raise NotImplementedError()
1117
1118 def upgrade_resume(self) -> Completion[str]:
1119 raise NotImplementedError()
1120
1121 def upgrade_stop(self) -> Completion[str]:
1122 raise NotImplementedError()
1123
1124 def upgrade_status(self) -> Completion['UpgradeStatusSpec']:
1125 """
1126 If an upgrade is currently underway, report on where
1127 we are in the process, or if some error has occurred.
1128
1129 :return: UpgradeStatusSpec instance
1130 """
1131 raise NotImplementedError()
1132
1133 @_hide_in_features
1134 def upgrade_available(self) -> Completion:
1135 """
1136 Report on what versions are available to upgrade to
1137
1138 :return: List of strings
1139 """
1140 raise NotImplementedError()
1141
1142
1143 GenericSpec = Union[ServiceSpec, HostSpec]
1144
1145
1146 def json_to_generic_spec(spec: dict) -> GenericSpec:
1147 if 'service_type' in spec and spec['service_type'] == 'host':
1148 return HostSpec.from_json(spec)
1149 else:
1150 return ServiceSpec.from_json(spec)
1151
1152
1153 class UpgradeStatusSpec(object):
1154 # Orchestrator's report on what's going on with any ongoing upgrade
1155 def __init__(self):
1156 self.in_progress = False # Is an upgrade underway?
1157 self.target_image = None
1158 self.services_complete = [] # Which daemon types are fully updated?
1159 self.message = "" # Freeform description
1160
1161
1162 def handle_type_error(method):
1163 @wraps(method)
1164 def inner(cls, *args, **kwargs):
1165 try:
1166 return method(cls, *args, **kwargs)
1167 except TypeError as e:
1168 error_msg = '{}: {}'.format(cls.__name__, e)
1169 raise OrchestratorValidationError(error_msg)
1170 return inner
1171
1172
1173 class DaemonDescription(object):
1174 """
1175 For responding to queries about the status of a particular daemon,
1176 stateful or stateless.
1177
1178 This is not about health or performance monitoring of daemons: it's
1179 about letting the orchestrator tell Ceph whether and where a
1180 daemon is scheduled in the cluster. When an orchestrator tells
1181 Ceph "it's running on host123", that's not a promise that the process
1182 is literally up this second, it's a description of where the orchestrator
1183 has decided the daemon should run.
1184 """
1185
1186 def __init__(self,
1187 daemon_type=None,
1188 daemon_id=None,
1189 hostname=None,
1190 container_id=None,
1191 container_image_id=None,
1192 container_image_name=None,
1193 version=None,
1194 status=None,
1195 status_desc=None,
1196 last_refresh=None,
1197 created=None,
1198 started=None,
1199 last_configured=None,
1200 osdspec_affinity=None,
1201 last_deployed=None,
1202 events: Optional[List['OrchestratorEvent']] = None,
1203 is_active: bool = False):
1204
1205 # Host is at the same granularity as InventoryHost
1206 self.hostname: str = hostname
1207
1208 # Not everyone runs in containers, but enough people do to
1209 # justify having the container_id (runtime id) and container_image
1210 # (image name)
1211 self.container_id = container_id # runtime id
1212 self.container_image_id = container_image_id # image hash
1213 self.container_image_name = container_image_name # image friendly name
1214
1215 # The type of service (osd, mon, mgr, etc.)
1216 self.daemon_type = daemon_type
1217
1218 # The orchestrator will have picked some names for daemons,
1219 # typically either based on hostnames or on pod names.
1220 # This is the <foo> in mds.<foo>, the ID that will appear
1221 # in the FSMap/ServiceMap.
1222 self.daemon_id: str = daemon_id
1223
1224 # Service version that was deployed
1225 self.version = version
1226
1227 # Service status: -1 error, 0 stopped, 1 running
1228 self.status = status
1229
1230 # Service status description when status == -1.
1231 self.status_desc = status_desc
1232
1233 # datetime when this info was last refreshed
1234 self.last_refresh: Optional[datetime.datetime] = last_refresh
1235
1236 self.created: Optional[datetime.datetime] = created
1237 self.started: Optional[datetime.datetime] = started
1238 self.last_configured: Optional[datetime.datetime] = last_configured
1239 self.last_deployed: Optional[datetime.datetime] = last_deployed
1240
1241 # Affinity to a certain OSDSpec
1242 self.osdspec_affinity: Optional[str] = osdspec_affinity
1243
1244 self.events: List[OrchestratorEvent] = events or []
1245
1246 self.is_active = is_active
1247
1248 def name(self):
1249 return '%s.%s' % (self.daemon_type, self.daemon_id)
1250
1251 def matches_service(self, service_name: Optional[str]) -> bool:
1252 if service_name:
1253 return self.name().startswith(service_name + '.')
1254 return False
1255
1256 def service_id(self):
1257 if self.daemon_type == 'osd' and self.osdspec_affinity:
1258 return self.osdspec_affinity
1259
1260 def _match():
1261 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
1262 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
1263
1264 if not self.hostname:
1265 # TODO: can a DaemonDescription exist without a hostname?
1266 raise err
1267
1268 # use the bare hostname, not the FQDN.
1269 host = self.hostname.split('.')[0]
1270
1271 if host == self.daemon_id:
1272 # daemon_id == "host"
1273 return self.daemon_id
1274
1275 elif host in self.daemon_id:
1276 # daemon_id == "service_id.host"
1277 # daemon_id == "service_id.host.random"
1278 pre, post = self.daemon_id.rsplit(host, 1)
1279 if not pre.endswith('.'):
1280 # '.' sep missing at front of host
1281 raise err
1282 elif post and not post.startswith('.'):
1283 # '.' sep missing at end of host
1284 raise err
1285 return pre[:-1]
1286
1287 # daemon_id == "service_id.random"
1288 if self.daemon_type == 'rgw':
1289 v = self.daemon_id.split('.')
1290 if len(v) in [3, 4]:
1291 return '.'.join(v[0:2])
1292
1293 if self.daemon_type == 'iscsi':
1294 v = self.daemon_id.split('.')
1295 return '.'.join(v[0:-1])
1296
1297 # daemon_id == "service_id"
1298 return self.daemon_id
1299
1300 if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
1301 return _match()
1302
1303 return self.daemon_id
1304
1305 def service_name(self):
1306 if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
1307 return f'{self.daemon_type}.{self.service_id()}'
1308 return self.daemon_type
1309
1310 def __repr__(self):
1311 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
1312 id=self.daemon_id)
1313
1314 def to_json(self):
1315 out = OrderedDict()
1316 out['daemon_type'] = self.daemon_type
1317 out['daemon_id'] = self.daemon_id
1318 out['hostname'] = self.hostname
1319 out['container_id'] = self.container_id
1320 out['container_image_id'] = self.container_image_id
1321 out['container_image_name'] = self.container_image_name
1322 out['version'] = self.version
1323 out['status'] = self.status
1324 out['status_desc'] = self.status_desc
1325 if self.daemon_type == 'osd':
1326 out['osdspec_affinity'] = self.osdspec_affinity
1327 out['is_active'] = self.is_active
1328
1329 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1330 'last_configured']:
1331 if getattr(self, k):
1332 out[k] = datetime_to_str(getattr(self, k))
1333
1334 if self.events:
1335 out['events'] = [e.to_json() for e in self.events]
1336
1337 empty = [k for k, v in out.items() if v is None]
1338 for e in empty:
1339 del out[e]
1340 return out
1341
1342 @classmethod
1343 @handle_type_error
1344 def from_json(cls, data):
1345 c = data.copy()
1346 event_strs = c.pop('events', [])
1347 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1348 'last_configured']:
1349 if k in c:
1350 c[k] = str_to_datetime(c[k])
1351 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1352 return cls(events=events, **c)
1353
1354 def __copy__(self):
1355 # feel free to change this:
1356 return DaemonDescription.from_json(self.to_json())
1357
1358 @staticmethod
1359 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
1360 return dumper.represent_dict(data.to_json().items())
1361
1362
1363 yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1364
1365
1366 class ServiceDescription(object):
1367 """
1368 For responding to queries about the status of a particular service,
1369 stateful or stateless.
1370
1371 This is not about health or performance monitoring of services: it's
1372 about letting the orchestrator tell Ceph whether and where a
1373 service is scheduled in the cluster. When an orchestrator tells
1374 Ceph "it's running on host123", that's not a promise that the process
1375 is literally up this second, it's a description of where the orchestrator
1376 has decided the service should run.
1377 """
1378
1379 def __init__(self,
1380 spec: ServiceSpec,
1381 container_image_id=None,
1382 container_image_name=None,
1383 rados_config_location=None,
1384 service_url=None,
1385 last_refresh=None,
1386 created=None,
1387 size=0,
1388 running=0,
1389 events: Optional[List['OrchestratorEvent']] = None):
1390 # Not everyone runs in containers, but enough people do to
1391 # justify having the container_image_id (image hash) and container_image
1392 # (image name)
1393 self.container_image_id = container_image_id # image hash
1394 self.container_image_name = container_image_name # image friendly name
1395
1396 # Location of the service configuration when stored in rados
1397 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1398 self.rados_config_location = rados_config_location
1399
1400 # If the service exposes REST-like API, this attribute should hold
1401 # the URL.
1402 self.service_url = service_url
1403
1404 # Number of daemons
1405 self.size = size
1406
1407 # Number of daemons up
1408 self.running = running
1409
1410 # datetime when this info was last refreshed
1411 self.last_refresh: Optional[datetime.datetime] = last_refresh
1412 self.created: Optional[datetime.datetime] = created
1413
1414 self.spec: ServiceSpec = spec
1415
1416 self.events: List[OrchestratorEvent] = events or []
1417
1418 def service_type(self):
1419 return self.spec.service_type
1420
1421 def __repr__(self):
1422 return f"<ServiceDescription of {self.spec.one_line_str()}>"
1423
1424 def to_json(self) -> OrderedDict:
1425 out = self.spec.to_json()
1426 status = {
1427 'container_image_id': self.container_image_id,
1428 'container_image_name': self.container_image_name,
1429 'rados_config_location': self.rados_config_location,
1430 'service_url': self.service_url,
1431 'size': self.size,
1432 'running': self.running,
1433 'last_refresh': self.last_refresh,
1434 'created': self.created,
1435 }
1436 for k in ['last_refresh', 'created']:
1437 if getattr(self, k):
1438 status[k] = datetime_to_str(getattr(self, k))
1439 status = {k: v for (k, v) in status.items() if v is not None}
1440 out['status'] = status
1441 if self.events:
1442 out['events'] = [e.to_json() for e in self.events]
1443 return out
1444
1445 @classmethod
1446 @handle_type_error
1447 def from_json(cls, data: dict):
1448 c = data.copy()
1449 status = c.pop('status', {})
1450 event_strs = c.pop('events', [])
1451 spec = ServiceSpec.from_json(c)
1452
1453 c_status = status.copy()
1454 for k in ['last_refresh', 'created']:
1455 if k in c_status:
1456 c_status[k] = str_to_datetime(c_status[k])
1457 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1458 return cls(spec=spec, events=events, **c_status)
1459
1460 @staticmethod
1461 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
1462 return dumper.represent_dict(data.to_json().items())
1463
1464
1465 yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
1466
1467
1468 class InventoryFilter(object):
1469 """
1470 When fetching inventory, use this filter to avoid unnecessarily
1471 scanning the whole estate.
1472
1473 Typical use: filter by host when presenting UI workflow for configuring
1474 a particular server.
1475 filter by label when not all of estate is Ceph servers,
1476 and we want to only learn about the Ceph servers.
1477 filter by label when we are interested particularly
1478 in e.g. OSD servers.
1479
1480 """
1481
1482 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
1483
1484 #: Optional: get info about hosts matching labels
1485 self.labels = labels
1486
1487 #: Optional: get info about certain named hosts only
1488 self.hosts = hosts
1489
1490
1491 class InventoryHost(object):
1492 """
1493 When fetching inventory, all Devices are groups inside of an
1494 InventoryHost.
1495 """
1496
1497 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
1498 if devices is None:
1499 devices = inventory.Devices([])
1500 if labels is None:
1501 labels = []
1502 assert isinstance(devices, inventory.Devices)
1503
1504 self.name = name # unique within cluster. For example a hostname.
1505 self.addr = addr or name
1506 self.devices = devices
1507 self.labels = labels
1508
1509 def to_json(self):
1510 return {
1511 'name': self.name,
1512 'addr': self.addr,
1513 'devices': self.devices.to_json(),
1514 'labels': self.labels,
1515 }
1516
1517 @classmethod
1518 def from_json(cls, data):
1519 try:
1520 _data = copy.deepcopy(data)
1521 name = _data.pop('name')
1522 addr = _data.pop('addr', None) or name
1523 devices = inventory.Devices.from_json(_data.pop('devices'))
1524 labels = _data.pop('labels', list())
1525 if _data:
1526 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1527 raise OrchestratorValidationError(error_msg)
1528 return cls(name, devices, labels, addr)
1529 except KeyError as e:
1530 error_msg = '{} is required for {}'.format(e, cls.__name__)
1531 raise OrchestratorValidationError(error_msg)
1532 except TypeError as e:
1533 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1534
1535 @classmethod
1536 def from_nested_items(cls, hosts):
1537 devs = inventory.Devices.from_json
1538 return [cls(item[0], devs(item[1].data)) for item in hosts]
1539
1540 def __repr__(self):
1541 return "<InventoryHost>({name})".format(name=self.name)
1542
1543 @staticmethod
1544 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
1545 return [host.name for host in hosts]
1546
1547 def __eq__(self, other):
1548 return self.name == other.name and self.devices == other.devices
1549
1550
1551 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1552 """
1553 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1554 on devices.
1555
1556 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1557
1558 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1559 See ``ceph osd metadata | jq '.[].device_ids'``
1560 """
1561 __slots__ = ()
1562
1563
1564 class OrchestratorEvent:
1565 """
1566 Similar to K8s Events.
1567
1568 Some form of "important" log message attached to something.
1569 """
1570 INFO = 'INFO'
1571 ERROR = 'ERROR'
1572 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1573
1574 def __init__(self, created: Union[str, datetime.datetime], kind, subject, level, message):
1575 if isinstance(created, str):
1576 created = str_to_datetime(created)
1577 self.created: datetime.datetime = created
1578
1579 assert kind in "service daemon".split()
1580 self.kind: str = kind
1581
1582 # service name, or daemon danem or something
1583 self.subject: str = subject
1584
1585 # Events are not meant for debugging. debugs should end in the log.
1586 assert level in "INFO ERROR".split()
1587 self.level = level
1588
1589 self.message: str = message
1590
1591 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1592
1593 def kind_subject(self) -> str:
1594 return f'{self.kind}:{self.subject}'
1595
1596 def to_json(self) -> str:
1597 # Make a long list of events readable.
1598 created = datetime_to_str(self.created)
1599 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1600
1601 @classmethod
1602 @handle_type_error
1603 def from_json(cls, data) -> "OrchestratorEvent":
1604 """
1605 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1606 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1607
1608 :param data:
1609 :return:
1610 """
1611 match = cls.regex_v1.match(data)
1612 if match:
1613 return cls(*match.groups())
1614 raise ValueError(f'Unable to match: "{data}"')
1615
1616 def __eq__(self, other):
1617 if not isinstance(other, OrchestratorEvent):
1618 return False
1619
1620 return self.created == other.created and self.kind == other.kind \
1621 and self.subject == other.subject and self.message == other.message
1622
1623
1624 def _mk_orch_methods(cls):
1625 # Needs to be defined outside of for.
1626 # Otherwise meth is always bound to last key
1627 def shim(method_name):
1628 def inner(self, *args, **kwargs):
1629 completion = self._oremote(method_name, args, kwargs)
1630 return completion
1631 return inner
1632
1633 for meth in Orchestrator.__dict__:
1634 if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
1635 setattr(cls, meth, shim(meth))
1636 return cls
1637
1638
1639 @_mk_orch_methods
1640 class OrchestratorClientMixin(Orchestrator):
1641 """
1642 A module that inherents from `OrchestratorClientMixin` can directly call
1643 all :class:`Orchestrator` methods without manually calling remote.
1644
1645 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1646 calls :func:`OrchestratorClientMixin._oremote`
1647
1648 >>> class MyModule(OrchestratorClientMixin):
1649 ... def func(self):
1650 ... completion = self.add_host('somehost') # calls `_oremote()`
1651 ... self._orchestrator_wait([completion])
1652 ... self.log.debug(completion.result)
1653
1654 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1655 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1656 "real" implementation of the orchestrator.
1657
1658
1659 >>> import mgr_module
1660 >>> #doctest: +SKIP
1661 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1662 ... def __init__(self, ...):
1663 ... self.orch_client = OrchestratorClientMixin()
1664 ... self.orch_client.set_mgr(self.mgr))
1665 """
1666
1667 def set_mgr(self, mgr: MgrModule) -> None:
1668 """
1669 Useable in the Dashbord that uses a global ``mgr``
1670 """
1671
1672 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1673
1674 def __get_mgr(self):
1675 try:
1676 return self.__mgr
1677 except AttributeError:
1678 return self
1679
1680 def _oremote(self, meth, args, kwargs):
1681 """
1682 Helper for invoking `remote` on whichever orchestrator is enabled
1683
1684 :raises RuntimeError: If the remote method failed.
1685 :raises OrchestratorError: orchestrator failed to perform
1686 :raises ImportError: no `orchestrator` module or backend not found.
1687 """
1688 mgr = self.__get_mgr()
1689
1690 try:
1691 o = mgr._select_orchestrator()
1692 except AttributeError:
1693 o = mgr.remote('orchestrator', '_select_orchestrator')
1694
1695 if o is None:
1696 raise NoOrchestrator()
1697
1698 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1699 try:
1700 return mgr.remote(o, meth, *args, **kwargs)
1701 except Exception as e:
1702 if meth == 'get_feature_set':
1703 raise # self.get_feature_set() calls self._oremote()
1704 f_set = self.get_feature_set()
1705 if meth not in f_set or not f_set[meth]['available']:
1706 raise NotImplementedError(f'{o} does not implement {meth}') from e
1707 raise
1708
1709 def _orchestrator_wait(self, completions: List[Completion]) -> None:
1710 """
1711 Wait for completions to complete (reads) or
1712 become persistent (writes).
1713
1714 Waits for writes to be *persistent* but not *effective*.
1715
1716 :param completions: List of Completions
1717 :raises NoOrchestrator:
1718 :raises RuntimeError: something went wrong while calling the process method.
1719 :raises ImportError: no `orchestrator` module or backend not found.
1720 """
1721 while any(not c.has_result for c in completions):
1722 self.process(completions)
1723 self.__get_mgr().log.info("Operations pending: %s",
1724 sum(1 for c in completions if not c.has_result))
1725 if any(c.needs_result for c in completions):
1726 time.sleep(1)
1727 else:
1728 break