]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/orchestrator/_interface.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
1
2 """
3 ceph-mgr orchestrator interface
4
5 Please see the ceph-mgr module developer's guide for more information.
6 """
7
8 import copy
9 import datetime
10 import errno
11 import logging
12 import pickle
13 import re
14 import time
15 import uuid
16
17 from collections import namedtuple, OrderedDict
18 from contextlib import contextmanager
19 from functools import wraps
20
21 import yaml
22
23 from ceph.deployment import inventory
24 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
25 ServiceSpecValidationError, IscsiServiceSpec
26 from ceph.deployment.drive_group import DriveGroupSpec
27 from ceph.deployment.hostspec import HostSpec
28
29 from mgr_module import MgrModule, CLICommand, HandleCommandResult
30
31 try:
32 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
33 Type, Sequence, Dict, cast
34 except ImportError:
35 pass
36
37 logger = logging.getLogger(__name__)
38
39 DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
40
41 T = TypeVar('T')
42
43
44 class OrchestratorError(Exception):
45 """
46 General orchestrator specific error.
47
48 Used for deployment, configuration or user errors.
49
50 It's not intended for programming errors or orchestrator internal errors.
51 """
52
53 def __init__(self,
54 msg: str,
55 errno: int = -errno.EINVAL,
56 event_kind_subject: Optional[Tuple[str, str]] = None):
57 super(Exception, self).__init__(msg)
58 self.errno = errno
59 # See OrchestratorEvent.subject
60 self.event_subject = event_kind_subject
61
62
63 class NoOrchestrator(OrchestratorError):
64 """
65 No orchestrator in configured.
66 """
67
68 def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"):
69 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
70
71
72 class OrchestratorValidationError(OrchestratorError):
73 """
74 Raised when an orchestrator doesn't support a specific feature.
75 """
76
77
78 @contextmanager
79 def set_exception_subject(kind, subject, overwrite=False):
80 try:
81 yield
82 except OrchestratorError as e:
83 if overwrite or hasattr(e, 'event_subject'):
84 e.event_subject = (kind, subject)
85 raise
86
87
88 def handle_exception(prefix, cmd_args, desc, perm, func):
89 @wraps(func)
90 def wrapper(*args, **kwargs):
91 try:
92 return func(*args, **kwargs)
93 except (OrchestratorError, ServiceSpecValidationError) as e:
94 # Do not print Traceback for expected errors.
95 return HandleCommandResult(e.errno, stderr=str(e))
96 except ImportError as e:
97 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
98 except NotImplementedError:
99 msg = 'This Orchestrator does not support `{}`'.format(prefix)
100 return HandleCommandResult(-errno.ENOENT, stderr=msg)
101
102 # misuse partial to copy `wrapper`
103 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs)
104 wrapper_copy._prefix = prefix # type: ignore
105 wrapper_copy._cli_command = CLICommand(prefix, cmd_args, desc, perm) # type: ignore
106 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
107
108 return wrapper_copy
109
110
111 def _cli_command(perm):
112 def inner_cli_command(prefix, cmd_args="", desc=""):
113 return lambda func: handle_exception(prefix, cmd_args, desc, perm, func)
114 return inner_cli_command
115
116
117 _cli_read_command = _cli_command('r')
118 _cli_write_command = _cli_command('rw')
119
120
121 class CLICommandMeta(type):
122 """
123 This is a workaround for the use of a global variable CLICommand.COMMANDS which
124 prevents modules from importing any other module.
125
126 We make use of CLICommand, except for the use of the global variable.
127 """
128 def __init__(cls, name, bases, dct):
129 super(CLICommandMeta, cls).__init__(name, bases, dct)
130 dispatch: Dict[str, CLICommand] = {}
131 for v in dct.values():
132 try:
133 dispatch[v._prefix] = v._cli_command
134 except AttributeError:
135 pass
136
137 def handle_command(self, inbuf, cmd):
138 if cmd['prefix'] not in dispatch:
139 return self.handle_command(inbuf, cmd)
140
141 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
142
143 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
144 cls.handle_command = handle_command
145
146
147 def _no_result():
148 return object()
149
150
151 class _Promise(object):
152 """
153 A completion may need multiple promises to be fulfilled. `_Promise` is one
154 step.
155
156 Typically ``Orchestrator`` implementations inherit from this class to
157 build their own way of finishing a step to fulfil a future.
158
159 They are not exposed in the orchestrator interface and can be seen as a
160 helper to build orchestrator modules.
161 """
162 INITIALIZED = 1 # We have a parent completion and a next completion
163 RUNNING = 2
164 FINISHED = 3 # we have a final result
165
166 NO_RESULT: None = _no_result()
167 ASYNC_RESULT = object()
168
169 def __init__(self,
170 _first_promise: Optional["_Promise"] = None,
171 value: Optional[Any] = NO_RESULT,
172 on_complete: Optional[Callable] = None,
173 name: Optional[str] = None,
174 ):
175 self._on_complete_ = on_complete
176 self._name = name
177 self._next_promise: Optional[_Promise] = None
178
179 self._state = self.INITIALIZED
180 self._exception: Optional[Exception] = None
181
182 # Value of this _Promise. may be an intermediate result.
183 self._value = value
184
185 # _Promise is not a continuation monad, as `_result` is of type
186 # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
187 self._first_promise: '_Promise' = _first_promise or self
188
189 @property
190 def _exception(self) -> Optional[Exception]:
191 return getattr(self, '_exception_', None)
192
193 @_exception.setter
194 def _exception(self, e):
195 self._exception_ = e
196 try:
197 self._serialized_exception_ = pickle.dumps(e) if e is not None else None
198 except pickle.PicklingError:
199 logger.error(f"failed to pickle {e}")
200 if isinstance(e, Exception):
201 e = Exception(*e.args)
202 else:
203 e = Exception(str(e))
204 # degenerate to a plain Exception
205 self._serialized_exception_ = pickle.dumps(e)
206
207 @property
208 def _serialized_exception(self) -> Optional[bytes]:
209 return getattr(self, '_serialized_exception_', None)
210
211 @property
212 def _on_complete(self) -> Optional[Callable]:
213 # https://github.com/python/mypy/issues/4125
214 return self._on_complete_
215
216 @_on_complete.setter
217 def _on_complete(self, val: Optional[Callable]) -> None:
218 self._on_complete_ = val
219
220 def __repr__(self):
221 name = self._name or getattr(self._on_complete, '__name__',
222 '??') if self._on_complete else 'None'
223 val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
224 return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
225 self.__class__, self._state, val, self._on_complete, id(self), name, getattr(
226 next, '_progress_reference', 'NA'), repr(self._next_promise)
227 )
228
229 def pretty_print_1(self):
230 if self._name:
231 name = self._name
232 elif self._on_complete is None:
233 name = 'lambda x: x'
234 elif hasattr(self._on_complete, '__name__'):
235 name = getattr(self._on_complete, '__name__')
236 else:
237 name = self._on_complete.__class__.__name__
238 val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...'
239 prefix = {
240 self.INITIALIZED: ' ',
241 self.RUNNING: ' >>>',
242 self.FINISHED: '(done)'
243 }[self._state]
244 return '{} {}({}),'.format(prefix, name, val)
245
246 def then(self: Any, on_complete: Callable) -> Any:
247 """
248 Call ``on_complete`` as soon as this promise is finalized.
249 """
250 assert self._state in (self.INITIALIZED, self.RUNNING)
251
252 if self._next_promise is not None:
253 return self._next_promise.then(on_complete)
254
255 if self._on_complete is not None:
256 self._set_next_promise(self.__class__(
257 _first_promise=self._first_promise,
258 on_complete=on_complete
259 ))
260 return self._next_promise
261
262 else:
263 self._on_complete = on_complete
264 self._set_next_promise(self.__class__(_first_promise=self._first_promise))
265 return self._next_promise
266
267 def _set_next_promise(self, next: '_Promise') -> None:
268 assert self is not next
269 assert self._state in (self.INITIALIZED, self.RUNNING)
270
271 self._next_promise = next
272 assert self._next_promise is not None
273 for p in iter(self._next_promise):
274 p._first_promise = self._first_promise
275
276 def _finalize(self, value=NO_RESULT):
277 """
278 Sets this promise to complete.
279
280 Orchestrators may choose to use this helper function.
281
282 :param value: new value.
283 """
284 if self._state not in (self.INITIALIZED, self.RUNNING):
285 raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
286
287 self._state = self.RUNNING
288
289 if value is not self.NO_RESULT:
290 self._value = value
291 assert self._value is not self.NO_RESULT, repr(self)
292
293 if self._on_complete:
294 try:
295 next_result = self._on_complete(self._value)
296 except Exception as e:
297 self.fail(e)
298 return
299 else:
300 next_result = self._value
301
302 if isinstance(next_result, _Promise):
303 # hack: _Promise is not a continuation monad.
304 next_result = next_result._first_promise # type: ignore
305 assert next_result not in self, repr(self._first_promise) + repr(next_result)
306 assert self not in next_result
307 next_result._append_promise(self._next_promise)
308 self._set_next_promise(next_result)
309 assert self._next_promise
310 if self._next_promise._value is self.NO_RESULT:
311 self._next_promise._value = self._value
312 self.propagate_to_next()
313 elif next_result is not self.ASYNC_RESULT:
314 # simple map. simply forward
315 if self._next_promise:
316 self._next_promise._value = next_result
317 else:
318 # Hack: next_result is of type U, _value is of type T
319 self._value = next_result # type: ignore
320 self.propagate_to_next()
321 else:
322 # asynchronous promise
323 pass
324
325 def propagate_to_next(self):
326 self._state = self.FINISHED
327 logger.debug('finalized {}'.format(repr(self)))
328 if self._next_promise:
329 self._next_promise._finalize()
330
331 def fail(self, e: Exception) -> None:
332 """
333 Sets the whole completion to be faild with this exception and end the
334 evaluation.
335 """
336 if self._state == self.FINISHED:
337 raise ValueError(
338 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e)))
339 assert self._state in (self.INITIALIZED, self.RUNNING)
340 logger.exception('_Promise failed')
341 self._exception = e
342 self._value = f'_exception: {e}'
343 if self._next_promise:
344 self._next_promise.fail(e)
345 self._state = self.FINISHED
346
347 def __contains__(self, item):
348 return any(item is p for p in iter(self._first_promise))
349
350 def __iter__(self):
351 yield self
352 elem = self._next_promise
353 while elem is not None:
354 yield elem
355 elem = elem._next_promise
356
357 def _append_promise(self, other):
358 if other is not None:
359 assert self not in other
360 assert other not in self
361 self._last_promise()._set_next_promise(other)
362
363 def _last_promise(self) -> '_Promise':
364 return list(iter(self))[-1]
365
366
367 class ProgressReference(object):
368 def __init__(self,
369 message: str,
370 mgr,
371 completion: Optional[Callable[[], 'Completion']] = None
372 ):
373 """
374 ProgressReference can be used within Completions::
375
376 +---------------+ +---------------------------------+
377 | | then | |
378 | My Completion | +--> | on_complete=ProgressReference() |
379 | | | |
380 +---------------+ +---------------------------------+
381
382 See :func:`Completion.with_progress` for an easy way to create
383 a progress reference
384
385 """
386 super(ProgressReference, self).__init__()
387 self.progress_id = str(uuid.uuid4())
388 self.message = message
389 self.mgr = mgr
390
391 #: The completion can already have a result, before the write
392 #: operation is effective. progress == 1 means, the services are
393 #: created / removed.
394 self.completion: Optional[Callable[[], Completion]] = completion
395
396 #: if a orchestrator module can provide a more detailed
397 #: progress information, it needs to also call ``progress.update()``.
398 self.progress = 0.0
399
400 self._completion_has_result = False
401 self.mgr.all_progress_references.append(self)
402
403 def __str__(self):
404 """
405 ``__str__()`` is used for determining the message for progress events.
406 """
407 return self.message or super(ProgressReference, self).__str__()
408
409 def __call__(self, arg):
410 self._completion_has_result = True
411 self.progress = 1.0
412 return arg
413
414 @property
415 def progress(self):
416 return self._progress
417
418 @progress.setter
419 def progress(self, progress):
420 assert progress <= 1.0
421 self._progress = progress
422 try:
423 if self.effective:
424 self.mgr.remote("progress", "complete", self.progress_id)
425 self.mgr.all_progress_references = [
426 p for p in self.mgr.all_progress_references if p is not self]
427 else:
428 self.mgr.remote("progress", "update", self.progress_id, self.message,
429 progress,
430 [("origin", "orchestrator")])
431 except ImportError:
432 # If the progress module is disabled that's fine,
433 # they just won't see the output.
434 pass
435
436 @property
437 def effective(self):
438 return self.progress == 1 and self._completion_has_result
439
440 def update(self):
441 def progress_run(progress):
442 self.progress = progress
443 if self.completion:
444 c = self.completion().then(progress_run)
445 self.mgr.process([c._first_promise])
446 else:
447 self.progress = 1
448
449 def fail(self):
450 self._completion_has_result = True
451 self.progress = 1
452
453
454 class Completion(_Promise, Generic[T]):
455 """
456 Combines multiple promises into one overall operation.
457
458 Completions are composable by being able to
459 call one completion from another completion. I.e. making them re-usable
460 using Promises E.g.::
461
462 >>> #doctest: +SKIP
463 ... return Orchestrator().get_hosts().then(self._create_osd)
464
465 where ``get_hosts`` returns a Completion of list of hosts and
466 ``_create_osd`` takes a list of hosts.
467
468 The concept behind this is to store the computation steps
469 explicit and then explicitly evaluate the chain:
470
471 >>> #doctest: +SKIP
472 ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
473 ... p.finalize(2)
474 ... assert p.result = "4"
475
476 or graphically::
477
478 +---------------+ +-----------------+
479 | | then | |
480 | lambda x: x*x | +--> | lambda x: str(x)|
481 | | | |
482 +---------------+ +-----------------+
483
484 """
485
486 def __init__(self,
487 _first_promise: Optional["Completion"] = None,
488 value: Any = _Promise.NO_RESULT,
489 on_complete: Optional[Callable] = None,
490 name: Optional[str] = None,
491 ):
492 super(Completion, self).__init__(_first_promise, value, on_complete, name)
493
494 @property
495 def _progress_reference(self) -> Optional[ProgressReference]:
496 if hasattr(self._on_complete, 'progress_id'):
497 return self._on_complete # type: ignore
498 return None
499
500 @property
501 def progress_reference(self) -> Optional[ProgressReference]:
502 """
503 ProgressReference. Marks this completion
504 as a write completeion.
505 """
506
507 references = [c._progress_reference for c in iter(
508 self) if c._progress_reference is not None]
509 if references:
510 assert len(references) == 1
511 return references[0]
512 return None
513
514 @classmethod
515 def with_progress(cls: Any,
516 message: str,
517 mgr,
518 _first_promise: Optional["Completion"] = None,
519 value: Any = _Promise.NO_RESULT,
520 on_complete: Optional[Callable] = None,
521 calc_percent: Optional[Callable[[], Any]] = None
522 ) -> Any:
523
524 c = cls(
525 _first_promise=_first_promise,
526 value=value,
527 on_complete=on_complete
528 ).add_progress(message, mgr, calc_percent)
529
530 return c._first_promise
531
532 def add_progress(self,
533 message: str,
534 mgr,
535 calc_percent: Optional[Callable[[], Any]] = None
536 ):
537 return self.then(
538 on_complete=ProgressReference(
539 message=message,
540 mgr=mgr,
541 completion=calc_percent
542 )
543 )
544
545 def fail(self, e: Exception):
546 super(Completion, self).fail(e)
547 if self._progress_reference:
548 self._progress_reference.fail()
549
550 def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT):
551 if self._first_promise._state == self.INITIALIZED:
552 self._first_promise._finalize(result)
553
554 @property
555 def result(self) -> T:
556 """
557 The result of the operation that we were waited
558 for. Only valid after calling Orchestrator.process() on this
559 completion.
560 """
561 last = self._last_promise()
562 assert last._state == _Promise.FINISHED
563 return cast(T, last._value)
564
565 def result_str(self) -> str:
566 """Force a string."""
567 if self.result is None:
568 return ''
569 if isinstance(self.result, list):
570 return '\n'.join(str(x) for x in self.result)
571 return str(self.result)
572
573 @property
574 def exception(self) -> Optional[Exception]:
575 return self._last_promise()._exception
576
577 @property
578 def serialized_exception(self) -> Optional[bytes]:
579 return self._last_promise()._serialized_exception
580
581 @property
582 def has_result(self) -> bool:
583 """
584 Has the operation already a result?
585
586 For Write operations, it can already have a
587 result, if the orchestrator's configuration is
588 persistently written. Typically this would
589 indicate that an update had been written to
590 a manifest, but that the update had not
591 necessarily been pushed out to the cluster.
592
593 :return:
594 """
595 return self._last_promise()._state == _Promise.FINISHED
596
597 @property
598 def is_errored(self) -> bool:
599 """
600 Has the completion failed. Default implementation looks for
601 self.exception. Can be overwritten.
602 """
603 return self.exception is not None
604
605 @property
606 def needs_result(self) -> bool:
607 """
608 Could the external operation be deemed as complete,
609 or should we wait?
610 We must wait for a read operation only if it is not complete.
611 """
612 return not self.is_errored and not self.has_result
613
614 @property
615 def is_finished(self) -> bool:
616 """
617 Could the external operation be deemed as complete,
618 or should we wait?
619 We must wait for a read operation only if it is not complete.
620 """
621 return self.is_errored or (self.has_result)
622
623 def pretty_print(self):
624
625 reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise))
626 return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
627
628
629 def pretty_print(completions: Sequence[Completion]) -> str:
630 return ', '.join(c.pretty_print() for c in completions)
631
632
633 def raise_if_exception(c: Completion) -> None:
634 """
635 :raises OrchestratorError: Some user error or a config error.
636 :raises Exception: Some internal error
637 """
638 if c.serialized_exception is not None:
639 try:
640 e = pickle.loads(c.serialized_exception)
641 except (KeyError, AttributeError):
642 raise Exception('{}: {}'.format(type(c.exception), c.exception))
643 raise e
644
645
646 class TrivialReadCompletion(Completion[T]):
647 """
648 This is the trivial completion simply wrapping a result.
649 """
650
651 def __init__(self, result: T):
652 super(TrivialReadCompletion, self).__init__()
653 if result:
654 self.finalize(result)
655
656
657 def _hide_in_features(f):
658 f._hide_in_features = True
659 return f
660
661
662 class Orchestrator(object):
663 """
664 Calls in this class may do long running remote operations, with time
665 periods ranging from network latencies to package install latencies and large
666 internet downloads. For that reason, all are asynchronous, and return
667 ``Completion`` objects.
668
669 Methods should only return the completion and not directly execute
670 anything, like network calls. Otherwise the purpose of
671 those completions is defeated.
672
673 Implementations are not required to start work on an operation until
674 the caller waits on the relevant Completion objects. Callers making
675 multiple updates should not wait on Completions until they're done
676 sending operations: this enables implementations to batch up a series
677 of updates when wait() is called on a set of Completion objects.
678
679 Implementations are encouraged to keep reasonably fresh caches of
680 the status of the system: it is better to serve a stale-but-recent
681 result read of e.g. device inventory than it is to keep the caller waiting
682 while you scan hosts every time.
683 """
684
685 @_hide_in_features
686 def is_orchestrator_module(self):
687 """
688 Enable other modules to interrogate this module to discover
689 whether it's usable as an orchestrator module.
690
691 Subclasses do not need to override this.
692 """
693 return True
694
695 @_hide_in_features
696 def available(self) -> Tuple[bool, str]:
697 """
698 Report whether we can talk to the orchestrator. This is the
699 place to give the user a meaningful message if the orchestrator
700 isn't running or can't be contacted.
701
702 This method may be called frequently (e.g. every page load
703 to conditionally display a warning banner), so make sure it's
704 not too expensive. It's okay to give a slightly stale status
705 (e.g. based on a periodic background ping of the orchestrator)
706 if that's necessary to make this method fast.
707
708 .. note::
709 `True` doesn't mean that the desired functionality
710 is actually available in the orchestrator. I.e. this
711 won't work as expected::
712
713 >>> #doctest: +SKIP
714 ... if OrchestratorClientMixin().available()[0]: # wrong.
715 ... OrchestratorClientMixin().get_hosts()
716
717 :return: two-tuple of boolean, string
718 """
719 raise NotImplementedError()
720
721 @_hide_in_features
722 def process(self, completions: List[Completion]) -> None:
723 """
724 Given a list of Completion instances, process any which are
725 incomplete.
726
727 Callers should inspect the detail of each completion to identify
728 partial completion/progress information, and present that information
729 to the user.
730
731 This method should not block, as this would make it slow to query
732 a status, while other long running operations are in progress.
733 """
734 raise NotImplementedError()
735
736 @_hide_in_features
737 def get_feature_set(self):
738 """Describes which methods this orchestrator implements
739
740 .. note::
741 `True` doesn't mean that the desired functionality
742 is actually possible in the orchestrator. I.e. this
743 won't work as expected::
744
745 >>> #doctest: +SKIP
746 ... api = OrchestratorClientMixin()
747 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
748 ... api.get_hosts()
749
750 It's better to ask for forgiveness instead::
751
752 >>> #doctest: +SKIP
753 ... try:
754 ... OrchestratorClientMixin().get_hosts()
755 ... except (OrchestratorError, NotImplementedError):
756 ... ...
757
758 :returns: Dict of API method names to ``{'available': True or False}``
759 """
760 module = self.__class__
761 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
762 for a in Orchestrator.__dict__
763 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
764 }
765 return features
766
767 def cancel_completions(self) -> None:
768 """
769 Cancels ongoing completions. Unstuck the mgr.
770 """
771 raise NotImplementedError()
772
773 def pause(self) -> None:
774 raise NotImplementedError()
775
776 def resume(self) -> None:
777 raise NotImplementedError()
778
779 def add_host(self, host_spec: HostSpec) -> Completion[str]:
780 """
781 Add a host to the orchestrator inventory.
782
783 :param host: hostname
784 """
785 raise NotImplementedError()
786
787 def remove_host(self, host: str) -> Completion[str]:
788 """
789 Remove a host from the orchestrator inventory.
790
791 :param host: hostname
792 """
793 raise NotImplementedError()
794
795 def update_host_addr(self, host: str, addr: str) -> Completion[str]:
796 """
797 Update a host's address
798
799 :param host: hostname
800 :param addr: address (dns name or IP)
801 """
802 raise NotImplementedError()
803
804 def get_hosts(self) -> Completion[List[HostSpec]]:
805 """
806 Report the hosts in the cluster.
807
808 :return: list of HostSpec
809 """
810 raise NotImplementedError()
811
812 def add_host_label(self, host: str, label: str) -> Completion[str]:
813 """
814 Add a host label
815 """
816 raise NotImplementedError()
817
818 def remove_host_label(self, host: str, label: str) -> Completion[str]:
819 """
820 Remove a host label
821 """
822 raise NotImplementedError()
823
824 def host_ok_to_stop(self, hostname: str) -> Completion:
825 """
826 Check if the specified host can be safely stopped without reducing availability
827
828 :param host: hostname
829 """
830 raise NotImplementedError()
831
832 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]:
833 """
834 Returns something that was created by `ceph-volume inventory`.
835
836 :return: list of InventoryHost
837 """
838 raise NotImplementedError()
839
840 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> Completion[List['ServiceDescription']]:
841 """
842 Describe a service (of any kind) that is already configured in
843 the orchestrator. For example, when viewing an OSD in the dashboard
844 we might like to also display information about the orchestrator's
845 view of the service (like the kubernetes pod ID).
846
847 When viewing a CephFS filesystem in the dashboard, we would use this
848 to display the pods being currently run for MDS daemons.
849
850 :return: list of ServiceDescription objects.
851 """
852 raise NotImplementedError()
853
854 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> Completion[List['DaemonDescription']]:
855 """
856 Describe a daemon (of any kind) that is already configured in
857 the orchestrator.
858
859 :return: list of DaemonDescription objects.
860 """
861 raise NotImplementedError()
862
863 def apply(self, specs: List["GenericSpec"]) -> Completion[List[str]]:
864 """
865 Applies any spec
866 """
867 fns: Dict[str, function] = {
868 'alertmanager': self.apply_alertmanager,
869 'crash': self.apply_crash,
870 'grafana': self.apply_grafana,
871 'iscsi': self.apply_iscsi,
872 'mds': self.apply_mds,
873 'mgr': self.apply_mgr,
874 'mon': self.apply_mon,
875 'nfs': self.apply_nfs,
876 'node-exporter': self.apply_node_exporter,
877 'osd': lambda dg: self.apply_drivegroups([dg]),
878 'prometheus': self.apply_prometheus,
879 'rbd-mirror': self.apply_rbd_mirror,
880 'rgw': self.apply_rgw,
881 'host': self.add_host,
882 }
883
884 def merge(ls, r):
885 if isinstance(ls, list):
886 return ls + [r]
887 return [ls, r]
888
889 spec, *specs = specs
890
891 fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
892 completion = fn(spec)
893 for s in specs:
894 def next(ls):
895 fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
896 return fn(s).then(lambda r: merge(ls, r))
897 completion = completion.then(next)
898 return completion
899
900 def plan(self, spec: List["GenericSpec"]) -> Completion[List]:
901 """
902 Plan (Dry-run, Preview) a List of Specs.
903 """
904 raise NotImplementedError()
905
906 def remove_daemons(self, names: List[str]) -> Completion[List[str]]:
907 """
908 Remove specific daemon(s).
909
910 :return: None
911 """
912 raise NotImplementedError()
913
914 def remove_service(self, service_name: str) -> Completion[str]:
915 """
916 Remove a service (a collection of daemons).
917
918 :return: None
919 """
920 raise NotImplementedError()
921
922 def service_action(self, action: str, service_name: str) -> Completion[List[str]]:
923 """
924 Perform an action (start/stop/reload) on a service (i.e., all daemons
925 providing the logical service).
926
927 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
928 :param service_name: service_type + '.' + service_id
929 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
930 :rtype: Completion
931 """
932 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
933 raise NotImplementedError()
934
935 def daemon_action(self, action: str, daemon_name: str, image: Optional[str]=None) -> Completion[str]:
936 """
937 Perform an action (start/stop/reload) on a daemon.
938
939 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
940 :param daemon_name: name of daemon
941 :param image: Container image when redeploying that daemon
942 :rtype: Completion
943 """
944 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
945 raise NotImplementedError()
946
947 def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]:
948 """
949 Create one or more OSDs within a single Drive Group.
950
951 The principal argument here is the drive_group member
952 of OsdSpec: other fields are advisory/extensible for any
953 finer-grained OSD feature enablement (choice of backing store,
954 compression/encryption, etc).
955 """
956 raise NotImplementedError()
957
958 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion[List[str]]:
959 """ Update OSD cluster """
960 raise NotImplementedError()
961
962 def set_unmanaged_flag(self,
963 unmanaged_flag: bool,
964 service_type: str = 'osd',
965 service_name=None
966 ) -> HandleCommandResult:
967 raise NotImplementedError()
968
969 def preview_osdspecs(self,
970 osdspec_name: Optional[str] = 'osd',
971 osdspecs: Optional[List[DriveGroupSpec]] = None
972 ) -> Completion[str]:
973 """ Get a preview for OSD deployments """
974 raise NotImplementedError()
975
976 def remove_osds(self, osd_ids: List[str],
977 replace: bool = False,
978 force: bool = False) -> Completion[str]:
979 """
980 :param osd_ids: list of OSD IDs
981 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
982 :param force: Forces the OSD removal process without waiting for the data to be drained first.
983 Note that this can only remove OSDs that were successfully
984 created (i.e. got an OSD ID).
985 """
986 raise NotImplementedError()
987
988 def stop_remove_osds(self, osd_ids: List[str]) -> Completion:
989 """
990 TODO
991 """
992 raise NotImplementedError()
993
994 def remove_osds_status(self) -> Completion:
995 """
996 Returns a status of the ongoing OSD removal operations.
997 """
998 raise NotImplementedError()
999
1000 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> Completion[List[str]]:
1001 """
1002 Instructs the orchestrator to enable or disable either the ident or the fault LED.
1003
1004 :param ident_fault: either ``"ident"`` or ``"fault"``
1005 :param on: ``True`` = on.
1006 :param locations: See :class:`orchestrator.DeviceLightLoc`
1007 """
1008 raise NotImplementedError()
1009
1010 def zap_device(self, host: str, path: str) -> Completion[str]:
1011 """Zap/Erase a device (DESTROYS DATA)"""
1012 raise NotImplementedError()
1013
1014 def add_mon(self, spec: ServiceSpec) -> Completion[List[str]]:
1015 """Create mon daemon(s)"""
1016 raise NotImplementedError()
1017
1018 def apply_mon(self, spec: ServiceSpec) -> Completion[str]:
1019 """Update mon cluster"""
1020 raise NotImplementedError()
1021
1022 def add_mgr(self, spec: ServiceSpec) -> Completion[List[str]]:
1023 """Create mgr daemon(s)"""
1024 raise NotImplementedError()
1025
1026 def apply_mgr(self, spec: ServiceSpec) -> Completion[str]:
1027 """Update mgr cluster"""
1028 raise NotImplementedError()
1029
1030 def add_mds(self, spec: ServiceSpec) -> Completion[List[str]]:
1031 """Create MDS daemon(s)"""
1032 raise NotImplementedError()
1033
1034 def apply_mds(self, spec: ServiceSpec) -> Completion[str]:
1035 """Update MDS cluster"""
1036 raise NotImplementedError()
1037
1038 def add_rgw(self, spec: RGWSpec) -> Completion[List[str]]:
1039 """Create RGW daemon(s)"""
1040 raise NotImplementedError()
1041
1042 def apply_rgw(self, spec: RGWSpec) -> Completion[str]:
1043 """Update RGW cluster"""
1044 raise NotImplementedError()
1045
1046 def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]:
1047 """Create rbd-mirror daemon(s)"""
1048 raise NotImplementedError()
1049
1050 def apply_rbd_mirror(self, spec: ServiceSpec) -> Completion[str]:
1051 """Update rbd-mirror cluster"""
1052 raise NotImplementedError()
1053
1054 def add_nfs(self, spec: NFSServiceSpec) -> Completion[List[str]]:
1055 """Create NFS daemon(s)"""
1056 raise NotImplementedError()
1057
1058 def apply_nfs(self, spec: NFSServiceSpec) -> Completion[str]:
1059 """Update NFS cluster"""
1060 raise NotImplementedError()
1061
1062 def add_iscsi(self, spec: IscsiServiceSpec) -> Completion[List[str]]:
1063 """Create iscsi daemon(s)"""
1064 raise NotImplementedError()
1065
1066 def apply_iscsi(self, spec: IscsiServiceSpec) -> Completion[str]:
1067 """Update iscsi cluster"""
1068 raise NotImplementedError()
1069
1070 def add_prometheus(self, spec: ServiceSpec) -> Completion[List[str]]:
1071 """Create new prometheus daemon"""
1072 raise NotImplementedError()
1073
1074 def apply_prometheus(self, spec: ServiceSpec) -> Completion[str]:
1075 """Update prometheus cluster"""
1076 raise NotImplementedError()
1077
1078 def add_node_exporter(self, spec: ServiceSpec) -> Completion[List[str]]:
1079 """Create a new Node-Exporter service"""
1080 raise NotImplementedError()
1081
1082 def apply_node_exporter(self, spec: ServiceSpec) -> Completion[str]:
1083 """Update existing a Node-Exporter daemon(s)"""
1084 raise NotImplementedError()
1085
1086 def add_crash(self, spec: ServiceSpec) -> Completion[List[str]]:
1087 """Create a new crash service"""
1088 raise NotImplementedError()
1089
1090 def apply_crash(self, spec: ServiceSpec) -> Completion[str]:
1091 """Update existing a crash daemon(s)"""
1092 raise NotImplementedError()
1093
1094 def add_grafana(self, spec: ServiceSpec) -> Completion[List[str]]:
1095 """Create a new Node-Exporter service"""
1096 raise NotImplementedError()
1097
1098 def apply_grafana(self, spec: ServiceSpec) -> Completion[str]:
1099 """Update existing a Node-Exporter daemon(s)"""
1100 raise NotImplementedError()
1101
1102 def add_alertmanager(self, spec: ServiceSpec) -> Completion[List[str]]:
1103 """Create a new AlertManager service"""
1104 raise NotImplementedError()
1105
1106 def apply_alertmanager(self, spec: ServiceSpec) -> Completion[str]:
1107 """Update an existing AlertManager daemon(s)"""
1108 raise NotImplementedError()
1109
1110 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
1111 raise NotImplementedError()
1112
1113 def upgrade_start(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
1114 raise NotImplementedError()
1115
1116 def upgrade_pause(self) -> Completion[str]:
1117 raise NotImplementedError()
1118
1119 def upgrade_resume(self) -> Completion[str]:
1120 raise NotImplementedError()
1121
1122 def upgrade_stop(self) -> Completion[str]:
1123 raise NotImplementedError()
1124
1125 def upgrade_status(self) -> Completion['UpgradeStatusSpec']:
1126 """
1127 If an upgrade is currently underway, report on where
1128 we are in the process, or if some error has occurred.
1129
1130 :return: UpgradeStatusSpec instance
1131 """
1132 raise NotImplementedError()
1133
1134 @_hide_in_features
1135 def upgrade_available(self) -> Completion:
1136 """
1137 Report on what versions are available to upgrade to
1138
1139 :return: List of strings
1140 """
1141 raise NotImplementedError()
1142
1143
1144 GenericSpec = Union[ServiceSpec, HostSpec]
1145
1146
1147 def json_to_generic_spec(spec: dict) -> GenericSpec:
1148 if 'service_type' in spec and spec['service_type'] == 'host':
1149 return HostSpec.from_json(spec)
1150 else:
1151 return ServiceSpec.from_json(spec)
1152
1153
1154 class UpgradeStatusSpec(object):
1155 # Orchestrator's report on what's going on with any ongoing upgrade
1156 def __init__(self):
1157 self.in_progress = False # Is an upgrade underway?
1158 self.target_image = None
1159 self.services_complete = [] # Which daemon types are fully updated?
1160 self.message = "" # Freeform description
1161
1162
1163 def handle_type_error(method):
1164 @wraps(method)
1165 def inner(cls, *args, **kwargs):
1166 try:
1167 return method(cls, *args, **kwargs)
1168 except TypeError as e:
1169 error_msg = '{}: {}'.format(cls.__name__, e)
1170 raise OrchestratorValidationError(error_msg)
1171 return inner
1172
1173
1174 class DaemonDescription(object):
1175 """
1176 For responding to queries about the status of a particular daemon,
1177 stateful or stateless.
1178
1179 This is not about health or performance monitoring of daemons: it's
1180 about letting the orchestrator tell Ceph whether and where a
1181 daemon is scheduled in the cluster. When an orchestrator tells
1182 Ceph "it's running on host123", that's not a promise that the process
1183 is literally up this second, it's a description of where the orchestrator
1184 has decided the daemon should run.
1185 """
1186
1187 def __init__(self,
1188 daemon_type=None,
1189 daemon_id=None,
1190 hostname=None,
1191 container_id=None,
1192 container_image_id=None,
1193 container_image_name=None,
1194 version=None,
1195 status=None,
1196 status_desc=None,
1197 last_refresh=None,
1198 created=None,
1199 started=None,
1200 last_configured=None,
1201 osdspec_affinity=None,
1202 last_deployed=None,
1203 events: Optional[List['OrchestratorEvent']] = None,
1204 is_active: bool=False):
1205
1206 # Host is at the same granularity as InventoryHost
1207 self.hostname: str = hostname
1208
1209 # Not everyone runs in containers, but enough people do to
1210 # justify having the container_id (runtime id) and container_image
1211 # (image name)
1212 self.container_id = container_id # runtime id
1213 self.container_image_id = container_image_id # image hash
1214 self.container_image_name = container_image_name # image friendly name
1215
1216 # The type of service (osd, mon, mgr, etc.)
1217 self.daemon_type = daemon_type
1218
1219 # The orchestrator will have picked some names for daemons,
1220 # typically either based on hostnames or on pod names.
1221 # This is the <foo> in mds.<foo>, the ID that will appear
1222 # in the FSMap/ServiceMap.
1223 self.daemon_id: str = daemon_id
1224
1225 # Service version that was deployed
1226 self.version = version
1227
1228 # Service status: -1 error, 0 stopped, 1 running
1229 self.status = status
1230
1231 # Service status description when status == -1.
1232 self.status_desc = status_desc
1233
1234 # datetime when this info was last refreshed
1235 self.last_refresh: Optional[datetime.datetime] = last_refresh
1236
1237 self.created: Optional[datetime.datetime] = created
1238 self.started: Optional[datetime.datetime] = started
1239 self.last_configured: Optional[datetime.datetime] = last_configured
1240 self.last_deployed: Optional[datetime.datetime] = last_deployed
1241
1242 # Affinity to a certain OSDSpec
1243 self.osdspec_affinity: Optional[str] = osdspec_affinity
1244
1245 self.events: List[OrchestratorEvent] = events or []
1246
1247 self.is_active = is_active
1248
1249 def name(self):
1250 return '%s.%s' % (self.daemon_type, self.daemon_id)
1251
1252 def matches_service(self, service_name: Optional[str]) -> bool:
1253 if service_name:
1254 return self.name().startswith(service_name + '.')
1255 return False
1256
1257 def service_id(self):
1258 if self.daemon_type == 'osd' and self.osdspec_affinity:
1259 return self.osdspec_affinity
1260
1261 def _match():
1262 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
1263 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
1264
1265 if not self.hostname:
1266 # TODO: can a DaemonDescription exist without a hostname?
1267 raise err
1268
1269 # use the bare hostname, not the FQDN.
1270 host = self.hostname.split('.')[0]
1271
1272 if host == self.daemon_id:
1273 # daemon_id == "host"
1274 return self.daemon_id
1275
1276 elif host in self.daemon_id:
1277 # daemon_id == "service_id.host"
1278 # daemon_id == "service_id.host.random"
1279 pre, post = self.daemon_id.rsplit(host, 1)
1280 if not pre.endswith('.'):
1281 # '.' sep missing at front of host
1282 raise err
1283 elif post and not post.startswith('.'):
1284 # '.' sep missing at end of host
1285 raise err
1286 return pre[:-1]
1287
1288 # daemon_id == "service_id.random"
1289 if self.daemon_type == 'rgw':
1290 v = self.daemon_id.split('.')
1291 if len(v) in [3, 4]:
1292 return '.'.join(v[0:2])
1293
1294 # daemon_id == "service_id"
1295 return self.daemon_id
1296
1297 if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
1298 return _match()
1299
1300 return self.daemon_id
1301
1302 def service_name(self):
1303 if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
1304 return f'{self.daemon_type}.{self.service_id()}'
1305 return self.daemon_type
1306
1307 def __repr__(self):
1308 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
1309 id=self.daemon_id)
1310
1311 def to_json(self):
1312 out = OrderedDict()
1313 out['daemon_type'] = self.daemon_type
1314 out['daemon_id'] = self.daemon_id
1315 out['hostname'] = self.hostname
1316 out['container_id'] = self.container_id
1317 out['container_image_id'] = self.container_image_id
1318 out['container_image_name'] = self.container_image_name
1319 out['version'] = self.version
1320 out['status'] = self.status
1321 out['status_desc'] = self.status_desc
1322 if self.daemon_type == 'osd':
1323 out['osdspec_affinity'] = self.osdspec_affinity
1324 out['is_active'] = self.is_active
1325
1326 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1327 'last_configured']:
1328 if getattr(self, k):
1329 out[k] = getattr(self, k).strftime(DATEFMT)
1330
1331 if self.events:
1332 out['events'] = [e.to_json() for e in self.events]
1333
1334 empty = [k for k, v in out.items() if v is None]
1335 for e in empty:
1336 del out[e]
1337 return out
1338
1339 @classmethod
1340 @handle_type_error
1341 def from_json(cls, data):
1342 c = data.copy()
1343 event_strs = c.pop('events', [])
1344 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1345 'last_configured']:
1346 if k in c:
1347 c[k] = datetime.datetime.strptime(c[k], DATEFMT)
1348 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1349 return cls(events=events, **c)
1350
1351 def __copy__(self):
1352 # feel free to change this:
1353 return DaemonDescription.from_json(self.to_json())
1354
1355 @staticmethod
1356 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
1357 return dumper.represent_dict(data.to_json().items())
1358
1359
1360 yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1361
1362
1363 class ServiceDescription(object):
1364 """
1365 For responding to queries about the status of a particular service,
1366 stateful or stateless.
1367
1368 This is not about health or performance monitoring of services: it's
1369 about letting the orchestrator tell Ceph whether and where a
1370 service is scheduled in the cluster. When an orchestrator tells
1371 Ceph "it's running on host123", that's not a promise that the process
1372 is literally up this second, it's a description of where the orchestrator
1373 has decided the service should run.
1374 """
1375
1376 def __init__(self,
1377 spec: ServiceSpec,
1378 container_image_id=None,
1379 container_image_name=None,
1380 rados_config_location=None,
1381 service_url=None,
1382 last_refresh=None,
1383 created=None,
1384 size=0,
1385 running=0,
1386 events: Optional[List['OrchestratorEvent']] = None):
1387 # Not everyone runs in containers, but enough people do to
1388 # justify having the container_image_id (image hash) and container_image
1389 # (image name)
1390 self.container_image_id = container_image_id # image hash
1391 self.container_image_name = container_image_name # image friendly name
1392
1393 # Location of the service configuration when stored in rados
1394 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1395 self.rados_config_location = rados_config_location
1396
1397 # If the service exposes REST-like API, this attribute should hold
1398 # the URL.
1399 self.service_url = service_url
1400
1401 # Number of daemons
1402 self.size = size
1403
1404 # Number of daemons up
1405 self.running = running
1406
1407 # datetime when this info was last refreshed
1408 self.last_refresh: Optional[datetime.datetime] = last_refresh
1409 self.created: Optional[datetime.datetime] = created
1410
1411 self.spec: ServiceSpec = spec
1412
1413 self.events: List[OrchestratorEvent] = events or []
1414
1415 def service_type(self):
1416 return self.spec.service_type
1417
1418 def __repr__(self):
1419 return f"<ServiceDescription of {self.spec.one_line_str()}>"
1420
1421 def to_json(self) -> OrderedDict:
1422 out = self.spec.to_json()
1423 status = {
1424 'container_image_id': self.container_image_id,
1425 'container_image_name': self.container_image_name,
1426 'rados_config_location': self.rados_config_location,
1427 'service_url': self.service_url,
1428 'size': self.size,
1429 'running': self.running,
1430 'last_refresh': self.last_refresh,
1431 'created': self.created,
1432 }
1433 for k in ['last_refresh', 'created']:
1434 if getattr(self, k):
1435 status[k] = getattr(self, k).strftime(DATEFMT)
1436 status = {k: v for (k, v) in status.items() if v is not None}
1437 out['status'] = status
1438 if self.events:
1439 out['events'] = [e.to_json() for e in self.events]
1440 return out
1441
1442 @classmethod
1443 @handle_type_error
1444 def from_json(cls, data: dict):
1445 c = data.copy()
1446 status = c.pop('status', {})
1447 event_strs = c.pop('events', [])
1448 spec = ServiceSpec.from_json(c)
1449
1450 c_status = status.copy()
1451 for k in ['last_refresh', 'created']:
1452 if k in c_status:
1453 c_status[k] = datetime.datetime.strptime(c_status[k], DATEFMT)
1454 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1455 return cls(spec=spec, events=events, **c_status)
1456
1457 @staticmethod
1458 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
1459 return dumper.represent_dict(data.to_json().items())
1460
1461
1462 yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
1463
1464
1465 class InventoryFilter(object):
1466 """
1467 When fetching inventory, use this filter to avoid unnecessarily
1468 scanning the whole estate.
1469
1470 Typical use: filter by host when presenting UI workflow for configuring
1471 a particular server.
1472 filter by label when not all of estate is Ceph servers,
1473 and we want to only learn about the Ceph servers.
1474 filter by label when we are interested particularly
1475 in e.g. OSD servers.
1476
1477 """
1478
1479 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
1480
1481 #: Optional: get info about hosts matching labels
1482 self.labels = labels
1483
1484 #: Optional: get info about certain named hosts only
1485 self.hosts = hosts
1486
1487
1488 class InventoryHost(object):
1489 """
1490 When fetching inventory, all Devices are groups inside of an
1491 InventoryHost.
1492 """
1493
1494 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
1495 if devices is None:
1496 devices = inventory.Devices([])
1497 if labels is None:
1498 labels = []
1499 assert isinstance(devices, inventory.Devices)
1500
1501 self.name = name # unique within cluster. For example a hostname.
1502 self.addr = addr or name
1503 self.devices = devices
1504 self.labels = labels
1505
1506 def to_json(self):
1507 return {
1508 'name': self.name,
1509 'addr': self.addr,
1510 'devices': self.devices.to_json(),
1511 'labels': self.labels,
1512 }
1513
1514 @classmethod
1515 def from_json(cls, data):
1516 try:
1517 _data = copy.deepcopy(data)
1518 name = _data.pop('name')
1519 addr = _data.pop('addr', None) or name
1520 devices = inventory.Devices.from_json(_data.pop('devices'))
1521 labels = _data.pop('labels', list())
1522 if _data:
1523 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1524 raise OrchestratorValidationError(error_msg)
1525 return cls(name, devices, labels, addr)
1526 except KeyError as e:
1527 error_msg = '{} is required for {}'.format(e, cls.__name__)
1528 raise OrchestratorValidationError(error_msg)
1529 except TypeError as e:
1530 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1531
1532 @classmethod
1533 def from_nested_items(cls, hosts):
1534 devs = inventory.Devices.from_json
1535 return [cls(item[0], devs(item[1].data)) for item in hosts]
1536
1537 def __repr__(self):
1538 return "<InventoryHost>({name})".format(name=self.name)
1539
1540 @staticmethod
1541 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
1542 return [host.name for host in hosts]
1543
1544 def __eq__(self, other):
1545 return self.name == other.name and self.devices == other.devices
1546
1547
1548 class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1549 """
1550 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1551 on devices.
1552
1553 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1554
1555 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1556 See ``ceph osd metadata | jq '.[].device_ids'``
1557 """
1558 __slots__ = ()
1559
1560
1561 class OrchestratorEvent:
1562 """
1563 Similar to K8s Events.
1564
1565 Some form of "important" log message attached to something.
1566 """
1567 INFO = 'INFO'
1568 ERROR = 'ERROR'
1569 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1570
1571 def __init__(self, created: Union[str, datetime.datetime], kind, subject, level, message):
1572 if isinstance(created, str):
1573 created = datetime.datetime.strptime(created, DATEFMT)
1574 self.created: datetime.datetime = created
1575
1576 assert kind in "service daemon".split()
1577 self.kind: str = kind
1578
1579 # service name, or daemon danem or something
1580 self.subject: str = subject
1581
1582 # Events are not meant for debugging. debugs should end in the log.
1583 assert level in "INFO ERROR".split()
1584 self.level = level
1585
1586 self.message: str = message
1587
1588 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1589
1590 def kind_subject(self) -> str:
1591 return f'{self.kind}:{self.subject}'
1592
1593 def to_json(self) -> str:
1594 # Make a long list of events readable.
1595 created = self.created.strftime(DATEFMT)
1596 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1597
1598 @classmethod
1599 @handle_type_error
1600 def from_json(cls, data) -> "OrchestratorEvent":
1601 """
1602 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1603 '2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1604
1605 :param data:
1606 :return:
1607 """
1608 match = cls.regex_v1.match(data)
1609 if match:
1610 return cls(*match.groups())
1611 raise ValueError(f'Unable to match: "{data}"')
1612
1613 def __eq__(self, other):
1614 if not isinstance(other, OrchestratorEvent):
1615 return False
1616
1617 return self.created == other.created and self.kind == other.kind \
1618 and self.subject == other.subject and self.message == other.message
1619
1620
1621 def _mk_orch_methods(cls):
1622 # Needs to be defined outside of for.
1623 # Otherwise meth is always bound to last key
1624 def shim(method_name):
1625 def inner(self, *args, **kwargs):
1626 completion = self._oremote(method_name, args, kwargs)
1627 return completion
1628 return inner
1629
1630 for meth in Orchestrator.__dict__:
1631 if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
1632 setattr(cls, meth, shim(meth))
1633 return cls
1634
1635
1636 @_mk_orch_methods
1637 class OrchestratorClientMixin(Orchestrator):
1638 """
1639 A module that inherents from `OrchestratorClientMixin` can directly call
1640 all :class:`Orchestrator` methods without manually calling remote.
1641
1642 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1643 calls :func:`OrchestratorClientMixin._oremote`
1644
1645 >>> class MyModule(OrchestratorClientMixin):
1646 ... def func(self):
1647 ... completion = self.add_host('somehost') # calls `_oremote()`
1648 ... self._orchestrator_wait([completion])
1649 ... self.log.debug(completion.result)
1650
1651 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1652 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1653 "real" implementation of the orchestrator.
1654
1655
1656 >>> import mgr_module
1657 >>> #doctest: +SKIP
1658 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
1659 ... def __init__(self, ...):
1660 ... self.orch_client = OrchestratorClientMixin()
1661 ... self.orch_client.set_mgr(self.mgr))
1662 """
1663
1664 def set_mgr(self, mgr: MgrModule) -> None:
1665 """
1666 Useable in the Dashbord that uses a global ``mgr``
1667 """
1668
1669 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1670
1671 def __get_mgr(self):
1672 try:
1673 return self.__mgr
1674 except AttributeError:
1675 return self
1676
1677 def _oremote(self, meth, args, kwargs):
1678 """
1679 Helper for invoking `remote` on whichever orchestrator is enabled
1680
1681 :raises RuntimeError: If the remote method failed.
1682 :raises OrchestratorError: orchestrator failed to perform
1683 :raises ImportError: no `orchestrator` module or backend not found.
1684 """
1685 mgr = self.__get_mgr()
1686
1687 try:
1688 o = mgr._select_orchestrator()
1689 except AttributeError:
1690 o = mgr.remote('orchestrator', '_select_orchestrator')
1691
1692 if o is None:
1693 raise NoOrchestrator()
1694
1695 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1696 try:
1697 return mgr.remote(o, meth, *args, **kwargs)
1698 except Exception as e:
1699 if meth == 'get_feature_set':
1700 raise # self.get_feature_set() calls self._oremote()
1701 f_set = self.get_feature_set()
1702 if meth not in f_set or not f_set[meth]['available']:
1703 raise NotImplementedError(f'{o} does not implement {meth}') from e
1704 raise
1705
1706 def _orchestrator_wait(self, completions: List[Completion]) -> None:
1707 """
1708 Wait for completions to complete (reads) or
1709 become persistent (writes).
1710
1711 Waits for writes to be *persistent* but not *effective*.
1712
1713 :param completions: List of Completions
1714 :raises NoOrchestrator:
1715 :raises RuntimeError: something went wrong while calling the process method.
1716 :raises ImportError: no `orchestrator` module or backend not found.
1717 """
1718 while any(not c.has_result for c in completions):
1719 self.process(completions)
1720 self.__get_mgr().log.info("Operations pending: %s",
1721 sum(1 for c in completions if not c.has_result))
1722 if any(c.needs_result for c in completions):
1723 time.sleep(1)
1724 else:
1725 break