]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/orchestrator/_interface.py
Import ceph 15.2.8
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
CommitLineData
9f95a23c
TL
1
2"""
3ceph-mgr orchestrator interface
4
5Please see the ceph-mgr module developer's guide for more information.
6"""
e306af50
TL
7
8import copy
9import datetime
10import errno
9f95a23c
TL
11import logging
12import pickle
e306af50 13import re
9f95a23c 14import time
e306af50
TL
15import uuid
16
f6b5b4d7
TL
17from collections import namedtuple, OrderedDict
18from contextlib import contextmanager
9f95a23c 19from functools import wraps
9f95a23c 20
f6b5b4d7
TL
21import yaml
22
9f95a23c
TL
23from ceph.deployment import inventory
24from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
1911f103 25 ServiceSpecValidationError, IscsiServiceSpec
9f95a23c 26from ceph.deployment.drive_group import DriveGroupSpec
f6b5b4d7 27from ceph.deployment.hostspec import HostSpec
9f95a23c
TL
28
29from mgr_module import MgrModule, CLICommand, HandleCommandResult
30
31try:
32 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
f6b5b4d7 33 Type, Sequence, Dict, cast
9f95a23c
TL
34except ImportError:
35 pass
36
37logger = logging.getLogger(__name__)
38
39DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
40
f6b5b4d7
TL
41T = TypeVar('T')
42
9f95a23c
TL
43
44class OrchestratorError(Exception):
45 """
46 General orchestrator specific error.
47
48 Used for deployment, configuration or user errors.
49
50 It's not intended for programming errors or orchestrator internal errors.
51 """
52
f6b5b4d7
TL
53 def __init__(self,
54 msg: str,
55 errno: int = -errno.EINVAL,
56 event_kind_subject: Optional[Tuple[str, str]] = None):
57 super(Exception, self).__init__(msg)
58 self.errno = errno
59 # See OrchestratorEvent.subject
60 self.event_subject = event_kind_subject
61
9f95a23c
TL
62
63class NoOrchestrator(OrchestratorError):
64 """
65 No orchestrator in configured.
66 """
f6b5b4d7 67
9f95a23c 68 def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"):
f6b5b4d7 69 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
9f95a23c
TL
70
71
72class OrchestratorValidationError(OrchestratorError):
73 """
74 Raised when an orchestrator doesn't support a specific feature.
75 """
76
77
f6b5b4d7
TL
78@contextmanager
79def set_exception_subject(kind, subject, overwrite=False):
80 try:
81 yield
82 except OrchestratorError as e:
83 if overwrite or hasattr(e, 'event_subject'):
84 e.event_subject = (kind, subject)
85 raise
86
87
9f95a23c
TL
88def handle_exception(prefix, cmd_args, desc, perm, func):
89 @wraps(func)
90 def wrapper(*args, **kwargs):
91 try:
92 return func(*args, **kwargs)
f6b5b4d7 93 except (OrchestratorError, ServiceSpecValidationError) as e:
9f95a23c 94 # Do not print Traceback for expected errors.
f6b5b4d7
TL
95 return HandleCommandResult(e.errno, stderr=str(e))
96 except ImportError as e:
9f95a23c
TL
97 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
98 except NotImplementedError:
99 msg = 'This Orchestrator does not support `{}`'.format(prefix)
100 return HandleCommandResult(-errno.ENOENT, stderr=msg)
101
102 # misuse partial to copy `wrapper`
103 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs)
104 wrapper_copy._prefix = prefix # type: ignore
105 wrapper_copy._cli_command = CLICommand(prefix, cmd_args, desc, perm) # type: ignore
106 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
107
108 return wrapper_copy
109
110
111def _cli_command(perm):
112 def inner_cli_command(prefix, cmd_args="", desc=""):
113 return lambda func: handle_exception(prefix, cmd_args, desc, perm, func)
114 return inner_cli_command
115
116
117_cli_read_command = _cli_command('r')
118_cli_write_command = _cli_command('rw')
119
120
121class CLICommandMeta(type):
122 """
123 This is a workaround for the use of a global variable CLICommand.COMMANDS which
124 prevents modules from importing any other module.
125
126 We make use of CLICommand, except for the use of the global variable.
127 """
128 def __init__(cls, name, bases, dct):
129 super(CLICommandMeta, cls).__init__(name, bases, dct)
f6b5b4d7 130 dispatch: Dict[str, CLICommand] = {}
9f95a23c
TL
131 for v in dct.values():
132 try:
133 dispatch[v._prefix] = v._cli_command
134 except AttributeError:
135 pass
136
137 def handle_command(self, inbuf, cmd):
138 if cmd['prefix'] not in dispatch:
139 return self.handle_command(inbuf, cmd)
140
141 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
142
143 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
144 cls.handle_command = handle_command
145
146
147def _no_result():
148 return object()
149
150
151class _Promise(object):
152 """
153 A completion may need multiple promises to be fulfilled. `_Promise` is one
154 step.
155
156 Typically ``Orchestrator`` implementations inherit from this class to
157 build their own way of finishing a step to fulfil a future.
158
159 They are not exposed in the orchestrator interface and can be seen as a
160 helper to build orchestrator modules.
161 """
162 INITIALIZED = 1 # We have a parent completion and a next completion
163 RUNNING = 2
164 FINISHED = 3 # we have a final result
165
f6b5b4d7 166 NO_RESULT: None = _no_result()
9f95a23c
TL
167 ASYNC_RESULT = object()
168
169 def __init__(self,
f6b5b4d7
TL
170 _first_promise: Optional["_Promise"] = None,
171 value: Optional[Any] = NO_RESULT,
172 on_complete: Optional[Callable] = None,
173 name: Optional[str] = None,
9f95a23c
TL
174 ):
175 self._on_complete_ = on_complete
176 self._name = name
f6b5b4d7 177 self._next_promise: Optional[_Promise] = None
9f95a23c
TL
178
179 self._state = self.INITIALIZED
f6b5b4d7 180 self._exception: Optional[Exception] = None
9f95a23c
TL
181
182 # Value of this _Promise. may be an intermediate result.
183 self._value = value
184
185 # _Promise is not a continuation monad, as `_result` is of type
186 # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
f6b5b4d7 187 self._first_promise: '_Promise' = _first_promise or self
9f95a23c
TL
188
189 @property
f6b5b4d7 190 def _exception(self) -> Optional[Exception]:
9f95a23c
TL
191 return getattr(self, '_exception_', None)
192
193 @_exception.setter
194 def _exception(self, e):
195 self._exception_ = e
196 try:
197 self._serialized_exception_ = pickle.dumps(e) if e is not None else None
198 except pickle.PicklingError:
199 logger.error(f"failed to pickle {e}")
200 if isinstance(e, Exception):
201 e = Exception(*e.args)
202 else:
203 e = Exception(str(e))
204 # degenerate to a plain Exception
205 self._serialized_exception_ = pickle.dumps(e)
206
207 @property
f6b5b4d7 208 def _serialized_exception(self) -> Optional[bytes]:
9f95a23c
TL
209 return getattr(self, '_serialized_exception_', None)
210
9f95a23c 211 @property
f6b5b4d7 212 def _on_complete(self) -> Optional[Callable]:
9f95a23c
TL
213 # https://github.com/python/mypy/issues/4125
214 return self._on_complete_
215
216 @_on_complete.setter
f6b5b4d7 217 def _on_complete(self, val: Optional[Callable]) -> None:
9f95a23c
TL
218 self._on_complete_ = val
219
9f95a23c 220 def __repr__(self):
f6b5b4d7
TL
221 name = self._name or getattr(self._on_complete, '__name__',
222 '??') if self._on_complete else 'None'
9f95a23c
TL
223 val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
224 return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
f6b5b4d7
TL
225 self.__class__, self._state, val, self._on_complete, id(self), name, getattr(
226 next, '_progress_reference', 'NA'), repr(self._next_promise)
9f95a23c
TL
227 )
228
229 def pretty_print_1(self):
230 if self._name:
231 name = self._name
232 elif self._on_complete is None:
233 name = 'lambda x: x'
234 elif hasattr(self._on_complete, '__name__'):
235 name = getattr(self._on_complete, '__name__')
236 else:
237 name = self._on_complete.__class__.__name__
238 val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...'
239 prefix = {
240 self.INITIALIZED: ' ',
241 self.RUNNING: ' >>>',
242 self.FINISHED: '(done)'
243 }[self._state]
244 return '{} {}({}),'.format(prefix, name, val)
245
f6b5b4d7 246 def then(self: Any, on_complete: Callable) -> Any:
9f95a23c
TL
247 """
248 Call ``on_complete`` as soon as this promise is finalized.
249 """
250 assert self._state in (self.INITIALIZED, self.RUNNING)
251
252 if self._next_promise is not None:
253 return self._next_promise.then(on_complete)
254
255 if self._on_complete is not None:
256 self._set_next_promise(self.__class__(
257 _first_promise=self._first_promise,
258 on_complete=on_complete
259 ))
260 return self._next_promise
261
262 else:
263 self._on_complete = on_complete
264 self._set_next_promise(self.__class__(_first_promise=self._first_promise))
265 return self._next_promise
266
f6b5b4d7 267 def _set_next_promise(self, next: '_Promise') -> None:
9f95a23c
TL
268 assert self is not next
269 assert self._state in (self.INITIALIZED, self.RUNNING)
270
271 self._next_promise = next
272 assert self._next_promise is not None
273 for p in iter(self._next_promise):
274 p._first_promise = self._first_promise
275
276 def _finalize(self, value=NO_RESULT):
277 """
278 Sets this promise to complete.
279
280 Orchestrators may choose to use this helper function.
281
282 :param value: new value.
283 """
284 if self._state not in (self.INITIALIZED, self.RUNNING):
285 raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
286
287 self._state = self.RUNNING
288
289 if value is not self.NO_RESULT:
290 self._value = value
291 assert self._value is not self.NO_RESULT, repr(self)
292
293 if self._on_complete:
294 try:
295 next_result = self._on_complete(self._value)
296 except Exception as e:
297 self.fail(e)
298 return
299 else:
300 next_result = self._value
301
302 if isinstance(next_result, _Promise):
303 # hack: _Promise is not a continuation monad.
304 next_result = next_result._first_promise # type: ignore
305 assert next_result not in self, repr(self._first_promise) + repr(next_result)
306 assert self not in next_result
307 next_result._append_promise(self._next_promise)
308 self._set_next_promise(next_result)
309 assert self._next_promise
310 if self._next_promise._value is self.NO_RESULT:
311 self._next_promise._value = self._value
312 self.propagate_to_next()
313 elif next_result is not self.ASYNC_RESULT:
314 # simple map. simply forward
315 if self._next_promise:
316 self._next_promise._value = next_result
317 else:
318 # Hack: next_result is of type U, _value is of type T
319 self._value = next_result # type: ignore
320 self.propagate_to_next()
321 else:
322 # asynchronous promise
323 pass
324
9f95a23c
TL
325 def propagate_to_next(self):
326 self._state = self.FINISHED
327 logger.debug('finalized {}'.format(repr(self)))
328 if self._next_promise:
329 self._next_promise._finalize()
330
f6b5b4d7 331 def fail(self, e: Exception) -> None:
9f95a23c
TL
332 """
333 Sets the whole completion to be faild with this exception and end the
334 evaluation.
335 """
336 if self._state == self.FINISHED:
337 raise ValueError(
338 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e)))
339 assert self._state in (self.INITIALIZED, self.RUNNING)
340 logger.exception('_Promise failed')
341 self._exception = e
342 self._value = f'_exception: {e}'
343 if self._next_promise:
344 self._next_promise.fail(e)
345 self._state = self.FINISHED
346
347 def __contains__(self, item):
348 return any(item is p for p in iter(self._first_promise))
349
350 def __iter__(self):
351 yield self
352 elem = self._next_promise
353 while elem is not None:
354 yield elem
355 elem = elem._next_promise
356
357 def _append_promise(self, other):
358 if other is not None:
359 assert self not in other
360 assert other not in self
361 self._last_promise()._set_next_promise(other)
362
f6b5b4d7 363 def _last_promise(self) -> '_Promise':
9f95a23c
TL
364 return list(iter(self))[-1]
365
366
367class ProgressReference(object):
368 def __init__(self,
f6b5b4d7 369 message: str,
9f95a23c 370 mgr,
f6b5b4d7
TL
371 completion: Optional[Callable[[], 'Completion']] = None
372 ):
9f95a23c
TL
373 """
374 ProgressReference can be used within Completions::
375
376 +---------------+ +---------------------------------+
377 | | then | |
378 | My Completion | +--> | on_complete=ProgressReference() |
379 | | | |
380 +---------------+ +---------------------------------+
381
382 See :func:`Completion.with_progress` for an easy way to create
383 a progress reference
384
385 """
386 super(ProgressReference, self).__init__()
387 self.progress_id = str(uuid.uuid4())
388 self.message = message
389 self.mgr = mgr
390
391 #: The completion can already have a result, before the write
392 #: operation is effective. progress == 1 means, the services are
393 #: created / removed.
f6b5b4d7 394 self.completion: Optional[Callable[[], Completion]] = completion
9f95a23c
TL
395
396 #: if a orchestrator module can provide a more detailed
397 #: progress information, it needs to also call ``progress.update()``.
398 self.progress = 0.0
399
400 self._completion_has_result = False
401 self.mgr.all_progress_references.append(self)
402
403 def __str__(self):
404 """
405 ``__str__()`` is used for determining the message for progress events.
406 """
407 return self.message or super(ProgressReference, self).__str__()
408
409 def __call__(self, arg):
410 self._completion_has_result = True
411 self.progress = 1.0
412 return arg
413
414 @property
415 def progress(self):
416 return self._progress
417
418 @progress.setter
419 def progress(self, progress):
420 assert progress <= 1.0
421 self._progress = progress
422 try:
423 if self.effective:
424 self.mgr.remote("progress", "complete", self.progress_id)
f6b5b4d7
TL
425 self.mgr.all_progress_references = [
426 p for p in self.mgr.all_progress_references if p is not self]
9f95a23c
TL
427 else:
428 self.mgr.remote("progress", "update", self.progress_id, self.message,
429 progress,
430 [("origin", "orchestrator")])
431 except ImportError:
432 # If the progress module is disabled that's fine,
433 # they just won't see the output.
434 pass
435
436 @property
437 def effective(self):
438 return self.progress == 1 and self._completion_has_result
439
440 def update(self):
441 def progress_run(progress):
442 self.progress = progress
443 if self.completion:
444 c = self.completion().then(progress_run)
445 self.mgr.process([c._first_promise])
446 else:
447 self.progress = 1
448
449 def fail(self):
450 self._completion_has_result = True
451 self.progress = 1
452
453
f6b5b4d7 454class Completion(_Promise, Generic[T]):
9f95a23c
TL
455 """
456 Combines multiple promises into one overall operation.
457
458 Completions are composable by being able to
459 call one completion from another completion. I.e. making them re-usable
460 using Promises E.g.::
461
1911f103
TL
462 >>> #doctest: +SKIP
463 ... return Orchestrator().get_hosts().then(self._create_osd)
9f95a23c
TL
464
465 where ``get_hosts`` returns a Completion of list of hosts and
466 ``_create_osd`` takes a list of hosts.
467
468 The concept behind this is to store the computation steps
469 explicit and then explicitly evaluate the chain:
470
1911f103
TL
471 >>> #doctest: +SKIP
472 ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
9f95a23c
TL
473 ... p.finalize(2)
474 ... assert p.result = "4"
475
476 or graphically::
477
478 +---------------+ +-----------------+
479 | | then | |
480 | lambda x: x*x | +--> | lambda x: str(x)|
481 | | | |
482 +---------------+ +-----------------+
483
484 """
f6b5b4d7 485
9f95a23c 486 def __init__(self,
f6b5b4d7
TL
487 _first_promise: Optional["Completion"] = None,
488 value: Any = _Promise.NO_RESULT,
489 on_complete: Optional[Callable] = None,
490 name: Optional[str] = None,
9f95a23c
TL
491 ):
492 super(Completion, self).__init__(_first_promise, value, on_complete, name)
493
494 @property
f6b5b4d7 495 def _progress_reference(self) -> Optional[ProgressReference]:
9f95a23c
TL
496 if hasattr(self._on_complete, 'progress_id'):
497 return self._on_complete # type: ignore
498 return None
499
500 @property
f6b5b4d7 501 def progress_reference(self) -> Optional[ProgressReference]:
9f95a23c
TL
502 """
503 ProgressReference. Marks this completion
504 as a write completeion.
505 """
506
f6b5b4d7
TL
507 references = [c._progress_reference for c in iter(
508 self) if c._progress_reference is not None]
9f95a23c
TL
509 if references:
510 assert len(references) == 1
511 return references[0]
512 return None
513
514 @classmethod
f6b5b4d7
TL
515 def with_progress(cls: Any,
516 message: str,
9f95a23c 517 mgr,
f6b5b4d7
TL
518 _first_promise: Optional["Completion"] = None,
519 value: Any = _Promise.NO_RESULT,
520 on_complete: Optional[Callable] = None,
521 calc_percent: Optional[Callable[[], Any]] = None
522 ) -> Any:
9f95a23c
TL
523
524 c = cls(
525 _first_promise=_first_promise,
526 value=value,
527 on_complete=on_complete
528 ).add_progress(message, mgr, calc_percent)
529
530 return c._first_promise
531
532 def add_progress(self,
f6b5b4d7 533 message: str,
9f95a23c 534 mgr,
f6b5b4d7 535 calc_percent: Optional[Callable[[], Any]] = None
9f95a23c
TL
536 ):
537 return self.then(
538 on_complete=ProgressReference(
539 message=message,
540 mgr=mgr,
541 completion=calc_percent
542 )
543 )
544
f6b5b4d7 545 def fail(self, e: Exception):
9f95a23c
TL
546 super(Completion, self).fail(e)
547 if self._progress_reference:
548 self._progress_reference.fail()
549
f6b5b4d7 550 def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT):
9f95a23c
TL
551 if self._first_promise._state == self.INITIALIZED:
552 self._first_promise._finalize(result)
553
554 @property
f6b5b4d7 555 def result(self) -> T:
9f95a23c
TL
556 """
557 The result of the operation that we were waited
558 for. Only valid after calling Orchestrator.process() on this
559 completion.
560 """
561 last = self._last_promise()
562 assert last._state == _Promise.FINISHED
f6b5b4d7 563 return cast(T, last._value)
9f95a23c 564
f6b5b4d7 565 def result_str(self) -> str:
9f95a23c
TL
566 """Force a string."""
567 if self.result is None:
568 return ''
569 if isinstance(self.result, list):
570 return '\n'.join(str(x) for x in self.result)
571 return str(self.result)
572
573 @property
f6b5b4d7 574 def exception(self) -> Optional[Exception]:
9f95a23c
TL
575 return self._last_promise()._exception
576
577 @property
f6b5b4d7 578 def serialized_exception(self) -> Optional[bytes]:
9f95a23c
TL
579 return self._last_promise()._serialized_exception
580
581 @property
f6b5b4d7 582 def has_result(self) -> bool:
9f95a23c
TL
583 """
584 Has the operation already a result?
585
586 For Write operations, it can already have a
587 result, if the orchestrator's configuration is
588 persistently written. Typically this would
589 indicate that an update had been written to
590 a manifest, but that the update had not
591 necessarily been pushed out to the cluster.
592
593 :return:
594 """
595 return self._last_promise()._state == _Promise.FINISHED
596
597 @property
f6b5b4d7 598 def is_errored(self) -> bool:
9f95a23c
TL
599 """
600 Has the completion failed. Default implementation looks for
601 self.exception. Can be overwritten.
602 """
603 return self.exception is not None
604
605 @property
f6b5b4d7 606 def needs_result(self) -> bool:
9f95a23c
TL
607 """
608 Could the external operation be deemed as complete,
609 or should we wait?
610 We must wait for a read operation only if it is not complete.
611 """
612 return not self.is_errored and not self.has_result
613
614 @property
f6b5b4d7 615 def is_finished(self) -> bool:
9f95a23c
TL
616 """
617 Could the external operation be deemed as complete,
618 or should we wait?
619 We must wait for a read operation only if it is not complete.
620 """
621 return self.is_errored or (self.has_result)
622
623 def pretty_print(self):
624
625 reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise))
626 return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
627
628
f6b5b4d7 629def pretty_print(completions: Sequence[Completion]) -> str:
9f95a23c
TL
630 return ', '.join(c.pretty_print() for c in completions)
631
632
f6b5b4d7 633def raise_if_exception(c: Completion) -> None:
9f95a23c
TL
634 """
635 :raises OrchestratorError: Some user error or a config error.
636 :raises Exception: Some internal error
637 """
638 if c.serialized_exception is not None:
639 try:
640 e = pickle.loads(c.serialized_exception)
641 except (KeyError, AttributeError):
642 raise Exception('{}: {}'.format(type(c.exception), c.exception))
643 raise e
644
645
f6b5b4d7 646class TrivialReadCompletion(Completion[T]):
9f95a23c
TL
647 """
648 This is the trivial completion simply wrapping a result.
649 """
f6b5b4d7
TL
650
651 def __init__(self, result: T):
9f95a23c
TL
652 super(TrivialReadCompletion, self).__init__()
653 if result:
654 self.finalize(result)
655
656
657def _hide_in_features(f):
658 f._hide_in_features = True
659 return f
660
661
662class Orchestrator(object):
663 """
664 Calls in this class may do long running remote operations, with time
665 periods ranging from network latencies to package install latencies and large
666 internet downloads. For that reason, all are asynchronous, and return
667 ``Completion`` objects.
668
669 Methods should only return the completion and not directly execute
670 anything, like network calls. Otherwise the purpose of
671 those completions is defeated.
672
673 Implementations are not required to start work on an operation until
674 the caller waits on the relevant Completion objects. Callers making
675 multiple updates should not wait on Completions until they're done
676 sending operations: this enables implementations to batch up a series
677 of updates when wait() is called on a set of Completion objects.
678
679 Implementations are encouraged to keep reasonably fresh caches of
680 the status of the system: it is better to serve a stale-but-recent
681 result read of e.g. device inventory than it is to keep the caller waiting
682 while you scan hosts every time.
683 """
684
685 @_hide_in_features
686 def is_orchestrator_module(self):
687 """
688 Enable other modules to interrogate this module to discover
689 whether it's usable as an orchestrator module.
690
691 Subclasses do not need to override this.
692 """
693 return True
694
695 @_hide_in_features
f6b5b4d7 696 def available(self) -> Tuple[bool, str]:
9f95a23c
TL
697 """
698 Report whether we can talk to the orchestrator. This is the
699 place to give the user a meaningful message if the orchestrator
700 isn't running or can't be contacted.
701
702 This method may be called frequently (e.g. every page load
703 to conditionally display a warning banner), so make sure it's
704 not too expensive. It's okay to give a slightly stale status
705 (e.g. based on a periodic background ping of the orchestrator)
706 if that's necessary to make this method fast.
707
708 .. note::
709 `True` doesn't mean that the desired functionality
710 is actually available in the orchestrator. I.e. this
711 won't work as expected::
712
1911f103
TL
713 >>> #doctest: +SKIP
714 ... if OrchestratorClientMixin().available()[0]: # wrong.
9f95a23c
TL
715 ... OrchestratorClientMixin().get_hosts()
716
717 :return: two-tuple of boolean, string
718 """
719 raise NotImplementedError()
720
721 @_hide_in_features
f6b5b4d7 722 def process(self, completions: List[Completion]) -> None:
9f95a23c
TL
723 """
724 Given a list of Completion instances, process any which are
725 incomplete.
726
727 Callers should inspect the detail of each completion to identify
728 partial completion/progress information, and present that information
729 to the user.
730
731 This method should not block, as this would make it slow to query
732 a status, while other long running operations are in progress.
733 """
734 raise NotImplementedError()
735
736 @_hide_in_features
737 def get_feature_set(self):
738 """Describes which methods this orchestrator implements
739
740 .. note::
741 `True` doesn't mean that the desired functionality
742 is actually possible in the orchestrator. I.e. this
743 won't work as expected::
744
1911f103
TL
745 >>> #doctest: +SKIP
746 ... api = OrchestratorClientMixin()
9f95a23c
TL
747 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
748 ... api.get_hosts()
749
750 It's better to ask for forgiveness instead::
751
1911f103
TL
752 >>> #doctest: +SKIP
753 ... try:
9f95a23c
TL
754 ... OrchestratorClientMixin().get_hosts()
755 ... except (OrchestratorError, NotImplementedError):
756 ... ...
757
758 :returns: Dict of API method names to ``{'available': True or False}``
759 """
760 module = self.__class__
761 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
762 for a in Orchestrator.__dict__
763 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
764 }
765 return features
766
f6b5b4d7 767 def cancel_completions(self) -> None:
9f95a23c
TL
768 """
769 Cancels ongoing completions. Unstuck the mgr.
770 """
771 raise NotImplementedError()
772
f6b5b4d7 773 def pause(self) -> None:
9f95a23c
TL
774 raise NotImplementedError()
775
f6b5b4d7 776 def resume(self) -> None:
9f95a23c
TL
777 raise NotImplementedError()
778
f6b5b4d7 779 def add_host(self, host_spec: HostSpec) -> Completion[str]:
9f95a23c
TL
780 """
781 Add a host to the orchestrator inventory.
782
783 :param host: hostname
784 """
785 raise NotImplementedError()
786
f6b5b4d7 787 def remove_host(self, host: str) -> Completion[str]:
9f95a23c
TL
788 """
789 Remove a host from the orchestrator inventory.
790
791 :param host: hostname
792 """
793 raise NotImplementedError()
794
f6b5b4d7 795 def update_host_addr(self, host: str, addr: str) -> Completion[str]:
9f95a23c
TL
796 """
797 Update a host's address
798
799 :param host: hostname
800 :param addr: address (dns name or IP)
801 """
802 raise NotImplementedError()
803
f6b5b4d7 804 def get_hosts(self) -> Completion[List[HostSpec]]:
9f95a23c
TL
805 """
806 Report the hosts in the cluster.
807
808 :return: list of HostSpec
809 """
810 raise NotImplementedError()
811
f6b5b4d7 812 def add_host_label(self, host: str, label: str) -> Completion[str]:
9f95a23c
TL
813 """
814 Add a host label
815 """
816 raise NotImplementedError()
817
f6b5b4d7 818 def remove_host_label(self, host: str, label: str) -> Completion[str]:
9f95a23c
TL
819 """
820 Remove a host label
821 """
822 raise NotImplementedError()
823
f6b5b4d7
TL
824 def host_ok_to_stop(self, hostname: str) -> Completion:
825 """
826 Check if the specified host can be safely stopped without reducing availability
827
828 :param host: hostname
829 """
830 raise NotImplementedError()
831
832 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]:
9f95a23c
TL
833 """
834 Returns something that was created by `ceph-volume inventory`.
835
836 :return: list of InventoryHost
837 """
838 raise NotImplementedError()
839
f6b5b4d7 840 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> Completion[List['ServiceDescription']]:
9f95a23c
TL
841 """
842 Describe a service (of any kind) that is already configured in
843 the orchestrator. For example, when viewing an OSD in the dashboard
844 we might like to also display information about the orchestrator's
845 view of the service (like the kubernetes pod ID).
846
847 When viewing a CephFS filesystem in the dashboard, we would use this
848 to display the pods being currently run for MDS daemons.
849
850 :return: list of ServiceDescription objects.
851 """
852 raise NotImplementedError()
853
f6b5b4d7 854 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> Completion[List['DaemonDescription']]:
9f95a23c
TL
855 """
856 Describe a daemon (of any kind) that is already configured in
857 the orchestrator.
858
859 :return: list of DaemonDescription objects.
860 """
861 raise NotImplementedError()
862
f6b5b4d7 863 def apply(self, specs: List["GenericSpec"]) -> Completion[List[str]]:
9f95a23c
TL
864 """
865 Applies any spec
866 """
e306af50 867 fns: Dict[str, function] = {
9f95a23c
TL
868 'alertmanager': self.apply_alertmanager,
869 'crash': self.apply_crash,
870 'grafana': self.apply_grafana,
e306af50 871 'iscsi': self.apply_iscsi,
9f95a23c
TL
872 'mds': self.apply_mds,
873 'mgr': self.apply_mgr,
874 'mon': self.apply_mon,
e306af50 875 'nfs': self.apply_nfs,
9f95a23c 876 'node-exporter': self.apply_node_exporter,
e306af50 877 'osd': lambda dg: self.apply_drivegroups([dg]),
9f95a23c
TL
878 'prometheus': self.apply_prometheus,
879 'rbd-mirror': self.apply_rbd_mirror,
e306af50
TL
880 'rgw': self.apply_rgw,
881 'host': self.add_host,
9f95a23c
TL
882 }
883
884 def merge(ls, r):
885 if isinstance(ls, list):
886 return ls + [r]
887 return [ls, r]
888
889 spec, *specs = specs
890
e306af50
TL
891 fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
892 completion = fn(spec)
9f95a23c
TL
893 for s in specs:
894 def next(ls):
e306af50
TL
895 fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
896 return fn(s).then(lambda r: merge(ls, r))
9f95a23c
TL
897 completion = completion.then(next)
898 return completion
899
f6b5b4d7
TL
900 def plan(self, spec: List["GenericSpec"]) -> Completion[List]:
901 """
902 Plan (Dry-run, Preview) a List of Specs.
903 """
904 raise NotImplementedError()
905
906 def remove_daemons(self, names: List[str]) -> Completion[List[str]]:
9f95a23c
TL
907 """
908 Remove specific daemon(s).
909
910 :return: None
911 """
912 raise NotImplementedError()
913
f6b5b4d7 914 def remove_service(self, service_name: str) -> Completion[str]:
9f95a23c
TL
915 """
916 Remove a service (a collection of daemons).
917
918 :return: None
919 """
920 raise NotImplementedError()
921
f6b5b4d7 922 def service_action(self, action: str, service_name: str) -> Completion[List[str]]:
9f95a23c
TL
923 """
924 Perform an action (start/stop/reload) on a service (i.e., all daemons
925 providing the logical service).
926
927 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
f6b5b4d7
TL
928 :param service_name: service_type + '.' + service_id
929 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
9f95a23c
TL
930 :rtype: Completion
931 """
f6b5b4d7 932 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
9f95a23c
TL
933 raise NotImplementedError()
934
f91f0fd5 935 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> Completion[str]:
9f95a23c
TL
936 """
937 Perform an action (start/stop/reload) on a daemon.
938
939 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
f6b5b4d7
TL
940 :param daemon_name: name of daemon
941 :param image: Container image when redeploying that daemon
9f95a23c
TL
942 :rtype: Completion
943 """
f6b5b4d7 944 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
9f95a23c
TL
945 raise NotImplementedError()
946
f6b5b4d7 947 def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]:
9f95a23c
TL
948 """
949 Create one or more OSDs within a single Drive Group.
950
951 The principal argument here is the drive_group member
952 of OsdSpec: other fields are advisory/extensible for any
953 finer-grained OSD feature enablement (choice of backing store,
954 compression/encryption, etc).
955 """
956 raise NotImplementedError()
957
f6b5b4d7 958 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion[List[str]]:
9f95a23c
TL
959 """ Update OSD cluster """
960 raise NotImplementedError()
961
e306af50
TL
962 def set_unmanaged_flag(self,
963 unmanaged_flag: bool,
964 service_type: str = 'osd',
965 service_name=None
966 ) -> HandleCommandResult:
1911f103
TL
967 raise NotImplementedError()
968
e306af50
TL
969 def preview_osdspecs(self,
970 osdspec_name: Optional[str] = 'osd',
971 osdspecs: Optional[List[DriveGroupSpec]] = None
f6b5b4d7 972 ) -> Completion[str]:
1911f103
TL
973 """ Get a preview for OSD deployments """
974 raise NotImplementedError()
975
9f95a23c
TL
976 def remove_osds(self, osd_ids: List[str],
977 replace: bool = False,
f6b5b4d7 978 force: bool = False) -> Completion[str]:
9f95a23c
TL
979 """
980 :param osd_ids: list of OSD IDs
981 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
982 :param force: Forces the OSD removal process without waiting for the data to be drained first.
983 Note that this can only remove OSDs that were successfully
984 created (i.e. got an OSD ID).
985 """
986 raise NotImplementedError()
987
f6b5b4d7
TL
988 def stop_remove_osds(self, osd_ids: List[str]) -> Completion:
989 """
990 TODO
991 """
992 raise NotImplementedError()
993
994 def remove_osds_status(self) -> Completion:
9f95a23c
TL
995 """
996 Returns a status of the ongoing OSD removal operations.
997 """
998 raise NotImplementedError()
999
f6b5b4d7 1000 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> Completion[List[str]]:
9f95a23c
TL
1001 """
1002 Instructs the orchestrator to enable or disable either the ident or the fault LED.
1003
1004 :param ident_fault: either ``"ident"`` or ``"fault"``
1005 :param on: ``True`` = on.
1006 :param locations: See :class:`orchestrator.DeviceLightLoc`
1007 """
1008 raise NotImplementedError()
1009
f6b5b4d7 1010 def zap_device(self, host: str, path: str) -> Completion[str]:
9f95a23c
TL
1011 """Zap/Erase a device (DESTROYS DATA)"""
1012 raise NotImplementedError()
1013
f6b5b4d7 1014 def add_mon(self, spec: ServiceSpec) -> Completion[List[str]]:
9f95a23c
TL
1015 """Create mon daemon(s)"""
1016 raise NotImplementedError()
1017
f6b5b4d7 1018 def apply_mon(self, spec: ServiceSpec) -> Completion[str]:
9f95a23c
TL
1019 """Update mon cluster"""
1020 raise NotImplementedError()
1021
f6b5b4d7 1022 def add_mgr(self, spec: ServiceSpec) -> Completion[List[str]]:
9f95a23c
TL
1023 """Create mgr daemon(s)"""
1024 raise NotImplementedError()
1025
f6b5b4d7 1026 def apply_mgr(self, spec: ServiceSpec) -> Completion[str]:
9f95a23c
TL
1027 """Update mgr cluster"""
1028 raise NotImplementedError()
1029
f6b5b4d7 1030 def add_mds(self, spec: ServiceSpec) -> Completion[List[str]]:
9f95a23c
TL
1031 """Create MDS daemon(s)"""
1032 raise NotImplementedError()
1033
f6b5b4d7 1034 def apply_mds(self, spec: ServiceSpec) -> Completion[str]:
9f95a23c
TL
1035 """Update MDS cluster"""
1036 raise NotImplementedError()
1037
f6b5b4d7 1038 def add_rgw(self, spec: RGWSpec) -> Completion[List[str]]:
801d1391
TL
1039 """Create RGW daemon(s)"""
1040 raise NotImplementedError()
1041
f6b5b4d7 1042 def apply_rgw(self, spec: RGWSpec) -> Completion[str]:
801d1391
TL
1043 """Update RGW cluster"""
1044 raise NotImplementedError()
1045
f6b5b4d7 1046 def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]:
9f95a23c
TL
1047 """Create rbd-mirror daemon(s)"""
1048 raise NotImplementedError()
1049
f6b5b4d7 1050 def apply_rbd_mirror(self, spec: ServiceSpec) -> Completion[str]:
9f95a23c
TL
1051 """Update rbd-mirror cluster"""
1052 raise NotImplementedError()
1053
f6b5b4d7 1054 def add_nfs(self, spec: NFSServiceSpec) -> Completion[List[str]]:
9f95a23c
TL
1055 """Create NFS daemon(s)"""
1056 raise NotImplementedError()
1057
f6b5b4d7 1058 def apply_nfs(self, spec: NFSServiceSpec) -> Completion[str]:
9f95a23c
TL
1059 """Update NFS cluster"""
1060 raise NotImplementedError()
1061
f6b5b4d7 1062 def add_iscsi(self, spec: IscsiServiceSpec) -> Completion[List[str]]:
1911f103
TL
1063 """Create iscsi daemon(s)"""
1064 raise NotImplementedError()
1065
f6b5b4d7 1066 def apply_iscsi(self, spec: IscsiServiceSpec) -> Completion[str]:
1911f103
TL
1067 """Update iscsi cluster"""
1068 raise NotImplementedError()
1069
f6b5b4d7 1070 def add_prometheus(self, spec: ServiceSpec) -> Completion[List[str]]:
9f95a23c
TL
1071 """Create new prometheus daemon"""
1072 raise NotImplementedError()
1073
f6b5b4d7 1074 def apply_prometheus(self, spec: ServiceSpec) -> Completion[str]:
9f95a23c
TL
1075 """Update prometheus cluster"""
1076 raise NotImplementedError()
1077
f6b5b4d7 1078 def add_node_exporter(self, spec: ServiceSpec) -> Completion[List[str]]:
9f95a23c
TL
1079 """Create a new Node-Exporter service"""
1080 raise NotImplementedError()
1081
f6b5b4d7 1082 def apply_node_exporter(self, spec: ServiceSpec) -> Completion[str]:
9f95a23c
TL
1083 """Update existing a Node-Exporter daemon(s)"""
1084 raise NotImplementedError()
1085
f6b5b4d7 1086 def add_crash(self, spec: ServiceSpec) -> Completion[List[str]]:
9f95a23c
TL
1087 """Create a new crash service"""
1088 raise NotImplementedError()
1089
f6b5b4d7 1090 def apply_crash(self, spec: ServiceSpec) -> Completion[str]:
9f95a23c
TL
1091 """Update existing a crash daemon(s)"""
1092 raise NotImplementedError()
1093
f6b5b4d7 1094 def add_grafana(self, spec: ServiceSpec) -> Completion[List[str]]:
9f95a23c
TL
1095 """Create a new Node-Exporter service"""
1096 raise NotImplementedError()
1097
f6b5b4d7 1098 def apply_grafana(self, spec: ServiceSpec) -> Completion[str]:
9f95a23c
TL
1099 """Update existing a Node-Exporter daemon(s)"""
1100 raise NotImplementedError()
1101
f6b5b4d7 1102 def add_alertmanager(self, spec: ServiceSpec) -> Completion[List[str]]:
9f95a23c
TL
1103 """Create a new AlertManager service"""
1104 raise NotImplementedError()
1105
f6b5b4d7 1106 def apply_alertmanager(self, spec: ServiceSpec) -> Completion[str]:
9f95a23c
TL
1107 """Update an existing AlertManager daemon(s)"""
1108 raise NotImplementedError()
1109
f6b5b4d7 1110 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
9f95a23c
TL
1111 raise NotImplementedError()
1112
f6b5b4d7 1113 def upgrade_start(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
9f95a23c
TL
1114 raise NotImplementedError()
1115
f6b5b4d7 1116 def upgrade_pause(self) -> Completion[str]:
9f95a23c
TL
1117 raise NotImplementedError()
1118
f6b5b4d7 1119 def upgrade_resume(self) -> Completion[str]:
9f95a23c
TL
1120 raise NotImplementedError()
1121
f6b5b4d7 1122 def upgrade_stop(self) -> Completion[str]:
9f95a23c
TL
1123 raise NotImplementedError()
1124
f6b5b4d7 1125 def upgrade_status(self) -> Completion['UpgradeStatusSpec']:
9f95a23c
TL
1126 """
1127 If an upgrade is currently underway, report on where
1128 we are in the process, or if some error has occurred.
1129
1130 :return: UpgradeStatusSpec instance
1131 """
1132 raise NotImplementedError()
1133
1134 @_hide_in_features
f6b5b4d7 1135 def upgrade_available(self) -> Completion:
9f95a23c
TL
1136 """
1137 Report on what versions are available to upgrade to
1138
1139 :return: List of strings
1140 """
1141 raise NotImplementedError()
1142
1143
e306af50
TL
1144GenericSpec = Union[ServiceSpec, HostSpec]
1145
f6b5b4d7
TL
1146
1147def json_to_generic_spec(spec: dict) -> GenericSpec:
e306af50
TL
1148 if 'service_type' in spec and spec['service_type'] == 'host':
1149 return HostSpec.from_json(spec)
1150 else:
1151 return ServiceSpec.from_json(spec)
9f95a23c 1152
f6b5b4d7 1153
9f95a23c
TL
1154class UpgradeStatusSpec(object):
1155 # Orchestrator's report on what's going on with any ongoing upgrade
1156 def __init__(self):
1157 self.in_progress = False # Is an upgrade underway?
1158 self.target_image = None
1159 self.services_complete = [] # Which daemon types are fully updated?
1160 self.message = "" # Freeform description
1161
1162
1163def handle_type_error(method):
1164 @wraps(method)
1165 def inner(cls, *args, **kwargs):
1166 try:
1167 return method(cls, *args, **kwargs)
1168 except TypeError as e:
1169 error_msg = '{}: {}'.format(cls.__name__, e)
1170 raise OrchestratorValidationError(error_msg)
1171 return inner
1172
1173
1174class DaemonDescription(object):
1175 """
1176 For responding to queries about the status of a particular daemon,
1177 stateful or stateless.
1178
1179 This is not about health or performance monitoring of daemons: it's
1180 about letting the orchestrator tell Ceph whether and where a
1181 daemon is scheduled in the cluster. When an orchestrator tells
1182 Ceph "it's running on host123", that's not a promise that the process
1183 is literally up this second, it's a description of where the orchestrator
1184 has decided the daemon should run.
1185 """
1186
1187 def __init__(self,
1188 daemon_type=None,
1189 daemon_id=None,
1190 hostname=None,
1191 container_id=None,
1192 container_image_id=None,
1193 container_image_name=None,
1194 version=None,
1195 status=None,
1196 status_desc=None,
1197 last_refresh=None,
1198 created=None,
1199 started=None,
1200 last_configured=None,
e306af50 1201 osdspec_affinity=None,
f6b5b4d7
TL
1202 last_deployed=None,
1203 events: Optional[List['OrchestratorEvent']] = None,
f91f0fd5 1204 is_active: bool = False):
f6b5b4d7 1205
9f95a23c 1206 # Host is at the same granularity as InventoryHost
f6b5b4d7 1207 self.hostname: str = hostname
9f95a23c
TL
1208
1209 # Not everyone runs in containers, but enough people do to
1210 # justify having the container_id (runtime id) and container_image
1211 # (image name)
1212 self.container_id = container_id # runtime id
1213 self.container_image_id = container_image_id # image hash
1214 self.container_image_name = container_image_name # image friendly name
1215
1216 # The type of service (osd, mon, mgr, etc.)
1217 self.daemon_type = daemon_type
1218
1219 # The orchestrator will have picked some names for daemons,
1220 # typically either based on hostnames or on pod names.
1221 # This is the <foo> in mds.<foo>, the ID that will appear
1222 # in the FSMap/ServiceMap.
f6b5b4d7 1223 self.daemon_id: str = daemon_id
9f95a23c
TL
1224
1225 # Service version that was deployed
1226 self.version = version
1227
1228 # Service status: -1 error, 0 stopped, 1 running
1229 self.status = status
1230
1231 # Service status description when status == -1.
1232 self.status_desc = status_desc
1233
1234 # datetime when this info was last refreshed
f6b5b4d7 1235 self.last_refresh: Optional[datetime.datetime] = last_refresh
9f95a23c 1236
f6b5b4d7
TL
1237 self.created: Optional[datetime.datetime] = created
1238 self.started: Optional[datetime.datetime] = started
1239 self.last_configured: Optional[datetime.datetime] = last_configured
1240 self.last_deployed: Optional[datetime.datetime] = last_deployed
9f95a23c 1241
e306af50 1242 # Affinity to a certain OSDSpec
f6b5b4d7
TL
1243 self.osdspec_affinity: Optional[str] = osdspec_affinity
1244
1245 self.events: List[OrchestratorEvent] = events or []
f91f0fd5 1246
f6b5b4d7 1247 self.is_active = is_active
e306af50 1248
9f95a23c
TL
1249 def name(self):
1250 return '%s.%s' % (self.daemon_type, self.daemon_id)
1251
f6b5b4d7 1252 def matches_service(self, service_name: Optional[str]) -> bool:
9f95a23c
TL
1253 if service_name:
1254 return self.name().startswith(service_name + '.')
1255 return False
1256
1911f103 1257 def service_id(self):
f6b5b4d7
TL
1258 if self.daemon_type == 'osd' and self.osdspec_affinity:
1259 return self.osdspec_affinity
1260
e306af50 1261 def _match():
f6b5b4d7
TL
1262 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
1263 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
e306af50
TL
1264
1265 if not self.hostname:
1266 # TODO: can a DaemonDescription exist without a hostname?
1267 raise err
1268
1269 # use the bare hostname, not the FQDN.
1270 host = self.hostname.split('.')[0]
1271
1272 if host == self.daemon_id:
1273 # daemon_id == "host"
1274 return self.daemon_id
1275
1276 elif host in self.daemon_id:
1277 # daemon_id == "service_id.host"
1278 # daemon_id == "service_id.host.random"
1279 pre, post = self.daemon_id.rsplit(host, 1)
1280 if not pre.endswith('.'):
1281 # '.' sep missing at front of host
1282 raise err
1283 elif post and not post.startswith('.'):
1284 # '.' sep missing at end of host
1285 raise err
1911f103 1286 return pre[:-1]
e306af50
TL
1287
1288 # daemon_id == "service_id.random"
1289 if self.daemon_type == 'rgw':
1911f103 1290 v = self.daemon_id.split('.')
e306af50 1291 if len(v) in [3, 4]:
1911f103 1292 return '.'.join(v[0:2])
e306af50 1293
f91f0fd5
TL
1294 if self.daemon_type == 'iscsi':
1295 v = self.daemon_id.split('.')
1296 return '.'.join(v[0:-1])
1297
e306af50
TL
1298 # daemon_id == "service_id"
1299 return self.daemon_id
1300
f6b5b4d7 1301 if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
e306af50
TL
1302 return _match()
1303
1304 return self.daemon_id
1911f103
TL
1305
1306 def service_name(self):
f6b5b4d7 1307 if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
1911f103 1308 return f'{self.daemon_type}.{self.service_id()}'
9f95a23c
TL
1309 return self.daemon_type
1310
1311 def __repr__(self):
1312 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
1313 id=self.daemon_id)
1314
1315 def to_json(self):
f6b5b4d7
TL
1316 out = OrderedDict()
1317 out['daemon_type'] = self.daemon_type
1318 out['daemon_id'] = self.daemon_id
1319 out['hostname'] = self.hostname
1320 out['container_id'] = self.container_id
1321 out['container_image_id'] = self.container_image_id
1322 out['container_image_name'] = self.container_image_name
1323 out['version'] = self.version
1324 out['status'] = self.status
1325 out['status_desc'] = self.status_desc
1326 if self.daemon_type == 'osd':
1327 out['osdspec_affinity'] = self.osdspec_affinity
1328 out['is_active'] = self.is_active
1329
9f95a23c
TL
1330 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1331 'last_configured']:
1332 if getattr(self, k):
1333 out[k] = getattr(self, k).strftime(DATEFMT)
f6b5b4d7
TL
1334
1335 if self.events:
1336 out['events'] = [e.to_json() for e in self.events]
1337
1338 empty = [k for k, v in out.items() if v is None]
1339 for e in empty:
1340 del out[e]
1341 return out
9f95a23c
TL
1342
1343 @classmethod
1344 @handle_type_error
1345 def from_json(cls, data):
1346 c = data.copy()
f6b5b4d7 1347 event_strs = c.pop('events', [])
9f95a23c
TL
1348 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1349 'last_configured']:
1350 if k in c:
1351 c[k] = datetime.datetime.strptime(c[k], DATEFMT)
f6b5b4d7
TL
1352 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1353 return cls(events=events, **c)
9f95a23c 1354
1911f103
TL
1355 def __copy__(self):
1356 # feel free to change this:
1357 return DaemonDescription.from_json(self.to_json())
1358
f6b5b4d7
TL
1359 @staticmethod
1360 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
1361 return dumper.represent_dict(data.to_json().items())
1362
1363
1364yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1365
1366
9f95a23c
TL
1367class ServiceDescription(object):
1368 """
1369 For responding to queries about the status of a particular service,
1370 stateful or stateless.
1371
1372 This is not about health or performance monitoring of services: it's
1373 about letting the orchestrator tell Ceph whether and where a
1374 service is scheduled in the cluster. When an orchestrator tells
1375 Ceph "it's running on host123", that's not a promise that the process
1376 is literally up this second, it's a description of where the orchestrator
1377 has decided the service should run.
1378 """
1379
1380 def __init__(self,
1911f103 1381 spec: ServiceSpec,
9f95a23c
TL
1382 container_image_id=None,
1383 container_image_name=None,
9f95a23c
TL
1384 rados_config_location=None,
1385 service_url=None,
1386 last_refresh=None,
1387 created=None,
1388 size=0,
f6b5b4d7
TL
1389 running=0,
1390 events: Optional[List['OrchestratorEvent']] = None):
9f95a23c
TL
1391 # Not everyone runs in containers, but enough people do to
1392 # justify having the container_image_id (image hash) and container_image
1393 # (image name)
1394 self.container_image_id = container_image_id # image hash
1395 self.container_image_name = container_image_name # image friendly name
1396
9f95a23c
TL
1397 # Location of the service configuration when stored in rados
1398 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1399 self.rados_config_location = rados_config_location
1400
1401 # If the service exposes REST-like API, this attribute should hold
1402 # the URL.
1403 self.service_url = service_url
1404
1405 # Number of daemons
1406 self.size = size
1407
1408 # Number of daemons up
1409 self.running = running
1410
1411 # datetime when this info was last refreshed
f6b5b4d7
TL
1412 self.last_refresh: Optional[datetime.datetime] = last_refresh
1413 self.created: Optional[datetime.datetime] = created
9f95a23c 1414
1911f103 1415 self.spec: ServiceSpec = spec
9f95a23c 1416
f6b5b4d7
TL
1417 self.events: List[OrchestratorEvent] = events or []
1418
9f95a23c 1419 def service_type(self):
1911f103 1420 return self.spec.service_type
9f95a23c
TL
1421
1422 def __repr__(self):
1911f103 1423 return f"<ServiceDescription of {self.spec.one_line_str()}>"
9f95a23c 1424
f6b5b4d7 1425 def to_json(self) -> OrderedDict:
1911f103
TL
1426 out = self.spec.to_json()
1427 status = {
9f95a23c
TL
1428 'container_image_id': self.container_image_id,
1429 'container_image_name': self.container_image_name,
9f95a23c
TL
1430 'rados_config_location': self.rados_config_location,
1431 'service_url': self.service_url,
1432 'size': self.size,
1433 'running': self.running,
1911f103 1434 'last_refresh': self.last_refresh,
f6b5b4d7 1435 'created': self.created,
9f95a23c
TL
1436 }
1437 for k in ['last_refresh', 'created']:
1438 if getattr(self, k):
1911f103
TL
1439 status[k] = getattr(self, k).strftime(DATEFMT)
1440 status = {k: v for (k, v) in status.items() if v is not None}
1441 out['status'] = status
f6b5b4d7
TL
1442 if self.events:
1443 out['events'] = [e.to_json() for e in self.events]
1911f103 1444 return out
9f95a23c
TL
1445
1446 @classmethod
1447 @handle_type_error
1911f103 1448 def from_json(cls, data: dict):
9f95a23c 1449 c = data.copy()
1911f103 1450 status = c.pop('status', {})
f6b5b4d7 1451 event_strs = c.pop('events', [])
1911f103
TL
1452 spec = ServiceSpec.from_json(c)
1453
1454 c_status = status.copy()
9f95a23c 1455 for k in ['last_refresh', 'created']:
1911f103
TL
1456 if k in c_status:
1457 c_status[k] = datetime.datetime.strptime(c_status[k], DATEFMT)
f6b5b4d7
TL
1458 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1459 return cls(spec=spec, events=events, **c_status)
1460
1461 @staticmethod
1462 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
1463 return dumper.represent_dict(data.to_json().items())
1464
1465
1466yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
9f95a23c
TL
1467
1468
1469class InventoryFilter(object):
1470 """
1471 When fetching inventory, use this filter to avoid unnecessarily
1472 scanning the whole estate.
1473
1474 Typical use: filter by host when presenting UI workflow for configuring
1475 a particular server.
1476 filter by label when not all of estate is Ceph servers,
1477 and we want to only learn about the Ceph servers.
1478 filter by label when we are interested particularly
1479 in e.g. OSD servers.
1480
1481 """
f6b5b4d7
TL
1482
1483 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
9f95a23c
TL
1484
1485 #: Optional: get info about hosts matching labels
1486 self.labels = labels
1487
1488 #: Optional: get info about certain named hosts only
1489 self.hosts = hosts
1490
1491
1492class InventoryHost(object):
1493 """
1494 When fetching inventory, all Devices are groups inside of an
1495 InventoryHost.
1496 """
f6b5b4d7
TL
1497
1498 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
9f95a23c
TL
1499 if devices is None:
1500 devices = inventory.Devices([])
1501 if labels is None:
1502 labels = []
1503 assert isinstance(devices, inventory.Devices)
1504
1505 self.name = name # unique within cluster. For example a hostname.
1506 self.addr = addr or name
1507 self.devices = devices
1508 self.labels = labels
1509
1510 def to_json(self):
1511 return {
1512 'name': self.name,
1513 'addr': self.addr,
1514 'devices': self.devices.to_json(),
1515 'labels': self.labels,
1516 }
1517
1518 @classmethod
1519 def from_json(cls, data):
1520 try:
1521 _data = copy.deepcopy(data)
1522 name = _data.pop('name')
1523 addr = _data.pop('addr', None) or name
1524 devices = inventory.Devices.from_json(_data.pop('devices'))
1911f103 1525 labels = _data.pop('labels', list())
9f95a23c
TL
1526 if _data:
1527 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1528 raise OrchestratorValidationError(error_msg)
9f95a23c
TL
1529 return cls(name, devices, labels, addr)
1530 except KeyError as e:
1531 error_msg = '{} is required for {}'.format(e, cls.__name__)
1532 raise OrchestratorValidationError(error_msg)
1533 except TypeError as e:
1534 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1535
9f95a23c
TL
1536 @classmethod
1537 def from_nested_items(cls, hosts):
1538 devs = inventory.Devices.from_json
1539 return [cls(item[0], devs(item[1].data)) for item in hosts]
1540
1541 def __repr__(self):
1542 return "<InventoryHost>({name})".format(name=self.name)
1543
1544 @staticmethod
f6b5b4d7 1545 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
9f95a23c
TL
1546 return [host.name for host in hosts]
1547
1548 def __eq__(self, other):
1549 return self.name == other.name and self.devices == other.devices
1550
1551
1552class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1553 """
1554 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1555 on devices.
1556
1557 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1558
1559 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1560 See ``ceph osd metadata | jq '.[].device_ids'``
1561 """
1562 __slots__ = ()
1563
1564
f6b5b4d7
TL
1565class OrchestratorEvent:
1566 """
1567 Similar to K8s Events.
1568
1569 Some form of "important" log message attached to something.
1570 """
1571 INFO = 'INFO'
1572 ERROR = 'ERROR'
1573 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1574
1575 def __init__(self, created: Union[str, datetime.datetime], kind, subject, level, message):
1576 if isinstance(created, str):
1577 created = datetime.datetime.strptime(created, DATEFMT)
1578 self.created: datetime.datetime = created
1579
1580 assert kind in "service daemon".split()
1581 self.kind: str = kind
1582
1583 # service name, or daemon danem or something
1584 self.subject: str = subject
1585
1586 # Events are not meant for debugging. debugs should end in the log.
1587 assert level in "INFO ERROR".split()
1588 self.level = level
1589
1590 self.message: str = message
1591
1592 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1593
1594 def kind_subject(self) -> str:
1595 return f'{self.kind}:{self.subject}'
1596
1597 def to_json(self) -> str:
1598 # Make a long list of events readable.
1599 created = self.created.strftime(DATEFMT)
1600 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1601
1602 @classmethod
1603 @handle_type_error
1604 def from_json(cls, data) -> "OrchestratorEvent":
1605 """
1606 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
1607 '2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
1608
1609 :param data:
1610 :return:
1611 """
1612 match = cls.regex_v1.match(data)
1613 if match:
1614 return cls(*match.groups())
1615 raise ValueError(f'Unable to match: "{data}"')
1616
1617 def __eq__(self, other):
1618 if not isinstance(other, OrchestratorEvent):
1619 return False
1620
1621 return self.created == other.created and self.kind == other.kind \
1622 and self.subject == other.subject and self.message == other.message
1623
1624
9f95a23c
TL
1625def _mk_orch_methods(cls):
1626 # Needs to be defined outside of for.
1627 # Otherwise meth is always bound to last key
1628 def shim(method_name):
1629 def inner(self, *args, **kwargs):
1630 completion = self._oremote(method_name, args, kwargs)
1631 return completion
1632 return inner
1633
1634 for meth in Orchestrator.__dict__:
1635 if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
1636 setattr(cls, meth, shim(meth))
1637 return cls
1638
1639
1640@_mk_orch_methods
1641class OrchestratorClientMixin(Orchestrator):
1642 """
1643 A module that inherents from `OrchestratorClientMixin` can directly call
1644 all :class:`Orchestrator` methods without manually calling remote.
1645
1646 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1647 calls :func:`OrchestratorClientMixin._oremote`
1648
1649 >>> class MyModule(OrchestratorClientMixin):
1650 ... def func(self):
1651 ... completion = self.add_host('somehost') # calls `_oremote()`
1652 ... self._orchestrator_wait([completion])
1653 ... self.log.debug(completion.result)
1654
1655 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1656 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1657 "real" implementation of the orchestrator.
1658
1659
1660 >>> import mgr_module
1911f103
TL
1661 >>> #doctest: +SKIP
1662 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
9f95a23c
TL
1663 ... def __init__(self, ...):
1664 ... self.orch_client = OrchestratorClientMixin()
1665 ... self.orch_client.set_mgr(self.mgr))
1666 """
1667
f6b5b4d7 1668 def set_mgr(self, mgr: MgrModule) -> None:
9f95a23c
TL
1669 """
1670 Useable in the Dashbord that uses a global ``mgr``
1671 """
1672
1673 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1674
1675 def __get_mgr(self):
1676 try:
1677 return self.__mgr
1678 except AttributeError:
1679 return self
1680
1681 def _oremote(self, meth, args, kwargs):
1682 """
1683 Helper for invoking `remote` on whichever orchestrator is enabled
1684
1685 :raises RuntimeError: If the remote method failed.
1686 :raises OrchestratorError: orchestrator failed to perform
1687 :raises ImportError: no `orchestrator` module or backend not found.
1688 """
1689 mgr = self.__get_mgr()
1690
1691 try:
1692 o = mgr._select_orchestrator()
1693 except AttributeError:
1694 o = mgr.remote('orchestrator', '_select_orchestrator')
1695
1696 if o is None:
1697 raise NoOrchestrator()
1698
1699 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1700 try:
1701 return mgr.remote(o, meth, *args, **kwargs)
1702 except Exception as e:
1703 if meth == 'get_feature_set':
1704 raise # self.get_feature_set() calls self._oremote()
1705 f_set = self.get_feature_set()
1706 if meth not in f_set or not f_set[meth]['available']:
1707 raise NotImplementedError(f'{o} does not implement {meth}') from e
1708 raise
1709
f6b5b4d7 1710 def _orchestrator_wait(self, completions: List[Completion]) -> None:
9f95a23c
TL
1711 """
1712 Wait for completions to complete (reads) or
1713 become persistent (writes).
1714
1715 Waits for writes to be *persistent* but not *effective*.
1716
1717 :param completions: List of Completions
1718 :raises NoOrchestrator:
1719 :raises RuntimeError: something went wrong while calling the process method.
1720 :raises ImportError: no `orchestrator` module or backend not found.
1721 """
1722 while any(not c.has_result for c in completions):
1723 self.process(completions)
1724 self.__get_mgr().log.info("Operations pending: %s",
1725 sum(1 for c in completions if not c.has_result))
1726 if any(c.needs_result for c in completions):
1727 time.sleep(1)
1728 else:
1729 break