]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/orchestrator/_interface.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
CommitLineData
9f95a23c
TL
1
2"""
3ceph-mgr orchestrator interface
4
5Please see the ceph-mgr module developer's guide for more information.
6"""
e306af50
TL
7
8import copy
9import datetime
10import errno
9f95a23c
TL
11import logging
12import pickle
e306af50 13import re
9f95a23c 14import time
e306af50
TL
15import uuid
16
9f95a23c
TL
17from collections import namedtuple
18from functools import wraps
9f95a23c
TL
19
20from ceph.deployment import inventory
21from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
1911f103 22 ServiceSpecValidationError, IscsiServiceSpec
9f95a23c
TL
23from ceph.deployment.drive_group import DriveGroupSpec
24
25from mgr_module import MgrModule, CLICommand, HandleCommandResult
26
27try:
28 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
29 Type, Sequence, Dict, cast
30except ImportError:
31 pass
32
33logger = logging.getLogger(__name__)
34
35DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
36
37
38class OrchestratorError(Exception):
39 """
40 General orchestrator specific error.
41
42 Used for deployment, configuration or user errors.
43
44 It's not intended for programming errors or orchestrator internal errors.
45 """
46
47
48class NoOrchestrator(OrchestratorError):
49 """
50 No orchestrator in configured.
51 """
52 def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"):
53 super(NoOrchestrator, self).__init__(msg)
54
55
56class OrchestratorValidationError(OrchestratorError):
57 """
58 Raised when an orchestrator doesn't support a specific feature.
59 """
60
61
62def handle_exception(prefix, cmd_args, desc, perm, func):
63 @wraps(func)
64 def wrapper(*args, **kwargs):
65 try:
66 return func(*args, **kwargs)
67 except (OrchestratorError, ImportError, ServiceSpecValidationError) as e:
68 # Do not print Traceback for expected errors.
69 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
70 except NotImplementedError:
71 msg = 'This Orchestrator does not support `{}`'.format(prefix)
72 return HandleCommandResult(-errno.ENOENT, stderr=msg)
73
74 # misuse partial to copy `wrapper`
75 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs)
76 wrapper_copy._prefix = prefix # type: ignore
77 wrapper_copy._cli_command = CLICommand(prefix, cmd_args, desc, perm) # type: ignore
78 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
79
80 return wrapper_copy
81
82
83def _cli_command(perm):
84 def inner_cli_command(prefix, cmd_args="", desc=""):
85 return lambda func: handle_exception(prefix, cmd_args, desc, perm, func)
86 return inner_cli_command
87
88
89_cli_read_command = _cli_command('r')
90_cli_write_command = _cli_command('rw')
91
92
93class CLICommandMeta(type):
94 """
95 This is a workaround for the use of a global variable CLICommand.COMMANDS which
96 prevents modules from importing any other module.
97
98 We make use of CLICommand, except for the use of the global variable.
99 """
100 def __init__(cls, name, bases, dct):
101 super(CLICommandMeta, cls).__init__(name, bases, dct)
102 dispatch = {} # type: Dict[str, CLICommand]
103 for v in dct.values():
104 try:
105 dispatch[v._prefix] = v._cli_command
106 except AttributeError:
107 pass
108
109 def handle_command(self, inbuf, cmd):
110 if cmd['prefix'] not in dispatch:
111 return self.handle_command(inbuf, cmd)
112
113 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
114
115 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
116 cls.handle_command = handle_command
117
118
119def _no_result():
120 return object()
121
122
123class _Promise(object):
124 """
125 A completion may need multiple promises to be fulfilled. `_Promise` is one
126 step.
127
128 Typically ``Orchestrator`` implementations inherit from this class to
129 build their own way of finishing a step to fulfil a future.
130
131 They are not exposed in the orchestrator interface and can be seen as a
132 helper to build orchestrator modules.
133 """
134 INITIALIZED = 1 # We have a parent completion and a next completion
135 RUNNING = 2
136 FINISHED = 3 # we have a final result
137
138 NO_RESULT = _no_result() # type: None
139 ASYNC_RESULT = object()
140
141 def __init__(self,
142 _first_promise=None, # type: Optional["_Promise"]
143 value=NO_RESULT, # type: Optional[Any]
144 on_complete=None, # type: Optional[Callable]
145 name=None, # type: Optional[str]
146 ):
147 self._on_complete_ = on_complete
148 self._name = name
149 self._next_promise = None # type: Optional[_Promise]
150
151 self._state = self.INITIALIZED
152 self._exception = None # type: Optional[Exception]
153
154 # Value of this _Promise. may be an intermediate result.
155 self._value = value
156
157 # _Promise is not a continuation monad, as `_result` is of type
158 # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
159 self._first_promise = _first_promise or self # type: '_Promise'
160
161 @property
162 def _exception(self):
163 # type: () -> Optional[Exception]
164 return getattr(self, '_exception_', None)
165
166 @_exception.setter
167 def _exception(self, e):
168 self._exception_ = e
169 try:
170 self._serialized_exception_ = pickle.dumps(e) if e is not None else None
171 except pickle.PicklingError:
172 logger.error(f"failed to pickle {e}")
173 if isinstance(e, Exception):
174 e = Exception(*e.args)
175 else:
176 e = Exception(str(e))
177 # degenerate to a plain Exception
178 self._serialized_exception_ = pickle.dumps(e)
179
180 @property
181 def _serialized_exception(self):
182 # type: () -> Optional[bytes]
183 return getattr(self, '_serialized_exception_', None)
184
185
186
187 @property
188 def _on_complete(self):
189 # type: () -> Optional[Callable]
190 # https://github.com/python/mypy/issues/4125
191 return self._on_complete_
192
193 @_on_complete.setter
194 def _on_complete(self, val):
195 # type: (Optional[Callable]) -> None
196 self._on_complete_ = val
197
198
199 def __repr__(self):
200 name = self._name or getattr(self._on_complete, '__name__', '??') if self._on_complete else 'None'
201 val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
202 return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
203 self.__class__, self._state, val, self._on_complete, id(self), name, getattr(next, '_progress_reference', 'NA'), repr(self._next_promise)
204 )
205
206 def pretty_print_1(self):
207 if self._name:
208 name = self._name
209 elif self._on_complete is None:
210 name = 'lambda x: x'
211 elif hasattr(self._on_complete, '__name__'):
212 name = getattr(self._on_complete, '__name__')
213 else:
214 name = self._on_complete.__class__.__name__
215 val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...'
216 prefix = {
217 self.INITIALIZED: ' ',
218 self.RUNNING: ' >>>',
219 self.FINISHED: '(done)'
220 }[self._state]
221 return '{} {}({}),'.format(prefix, name, val)
222
223 def then(self, on_complete):
224 # type: (Any, Callable) -> Any
225 """
226 Call ``on_complete`` as soon as this promise is finalized.
227 """
228 assert self._state in (self.INITIALIZED, self.RUNNING)
229
230 if self._next_promise is not None:
231 return self._next_promise.then(on_complete)
232
233 if self._on_complete is not None:
234 self._set_next_promise(self.__class__(
235 _first_promise=self._first_promise,
236 on_complete=on_complete
237 ))
238 return self._next_promise
239
240 else:
241 self._on_complete = on_complete
242 self._set_next_promise(self.__class__(_first_promise=self._first_promise))
243 return self._next_promise
244
245 def _set_next_promise(self, next):
246 # type: (_Promise) -> None
247 assert self is not next
248 assert self._state in (self.INITIALIZED, self.RUNNING)
249
250 self._next_promise = next
251 assert self._next_promise is not None
252 for p in iter(self._next_promise):
253 p._first_promise = self._first_promise
254
255 def _finalize(self, value=NO_RESULT):
256 """
257 Sets this promise to complete.
258
259 Orchestrators may choose to use this helper function.
260
261 :param value: new value.
262 """
263 if self._state not in (self.INITIALIZED, self.RUNNING):
264 raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
265
266 self._state = self.RUNNING
267
268 if value is not self.NO_RESULT:
269 self._value = value
270 assert self._value is not self.NO_RESULT, repr(self)
271
272 if self._on_complete:
273 try:
274 next_result = self._on_complete(self._value)
275 except Exception as e:
276 self.fail(e)
277 return
278 else:
279 next_result = self._value
280
281 if isinstance(next_result, _Promise):
282 # hack: _Promise is not a continuation monad.
283 next_result = next_result._first_promise # type: ignore
284 assert next_result not in self, repr(self._first_promise) + repr(next_result)
285 assert self not in next_result
286 next_result._append_promise(self._next_promise)
287 self._set_next_promise(next_result)
288 assert self._next_promise
289 if self._next_promise._value is self.NO_RESULT:
290 self._next_promise._value = self._value
291 self.propagate_to_next()
292 elif next_result is not self.ASYNC_RESULT:
293 # simple map. simply forward
294 if self._next_promise:
295 self._next_promise._value = next_result
296 else:
297 # Hack: next_result is of type U, _value is of type T
298 self._value = next_result # type: ignore
299 self.propagate_to_next()
300 else:
301 # asynchronous promise
302 pass
303
304
305 def propagate_to_next(self):
306 self._state = self.FINISHED
307 logger.debug('finalized {}'.format(repr(self)))
308 if self._next_promise:
309 self._next_promise._finalize()
310
311 def fail(self, e):
312 # type: (Exception) -> None
313 """
314 Sets the whole completion to be faild with this exception and end the
315 evaluation.
316 """
317 if self._state == self.FINISHED:
318 raise ValueError(
319 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e)))
320 assert self._state in (self.INITIALIZED, self.RUNNING)
321 logger.exception('_Promise failed')
322 self._exception = e
323 self._value = f'_exception: {e}'
324 if self._next_promise:
325 self._next_promise.fail(e)
326 self._state = self.FINISHED
327
328 def __contains__(self, item):
329 return any(item is p for p in iter(self._first_promise))
330
331 def __iter__(self):
332 yield self
333 elem = self._next_promise
334 while elem is not None:
335 yield elem
336 elem = elem._next_promise
337
338 def _append_promise(self, other):
339 if other is not None:
340 assert self not in other
341 assert other not in self
342 self._last_promise()._set_next_promise(other)
343
344 def _last_promise(self):
345 # type: () -> _Promise
346 return list(iter(self))[-1]
347
348
349class ProgressReference(object):
350 def __init__(self,
351 message, # type: str
352 mgr,
353 completion=None # type: Optional[Callable[[], Completion]]
354 ):
355 """
356 ProgressReference can be used within Completions::
357
358 +---------------+ +---------------------------------+
359 | | then | |
360 | My Completion | +--> | on_complete=ProgressReference() |
361 | | | |
362 +---------------+ +---------------------------------+
363
364 See :func:`Completion.with_progress` for an easy way to create
365 a progress reference
366
367 """
368 super(ProgressReference, self).__init__()
369 self.progress_id = str(uuid.uuid4())
370 self.message = message
371 self.mgr = mgr
372
373 #: The completion can already have a result, before the write
374 #: operation is effective. progress == 1 means, the services are
375 #: created / removed.
376 self.completion = completion # type: Optional[Callable[[], Completion]]
377
378 #: if a orchestrator module can provide a more detailed
379 #: progress information, it needs to also call ``progress.update()``.
380 self.progress = 0.0
381
382 self._completion_has_result = False
383 self.mgr.all_progress_references.append(self)
384
385 def __str__(self):
386 """
387 ``__str__()`` is used for determining the message for progress events.
388 """
389 return self.message or super(ProgressReference, self).__str__()
390
391 def __call__(self, arg):
392 self._completion_has_result = True
393 self.progress = 1.0
394 return arg
395
396 @property
397 def progress(self):
398 return self._progress
399
400 @progress.setter
401 def progress(self, progress):
402 assert progress <= 1.0
403 self._progress = progress
404 try:
405 if self.effective:
406 self.mgr.remote("progress", "complete", self.progress_id)
407 self.mgr.all_progress_references = [p for p in self.mgr.all_progress_references if p is not self]
408 else:
409 self.mgr.remote("progress", "update", self.progress_id, self.message,
410 progress,
411 [("origin", "orchestrator")])
412 except ImportError:
413 # If the progress module is disabled that's fine,
414 # they just won't see the output.
415 pass
416
417 @property
418 def effective(self):
419 return self.progress == 1 and self._completion_has_result
420
421 def update(self):
422 def progress_run(progress):
423 self.progress = progress
424 if self.completion:
425 c = self.completion().then(progress_run)
426 self.mgr.process([c._first_promise])
427 else:
428 self.progress = 1
429
430 def fail(self):
431 self._completion_has_result = True
432 self.progress = 1
433
434
435class Completion(_Promise):
436 """
437 Combines multiple promises into one overall operation.
438
439 Completions are composable by being able to
440 call one completion from another completion. I.e. making them re-usable
441 using Promises E.g.::
442
1911f103
TL
443 >>> #doctest: +SKIP
444 ... return Orchestrator().get_hosts().then(self._create_osd)
9f95a23c
TL
445
446 where ``get_hosts`` returns a Completion of list of hosts and
447 ``_create_osd`` takes a list of hosts.
448
449 The concept behind this is to store the computation steps
450 explicit and then explicitly evaluate the chain:
451
1911f103
TL
452 >>> #doctest: +SKIP
453 ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
9f95a23c
TL
454 ... p.finalize(2)
455 ... assert p.result = "4"
456
457 or graphically::
458
459 +---------------+ +-----------------+
460 | | then | |
461 | lambda x: x*x | +--> | lambda x: str(x)|
462 | | | |
463 +---------------+ +-----------------+
464
465 """
466 def __init__(self,
467 _first_promise=None, # type: Optional["Completion"]
468 value=_Promise.NO_RESULT, # type: Any
469 on_complete=None, # type: Optional[Callable]
470 name=None, # type: Optional[str]
471 ):
472 super(Completion, self).__init__(_first_promise, value, on_complete, name)
473
474 @property
475 def _progress_reference(self):
476 # type: () -> Optional[ProgressReference]
477 if hasattr(self._on_complete, 'progress_id'):
478 return self._on_complete # type: ignore
479 return None
480
481 @property
482 def progress_reference(self):
483 # type: () -> Optional[ProgressReference]
484 """
485 ProgressReference. Marks this completion
486 as a write completeion.
487 """
488
489 references = [c._progress_reference for c in iter(self) if c._progress_reference is not None]
490 if references:
491 assert len(references) == 1
492 return references[0]
493 return None
494
495 @classmethod
496 def with_progress(cls, # type: Any
497 message, # type: str
498 mgr,
499 _first_promise=None, # type: Optional["Completion"]
500 value=_Promise.NO_RESULT, # type: Any
501 on_complete=None, # type: Optional[Callable]
502 calc_percent=None # type: Optional[Callable[[], Any]]
503 ):
504 # type: (...) -> Any
505
506 c = cls(
507 _first_promise=_first_promise,
508 value=value,
509 on_complete=on_complete
510 ).add_progress(message, mgr, calc_percent)
511
512 return c._first_promise
513
514 def add_progress(self,
515 message, # type: str
516 mgr,
517 calc_percent=None # type: Optional[Callable[[], Any]]
518 ):
519 return self.then(
520 on_complete=ProgressReference(
521 message=message,
522 mgr=mgr,
523 completion=calc_percent
524 )
525 )
526
527 def fail(self, e):
528 super(Completion, self).fail(e)
529 if self._progress_reference:
530 self._progress_reference.fail()
531
532 def finalize(self, result=_Promise.NO_RESULT):
533 if self._first_promise._state == self.INITIALIZED:
534 self._first_promise._finalize(result)
535
536 @property
537 def result(self):
538 """
539 The result of the operation that we were waited
540 for. Only valid after calling Orchestrator.process() on this
541 completion.
542 """
543 last = self._last_promise()
544 assert last._state == _Promise.FINISHED
545 return last._value
546
547 def result_str(self):
548 """Force a string."""
549 if self.result is None:
550 return ''
551 if isinstance(self.result, list):
552 return '\n'.join(str(x) for x in self.result)
553 return str(self.result)
554
555 @property
556 def exception(self):
557 # type: () -> Optional[Exception]
558 return self._last_promise()._exception
559
560 @property
561 def serialized_exception(self):
562 # type: () -> Optional[bytes]
563 return self._last_promise()._serialized_exception
564
565 @property
566 def has_result(self):
567 # type: () -> bool
568 """
569 Has the operation already a result?
570
571 For Write operations, it can already have a
572 result, if the orchestrator's configuration is
573 persistently written. Typically this would
574 indicate that an update had been written to
575 a manifest, but that the update had not
576 necessarily been pushed out to the cluster.
577
578 :return:
579 """
580 return self._last_promise()._state == _Promise.FINISHED
581
582 @property
583 def is_errored(self):
584 # type: () -> bool
585 """
586 Has the completion failed. Default implementation looks for
587 self.exception. Can be overwritten.
588 """
589 return self.exception is not None
590
591 @property
592 def needs_result(self):
593 # type: () -> bool
594 """
595 Could the external operation be deemed as complete,
596 or should we wait?
597 We must wait for a read operation only if it is not complete.
598 """
599 return not self.is_errored and not self.has_result
600
601 @property
602 def is_finished(self):
603 # type: () -> bool
604 """
605 Could the external operation be deemed as complete,
606 or should we wait?
607 We must wait for a read operation only if it is not complete.
608 """
609 return self.is_errored or (self.has_result)
610
611 def pretty_print(self):
612
613 reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise))
614 return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
615
616
617def pretty_print(completions):
618 # type: (Sequence[Completion]) -> str
619 return ', '.join(c.pretty_print() for c in completions)
620
621
622def raise_if_exception(c):
623 # type: (Completion) -> None
624 """
625 :raises OrchestratorError: Some user error or a config error.
626 :raises Exception: Some internal error
627 """
628 if c.serialized_exception is not None:
629 try:
630 e = pickle.loads(c.serialized_exception)
631 except (KeyError, AttributeError):
632 raise Exception('{}: {}'.format(type(c.exception), c.exception))
633 raise e
634
635
636class TrivialReadCompletion(Completion):
637 """
638 This is the trivial completion simply wrapping a result.
639 """
640 def __init__(self, result):
641 super(TrivialReadCompletion, self).__init__()
642 if result:
643 self.finalize(result)
644
645
646def _hide_in_features(f):
647 f._hide_in_features = True
648 return f
649
650
651class Orchestrator(object):
652 """
653 Calls in this class may do long running remote operations, with time
654 periods ranging from network latencies to package install latencies and large
655 internet downloads. For that reason, all are asynchronous, and return
656 ``Completion`` objects.
657
658 Methods should only return the completion and not directly execute
659 anything, like network calls. Otherwise the purpose of
660 those completions is defeated.
661
662 Implementations are not required to start work on an operation until
663 the caller waits on the relevant Completion objects. Callers making
664 multiple updates should not wait on Completions until they're done
665 sending operations: this enables implementations to batch up a series
666 of updates when wait() is called on a set of Completion objects.
667
668 Implementations are encouraged to keep reasonably fresh caches of
669 the status of the system: it is better to serve a stale-but-recent
670 result read of e.g. device inventory than it is to keep the caller waiting
671 while you scan hosts every time.
672 """
673
674 @_hide_in_features
675 def is_orchestrator_module(self):
676 """
677 Enable other modules to interrogate this module to discover
678 whether it's usable as an orchestrator module.
679
680 Subclasses do not need to override this.
681 """
682 return True
683
684 @_hide_in_features
685 def available(self):
686 # type: () -> Tuple[bool, str]
687 """
688 Report whether we can talk to the orchestrator. This is the
689 place to give the user a meaningful message if the orchestrator
690 isn't running or can't be contacted.
691
692 This method may be called frequently (e.g. every page load
693 to conditionally display a warning banner), so make sure it's
694 not too expensive. It's okay to give a slightly stale status
695 (e.g. based on a periodic background ping of the orchestrator)
696 if that's necessary to make this method fast.
697
698 .. note::
699 `True` doesn't mean that the desired functionality
700 is actually available in the orchestrator. I.e. this
701 won't work as expected::
702
1911f103
TL
703 >>> #doctest: +SKIP
704 ... if OrchestratorClientMixin().available()[0]: # wrong.
9f95a23c
TL
705 ... OrchestratorClientMixin().get_hosts()
706
707 :return: two-tuple of boolean, string
708 """
709 raise NotImplementedError()
710
711 @_hide_in_features
712 def process(self, completions):
713 # type: (List[Completion]) -> None
714 """
715 Given a list of Completion instances, process any which are
716 incomplete.
717
718 Callers should inspect the detail of each completion to identify
719 partial completion/progress information, and present that information
720 to the user.
721
722 This method should not block, as this would make it slow to query
723 a status, while other long running operations are in progress.
724 """
725 raise NotImplementedError()
726
727 @_hide_in_features
728 def get_feature_set(self):
729 """Describes which methods this orchestrator implements
730
731 .. note::
732 `True` doesn't mean that the desired functionality
733 is actually possible in the orchestrator. I.e. this
734 won't work as expected::
735
1911f103
TL
736 >>> #doctest: +SKIP
737 ... api = OrchestratorClientMixin()
9f95a23c
TL
738 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
739 ... api.get_hosts()
740
741 It's better to ask for forgiveness instead::
742
1911f103
TL
743 >>> #doctest: +SKIP
744 ... try:
9f95a23c
TL
745 ... OrchestratorClientMixin().get_hosts()
746 ... except (OrchestratorError, NotImplementedError):
747 ... ...
748
749 :returns: Dict of API method names to ``{'available': True or False}``
750 """
751 module = self.__class__
752 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
753 for a in Orchestrator.__dict__
754 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
755 }
756 return features
757
758 def cancel_completions(self):
759 # type: () -> None
760 """
761 Cancels ongoing completions. Unstuck the mgr.
762 """
763 raise NotImplementedError()
764
765 def pause(self):
766 # type: () -> None
767 raise NotImplementedError()
768
769 def resume(self):
770 # type: () -> None
771 raise NotImplementedError()
772
773 def add_host(self, host_spec):
774 # type: (HostSpec) -> Completion
775 """
776 Add a host to the orchestrator inventory.
777
778 :param host: hostname
779 """
780 raise NotImplementedError()
781
782 def remove_host(self, host):
783 # type: (str) -> Completion
784 """
785 Remove a host from the orchestrator inventory.
786
787 :param host: hostname
788 """
789 raise NotImplementedError()
790
791 def update_host_addr(self, host, addr):
792 # type: (str, str) -> Completion
793 """
794 Update a host's address
795
796 :param host: hostname
797 :param addr: address (dns name or IP)
798 """
799 raise NotImplementedError()
800
801 def get_hosts(self):
802 # type: () -> Completion
803 """
804 Report the hosts in the cluster.
805
806 :return: list of HostSpec
807 """
808 raise NotImplementedError()
809
810 def add_host_label(self, host, label):
811 # type: (str, str) -> Completion
812 """
813 Add a host label
814 """
815 raise NotImplementedError()
816
817 def remove_host_label(self, host, label):
818 # type: (str, str) -> Completion
819 """
820 Remove a host label
821 """
822 raise NotImplementedError()
823
824 def get_inventory(self, host_filter=None, refresh=False):
825 # type: (Optional[InventoryFilter], bool) -> Completion
826 """
827 Returns something that was created by `ceph-volume inventory`.
828
829 :return: list of InventoryHost
830 """
831 raise NotImplementedError()
832
833 def describe_service(self, service_type=None, service_name=None, refresh=False):
834 # type: (Optional[str], Optional[str], bool) -> Completion
835 """
836 Describe a service (of any kind) that is already configured in
837 the orchestrator. For example, when viewing an OSD in the dashboard
838 we might like to also display information about the orchestrator's
839 view of the service (like the kubernetes pod ID).
840
841 When viewing a CephFS filesystem in the dashboard, we would use this
842 to display the pods being currently run for MDS daemons.
843
844 :return: list of ServiceDescription objects.
845 """
846 raise NotImplementedError()
847
801d1391
TL
848 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False):
849 # type: (Optional[str], Optional[str], Optional[str], Optional[str], bool) -> Completion
9f95a23c
TL
850 """
851 Describe a daemon (of any kind) that is already configured in
852 the orchestrator.
853
854 :return: list of DaemonDescription objects.
855 """
856 raise NotImplementedError()
857
e306af50 858 def apply(self, specs: List["GenericSpec"]) -> Completion:
9f95a23c
TL
859 """
860 Applies any spec
861 """
e306af50 862 fns: Dict[str, function] = {
9f95a23c
TL
863 'alertmanager': self.apply_alertmanager,
864 'crash': self.apply_crash,
865 'grafana': self.apply_grafana,
e306af50 866 'iscsi': self.apply_iscsi,
9f95a23c
TL
867 'mds': self.apply_mds,
868 'mgr': self.apply_mgr,
869 'mon': self.apply_mon,
e306af50 870 'nfs': self.apply_nfs,
9f95a23c 871 'node-exporter': self.apply_node_exporter,
e306af50 872 'osd': lambda dg: self.apply_drivegroups([dg]),
9f95a23c
TL
873 'prometheus': self.apply_prometheus,
874 'rbd-mirror': self.apply_rbd_mirror,
e306af50
TL
875 'rgw': self.apply_rgw,
876 'host': self.add_host,
9f95a23c
TL
877 }
878
879 def merge(ls, r):
880 if isinstance(ls, list):
881 return ls + [r]
882 return [ls, r]
883
884 spec, *specs = specs
885
e306af50
TL
886 fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
887 completion = fn(spec)
9f95a23c
TL
888 for s in specs:
889 def next(ls):
e306af50
TL
890 fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
891 return fn(s).then(lambda r: merge(ls, r))
9f95a23c
TL
892 completion = completion.then(next)
893 return completion
894
895 def remove_daemons(self, names):
896 # type: (List[str]) -> Completion
897 """
898 Remove specific daemon(s).
899
900 :return: None
901 """
902 raise NotImplementedError()
903
9f95a23c
TL
904 def remove_service(self, service_name):
905 # type: (str) -> Completion
906 """
907 Remove a service (a collection of daemons).
908
909 :return: None
910 """
911 raise NotImplementedError()
912
913 def service_action(self, action, service_name):
914 # type: (str, str) -> Completion
915 """
916 Perform an action (start/stop/reload) on a service (i.e., all daemons
917 providing the logical service).
918
919 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
920 :param service_type: e.g. "mds", "rgw", ...
921 :param service_name: name of logical service ("cephfs", "us-east", ...)
922 :rtype: Completion
923 """
924 #assert action in ["start", "stop", "reload, "restart", "redeploy"]
925 raise NotImplementedError()
926
927 def daemon_action(self, action, daemon_type, daemon_id):
928 # type: (str, str, str) -> Completion
929 """
930 Perform an action (start/stop/reload) on a daemon.
931
932 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
933 :param name: name of daemon
934 :rtype: Completion
935 """
936 #assert action in ["start", "stop", "reload, "restart", "redeploy"]
937 raise NotImplementedError()
938
939 def create_osds(self, drive_group):
940 # type: (DriveGroupSpec) -> Completion
941 """
942 Create one or more OSDs within a single Drive Group.
943
944 The principal argument here is the drive_group member
945 of OsdSpec: other fields are advisory/extensible for any
946 finer-grained OSD feature enablement (choice of backing store,
947 compression/encryption, etc).
948 """
949 raise NotImplementedError()
950
951 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion:
952 """ Update OSD cluster """
953 raise NotImplementedError()
954
e306af50
TL
955 def set_unmanaged_flag(self,
956 unmanaged_flag: bool,
957 service_type: str = 'osd',
958 service_name=None
959 ) -> HandleCommandResult:
1911f103
TL
960 raise NotImplementedError()
961
e306af50
TL
962 def preview_osdspecs(self,
963 osdspec_name: Optional[str] = 'osd',
964 osdspecs: Optional[List[DriveGroupSpec]] = None
965 ) -> Completion:
1911f103
TL
966 """ Get a preview for OSD deployments """
967 raise NotImplementedError()
968
9f95a23c
TL
969 def remove_osds(self, osd_ids: List[str],
970 replace: bool = False,
971 force: bool = False) -> Completion:
972 """
973 :param osd_ids: list of OSD IDs
974 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
975 :param force: Forces the OSD removal process without waiting for the data to be drained first.
976 Note that this can only remove OSDs that were successfully
977 created (i.e. got an OSD ID).
978 """
979 raise NotImplementedError()
980
981 def remove_osds_status(self):
982 # type: () -> Completion
983 """
984 Returns a status of the ongoing OSD removal operations.
985 """
986 raise NotImplementedError()
987
988 def blink_device_light(self, ident_fault, on, locations):
989 # type: (str, bool, List[DeviceLightLoc]) -> Completion
990 """
991 Instructs the orchestrator to enable or disable either the ident or the fault LED.
992
993 :param ident_fault: either ``"ident"`` or ``"fault"``
994 :param on: ``True`` = on.
995 :param locations: See :class:`orchestrator.DeviceLightLoc`
996 """
997 raise NotImplementedError()
998
999 def zap_device(self, host, path):
1000 # type: (str, str) -> Completion
1001 """Zap/Erase a device (DESTROYS DATA)"""
1002 raise NotImplementedError()
1003
1004 def add_mon(self, spec):
1005 # type: (ServiceSpec) -> Completion
1006 """Create mon daemon(s)"""
1007 raise NotImplementedError()
1008
1009 def apply_mon(self, spec):
1010 # type: (ServiceSpec) -> Completion
1011 """Update mon cluster"""
1012 raise NotImplementedError()
1013
1014 def add_mgr(self, spec):
1015 # type: (ServiceSpec) -> Completion
1016 """Create mgr daemon(s)"""
1017 raise NotImplementedError()
1018
1019 def apply_mgr(self, spec):
1020 # type: (ServiceSpec) -> Completion
1021 """Update mgr cluster"""
1022 raise NotImplementedError()
1023
1024 def add_mds(self, spec):
1025 # type: (ServiceSpec) -> Completion
1026 """Create MDS daemon(s)"""
1027 raise NotImplementedError()
1028
1029 def apply_mds(self, spec):
1030 # type: (ServiceSpec) -> Completion
1031 """Update MDS cluster"""
1032 raise NotImplementedError()
1033
801d1391
TL
1034 def add_rgw(self, spec):
1035 # type: (RGWSpec) -> Completion
1036 """Create RGW daemon(s)"""
1037 raise NotImplementedError()
1038
1039 def apply_rgw(self, spec):
1040 # type: (RGWSpec) -> Completion
1041 """Update RGW cluster"""
1042 raise NotImplementedError()
1043
9f95a23c
TL
1044 def add_rbd_mirror(self, spec):
1045 # type: (ServiceSpec) -> Completion
1046 """Create rbd-mirror daemon(s)"""
1047 raise NotImplementedError()
1048
1049 def apply_rbd_mirror(self, spec):
1050 # type: (ServiceSpec) -> Completion
1051 """Update rbd-mirror cluster"""
1052 raise NotImplementedError()
1053
1054 def add_nfs(self, spec):
1055 # type: (NFSServiceSpec) -> Completion
1056 """Create NFS daemon(s)"""
1057 raise NotImplementedError()
1058
1059 def apply_nfs(self, spec):
1060 # type: (NFSServiceSpec) -> Completion
1061 """Update NFS cluster"""
1062 raise NotImplementedError()
1063
1911f103
TL
1064 def add_iscsi(self, spec):
1065 # type: (IscsiServiceSpec) -> Completion
1066 """Create iscsi daemon(s)"""
1067 raise NotImplementedError()
1068
1069 def apply_iscsi(self, spec):
1070 # type: (IscsiServiceSpec) -> Completion
1071 """Update iscsi cluster"""
1072 raise NotImplementedError()
1073
9f95a23c
TL
1074 def add_prometheus(self, spec):
1075 # type: (ServiceSpec) -> Completion
1076 """Create new prometheus daemon"""
1077 raise NotImplementedError()
1078
1079 def apply_prometheus(self, spec):
1080 # type: (ServiceSpec) -> Completion
1081 """Update prometheus cluster"""
1082 raise NotImplementedError()
1083
1084 def add_node_exporter(self, spec):
1085 # type: (ServiceSpec) -> Completion
1086 """Create a new Node-Exporter service"""
1087 raise NotImplementedError()
1088
1089 def apply_node_exporter(self, spec):
1090 # type: (ServiceSpec) -> Completion
1091 """Update existing a Node-Exporter daemon(s)"""
1092 raise NotImplementedError()
1093
1094 def add_crash(self, spec):
1095 # type: (ServiceSpec) -> Completion
1096 """Create a new crash service"""
1097 raise NotImplementedError()
1098
1099 def apply_crash(self, spec):
1100 # type: (ServiceSpec) -> Completion
1101 """Update existing a crash daemon(s)"""
1102 raise NotImplementedError()
1103
1104 def add_grafana(self, spec):
1105 # type: (ServiceSpec) -> Completion
1106 """Create a new Node-Exporter service"""
1107 raise NotImplementedError()
1108
1109 def apply_grafana(self, spec):
1110 # type: (ServiceSpec) -> Completion
1111 """Update existing a Node-Exporter daemon(s)"""
1112 raise NotImplementedError()
1113
1114 def add_alertmanager(self, spec):
1115 # type: (ServiceSpec) -> Completion
1116 """Create a new AlertManager service"""
1117 raise NotImplementedError()
1118
1119 def apply_alertmanager(self, spec):
1120 # type: (ServiceSpec) -> Completion
1121 """Update an existing AlertManager daemon(s)"""
1122 raise NotImplementedError()
1123
1124 def upgrade_check(self, image, version):
1125 # type: (Optional[str], Optional[str]) -> Completion
1126 raise NotImplementedError()
1127
1128 def upgrade_start(self, image, version):
1129 # type: (Optional[str], Optional[str]) -> Completion
1130 raise NotImplementedError()
1131
1132 def upgrade_pause(self):
1133 # type: () -> Completion
1134 raise NotImplementedError()
1135
1136 def upgrade_resume(self):
1137 # type: () -> Completion
1138 raise NotImplementedError()
1139
1140 def upgrade_stop(self):
1141 # type: () -> Completion
1142 raise NotImplementedError()
1143
1144 def upgrade_status(self):
1145 # type: () -> Completion
1146 """
1147 If an upgrade is currently underway, report on where
1148 we are in the process, or if some error has occurred.
1149
1150 :return: UpgradeStatusSpec instance
1151 """
1152 raise NotImplementedError()
1153
1154 @_hide_in_features
1155 def upgrade_available(self):
1156 # type: () -> Completion
1157 """
1158 Report on what versions are available to upgrade to
1159
1160 :return: List of strings
1161 """
1162 raise NotImplementedError()
1163
1164
1165class HostSpec(object):
1166 """
1167 Information about hosts. Like e.g. ``kubectl get nodes``
1168 """
1169 def __init__(self,
1170 hostname, # type: str
1171 addr=None, # type: Optional[str]
1172 labels=None, # type: Optional[List[str]]
1173 status=None, # type: Optional[str]
1174 ):
e306af50
TL
1175 self.service_type = 'host'
1176
9f95a23c
TL
1177 #: the bare hostname on the host. Not the FQDN.
1178 self.hostname = hostname # type: str
1179
1180 #: DNS name or IP address to reach it
1181 self.addr = addr or hostname # type: str
1182
1183 #: label(s), if any
1184 self.labels = labels or [] # type: List[str]
1185
1186 #: human readable status
1187 self.status = status or '' # type: str
1188
1189 def to_json(self):
1190 return {
1191 'hostname': self.hostname,
1192 'addr': self.addr,
1193 'labels': self.labels,
1194 'status': self.status,
1195 }
1196
e306af50
TL
1197 @classmethod
1198 def from_json(cls, host_spec):
1199 _cls = cls(host_spec['hostname'],
1200 host_spec['addr'] if 'addr' in host_spec else None,
1201 host_spec['labels'] if 'labels' in host_spec else None)
1202 return _cls
1203
9f95a23c
TL
1204 def __repr__(self):
1205 args = [self.hostname] # type: List[Any]
1206 if self.addr is not None:
1207 args.append(self.addr)
1208 if self.labels:
1209 args.append(self.labels)
1210 if self.status:
1211 args.append(self.status)
1212
1213 return "<HostSpec>({})".format(', '.join(map(repr, args)))
1214
1215 def __eq__(self, other):
1216 # Let's omit `status` for the moment, as it is still the very same host.
1217 return self.hostname == other.hostname and \
1218 self.addr == other.addr and \
1219 self.labels == other.labels
1220
e306af50
TL
1221GenericSpec = Union[ServiceSpec, HostSpec]
1222
1223def json_to_generic_spec(spec):
1224 # type: (dict) -> GenericSpec
1225 if 'service_type' in spec and spec['service_type'] == 'host':
1226 return HostSpec.from_json(spec)
1227 else:
1228 return ServiceSpec.from_json(spec)
9f95a23c
TL
1229
1230class UpgradeStatusSpec(object):
1231 # Orchestrator's report on what's going on with any ongoing upgrade
1232 def __init__(self):
1233 self.in_progress = False # Is an upgrade underway?
1234 self.target_image = None
1235 self.services_complete = [] # Which daemon types are fully updated?
1236 self.message = "" # Freeform description
1237
1238
1239def handle_type_error(method):
1240 @wraps(method)
1241 def inner(cls, *args, **kwargs):
1242 try:
1243 return method(cls, *args, **kwargs)
1244 except TypeError as e:
1245 error_msg = '{}: {}'.format(cls.__name__, e)
1246 raise OrchestratorValidationError(error_msg)
1247 return inner
1248
1249
1250class DaemonDescription(object):
1251 """
1252 For responding to queries about the status of a particular daemon,
1253 stateful or stateless.
1254
1255 This is not about health or performance monitoring of daemons: it's
1256 about letting the orchestrator tell Ceph whether and where a
1257 daemon is scheduled in the cluster. When an orchestrator tells
1258 Ceph "it's running on host123", that's not a promise that the process
1259 is literally up this second, it's a description of where the orchestrator
1260 has decided the daemon should run.
1261 """
1262
1263 def __init__(self,
1264 daemon_type=None,
1265 daemon_id=None,
1266 hostname=None,
1267 container_id=None,
1268 container_image_id=None,
1269 container_image_name=None,
1270 version=None,
1271 status=None,
1272 status_desc=None,
1273 last_refresh=None,
1274 created=None,
1275 started=None,
1276 last_configured=None,
e306af50 1277 osdspec_affinity=None,
9f95a23c
TL
1278 last_deployed=None):
1279 # Host is at the same granularity as InventoryHost
1280 self.hostname = hostname
1281
1282 # Not everyone runs in containers, but enough people do to
1283 # justify having the container_id (runtime id) and container_image
1284 # (image name)
1285 self.container_id = container_id # runtime id
1286 self.container_image_id = container_image_id # image hash
1287 self.container_image_name = container_image_name # image friendly name
1288
1289 # The type of service (osd, mon, mgr, etc.)
1290 self.daemon_type = daemon_type
1291
1292 # The orchestrator will have picked some names for daemons,
1293 # typically either based on hostnames or on pod names.
1294 # This is the <foo> in mds.<foo>, the ID that will appear
1295 # in the FSMap/ServiceMap.
1296 self.daemon_id = daemon_id
1297
1298 # Service version that was deployed
1299 self.version = version
1300
1301 # Service status: -1 error, 0 stopped, 1 running
1302 self.status = status
1303
1304 # Service status description when status == -1.
1305 self.status_desc = status_desc
1306
1307 # datetime when this info was last refreshed
1308 self.last_refresh = last_refresh # type: Optional[datetime.datetime]
1309
1310 self.created = created # type: Optional[datetime.datetime]
1311 self.started = started # type: Optional[datetime.datetime]
1312 self.last_configured = last_configured # type: Optional[datetime.datetime]
1313 self.last_deployed = last_deployed # type: Optional[datetime.datetime]
1314
e306af50
TL
1315 # Affinity to a certain OSDSpec
1316 self.osdspec_affinity = osdspec_affinity # type: Optional[str]
1317
9f95a23c
TL
1318 def name(self):
1319 return '%s.%s' % (self.daemon_type, self.daemon_id)
1320
1321 def matches_service(self, service_name):
1322 # type: (Optional[str]) -> bool
1323 if service_name:
1324 return self.name().startswith(service_name + '.')
1325 return False
1326
1911f103 1327 def service_id(self):
e306af50
TL
1328 def _match():
1329 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: " \
1330 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
1331
1332 if not self.hostname:
1333 # TODO: can a DaemonDescription exist without a hostname?
1334 raise err
1335
1336 # use the bare hostname, not the FQDN.
1337 host = self.hostname.split('.')[0]
1338
1339 if host == self.daemon_id:
1340 # daemon_id == "host"
1341 return self.daemon_id
1342
1343 elif host in self.daemon_id:
1344 # daemon_id == "service_id.host"
1345 # daemon_id == "service_id.host.random"
1346 pre, post = self.daemon_id.rsplit(host, 1)
1347 if not pre.endswith('.'):
1348 # '.' sep missing at front of host
1349 raise err
1350 elif post and not post.startswith('.'):
1351 # '.' sep missing at end of host
1352 raise err
1911f103 1353 return pre[:-1]
e306af50
TL
1354
1355 # daemon_id == "service_id.random"
1356 if self.daemon_type == 'rgw':
1911f103 1357 v = self.daemon_id.split('.')
e306af50 1358 if len(v) in [3, 4]:
1911f103 1359 return '.'.join(v[0:2])
e306af50
TL
1360
1361 # daemon_id == "service_id"
1362 return self.daemon_id
1363
1364 if self.daemon_type in ['mds', 'nfs', 'iscsi', 'rgw']:
1365 return _match()
1366
1367 return self.daemon_id
1911f103
TL
1368
1369 def service_name(self):
1370 if self.daemon_type in ['rgw', 'mds', 'nfs', 'iscsi']:
1371 return f'{self.daemon_type}.{self.service_id()}'
9f95a23c
TL
1372 return self.daemon_type
1373
1374 def __repr__(self):
1375 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
1376 id=self.daemon_id)
1377
1378 def to_json(self):
1379 out = {
1380 'hostname': self.hostname,
1381 'container_id': self.container_id,
1382 'container_image_id': self.container_image_id,
1383 'container_image_name': self.container_image_name,
1384 'daemon_id': self.daemon_id,
1385 'daemon_type': self.daemon_type,
1386 'version': self.version,
1387 'status': self.status,
1388 'status_desc': self.status_desc,
1389 }
1390 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1391 'last_configured']:
1392 if getattr(self, k):
1393 out[k] = getattr(self, k).strftime(DATEFMT)
1394 return {k: v for (k, v) in out.items() if v is not None}
1395
1396 @classmethod
1397 @handle_type_error
1398 def from_json(cls, data):
1399 c = data.copy()
1400 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1401 'last_configured']:
1402 if k in c:
1403 c[k] = datetime.datetime.strptime(c[k], DATEFMT)
1404 return cls(**c)
1405
1911f103
TL
1406 def __copy__(self):
1407 # feel free to change this:
1408 return DaemonDescription.from_json(self.to_json())
1409
9f95a23c
TL
1410class ServiceDescription(object):
1411 """
1412 For responding to queries about the status of a particular service,
1413 stateful or stateless.
1414
1415 This is not about health or performance monitoring of services: it's
1416 about letting the orchestrator tell Ceph whether and where a
1417 service is scheduled in the cluster. When an orchestrator tells
1418 Ceph "it's running on host123", that's not a promise that the process
1419 is literally up this second, it's a description of where the orchestrator
1420 has decided the service should run.
1421 """
1422
1423 def __init__(self,
1911f103 1424 spec: ServiceSpec,
9f95a23c
TL
1425 container_image_id=None,
1426 container_image_name=None,
9f95a23c
TL
1427 rados_config_location=None,
1428 service_url=None,
1429 last_refresh=None,
1430 created=None,
1431 size=0,
1911f103 1432 running=0):
9f95a23c
TL
1433 # Not everyone runs in containers, but enough people do to
1434 # justify having the container_image_id (image hash) and container_image
1435 # (image name)
1436 self.container_image_id = container_image_id # image hash
1437 self.container_image_name = container_image_name # image friendly name
1438
9f95a23c
TL
1439 # Location of the service configuration when stored in rados
1440 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1441 self.rados_config_location = rados_config_location
1442
1443 # If the service exposes REST-like API, this attribute should hold
1444 # the URL.
1445 self.service_url = service_url
1446
1447 # Number of daemons
1448 self.size = size
1449
1450 # Number of daemons up
1451 self.running = running
1452
1453 # datetime when this info was last refreshed
1454 self.last_refresh = last_refresh # type: Optional[datetime.datetime]
1455 self.created = created # type: Optional[datetime.datetime]
1456
1911f103 1457 self.spec: ServiceSpec = spec
9f95a23c
TL
1458
1459 def service_type(self):
1911f103 1460 return self.spec.service_type
9f95a23c
TL
1461
1462 def __repr__(self):
1911f103 1463 return f"<ServiceDescription of {self.spec.one_line_str()}>"
9f95a23c
TL
1464
1465 def to_json(self):
1911f103
TL
1466 out = self.spec.to_json()
1467 status = {
9f95a23c
TL
1468 'container_image_id': self.container_image_id,
1469 'container_image_name': self.container_image_name,
9f95a23c
TL
1470 'rados_config_location': self.rados_config_location,
1471 'service_url': self.service_url,
1472 'size': self.size,
1473 'running': self.running,
1911f103
TL
1474 'last_refresh': self.last_refresh,
1475 'created': self.created
9f95a23c
TL
1476 }
1477 for k in ['last_refresh', 'created']:
1478 if getattr(self, k):
1911f103
TL
1479 status[k] = getattr(self, k).strftime(DATEFMT)
1480 status = {k: v for (k, v) in status.items() if v is not None}
1481 out['status'] = status
1482 return out
9f95a23c
TL
1483
1484 @classmethod
1485 @handle_type_error
1911f103 1486 def from_json(cls, data: dict):
9f95a23c 1487 c = data.copy()
1911f103
TL
1488 status = c.pop('status', {})
1489 spec = ServiceSpec.from_json(c)
1490
1491 c_status = status.copy()
9f95a23c 1492 for k in ['last_refresh', 'created']:
1911f103
TL
1493 if k in c_status:
1494 c_status[k] = datetime.datetime.strptime(c_status[k], DATEFMT)
1495 return cls(spec=spec, **c_status)
9f95a23c
TL
1496
1497
1498class InventoryFilter(object):
1499 """
1500 When fetching inventory, use this filter to avoid unnecessarily
1501 scanning the whole estate.
1502
1503 Typical use: filter by host when presenting UI workflow for configuring
1504 a particular server.
1505 filter by label when not all of estate is Ceph servers,
1506 and we want to only learn about the Ceph servers.
1507 filter by label when we are interested particularly
1508 in e.g. OSD servers.
1509
1510 """
1511 def __init__(self, labels=None, hosts=None):
1512 # type: (Optional[List[str]], Optional[List[str]]) -> None
1513
1514 #: Optional: get info about hosts matching labels
1515 self.labels = labels
1516
1517 #: Optional: get info about certain named hosts only
1518 self.hosts = hosts
1519
1520
1521class InventoryHost(object):
1522 """
1523 When fetching inventory, all Devices are groups inside of an
1524 InventoryHost.
1525 """
1526 def __init__(self, name, devices=None, labels=None, addr=None):
1527 # type: (str, Optional[inventory.Devices], Optional[List[str]], Optional[str]) -> None
1528 if devices is None:
1529 devices = inventory.Devices([])
1530 if labels is None:
1531 labels = []
1532 assert isinstance(devices, inventory.Devices)
1533
1534 self.name = name # unique within cluster. For example a hostname.
1535 self.addr = addr or name
1536 self.devices = devices
1537 self.labels = labels
1538
1539 def to_json(self):
1540 return {
1541 'name': self.name,
1542 'addr': self.addr,
1543 'devices': self.devices.to_json(),
1544 'labels': self.labels,
1545 }
1546
1547 @classmethod
1548 def from_json(cls, data):
1549 try:
1550 _data = copy.deepcopy(data)
1551 name = _data.pop('name')
1552 addr = _data.pop('addr', None) or name
1553 devices = inventory.Devices.from_json(_data.pop('devices'))
1911f103 1554 labels = _data.pop('labels', list())
9f95a23c
TL
1555 if _data:
1556 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1557 raise OrchestratorValidationError(error_msg)
9f95a23c
TL
1558 return cls(name, devices, labels, addr)
1559 except KeyError as e:
1560 error_msg = '{} is required for {}'.format(e, cls.__name__)
1561 raise OrchestratorValidationError(error_msg)
1562 except TypeError as e:
1563 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1564
1565
1566 @classmethod
1567 def from_nested_items(cls, hosts):
1568 devs = inventory.Devices.from_json
1569 return [cls(item[0], devs(item[1].data)) for item in hosts]
1570
1571 def __repr__(self):
1572 return "<InventoryHost>({name})".format(name=self.name)
1573
1574 @staticmethod
1575 def get_host_names(hosts):
1576 # type: (List[InventoryHost]) -> List[str]
1577 return [host.name for host in hosts]
1578
1579 def __eq__(self, other):
1580 return self.name == other.name and self.devices == other.devices
1581
1582
1583class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1584 """
1585 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1586 on devices.
1587
1588 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1589
1590 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1591 See ``ceph osd metadata | jq '.[].device_ids'``
1592 """
1593 __slots__ = ()
1594
1595
1596def _mk_orch_methods(cls):
1597 # Needs to be defined outside of for.
1598 # Otherwise meth is always bound to last key
1599 def shim(method_name):
1600 def inner(self, *args, **kwargs):
1601 completion = self._oremote(method_name, args, kwargs)
1602 return completion
1603 return inner
1604
1605 for meth in Orchestrator.__dict__:
1606 if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
1607 setattr(cls, meth, shim(meth))
1608 return cls
1609
1610
1611@_mk_orch_methods
1612class OrchestratorClientMixin(Orchestrator):
1613 """
1614 A module that inherents from `OrchestratorClientMixin` can directly call
1615 all :class:`Orchestrator` methods without manually calling remote.
1616
1617 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1618 calls :func:`OrchestratorClientMixin._oremote`
1619
1620 >>> class MyModule(OrchestratorClientMixin):
1621 ... def func(self):
1622 ... completion = self.add_host('somehost') # calls `_oremote()`
1623 ... self._orchestrator_wait([completion])
1624 ... self.log.debug(completion.result)
1625
1626 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1627 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1628 "real" implementation of the orchestrator.
1629
1630
1631 >>> import mgr_module
1911f103
TL
1632 >>> #doctest: +SKIP
1633 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
9f95a23c
TL
1634 ... def __init__(self, ...):
1635 ... self.orch_client = OrchestratorClientMixin()
1636 ... self.orch_client.set_mgr(self.mgr))
1637 """
1638
1639 def set_mgr(self, mgr):
1640 # type: (MgrModule) -> None
1641 """
1642 Useable in the Dashbord that uses a global ``mgr``
1643 """
1644
1645 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1646
1647 def __get_mgr(self):
1648 try:
1649 return self.__mgr
1650 except AttributeError:
1651 return self
1652
1653 def _oremote(self, meth, args, kwargs):
1654 """
1655 Helper for invoking `remote` on whichever orchestrator is enabled
1656
1657 :raises RuntimeError: If the remote method failed.
1658 :raises OrchestratorError: orchestrator failed to perform
1659 :raises ImportError: no `orchestrator` module or backend not found.
1660 """
1661 mgr = self.__get_mgr()
1662
1663 try:
1664 o = mgr._select_orchestrator()
1665 except AttributeError:
1666 o = mgr.remote('orchestrator', '_select_orchestrator')
1667
1668 if o is None:
1669 raise NoOrchestrator()
1670
1671 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1672 try:
1673 return mgr.remote(o, meth, *args, **kwargs)
1674 except Exception as e:
1675 if meth == 'get_feature_set':
1676 raise # self.get_feature_set() calls self._oremote()
1677 f_set = self.get_feature_set()
1678 if meth not in f_set or not f_set[meth]['available']:
1679 raise NotImplementedError(f'{o} does not implement {meth}') from e
1680 raise
1681
1682 def _orchestrator_wait(self, completions):
1683 # type: (List[Completion]) -> None
1684 """
1685 Wait for completions to complete (reads) or
1686 become persistent (writes).
1687
1688 Waits for writes to be *persistent* but not *effective*.
1689
1690 :param completions: List of Completions
1691 :raises NoOrchestrator:
1692 :raises RuntimeError: something went wrong while calling the process method.
1693 :raises ImportError: no `orchestrator` module or backend not found.
1694 """
1695 while any(not c.has_result for c in completions):
1696 self.process(completions)
1697 self.__get_mgr().log.info("Operations pending: %s",
1698 sum(1 for c in completions if not c.has_result))
1699 if any(c.needs_result for c in completions):
1700 time.sleep(1)
1701 else:
1702 break