]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | |
2 | """ | |
3 | ceph-mgr orchestrator interface | |
4 | ||
5 | Please see the ceph-mgr module developer's guide for more information. | |
6 | """ | |
e306af50 TL |
7 | |
8 | import copy | |
9 | import datetime | |
10 | import errno | |
9f95a23c TL |
11 | import logging |
12 | import pickle | |
e306af50 | 13 | import re |
9f95a23c | 14 | import time |
e306af50 TL |
15 | import uuid |
16 | ||
f6b5b4d7 TL |
17 | from collections import namedtuple, OrderedDict |
18 | from contextlib import contextmanager | |
9f95a23c | 19 | from functools import wraps |
9f95a23c | 20 | |
f6b5b4d7 TL |
21 | import yaml |
22 | ||
9f95a23c TL |
23 | from ceph.deployment import inventory |
24 | from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \ | |
1911f103 | 25 | ServiceSpecValidationError, IscsiServiceSpec |
9f95a23c | 26 | from ceph.deployment.drive_group import DriveGroupSpec |
f6b5b4d7 | 27 | from ceph.deployment.hostspec import HostSpec |
9f95a23c TL |
28 | |
29 | from mgr_module import MgrModule, CLICommand, HandleCommandResult | |
30 | ||
31 | try: | |
32 | from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \ | |
f6b5b4d7 | 33 | Type, Sequence, Dict, cast |
9f95a23c TL |
34 | except ImportError: |
35 | pass | |
36 | ||
37 | logger = logging.getLogger(__name__) | |
38 | ||
39 | DATEFMT = '%Y-%m-%dT%H:%M:%S.%f' | |
40 | ||
f6b5b4d7 TL |
41 | T = TypeVar('T') |
42 | ||
9f95a23c TL |
43 | |
44 | class OrchestratorError(Exception): | |
45 | """ | |
46 | General orchestrator specific error. | |
47 | ||
48 | Used for deployment, configuration or user errors. | |
49 | ||
50 | It's not intended for programming errors or orchestrator internal errors. | |
51 | """ | |
52 | ||
f6b5b4d7 TL |
53 | def __init__(self, |
54 | msg: str, | |
55 | errno: int = -errno.EINVAL, | |
56 | event_kind_subject: Optional[Tuple[str, str]] = None): | |
57 | super(Exception, self).__init__(msg) | |
58 | self.errno = errno | |
59 | # See OrchestratorEvent.subject | |
60 | self.event_subject = event_kind_subject | |
61 | ||
9f95a23c TL |
62 | |
63 | class NoOrchestrator(OrchestratorError): | |
64 | """ | |
65 | No orchestrator in configured. | |
66 | """ | |
f6b5b4d7 | 67 | |
9f95a23c | 68 | def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"): |
f6b5b4d7 | 69 | super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT) |
9f95a23c TL |
70 | |
71 | ||
72 | class OrchestratorValidationError(OrchestratorError): | |
73 | """ | |
74 | Raised when an orchestrator doesn't support a specific feature. | |
75 | """ | |
76 | ||
77 | ||
f6b5b4d7 TL |
78 | @contextmanager |
79 | def set_exception_subject(kind, subject, overwrite=False): | |
80 | try: | |
81 | yield | |
82 | except OrchestratorError as e: | |
83 | if overwrite or hasattr(e, 'event_subject'): | |
84 | e.event_subject = (kind, subject) | |
85 | raise | |
86 | ||
87 | ||
9f95a23c TL |
88 | def handle_exception(prefix, cmd_args, desc, perm, func): |
89 | @wraps(func) | |
90 | def wrapper(*args, **kwargs): | |
91 | try: | |
92 | return func(*args, **kwargs) | |
f6b5b4d7 | 93 | except (OrchestratorError, ServiceSpecValidationError) as e: |
9f95a23c | 94 | # Do not print Traceback for expected errors. |
f6b5b4d7 TL |
95 | return HandleCommandResult(e.errno, stderr=str(e)) |
96 | except ImportError as e: | |
9f95a23c TL |
97 | return HandleCommandResult(-errno.ENOENT, stderr=str(e)) |
98 | except NotImplementedError: | |
99 | msg = 'This Orchestrator does not support `{}`'.format(prefix) | |
100 | return HandleCommandResult(-errno.ENOENT, stderr=msg) | |
101 | ||
102 | # misuse partial to copy `wrapper` | |
103 | wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) | |
104 | wrapper_copy._prefix = prefix # type: ignore | |
105 | wrapper_copy._cli_command = CLICommand(prefix, cmd_args, desc, perm) # type: ignore | |
106 | wrapper_copy._cli_command.func = wrapper_copy # type: ignore | |
107 | ||
108 | return wrapper_copy | |
109 | ||
110 | ||
111 | def _cli_command(perm): | |
112 | def inner_cli_command(prefix, cmd_args="", desc=""): | |
113 | return lambda func: handle_exception(prefix, cmd_args, desc, perm, func) | |
114 | return inner_cli_command | |
115 | ||
116 | ||
117 | _cli_read_command = _cli_command('r') | |
118 | _cli_write_command = _cli_command('rw') | |
119 | ||
120 | ||
121 | class CLICommandMeta(type): | |
122 | """ | |
123 | This is a workaround for the use of a global variable CLICommand.COMMANDS which | |
124 | prevents modules from importing any other module. | |
125 | ||
126 | We make use of CLICommand, except for the use of the global variable. | |
127 | """ | |
128 | def __init__(cls, name, bases, dct): | |
129 | super(CLICommandMeta, cls).__init__(name, bases, dct) | |
f6b5b4d7 | 130 | dispatch: Dict[str, CLICommand] = {} |
9f95a23c TL |
131 | for v in dct.values(): |
132 | try: | |
133 | dispatch[v._prefix] = v._cli_command | |
134 | except AttributeError: | |
135 | pass | |
136 | ||
137 | def handle_command(self, inbuf, cmd): | |
138 | if cmd['prefix'] not in dispatch: | |
139 | return self.handle_command(inbuf, cmd) | |
140 | ||
141 | return dispatch[cmd['prefix']].call(self, cmd, inbuf) | |
142 | ||
143 | cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()] | |
144 | cls.handle_command = handle_command | |
145 | ||
146 | ||
147 | def _no_result(): | |
148 | return object() | |
149 | ||
150 | ||
151 | class _Promise(object): | |
152 | """ | |
153 | A completion may need multiple promises to be fulfilled. `_Promise` is one | |
154 | step. | |
155 | ||
156 | Typically ``Orchestrator`` implementations inherit from this class to | |
157 | build their own way of finishing a step to fulfil a future. | |
158 | ||
159 | They are not exposed in the orchestrator interface and can be seen as a | |
160 | helper to build orchestrator modules. | |
161 | """ | |
162 | INITIALIZED = 1 # We have a parent completion and a next completion | |
163 | RUNNING = 2 | |
164 | FINISHED = 3 # we have a final result | |
165 | ||
f6b5b4d7 | 166 | NO_RESULT: None = _no_result() |
9f95a23c TL |
167 | ASYNC_RESULT = object() |
168 | ||
169 | def __init__(self, | |
f6b5b4d7 TL |
170 | _first_promise: Optional["_Promise"] = None, |
171 | value: Optional[Any] = NO_RESULT, | |
172 | on_complete: Optional[Callable] = None, | |
173 | name: Optional[str] = None, | |
9f95a23c TL |
174 | ): |
175 | self._on_complete_ = on_complete | |
176 | self._name = name | |
f6b5b4d7 | 177 | self._next_promise: Optional[_Promise] = None |
9f95a23c TL |
178 | |
179 | self._state = self.INITIALIZED | |
f6b5b4d7 | 180 | self._exception: Optional[Exception] = None |
9f95a23c TL |
181 | |
182 | # Value of this _Promise. may be an intermediate result. | |
183 | self._value = value | |
184 | ||
185 | # _Promise is not a continuation monad, as `_result` is of type | |
186 | # T instead of (T -> r) -> r. Therefore we need to store the first promise here. | |
f6b5b4d7 | 187 | self._first_promise: '_Promise' = _first_promise or self |
9f95a23c TL |
188 | |
189 | @property | |
f6b5b4d7 | 190 | def _exception(self) -> Optional[Exception]: |
9f95a23c TL |
191 | return getattr(self, '_exception_', None) |
192 | ||
193 | @_exception.setter | |
194 | def _exception(self, e): | |
195 | self._exception_ = e | |
196 | try: | |
197 | self._serialized_exception_ = pickle.dumps(e) if e is not None else None | |
198 | except pickle.PicklingError: | |
199 | logger.error(f"failed to pickle {e}") | |
200 | if isinstance(e, Exception): | |
201 | e = Exception(*e.args) | |
202 | else: | |
203 | e = Exception(str(e)) | |
204 | # degenerate to a plain Exception | |
205 | self._serialized_exception_ = pickle.dumps(e) | |
206 | ||
207 | @property | |
f6b5b4d7 | 208 | def _serialized_exception(self) -> Optional[bytes]: |
9f95a23c TL |
209 | return getattr(self, '_serialized_exception_', None) |
210 | ||
9f95a23c | 211 | @property |
f6b5b4d7 | 212 | def _on_complete(self) -> Optional[Callable]: |
9f95a23c TL |
213 | # https://github.com/python/mypy/issues/4125 |
214 | return self._on_complete_ | |
215 | ||
216 | @_on_complete.setter | |
f6b5b4d7 | 217 | def _on_complete(self, val: Optional[Callable]) -> None: |
9f95a23c TL |
218 | self._on_complete_ = val |
219 | ||
9f95a23c | 220 | def __repr__(self): |
f6b5b4d7 TL |
221 | name = self._name or getattr(self._on_complete, '__name__', |
222 | '??') if self._on_complete else 'None' | |
9f95a23c TL |
223 | val = repr(self._value) if self._value is not self.NO_RESULT else 'NA' |
224 | return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format( | |
f6b5b4d7 TL |
225 | self.__class__, self._state, val, self._on_complete, id(self), name, getattr( |
226 | next, '_progress_reference', 'NA'), repr(self._next_promise) | |
9f95a23c TL |
227 | ) |
228 | ||
229 | def pretty_print_1(self): | |
230 | if self._name: | |
231 | name = self._name | |
232 | elif self._on_complete is None: | |
233 | name = 'lambda x: x' | |
234 | elif hasattr(self._on_complete, '__name__'): | |
235 | name = getattr(self._on_complete, '__name__') | |
236 | else: | |
237 | name = self._on_complete.__class__.__name__ | |
238 | val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...' | |
239 | prefix = { | |
240 | self.INITIALIZED: ' ', | |
241 | self.RUNNING: ' >>>', | |
242 | self.FINISHED: '(done)' | |
243 | }[self._state] | |
244 | return '{} {}({}),'.format(prefix, name, val) | |
245 | ||
f6b5b4d7 | 246 | def then(self: Any, on_complete: Callable) -> Any: |
9f95a23c TL |
247 | """ |
248 | Call ``on_complete`` as soon as this promise is finalized. | |
249 | """ | |
250 | assert self._state in (self.INITIALIZED, self.RUNNING) | |
251 | ||
252 | if self._next_promise is not None: | |
253 | return self._next_promise.then(on_complete) | |
254 | ||
255 | if self._on_complete is not None: | |
256 | self._set_next_promise(self.__class__( | |
257 | _first_promise=self._first_promise, | |
258 | on_complete=on_complete | |
259 | )) | |
260 | return self._next_promise | |
261 | ||
262 | else: | |
263 | self._on_complete = on_complete | |
264 | self._set_next_promise(self.__class__(_first_promise=self._first_promise)) | |
265 | return self._next_promise | |
266 | ||
f6b5b4d7 | 267 | def _set_next_promise(self, next: '_Promise') -> None: |
9f95a23c TL |
268 | assert self is not next |
269 | assert self._state in (self.INITIALIZED, self.RUNNING) | |
270 | ||
271 | self._next_promise = next | |
272 | assert self._next_promise is not None | |
273 | for p in iter(self._next_promise): | |
274 | p._first_promise = self._first_promise | |
275 | ||
276 | def _finalize(self, value=NO_RESULT): | |
277 | """ | |
278 | Sets this promise to complete. | |
279 | ||
280 | Orchestrators may choose to use this helper function. | |
281 | ||
282 | :param value: new value. | |
283 | """ | |
284 | if self._state not in (self.INITIALIZED, self.RUNNING): | |
285 | raise ValueError('finalize: {} already finished. {}'.format(repr(self), value)) | |
286 | ||
287 | self._state = self.RUNNING | |
288 | ||
289 | if value is not self.NO_RESULT: | |
290 | self._value = value | |
291 | assert self._value is not self.NO_RESULT, repr(self) | |
292 | ||
293 | if self._on_complete: | |
294 | try: | |
295 | next_result = self._on_complete(self._value) | |
296 | except Exception as e: | |
297 | self.fail(e) | |
298 | return | |
299 | else: | |
300 | next_result = self._value | |
301 | ||
302 | if isinstance(next_result, _Promise): | |
303 | # hack: _Promise is not a continuation monad. | |
304 | next_result = next_result._first_promise # type: ignore | |
305 | assert next_result not in self, repr(self._first_promise) + repr(next_result) | |
306 | assert self not in next_result | |
307 | next_result._append_promise(self._next_promise) | |
308 | self._set_next_promise(next_result) | |
309 | assert self._next_promise | |
310 | if self._next_promise._value is self.NO_RESULT: | |
311 | self._next_promise._value = self._value | |
312 | self.propagate_to_next() | |
313 | elif next_result is not self.ASYNC_RESULT: | |
314 | # simple map. simply forward | |
315 | if self._next_promise: | |
316 | self._next_promise._value = next_result | |
317 | else: | |
318 | # Hack: next_result is of type U, _value is of type T | |
319 | self._value = next_result # type: ignore | |
320 | self.propagate_to_next() | |
321 | else: | |
322 | # asynchronous promise | |
323 | pass | |
324 | ||
9f95a23c TL |
325 | def propagate_to_next(self): |
326 | self._state = self.FINISHED | |
327 | logger.debug('finalized {}'.format(repr(self))) | |
328 | if self._next_promise: | |
329 | self._next_promise._finalize() | |
330 | ||
f6b5b4d7 | 331 | def fail(self, e: Exception) -> None: |
9f95a23c TL |
332 | """ |
333 | Sets the whole completion to be faild with this exception and end the | |
334 | evaluation. | |
335 | """ | |
336 | if self._state == self.FINISHED: | |
337 | raise ValueError( | |
338 | 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e))) | |
339 | assert self._state in (self.INITIALIZED, self.RUNNING) | |
340 | logger.exception('_Promise failed') | |
341 | self._exception = e | |
342 | self._value = f'_exception: {e}' | |
343 | if self._next_promise: | |
344 | self._next_promise.fail(e) | |
345 | self._state = self.FINISHED | |
346 | ||
347 | def __contains__(self, item): | |
348 | return any(item is p for p in iter(self._first_promise)) | |
349 | ||
350 | def __iter__(self): | |
351 | yield self | |
352 | elem = self._next_promise | |
353 | while elem is not None: | |
354 | yield elem | |
355 | elem = elem._next_promise | |
356 | ||
357 | def _append_promise(self, other): | |
358 | if other is not None: | |
359 | assert self not in other | |
360 | assert other not in self | |
361 | self._last_promise()._set_next_promise(other) | |
362 | ||
f6b5b4d7 | 363 | def _last_promise(self) -> '_Promise': |
9f95a23c TL |
364 | return list(iter(self))[-1] |
365 | ||
366 | ||
367 | class ProgressReference(object): | |
368 | def __init__(self, | |
f6b5b4d7 | 369 | message: str, |
9f95a23c | 370 | mgr, |
f6b5b4d7 TL |
371 | completion: Optional[Callable[[], 'Completion']] = None |
372 | ): | |
9f95a23c TL |
373 | """ |
374 | ProgressReference can be used within Completions:: | |
375 | ||
376 | +---------------+ +---------------------------------+ | |
377 | | | then | | | |
378 | | My Completion | +--> | on_complete=ProgressReference() | | |
379 | | | | | | |
380 | +---------------+ +---------------------------------+ | |
381 | ||
382 | See :func:`Completion.with_progress` for an easy way to create | |
383 | a progress reference | |
384 | ||
385 | """ | |
386 | super(ProgressReference, self).__init__() | |
387 | self.progress_id = str(uuid.uuid4()) | |
388 | self.message = message | |
389 | self.mgr = mgr | |
390 | ||
391 | #: The completion can already have a result, before the write | |
392 | #: operation is effective. progress == 1 means, the services are | |
393 | #: created / removed. | |
f6b5b4d7 | 394 | self.completion: Optional[Callable[[], Completion]] = completion |
9f95a23c TL |
395 | |
396 | #: if a orchestrator module can provide a more detailed | |
397 | #: progress information, it needs to also call ``progress.update()``. | |
398 | self.progress = 0.0 | |
399 | ||
400 | self._completion_has_result = False | |
401 | self.mgr.all_progress_references.append(self) | |
402 | ||
403 | def __str__(self): | |
404 | """ | |
405 | ``__str__()`` is used for determining the message for progress events. | |
406 | """ | |
407 | return self.message or super(ProgressReference, self).__str__() | |
408 | ||
409 | def __call__(self, arg): | |
410 | self._completion_has_result = True | |
411 | self.progress = 1.0 | |
412 | return arg | |
413 | ||
414 | @property | |
415 | def progress(self): | |
416 | return self._progress | |
417 | ||
418 | @progress.setter | |
419 | def progress(self, progress): | |
420 | assert progress <= 1.0 | |
421 | self._progress = progress | |
422 | try: | |
423 | if self.effective: | |
424 | self.mgr.remote("progress", "complete", self.progress_id) | |
f6b5b4d7 TL |
425 | self.mgr.all_progress_references = [ |
426 | p for p in self.mgr.all_progress_references if p is not self] | |
9f95a23c TL |
427 | else: |
428 | self.mgr.remote("progress", "update", self.progress_id, self.message, | |
429 | progress, | |
430 | [("origin", "orchestrator")]) | |
431 | except ImportError: | |
432 | # If the progress module is disabled that's fine, | |
433 | # they just won't see the output. | |
434 | pass | |
435 | ||
436 | @property | |
437 | def effective(self): | |
438 | return self.progress == 1 and self._completion_has_result | |
439 | ||
440 | def update(self): | |
441 | def progress_run(progress): | |
442 | self.progress = progress | |
443 | if self.completion: | |
444 | c = self.completion().then(progress_run) | |
445 | self.mgr.process([c._first_promise]) | |
446 | else: | |
447 | self.progress = 1 | |
448 | ||
449 | def fail(self): | |
450 | self._completion_has_result = True | |
451 | self.progress = 1 | |
452 | ||
453 | ||
f6b5b4d7 | 454 | class Completion(_Promise, Generic[T]): |
9f95a23c TL |
455 | """ |
456 | Combines multiple promises into one overall operation. | |
457 | ||
458 | Completions are composable by being able to | |
459 | call one completion from another completion. I.e. making them re-usable | |
460 | using Promises E.g.:: | |
461 | ||
1911f103 TL |
462 | >>> #doctest: +SKIP |
463 | ... return Orchestrator().get_hosts().then(self._create_osd) | |
9f95a23c TL |
464 | |
465 | where ``get_hosts`` returns a Completion of list of hosts and | |
466 | ``_create_osd`` takes a list of hosts. | |
467 | ||
468 | The concept behind this is to store the computation steps | |
469 | explicit and then explicitly evaluate the chain: | |
470 | ||
1911f103 TL |
471 | >>> #doctest: +SKIP |
472 | ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x)) | |
9f95a23c TL |
473 | ... p.finalize(2) |
474 | ... assert p.result = "4" | |
475 | ||
476 | or graphically:: | |
477 | ||
478 | +---------------+ +-----------------+ | |
479 | | | then | | | |
480 | | lambda x: x*x | +--> | lambda x: str(x)| | |
481 | | | | | | |
482 | +---------------+ +-----------------+ | |
483 | ||
484 | """ | |
f6b5b4d7 | 485 | |
9f95a23c | 486 | def __init__(self, |
f6b5b4d7 TL |
487 | _first_promise: Optional["Completion"] = None, |
488 | value: Any = _Promise.NO_RESULT, | |
489 | on_complete: Optional[Callable] = None, | |
490 | name: Optional[str] = None, | |
9f95a23c TL |
491 | ): |
492 | super(Completion, self).__init__(_first_promise, value, on_complete, name) | |
493 | ||
494 | @property | |
f6b5b4d7 | 495 | def _progress_reference(self) -> Optional[ProgressReference]: |
9f95a23c TL |
496 | if hasattr(self._on_complete, 'progress_id'): |
497 | return self._on_complete # type: ignore | |
498 | return None | |
499 | ||
500 | @property | |
f6b5b4d7 | 501 | def progress_reference(self) -> Optional[ProgressReference]: |
9f95a23c TL |
502 | """ |
503 | ProgressReference. Marks this completion | |
504 | as a write completeion. | |
505 | """ | |
506 | ||
f6b5b4d7 TL |
507 | references = [c._progress_reference for c in iter( |
508 | self) if c._progress_reference is not None] | |
9f95a23c TL |
509 | if references: |
510 | assert len(references) == 1 | |
511 | return references[0] | |
512 | return None | |
513 | ||
514 | @classmethod | |
f6b5b4d7 TL |
515 | def with_progress(cls: Any, |
516 | message: str, | |
9f95a23c | 517 | mgr, |
f6b5b4d7 TL |
518 | _first_promise: Optional["Completion"] = None, |
519 | value: Any = _Promise.NO_RESULT, | |
520 | on_complete: Optional[Callable] = None, | |
521 | calc_percent: Optional[Callable[[], Any]] = None | |
522 | ) -> Any: | |
9f95a23c TL |
523 | |
524 | c = cls( | |
525 | _first_promise=_first_promise, | |
526 | value=value, | |
527 | on_complete=on_complete | |
528 | ).add_progress(message, mgr, calc_percent) | |
529 | ||
530 | return c._first_promise | |
531 | ||
532 | def add_progress(self, | |
f6b5b4d7 | 533 | message: str, |
9f95a23c | 534 | mgr, |
f6b5b4d7 | 535 | calc_percent: Optional[Callable[[], Any]] = None |
9f95a23c TL |
536 | ): |
537 | return self.then( | |
538 | on_complete=ProgressReference( | |
539 | message=message, | |
540 | mgr=mgr, | |
541 | completion=calc_percent | |
542 | ) | |
543 | ) | |
544 | ||
f6b5b4d7 | 545 | def fail(self, e: Exception): |
9f95a23c TL |
546 | super(Completion, self).fail(e) |
547 | if self._progress_reference: | |
548 | self._progress_reference.fail() | |
549 | ||
f6b5b4d7 | 550 | def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT): |
9f95a23c TL |
551 | if self._first_promise._state == self.INITIALIZED: |
552 | self._first_promise._finalize(result) | |
553 | ||
554 | @property | |
f6b5b4d7 | 555 | def result(self) -> T: |
9f95a23c TL |
556 | """ |
557 | The result of the operation that we were waited | |
558 | for. Only valid after calling Orchestrator.process() on this | |
559 | completion. | |
560 | """ | |
561 | last = self._last_promise() | |
562 | assert last._state == _Promise.FINISHED | |
f6b5b4d7 | 563 | return cast(T, last._value) |
9f95a23c | 564 | |
f6b5b4d7 | 565 | def result_str(self) -> str: |
9f95a23c TL |
566 | """Force a string.""" |
567 | if self.result is None: | |
568 | return '' | |
569 | if isinstance(self.result, list): | |
570 | return '\n'.join(str(x) for x in self.result) | |
571 | return str(self.result) | |
572 | ||
573 | @property | |
f6b5b4d7 | 574 | def exception(self) -> Optional[Exception]: |
9f95a23c TL |
575 | return self._last_promise()._exception |
576 | ||
577 | @property | |
f6b5b4d7 | 578 | def serialized_exception(self) -> Optional[bytes]: |
9f95a23c TL |
579 | return self._last_promise()._serialized_exception |
580 | ||
581 | @property | |
f6b5b4d7 | 582 | def has_result(self) -> bool: |
9f95a23c TL |
583 | """ |
584 | Has the operation already a result? | |
585 | ||
586 | For Write operations, it can already have a | |
587 | result, if the orchestrator's configuration is | |
588 | persistently written. Typically this would | |
589 | indicate that an update had been written to | |
590 | a manifest, but that the update had not | |
591 | necessarily been pushed out to the cluster. | |
592 | ||
593 | :return: | |
594 | """ | |
595 | return self._last_promise()._state == _Promise.FINISHED | |
596 | ||
597 | @property | |
f6b5b4d7 | 598 | def is_errored(self) -> bool: |
9f95a23c TL |
599 | """ |
600 | Has the completion failed. Default implementation looks for | |
601 | self.exception. Can be overwritten. | |
602 | """ | |
603 | return self.exception is not None | |
604 | ||
605 | @property | |
f6b5b4d7 | 606 | def needs_result(self) -> bool: |
9f95a23c TL |
607 | """ |
608 | Could the external operation be deemed as complete, | |
609 | or should we wait? | |
610 | We must wait for a read operation only if it is not complete. | |
611 | """ | |
612 | return not self.is_errored and not self.has_result | |
613 | ||
614 | @property | |
f6b5b4d7 | 615 | def is_finished(self) -> bool: |
9f95a23c TL |
616 | """ |
617 | Could the external operation be deemed as complete, | |
618 | or should we wait? | |
619 | We must wait for a read operation only if it is not complete. | |
620 | """ | |
621 | return self.is_errored or (self.has_result) | |
622 | ||
623 | def pretty_print(self): | |
624 | ||
625 | reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise)) | |
626 | return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs) | |
627 | ||
628 | ||
f6b5b4d7 | 629 | def pretty_print(completions: Sequence[Completion]) -> str: |
9f95a23c TL |
630 | return ', '.join(c.pretty_print() for c in completions) |
631 | ||
632 | ||
f6b5b4d7 | 633 | def raise_if_exception(c: Completion) -> None: |
9f95a23c TL |
634 | """ |
635 | :raises OrchestratorError: Some user error or a config error. | |
636 | :raises Exception: Some internal error | |
637 | """ | |
638 | if c.serialized_exception is not None: | |
639 | try: | |
640 | e = pickle.loads(c.serialized_exception) | |
641 | except (KeyError, AttributeError): | |
642 | raise Exception('{}: {}'.format(type(c.exception), c.exception)) | |
643 | raise e | |
644 | ||
645 | ||
f6b5b4d7 | 646 | class TrivialReadCompletion(Completion[T]): |
9f95a23c TL |
647 | """ |
648 | This is the trivial completion simply wrapping a result. | |
649 | """ | |
f6b5b4d7 TL |
650 | |
651 | def __init__(self, result: T): | |
9f95a23c TL |
652 | super(TrivialReadCompletion, self).__init__() |
653 | if result: | |
654 | self.finalize(result) | |
655 | ||
656 | ||
657 | def _hide_in_features(f): | |
658 | f._hide_in_features = True | |
659 | return f | |
660 | ||
661 | ||
662 | class Orchestrator(object): | |
663 | """ | |
664 | Calls in this class may do long running remote operations, with time | |
665 | periods ranging from network latencies to package install latencies and large | |
666 | internet downloads. For that reason, all are asynchronous, and return | |
667 | ``Completion`` objects. | |
668 | ||
669 | Methods should only return the completion and not directly execute | |
670 | anything, like network calls. Otherwise the purpose of | |
671 | those completions is defeated. | |
672 | ||
673 | Implementations are not required to start work on an operation until | |
674 | the caller waits on the relevant Completion objects. Callers making | |
675 | multiple updates should not wait on Completions until they're done | |
676 | sending operations: this enables implementations to batch up a series | |
677 | of updates when wait() is called on a set of Completion objects. | |
678 | ||
679 | Implementations are encouraged to keep reasonably fresh caches of | |
680 | the status of the system: it is better to serve a stale-but-recent | |
681 | result read of e.g. device inventory than it is to keep the caller waiting | |
682 | while you scan hosts every time. | |
683 | """ | |
684 | ||
685 | @_hide_in_features | |
686 | def is_orchestrator_module(self): | |
687 | """ | |
688 | Enable other modules to interrogate this module to discover | |
689 | whether it's usable as an orchestrator module. | |
690 | ||
691 | Subclasses do not need to override this. | |
692 | """ | |
693 | return True | |
694 | ||
695 | @_hide_in_features | |
f6b5b4d7 | 696 | def available(self) -> Tuple[bool, str]: |
9f95a23c TL |
697 | """ |
698 | Report whether we can talk to the orchestrator. This is the | |
699 | place to give the user a meaningful message if the orchestrator | |
700 | isn't running or can't be contacted. | |
701 | ||
702 | This method may be called frequently (e.g. every page load | |
703 | to conditionally display a warning banner), so make sure it's | |
704 | not too expensive. It's okay to give a slightly stale status | |
705 | (e.g. based on a periodic background ping of the orchestrator) | |
706 | if that's necessary to make this method fast. | |
707 | ||
708 | .. note:: | |
709 | `True` doesn't mean that the desired functionality | |
710 | is actually available in the orchestrator. I.e. this | |
711 | won't work as expected:: | |
712 | ||
1911f103 TL |
713 | >>> #doctest: +SKIP |
714 | ... if OrchestratorClientMixin().available()[0]: # wrong. | |
9f95a23c TL |
715 | ... OrchestratorClientMixin().get_hosts() |
716 | ||
717 | :return: two-tuple of boolean, string | |
718 | """ | |
719 | raise NotImplementedError() | |
720 | ||
721 | @_hide_in_features | |
f6b5b4d7 | 722 | def process(self, completions: List[Completion]) -> None: |
9f95a23c TL |
723 | """ |
724 | Given a list of Completion instances, process any which are | |
725 | incomplete. | |
726 | ||
727 | Callers should inspect the detail of each completion to identify | |
728 | partial completion/progress information, and present that information | |
729 | to the user. | |
730 | ||
731 | This method should not block, as this would make it slow to query | |
732 | a status, while other long running operations are in progress. | |
733 | """ | |
734 | raise NotImplementedError() | |
735 | ||
736 | @_hide_in_features | |
737 | def get_feature_set(self): | |
738 | """Describes which methods this orchestrator implements | |
739 | ||
740 | .. note:: | |
741 | `True` doesn't mean that the desired functionality | |
742 | is actually possible in the orchestrator. I.e. this | |
743 | won't work as expected:: | |
744 | ||
1911f103 TL |
745 | >>> #doctest: +SKIP |
746 | ... api = OrchestratorClientMixin() | |
9f95a23c TL |
747 | ... if api.get_feature_set()['get_hosts']['available']: # wrong. |
748 | ... api.get_hosts() | |
749 | ||
750 | It's better to ask for forgiveness instead:: | |
751 | ||
1911f103 TL |
752 | >>> #doctest: +SKIP |
753 | ... try: | |
9f95a23c TL |
754 | ... OrchestratorClientMixin().get_hosts() |
755 | ... except (OrchestratorError, NotImplementedError): | |
756 | ... ... | |
757 | ||
758 | :returns: Dict of API method names to ``{'available': True or False}`` | |
759 | """ | |
760 | module = self.__class__ | |
761 | features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)} | |
762 | for a in Orchestrator.__dict__ | |
763 | if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False) | |
764 | } | |
765 | return features | |
766 | ||
f6b5b4d7 | 767 | def cancel_completions(self) -> None: |
9f95a23c TL |
768 | """ |
769 | Cancels ongoing completions. Unstuck the mgr. | |
770 | """ | |
771 | raise NotImplementedError() | |
772 | ||
f6b5b4d7 | 773 | def pause(self) -> None: |
9f95a23c TL |
774 | raise NotImplementedError() |
775 | ||
f6b5b4d7 | 776 | def resume(self) -> None: |
9f95a23c TL |
777 | raise NotImplementedError() |
778 | ||
f6b5b4d7 | 779 | def add_host(self, host_spec: HostSpec) -> Completion[str]: |
9f95a23c TL |
780 | """ |
781 | Add a host to the orchestrator inventory. | |
782 | ||
783 | :param host: hostname | |
784 | """ | |
785 | raise NotImplementedError() | |
786 | ||
f6b5b4d7 | 787 | def remove_host(self, host: str) -> Completion[str]: |
9f95a23c TL |
788 | """ |
789 | Remove a host from the orchestrator inventory. | |
790 | ||
791 | :param host: hostname | |
792 | """ | |
793 | raise NotImplementedError() | |
794 | ||
f6b5b4d7 | 795 | def update_host_addr(self, host: str, addr: str) -> Completion[str]: |
9f95a23c TL |
796 | """ |
797 | Update a host's address | |
798 | ||
799 | :param host: hostname | |
800 | :param addr: address (dns name or IP) | |
801 | """ | |
802 | raise NotImplementedError() | |
803 | ||
f6b5b4d7 | 804 | def get_hosts(self) -> Completion[List[HostSpec]]: |
9f95a23c TL |
805 | """ |
806 | Report the hosts in the cluster. | |
807 | ||
808 | :return: list of HostSpec | |
809 | """ | |
810 | raise NotImplementedError() | |
811 | ||
f6b5b4d7 | 812 | def add_host_label(self, host: str, label: str) -> Completion[str]: |
9f95a23c TL |
813 | """ |
814 | Add a host label | |
815 | """ | |
816 | raise NotImplementedError() | |
817 | ||
f6b5b4d7 | 818 | def remove_host_label(self, host: str, label: str) -> Completion[str]: |
9f95a23c TL |
819 | """ |
820 | Remove a host label | |
821 | """ | |
822 | raise NotImplementedError() | |
823 | ||
f6b5b4d7 TL |
824 | def host_ok_to_stop(self, hostname: str) -> Completion: |
825 | """ | |
826 | Check if the specified host can be safely stopped without reducing availability | |
827 | ||
828 | :param host: hostname | |
829 | """ | |
830 | raise NotImplementedError() | |
831 | ||
832 | def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]: | |
9f95a23c TL |
833 | """ |
834 | Returns something that was created by `ceph-volume inventory`. | |
835 | ||
836 | :return: list of InventoryHost | |
837 | """ | |
838 | raise NotImplementedError() | |
839 | ||
f6b5b4d7 | 840 | def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> Completion[List['ServiceDescription']]: |
9f95a23c TL |
841 | """ |
842 | Describe a service (of any kind) that is already configured in | |
843 | the orchestrator. For example, when viewing an OSD in the dashboard | |
844 | we might like to also display information about the orchestrator's | |
845 | view of the service (like the kubernetes pod ID). | |
846 | ||
847 | When viewing a CephFS filesystem in the dashboard, we would use this | |
848 | to display the pods being currently run for MDS daemons. | |
849 | ||
850 | :return: list of ServiceDescription objects. | |
851 | """ | |
852 | raise NotImplementedError() | |
853 | ||
f6b5b4d7 | 854 | def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> Completion[List['DaemonDescription']]: |
9f95a23c TL |
855 | """ |
856 | Describe a daemon (of any kind) that is already configured in | |
857 | the orchestrator. | |
858 | ||
859 | :return: list of DaemonDescription objects. | |
860 | """ | |
861 | raise NotImplementedError() | |
862 | ||
f6b5b4d7 | 863 | def apply(self, specs: List["GenericSpec"]) -> Completion[List[str]]: |
9f95a23c TL |
864 | """ |
865 | Applies any spec | |
866 | """ | |
e306af50 | 867 | fns: Dict[str, function] = { |
9f95a23c TL |
868 | 'alertmanager': self.apply_alertmanager, |
869 | 'crash': self.apply_crash, | |
870 | 'grafana': self.apply_grafana, | |
e306af50 | 871 | 'iscsi': self.apply_iscsi, |
9f95a23c TL |
872 | 'mds': self.apply_mds, |
873 | 'mgr': self.apply_mgr, | |
874 | 'mon': self.apply_mon, | |
e306af50 | 875 | 'nfs': self.apply_nfs, |
9f95a23c | 876 | 'node-exporter': self.apply_node_exporter, |
e306af50 | 877 | 'osd': lambda dg: self.apply_drivegroups([dg]), |
9f95a23c TL |
878 | 'prometheus': self.apply_prometheus, |
879 | 'rbd-mirror': self.apply_rbd_mirror, | |
e306af50 TL |
880 | 'rgw': self.apply_rgw, |
881 | 'host': self.add_host, | |
9f95a23c TL |
882 | } |
883 | ||
884 | def merge(ls, r): | |
885 | if isinstance(ls, list): | |
886 | return ls + [r] | |
887 | return [ls, r] | |
888 | ||
889 | spec, *specs = specs | |
890 | ||
e306af50 TL |
891 | fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type]) |
892 | completion = fn(spec) | |
9f95a23c TL |
893 | for s in specs: |
894 | def next(ls): | |
e306af50 TL |
895 | fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type]) |
896 | return fn(s).then(lambda r: merge(ls, r)) | |
9f95a23c TL |
897 | completion = completion.then(next) |
898 | return completion | |
899 | ||
f6b5b4d7 TL |
900 | def plan(self, spec: List["GenericSpec"]) -> Completion[List]: |
901 | """ | |
902 | Plan (Dry-run, Preview) a List of Specs. | |
903 | """ | |
904 | raise NotImplementedError() | |
905 | ||
906 | def remove_daemons(self, names: List[str]) -> Completion[List[str]]: | |
9f95a23c TL |
907 | """ |
908 | Remove specific daemon(s). | |
909 | ||
910 | :return: None | |
911 | """ | |
912 | raise NotImplementedError() | |
913 | ||
f6b5b4d7 | 914 | def remove_service(self, service_name: str) -> Completion[str]: |
9f95a23c TL |
915 | """ |
916 | Remove a service (a collection of daemons). | |
917 | ||
918 | :return: None | |
919 | """ | |
920 | raise NotImplementedError() | |
921 | ||
f6b5b4d7 | 922 | def service_action(self, action: str, service_name: str) -> Completion[List[str]]: |
9f95a23c TL |
923 | """ |
924 | Perform an action (start/stop/reload) on a service (i.e., all daemons | |
925 | providing the logical service). | |
926 | ||
927 | :param action: one of "start", "stop", "restart", "redeploy", "reconfig" | |
f6b5b4d7 TL |
928 | :param service_name: service_type + '.' + service_id |
929 | (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...) | |
9f95a23c TL |
930 | :rtype: Completion |
931 | """ | |
f6b5b4d7 | 932 | # assert action in ["start", "stop", "reload, "restart", "redeploy"] |
9f95a23c TL |
933 | raise NotImplementedError() |
934 | ||
f91f0fd5 | 935 | def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> Completion[str]: |
9f95a23c TL |
936 | """ |
937 | Perform an action (start/stop/reload) on a daemon. | |
938 | ||
939 | :param action: one of "start", "stop", "restart", "redeploy", "reconfig" | |
f6b5b4d7 TL |
940 | :param daemon_name: name of daemon |
941 | :param image: Container image when redeploying that daemon | |
9f95a23c TL |
942 | :rtype: Completion |
943 | """ | |
f6b5b4d7 | 944 | # assert action in ["start", "stop", "reload, "restart", "redeploy"] |
9f95a23c TL |
945 | raise NotImplementedError() |
946 | ||
f6b5b4d7 | 947 | def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]: |
9f95a23c TL |
948 | """ |
949 | Create one or more OSDs within a single Drive Group. | |
950 | ||
951 | The principal argument here is the drive_group member | |
952 | of OsdSpec: other fields are advisory/extensible for any | |
953 | finer-grained OSD feature enablement (choice of backing store, | |
954 | compression/encryption, etc). | |
955 | """ | |
956 | raise NotImplementedError() | |
957 | ||
f6b5b4d7 | 958 | def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion[List[str]]: |
9f95a23c TL |
959 | """ Update OSD cluster """ |
960 | raise NotImplementedError() | |
961 | ||
e306af50 TL |
962 | def set_unmanaged_flag(self, |
963 | unmanaged_flag: bool, | |
964 | service_type: str = 'osd', | |
965 | service_name=None | |
966 | ) -> HandleCommandResult: | |
1911f103 TL |
967 | raise NotImplementedError() |
968 | ||
e306af50 TL |
969 | def preview_osdspecs(self, |
970 | osdspec_name: Optional[str] = 'osd', | |
971 | osdspecs: Optional[List[DriveGroupSpec]] = None | |
f6b5b4d7 | 972 | ) -> Completion[str]: |
1911f103 TL |
973 | """ Get a preview for OSD deployments """ |
974 | raise NotImplementedError() | |
975 | ||
9f95a23c TL |
976 | def remove_osds(self, osd_ids: List[str], |
977 | replace: bool = False, | |
f6b5b4d7 | 978 | force: bool = False) -> Completion[str]: |
9f95a23c TL |
979 | """ |
980 | :param osd_ids: list of OSD IDs | |
981 | :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace` | |
982 | :param force: Forces the OSD removal process without waiting for the data to be drained first. | |
983 | Note that this can only remove OSDs that were successfully | |
984 | created (i.e. got an OSD ID). | |
985 | """ | |
986 | raise NotImplementedError() | |
987 | ||
f6b5b4d7 TL |
988 | def stop_remove_osds(self, osd_ids: List[str]) -> Completion: |
989 | """ | |
990 | TODO | |
991 | """ | |
992 | raise NotImplementedError() | |
993 | ||
994 | def remove_osds_status(self) -> Completion: | |
9f95a23c TL |
995 | """ |
996 | Returns a status of the ongoing OSD removal operations. | |
997 | """ | |
998 | raise NotImplementedError() | |
999 | ||
f6b5b4d7 | 1000 | def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> Completion[List[str]]: |
9f95a23c TL |
1001 | """ |
1002 | Instructs the orchestrator to enable or disable either the ident or the fault LED. | |
1003 | ||
1004 | :param ident_fault: either ``"ident"`` or ``"fault"`` | |
1005 | :param on: ``True`` = on. | |
1006 | :param locations: See :class:`orchestrator.DeviceLightLoc` | |
1007 | """ | |
1008 | raise NotImplementedError() | |
1009 | ||
f6b5b4d7 | 1010 | def zap_device(self, host: str, path: str) -> Completion[str]: |
9f95a23c TL |
1011 | """Zap/Erase a device (DESTROYS DATA)""" |
1012 | raise NotImplementedError() | |
1013 | ||
f6b5b4d7 | 1014 | def add_mon(self, spec: ServiceSpec) -> Completion[List[str]]: |
9f95a23c TL |
1015 | """Create mon daemon(s)""" |
1016 | raise NotImplementedError() | |
1017 | ||
f6b5b4d7 | 1018 | def apply_mon(self, spec: ServiceSpec) -> Completion[str]: |
9f95a23c TL |
1019 | """Update mon cluster""" |
1020 | raise NotImplementedError() | |
1021 | ||
f6b5b4d7 | 1022 | def add_mgr(self, spec: ServiceSpec) -> Completion[List[str]]: |
9f95a23c TL |
1023 | """Create mgr daemon(s)""" |
1024 | raise NotImplementedError() | |
1025 | ||
f6b5b4d7 | 1026 | def apply_mgr(self, spec: ServiceSpec) -> Completion[str]: |
9f95a23c TL |
1027 | """Update mgr cluster""" |
1028 | raise NotImplementedError() | |
1029 | ||
f6b5b4d7 | 1030 | def add_mds(self, spec: ServiceSpec) -> Completion[List[str]]: |
9f95a23c TL |
1031 | """Create MDS daemon(s)""" |
1032 | raise NotImplementedError() | |
1033 | ||
f6b5b4d7 | 1034 | def apply_mds(self, spec: ServiceSpec) -> Completion[str]: |
9f95a23c TL |
1035 | """Update MDS cluster""" |
1036 | raise NotImplementedError() | |
1037 | ||
f6b5b4d7 | 1038 | def add_rgw(self, spec: RGWSpec) -> Completion[List[str]]: |
801d1391 TL |
1039 | """Create RGW daemon(s)""" |
1040 | raise NotImplementedError() | |
1041 | ||
f6b5b4d7 | 1042 | def apply_rgw(self, spec: RGWSpec) -> Completion[str]: |
801d1391 TL |
1043 | """Update RGW cluster""" |
1044 | raise NotImplementedError() | |
1045 | ||
f6b5b4d7 | 1046 | def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]: |
9f95a23c TL |
1047 | """Create rbd-mirror daemon(s)""" |
1048 | raise NotImplementedError() | |
1049 | ||
f6b5b4d7 | 1050 | def apply_rbd_mirror(self, spec: ServiceSpec) -> Completion[str]: |
9f95a23c TL |
1051 | """Update rbd-mirror cluster""" |
1052 | raise NotImplementedError() | |
1053 | ||
f6b5b4d7 | 1054 | def add_nfs(self, spec: NFSServiceSpec) -> Completion[List[str]]: |
9f95a23c TL |
1055 | """Create NFS daemon(s)""" |
1056 | raise NotImplementedError() | |
1057 | ||
f6b5b4d7 | 1058 | def apply_nfs(self, spec: NFSServiceSpec) -> Completion[str]: |
9f95a23c TL |
1059 | """Update NFS cluster""" |
1060 | raise NotImplementedError() | |
1061 | ||
f6b5b4d7 | 1062 | def add_iscsi(self, spec: IscsiServiceSpec) -> Completion[List[str]]: |
1911f103 TL |
1063 | """Create iscsi daemon(s)""" |
1064 | raise NotImplementedError() | |
1065 | ||
f6b5b4d7 | 1066 | def apply_iscsi(self, spec: IscsiServiceSpec) -> Completion[str]: |
1911f103 TL |
1067 | """Update iscsi cluster""" |
1068 | raise NotImplementedError() | |
1069 | ||
f6b5b4d7 | 1070 | def add_prometheus(self, spec: ServiceSpec) -> Completion[List[str]]: |
9f95a23c TL |
1071 | """Create new prometheus daemon""" |
1072 | raise NotImplementedError() | |
1073 | ||
f6b5b4d7 | 1074 | def apply_prometheus(self, spec: ServiceSpec) -> Completion[str]: |
9f95a23c TL |
1075 | """Update prometheus cluster""" |
1076 | raise NotImplementedError() | |
1077 | ||
f6b5b4d7 | 1078 | def add_node_exporter(self, spec: ServiceSpec) -> Completion[List[str]]: |
9f95a23c TL |
1079 | """Create a new Node-Exporter service""" |
1080 | raise NotImplementedError() | |
1081 | ||
f6b5b4d7 | 1082 | def apply_node_exporter(self, spec: ServiceSpec) -> Completion[str]: |
9f95a23c TL |
1083 | """Update existing a Node-Exporter daemon(s)""" |
1084 | raise NotImplementedError() | |
1085 | ||
f6b5b4d7 | 1086 | def add_crash(self, spec: ServiceSpec) -> Completion[List[str]]: |
9f95a23c TL |
1087 | """Create a new crash service""" |
1088 | raise NotImplementedError() | |
1089 | ||
f6b5b4d7 | 1090 | def apply_crash(self, spec: ServiceSpec) -> Completion[str]: |
9f95a23c TL |
1091 | """Update existing a crash daemon(s)""" |
1092 | raise NotImplementedError() | |
1093 | ||
f6b5b4d7 | 1094 | def add_grafana(self, spec: ServiceSpec) -> Completion[List[str]]: |
9f95a23c TL |
1095 | """Create a new Node-Exporter service""" |
1096 | raise NotImplementedError() | |
1097 | ||
f6b5b4d7 | 1098 | def apply_grafana(self, spec: ServiceSpec) -> Completion[str]: |
9f95a23c TL |
1099 | """Update existing a Node-Exporter daemon(s)""" |
1100 | raise NotImplementedError() | |
1101 | ||
f6b5b4d7 | 1102 | def add_alertmanager(self, spec: ServiceSpec) -> Completion[List[str]]: |
9f95a23c TL |
1103 | """Create a new AlertManager service""" |
1104 | raise NotImplementedError() | |
1105 | ||
f6b5b4d7 | 1106 | def apply_alertmanager(self, spec: ServiceSpec) -> Completion[str]: |
9f95a23c TL |
1107 | """Update an existing AlertManager daemon(s)""" |
1108 | raise NotImplementedError() | |
1109 | ||
f6b5b4d7 | 1110 | def upgrade_check(self, image: Optional[str], version: Optional[str]) -> Completion[str]: |
9f95a23c TL |
1111 | raise NotImplementedError() |
1112 | ||
f6b5b4d7 | 1113 | def upgrade_start(self, image: Optional[str], version: Optional[str]) -> Completion[str]: |
9f95a23c TL |
1114 | raise NotImplementedError() |
1115 | ||
f6b5b4d7 | 1116 | def upgrade_pause(self) -> Completion[str]: |
9f95a23c TL |
1117 | raise NotImplementedError() |
1118 | ||
f6b5b4d7 | 1119 | def upgrade_resume(self) -> Completion[str]: |
9f95a23c TL |
1120 | raise NotImplementedError() |
1121 | ||
f6b5b4d7 | 1122 | def upgrade_stop(self) -> Completion[str]: |
9f95a23c TL |
1123 | raise NotImplementedError() |
1124 | ||
f6b5b4d7 | 1125 | def upgrade_status(self) -> Completion['UpgradeStatusSpec']: |
9f95a23c TL |
1126 | """ |
1127 | If an upgrade is currently underway, report on where | |
1128 | we are in the process, or if some error has occurred. | |
1129 | ||
1130 | :return: UpgradeStatusSpec instance | |
1131 | """ | |
1132 | raise NotImplementedError() | |
1133 | ||
1134 | @_hide_in_features | |
f6b5b4d7 | 1135 | def upgrade_available(self) -> Completion: |
9f95a23c TL |
1136 | """ |
1137 | Report on what versions are available to upgrade to | |
1138 | ||
1139 | :return: List of strings | |
1140 | """ | |
1141 | raise NotImplementedError() | |
1142 | ||
1143 | ||
e306af50 TL |
1144 | GenericSpec = Union[ServiceSpec, HostSpec] |
1145 | ||
f6b5b4d7 TL |
1146 | |
1147 | def json_to_generic_spec(spec: dict) -> GenericSpec: | |
e306af50 TL |
1148 | if 'service_type' in spec and spec['service_type'] == 'host': |
1149 | return HostSpec.from_json(spec) | |
1150 | else: | |
1151 | return ServiceSpec.from_json(spec) | |
9f95a23c | 1152 | |
f6b5b4d7 | 1153 | |
9f95a23c TL |
1154 | class UpgradeStatusSpec(object): |
1155 | # Orchestrator's report on what's going on with any ongoing upgrade | |
1156 | def __init__(self): | |
1157 | self.in_progress = False # Is an upgrade underway? | |
1158 | self.target_image = None | |
1159 | self.services_complete = [] # Which daemon types are fully updated? | |
1160 | self.message = "" # Freeform description | |
1161 | ||
1162 | ||
1163 | def handle_type_error(method): | |
1164 | @wraps(method) | |
1165 | def inner(cls, *args, **kwargs): | |
1166 | try: | |
1167 | return method(cls, *args, **kwargs) | |
1168 | except TypeError as e: | |
1169 | error_msg = '{}: {}'.format(cls.__name__, e) | |
1170 | raise OrchestratorValidationError(error_msg) | |
1171 | return inner | |
1172 | ||
1173 | ||
1174 | class DaemonDescription(object): | |
1175 | """ | |
1176 | For responding to queries about the status of a particular daemon, | |
1177 | stateful or stateless. | |
1178 | ||
1179 | This is not about health or performance monitoring of daemons: it's | |
1180 | about letting the orchestrator tell Ceph whether and where a | |
1181 | daemon is scheduled in the cluster. When an orchestrator tells | |
1182 | Ceph "it's running on host123", that's not a promise that the process | |
1183 | is literally up this second, it's a description of where the orchestrator | |
1184 | has decided the daemon should run. | |
1185 | """ | |
1186 | ||
1187 | def __init__(self, | |
1188 | daemon_type=None, | |
1189 | daemon_id=None, | |
1190 | hostname=None, | |
1191 | container_id=None, | |
1192 | container_image_id=None, | |
1193 | container_image_name=None, | |
1194 | version=None, | |
1195 | status=None, | |
1196 | status_desc=None, | |
1197 | last_refresh=None, | |
1198 | created=None, | |
1199 | started=None, | |
1200 | last_configured=None, | |
e306af50 | 1201 | osdspec_affinity=None, |
f6b5b4d7 TL |
1202 | last_deployed=None, |
1203 | events: Optional[List['OrchestratorEvent']] = None, | |
f91f0fd5 | 1204 | is_active: bool = False): |
f6b5b4d7 | 1205 | |
9f95a23c | 1206 | # Host is at the same granularity as InventoryHost |
f6b5b4d7 | 1207 | self.hostname: str = hostname |
9f95a23c TL |
1208 | |
1209 | # Not everyone runs in containers, but enough people do to | |
1210 | # justify having the container_id (runtime id) and container_image | |
1211 | # (image name) | |
1212 | self.container_id = container_id # runtime id | |
1213 | self.container_image_id = container_image_id # image hash | |
1214 | self.container_image_name = container_image_name # image friendly name | |
1215 | ||
1216 | # The type of service (osd, mon, mgr, etc.) | |
1217 | self.daemon_type = daemon_type | |
1218 | ||
1219 | # The orchestrator will have picked some names for daemons, | |
1220 | # typically either based on hostnames or on pod names. | |
1221 | # This is the <foo> in mds.<foo>, the ID that will appear | |
1222 | # in the FSMap/ServiceMap. | |
f6b5b4d7 | 1223 | self.daemon_id: str = daemon_id |
9f95a23c TL |
1224 | |
1225 | # Service version that was deployed | |
1226 | self.version = version | |
1227 | ||
1228 | # Service status: -1 error, 0 stopped, 1 running | |
1229 | self.status = status | |
1230 | ||
1231 | # Service status description when status == -1. | |
1232 | self.status_desc = status_desc | |
1233 | ||
1234 | # datetime when this info was last refreshed | |
f6b5b4d7 | 1235 | self.last_refresh: Optional[datetime.datetime] = last_refresh |
9f95a23c | 1236 | |
f6b5b4d7 TL |
1237 | self.created: Optional[datetime.datetime] = created |
1238 | self.started: Optional[datetime.datetime] = started | |
1239 | self.last_configured: Optional[datetime.datetime] = last_configured | |
1240 | self.last_deployed: Optional[datetime.datetime] = last_deployed | |
9f95a23c | 1241 | |
e306af50 | 1242 | # Affinity to a certain OSDSpec |
f6b5b4d7 TL |
1243 | self.osdspec_affinity: Optional[str] = osdspec_affinity |
1244 | ||
1245 | self.events: List[OrchestratorEvent] = events or [] | |
f91f0fd5 | 1246 | |
f6b5b4d7 | 1247 | self.is_active = is_active |
e306af50 | 1248 | |
9f95a23c TL |
1249 | def name(self): |
1250 | return '%s.%s' % (self.daemon_type, self.daemon_id) | |
1251 | ||
f6b5b4d7 | 1252 | def matches_service(self, service_name: Optional[str]) -> bool: |
9f95a23c TL |
1253 | if service_name: |
1254 | return self.name().startswith(service_name + '.') | |
1255 | return False | |
1256 | ||
1911f103 | 1257 | def service_id(self): |
f6b5b4d7 TL |
1258 | if self.daemon_type == 'osd' and self.osdspec_affinity: |
1259 | return self.osdspec_affinity | |
1260 | ||
e306af50 | 1261 | def _match(): |
f6b5b4d7 TL |
1262 | err = OrchestratorError("DaemonDescription: Cannot calculate service_id: " |
1263 | f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'") | |
e306af50 TL |
1264 | |
1265 | if not self.hostname: | |
1266 | # TODO: can a DaemonDescription exist without a hostname? | |
1267 | raise err | |
1268 | ||
1269 | # use the bare hostname, not the FQDN. | |
1270 | host = self.hostname.split('.')[0] | |
1271 | ||
1272 | if host == self.daemon_id: | |
1273 | # daemon_id == "host" | |
1274 | return self.daemon_id | |
1275 | ||
1276 | elif host in self.daemon_id: | |
1277 | # daemon_id == "service_id.host" | |
1278 | # daemon_id == "service_id.host.random" | |
1279 | pre, post = self.daemon_id.rsplit(host, 1) | |
1280 | if not pre.endswith('.'): | |
1281 | # '.' sep missing at front of host | |
1282 | raise err | |
1283 | elif post and not post.startswith('.'): | |
1284 | # '.' sep missing at end of host | |
1285 | raise err | |
1911f103 | 1286 | return pre[:-1] |
e306af50 TL |
1287 | |
1288 | # daemon_id == "service_id.random" | |
1289 | if self.daemon_type == 'rgw': | |
1911f103 | 1290 | v = self.daemon_id.split('.') |
e306af50 | 1291 | if len(v) in [3, 4]: |
1911f103 | 1292 | return '.'.join(v[0:2]) |
e306af50 | 1293 | |
f91f0fd5 TL |
1294 | if self.daemon_type == 'iscsi': |
1295 | v = self.daemon_id.split('.') | |
1296 | return '.'.join(v[0:-1]) | |
1297 | ||
e306af50 TL |
1298 | # daemon_id == "service_id" |
1299 | return self.daemon_id | |
1300 | ||
f6b5b4d7 | 1301 | if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID: |
e306af50 TL |
1302 | return _match() |
1303 | ||
1304 | return self.daemon_id | |
1911f103 TL |
1305 | |
1306 | def service_name(self): | |
f6b5b4d7 | 1307 | if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID: |
1911f103 | 1308 | return f'{self.daemon_type}.{self.service_id()}' |
9f95a23c TL |
1309 | return self.daemon_type |
1310 | ||
1311 | def __repr__(self): | |
1312 | return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type, | |
1313 | id=self.daemon_id) | |
1314 | ||
1315 | def to_json(self): | |
f6b5b4d7 TL |
1316 | out = OrderedDict() |
1317 | out['daemon_type'] = self.daemon_type | |
1318 | out['daemon_id'] = self.daemon_id | |
1319 | out['hostname'] = self.hostname | |
1320 | out['container_id'] = self.container_id | |
1321 | out['container_image_id'] = self.container_image_id | |
1322 | out['container_image_name'] = self.container_image_name | |
1323 | out['version'] = self.version | |
1324 | out['status'] = self.status | |
1325 | out['status_desc'] = self.status_desc | |
1326 | if self.daemon_type == 'osd': | |
1327 | out['osdspec_affinity'] = self.osdspec_affinity | |
1328 | out['is_active'] = self.is_active | |
1329 | ||
9f95a23c TL |
1330 | for k in ['last_refresh', 'created', 'started', 'last_deployed', |
1331 | 'last_configured']: | |
1332 | if getattr(self, k): | |
1333 | out[k] = getattr(self, k).strftime(DATEFMT) | |
f6b5b4d7 TL |
1334 | |
1335 | if self.events: | |
1336 | out['events'] = [e.to_json() for e in self.events] | |
1337 | ||
1338 | empty = [k for k, v in out.items() if v is None] | |
1339 | for e in empty: | |
1340 | del out[e] | |
1341 | return out | |
9f95a23c TL |
1342 | |
1343 | @classmethod | |
1344 | @handle_type_error | |
1345 | def from_json(cls, data): | |
1346 | c = data.copy() | |
f6b5b4d7 | 1347 | event_strs = c.pop('events', []) |
9f95a23c TL |
1348 | for k in ['last_refresh', 'created', 'started', 'last_deployed', |
1349 | 'last_configured']: | |
1350 | if k in c: | |
1351 | c[k] = datetime.datetime.strptime(c[k], DATEFMT) | |
f6b5b4d7 TL |
1352 | events = [OrchestratorEvent.from_json(e) for e in event_strs] |
1353 | return cls(events=events, **c) | |
9f95a23c | 1354 | |
1911f103 TL |
1355 | def __copy__(self): |
1356 | # feel free to change this: | |
1357 | return DaemonDescription.from_json(self.to_json()) | |
1358 | ||
f6b5b4d7 TL |
1359 | @staticmethod |
1360 | def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'): | |
1361 | return dumper.represent_dict(data.to_json().items()) | |
1362 | ||
1363 | ||
1364 | yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer) | |
1365 | ||
1366 | ||
9f95a23c TL |
1367 | class ServiceDescription(object): |
1368 | """ | |
1369 | For responding to queries about the status of a particular service, | |
1370 | stateful or stateless. | |
1371 | ||
1372 | This is not about health or performance monitoring of services: it's | |
1373 | about letting the orchestrator tell Ceph whether and where a | |
1374 | service is scheduled in the cluster. When an orchestrator tells | |
1375 | Ceph "it's running on host123", that's not a promise that the process | |
1376 | is literally up this second, it's a description of where the orchestrator | |
1377 | has decided the service should run. | |
1378 | """ | |
1379 | ||
1380 | def __init__(self, | |
1911f103 | 1381 | spec: ServiceSpec, |
9f95a23c TL |
1382 | container_image_id=None, |
1383 | container_image_name=None, | |
9f95a23c TL |
1384 | rados_config_location=None, |
1385 | service_url=None, | |
1386 | last_refresh=None, | |
1387 | created=None, | |
1388 | size=0, | |
f6b5b4d7 TL |
1389 | running=0, |
1390 | events: Optional[List['OrchestratorEvent']] = None): | |
9f95a23c TL |
1391 | # Not everyone runs in containers, but enough people do to |
1392 | # justify having the container_image_id (image hash) and container_image | |
1393 | # (image name) | |
1394 | self.container_image_id = container_image_id # image hash | |
1395 | self.container_image_name = container_image_name # image friendly name | |
1396 | ||
9f95a23c TL |
1397 | # Location of the service configuration when stored in rados |
1398 | # object. Format: "rados://<pool>/[<namespace/>]<object>" | |
1399 | self.rados_config_location = rados_config_location | |
1400 | ||
1401 | # If the service exposes REST-like API, this attribute should hold | |
1402 | # the URL. | |
1403 | self.service_url = service_url | |
1404 | ||
1405 | # Number of daemons | |
1406 | self.size = size | |
1407 | ||
1408 | # Number of daemons up | |
1409 | self.running = running | |
1410 | ||
1411 | # datetime when this info was last refreshed | |
f6b5b4d7 TL |
1412 | self.last_refresh: Optional[datetime.datetime] = last_refresh |
1413 | self.created: Optional[datetime.datetime] = created | |
9f95a23c | 1414 | |
1911f103 | 1415 | self.spec: ServiceSpec = spec |
9f95a23c | 1416 | |
f6b5b4d7 TL |
1417 | self.events: List[OrchestratorEvent] = events or [] |
1418 | ||
9f95a23c | 1419 | def service_type(self): |
1911f103 | 1420 | return self.spec.service_type |
9f95a23c TL |
1421 | |
1422 | def __repr__(self): | |
1911f103 | 1423 | return f"<ServiceDescription of {self.spec.one_line_str()}>" |
9f95a23c | 1424 | |
f6b5b4d7 | 1425 | def to_json(self) -> OrderedDict: |
1911f103 TL |
1426 | out = self.spec.to_json() |
1427 | status = { | |
9f95a23c TL |
1428 | 'container_image_id': self.container_image_id, |
1429 | 'container_image_name': self.container_image_name, | |
9f95a23c TL |
1430 | 'rados_config_location': self.rados_config_location, |
1431 | 'service_url': self.service_url, | |
1432 | 'size': self.size, | |
1433 | 'running': self.running, | |
1911f103 | 1434 | 'last_refresh': self.last_refresh, |
f6b5b4d7 | 1435 | 'created': self.created, |
9f95a23c TL |
1436 | } |
1437 | for k in ['last_refresh', 'created']: | |
1438 | if getattr(self, k): | |
1911f103 TL |
1439 | status[k] = getattr(self, k).strftime(DATEFMT) |
1440 | status = {k: v for (k, v) in status.items() if v is not None} | |
1441 | out['status'] = status | |
f6b5b4d7 TL |
1442 | if self.events: |
1443 | out['events'] = [e.to_json() for e in self.events] | |
1911f103 | 1444 | return out |
9f95a23c TL |
1445 | |
1446 | @classmethod | |
1447 | @handle_type_error | |
1911f103 | 1448 | def from_json(cls, data: dict): |
9f95a23c | 1449 | c = data.copy() |
1911f103 | 1450 | status = c.pop('status', {}) |
f6b5b4d7 | 1451 | event_strs = c.pop('events', []) |
1911f103 TL |
1452 | spec = ServiceSpec.from_json(c) |
1453 | ||
1454 | c_status = status.copy() | |
9f95a23c | 1455 | for k in ['last_refresh', 'created']: |
1911f103 TL |
1456 | if k in c_status: |
1457 | c_status[k] = datetime.datetime.strptime(c_status[k], DATEFMT) | |
f6b5b4d7 TL |
1458 | events = [OrchestratorEvent.from_json(e) for e in event_strs] |
1459 | return cls(spec=spec, events=events, **c_status) | |
1460 | ||
1461 | @staticmethod | |
1462 | def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'): | |
1463 | return dumper.represent_dict(data.to_json().items()) | |
1464 | ||
1465 | ||
1466 | yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer) | |
9f95a23c TL |
1467 | |
1468 | ||
1469 | class InventoryFilter(object): | |
1470 | """ | |
1471 | When fetching inventory, use this filter to avoid unnecessarily | |
1472 | scanning the whole estate. | |
1473 | ||
1474 | Typical use: filter by host when presenting UI workflow for configuring | |
1475 | a particular server. | |
1476 | filter by label when not all of estate is Ceph servers, | |
1477 | and we want to only learn about the Ceph servers. | |
1478 | filter by label when we are interested particularly | |
1479 | in e.g. OSD servers. | |
1480 | ||
1481 | """ | |
f6b5b4d7 TL |
1482 | |
1483 | def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None: | |
9f95a23c TL |
1484 | |
1485 | #: Optional: get info about hosts matching labels | |
1486 | self.labels = labels | |
1487 | ||
1488 | #: Optional: get info about certain named hosts only | |
1489 | self.hosts = hosts | |
1490 | ||
1491 | ||
1492 | class InventoryHost(object): | |
1493 | """ | |
1494 | When fetching inventory, all Devices are groups inside of an | |
1495 | InventoryHost. | |
1496 | """ | |
f6b5b4d7 TL |
1497 | |
1498 | def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None: | |
9f95a23c TL |
1499 | if devices is None: |
1500 | devices = inventory.Devices([]) | |
1501 | if labels is None: | |
1502 | labels = [] | |
1503 | assert isinstance(devices, inventory.Devices) | |
1504 | ||
1505 | self.name = name # unique within cluster. For example a hostname. | |
1506 | self.addr = addr or name | |
1507 | self.devices = devices | |
1508 | self.labels = labels | |
1509 | ||
1510 | def to_json(self): | |
1511 | return { | |
1512 | 'name': self.name, | |
1513 | 'addr': self.addr, | |
1514 | 'devices': self.devices.to_json(), | |
1515 | 'labels': self.labels, | |
1516 | } | |
1517 | ||
1518 | @classmethod | |
1519 | def from_json(cls, data): | |
1520 | try: | |
1521 | _data = copy.deepcopy(data) | |
1522 | name = _data.pop('name') | |
1523 | addr = _data.pop('addr', None) or name | |
1524 | devices = inventory.Devices.from_json(_data.pop('devices')) | |
1911f103 | 1525 | labels = _data.pop('labels', list()) |
9f95a23c TL |
1526 | if _data: |
1527 | error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys())) | |
1528 | raise OrchestratorValidationError(error_msg) | |
9f95a23c TL |
1529 | return cls(name, devices, labels, addr) |
1530 | except KeyError as e: | |
1531 | error_msg = '{} is required for {}'.format(e, cls.__name__) | |
1532 | raise OrchestratorValidationError(error_msg) | |
1533 | except TypeError as e: | |
1534 | raise OrchestratorValidationError('Failed to read inventory: {}'.format(e)) | |
1535 | ||
9f95a23c TL |
1536 | @classmethod |
1537 | def from_nested_items(cls, hosts): | |
1538 | devs = inventory.Devices.from_json | |
1539 | return [cls(item[0], devs(item[1].data)) for item in hosts] | |
1540 | ||
1541 | def __repr__(self): | |
1542 | return "<InventoryHost>({name})".format(name=self.name) | |
1543 | ||
1544 | @staticmethod | |
f6b5b4d7 | 1545 | def get_host_names(hosts: List['InventoryHost']) -> List[str]: |
9f95a23c TL |
1546 | return [host.name for host in hosts] |
1547 | ||
1548 | def __eq__(self, other): | |
1549 | return self.name == other.name and self.devices == other.devices | |
1550 | ||
1551 | ||
1552 | class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])): | |
1553 | """ | |
1554 | Describes a specific device on a specific host. Used for enabling or disabling LEDs | |
1555 | on devices. | |
1556 | ||
1557 | hostname as in :func:`orchestrator.Orchestrator.get_hosts` | |
1558 | ||
1559 | device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``. | |
1560 | See ``ceph osd metadata | jq '.[].device_ids'`` | |
1561 | """ | |
1562 | __slots__ = () | |
1563 | ||
1564 | ||
f6b5b4d7 TL |
1565 | class OrchestratorEvent: |
1566 | """ | |
1567 | Similar to K8s Events. | |
1568 | ||
1569 | Some form of "important" log message attached to something. | |
1570 | """ | |
1571 | INFO = 'INFO' | |
1572 | ERROR = 'ERROR' | |
1573 | regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE) | |
1574 | ||
1575 | def __init__(self, created: Union[str, datetime.datetime], kind, subject, level, message): | |
1576 | if isinstance(created, str): | |
1577 | created = datetime.datetime.strptime(created, DATEFMT) | |
1578 | self.created: datetime.datetime = created | |
1579 | ||
1580 | assert kind in "service daemon".split() | |
1581 | self.kind: str = kind | |
1582 | ||
1583 | # service name, or daemon danem or something | |
1584 | self.subject: str = subject | |
1585 | ||
1586 | # Events are not meant for debugging. debugs should end in the log. | |
1587 | assert level in "INFO ERROR".split() | |
1588 | self.level = level | |
1589 | ||
1590 | self.message: str = message | |
1591 | ||
1592 | __slots__ = ('created', 'kind', 'subject', 'level', 'message') | |
1593 | ||
1594 | def kind_subject(self) -> str: | |
1595 | return f'{self.kind}:{self.subject}' | |
1596 | ||
1597 | def to_json(self) -> str: | |
1598 | # Make a long list of events readable. | |
1599 | created = self.created.strftime(DATEFMT) | |
1600 | return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"' | |
1601 | ||
1602 | @classmethod | |
1603 | @handle_type_error | |
1604 | def from_json(cls, data) -> "OrchestratorEvent": | |
1605 | """ | |
1606 | >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json() | |
1607 | '2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"' | |
1608 | ||
1609 | :param data: | |
1610 | :return: | |
1611 | """ | |
1612 | match = cls.regex_v1.match(data) | |
1613 | if match: | |
1614 | return cls(*match.groups()) | |
1615 | raise ValueError(f'Unable to match: "{data}"') | |
1616 | ||
1617 | def __eq__(self, other): | |
1618 | if not isinstance(other, OrchestratorEvent): | |
1619 | return False | |
1620 | ||
1621 | return self.created == other.created and self.kind == other.kind \ | |
1622 | and self.subject == other.subject and self.message == other.message | |
1623 | ||
1624 | ||
9f95a23c TL |
1625 | def _mk_orch_methods(cls): |
1626 | # Needs to be defined outside of for. | |
1627 | # Otherwise meth is always bound to last key | |
1628 | def shim(method_name): | |
1629 | def inner(self, *args, **kwargs): | |
1630 | completion = self._oremote(method_name, args, kwargs) | |
1631 | return completion | |
1632 | return inner | |
1633 | ||
1634 | for meth in Orchestrator.__dict__: | |
1635 | if not meth.startswith('_') and meth not in ['is_orchestrator_module']: | |
1636 | setattr(cls, meth, shim(meth)) | |
1637 | return cls | |
1638 | ||
1639 | ||
1640 | @_mk_orch_methods | |
1641 | class OrchestratorClientMixin(Orchestrator): | |
1642 | """ | |
1643 | A module that inherents from `OrchestratorClientMixin` can directly call | |
1644 | all :class:`Orchestrator` methods without manually calling remote. | |
1645 | ||
1646 | Every interface method from ``Orchestrator`` is converted into a stub method that internally | |
1647 | calls :func:`OrchestratorClientMixin._oremote` | |
1648 | ||
1649 | >>> class MyModule(OrchestratorClientMixin): | |
1650 | ... def func(self): | |
1651 | ... completion = self.add_host('somehost') # calls `_oremote()` | |
1652 | ... self._orchestrator_wait([completion]) | |
1653 | ... self.log.debug(completion.result) | |
1654 | ||
1655 | .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`. | |
1656 | Reason is, that OrchestratorClientMixin magically redirects all methods to the | |
1657 | "real" implementation of the orchestrator. | |
1658 | ||
1659 | ||
1660 | >>> import mgr_module | |
1911f103 TL |
1661 | >>> #doctest: +SKIP |
1662 | ... class MyImplentation(mgr_module.MgrModule, Orchestrator): | |
9f95a23c TL |
1663 | ... def __init__(self, ...): |
1664 | ... self.orch_client = OrchestratorClientMixin() | |
1665 | ... self.orch_client.set_mgr(self.mgr)) | |
1666 | """ | |
1667 | ||
f6b5b4d7 | 1668 | def set_mgr(self, mgr: MgrModule) -> None: |
9f95a23c TL |
1669 | """ |
1670 | Useable in the Dashbord that uses a global ``mgr`` | |
1671 | """ | |
1672 | ||
1673 | self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties | |
1674 | ||
1675 | def __get_mgr(self): | |
1676 | try: | |
1677 | return self.__mgr | |
1678 | except AttributeError: | |
1679 | return self | |
1680 | ||
1681 | def _oremote(self, meth, args, kwargs): | |
1682 | """ | |
1683 | Helper for invoking `remote` on whichever orchestrator is enabled | |
1684 | ||
1685 | :raises RuntimeError: If the remote method failed. | |
1686 | :raises OrchestratorError: orchestrator failed to perform | |
1687 | :raises ImportError: no `orchestrator` module or backend not found. | |
1688 | """ | |
1689 | mgr = self.__get_mgr() | |
1690 | ||
1691 | try: | |
1692 | o = mgr._select_orchestrator() | |
1693 | except AttributeError: | |
1694 | o = mgr.remote('orchestrator', '_select_orchestrator') | |
1695 | ||
1696 | if o is None: | |
1697 | raise NoOrchestrator() | |
1698 | ||
1699 | mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs)) | |
1700 | try: | |
1701 | return mgr.remote(o, meth, *args, **kwargs) | |
1702 | except Exception as e: | |
1703 | if meth == 'get_feature_set': | |
1704 | raise # self.get_feature_set() calls self._oremote() | |
1705 | f_set = self.get_feature_set() | |
1706 | if meth not in f_set or not f_set[meth]['available']: | |
1707 | raise NotImplementedError(f'{o} does not implement {meth}') from e | |
1708 | raise | |
1709 | ||
f6b5b4d7 | 1710 | def _orchestrator_wait(self, completions: List[Completion]) -> None: |
9f95a23c TL |
1711 | """ |
1712 | Wait for completions to complete (reads) or | |
1713 | become persistent (writes). | |
1714 | ||
1715 | Waits for writes to be *persistent* but not *effective*. | |
1716 | ||
1717 | :param completions: List of Completions | |
1718 | :raises NoOrchestrator: | |
1719 | :raises RuntimeError: something went wrong while calling the process method. | |
1720 | :raises ImportError: no `orchestrator` module or backend not found. | |
1721 | """ | |
1722 | while any(not c.has_result for c in completions): | |
1723 | self.process(completions) | |
1724 | self.__get_mgr().log.info("Operations pending: %s", | |
1725 | sum(1 for c in completions if not c.has_result)) | |
1726 | if any(c.needs_result for c in completions): | |
1727 | time.sleep(1) | |
1728 | else: | |
1729 | break |