]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/orchestrator/_interface.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
CommitLineData
9f95a23c
TL
1
2"""
3ceph-mgr orchestrator interface
4
5Please see the ceph-mgr module developer's guide for more information.
6"""
e306af50
TL
7
8import copy
9import datetime
f67539c2 10import enum
e306af50 11import errno
9f95a23c
TL
12import logging
13import pickle
e306af50 14import re
e306af50 15
f6b5b4d7
TL
16from collections import namedtuple, OrderedDict
17from contextlib import contextmanager
f67539c2
TL
18from functools import wraps, reduce
19
20from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
21 Sequence, Dict, cast
22
23try:
24 from typing import Protocol # Protocol was added in Python 3.8
25except ImportError:
26 class Protocol: # type: ignore
27 pass
28
9f95a23c 29
f6b5b4d7
TL
30import yaml
31
9f95a23c
TL
32from ceph.deployment import inventory
33from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
f67539c2 34 IscsiServiceSpec, IngressSpec
9f95a23c 35from ceph.deployment.drive_group import DriveGroupSpec
f67539c2 36from ceph.deployment.hostspec import HostSpec, SpecValidationError
adb31ebb 37from ceph.utils import datetime_to_str, str_to_datetime
9f95a23c
TL
38
39from mgr_module import MgrModule, CLICommand, HandleCommandResult
40
9f95a23c
TL
41
42logger = logging.getLogger(__name__)
43
f6b5b4d7 44T = TypeVar('T')
f67539c2 45FuncT = TypeVar('FuncT', bound=Callable[..., Any])
f6b5b4d7 46
9f95a23c
TL
47
48class OrchestratorError(Exception):
49 """
50 General orchestrator specific error.
51
52 Used for deployment, configuration or user errors.
53
54 It's not intended for programming errors or orchestrator internal errors.
55 """
56
f6b5b4d7
TL
57 def __init__(self,
58 msg: str,
59 errno: int = -errno.EINVAL,
f67539c2 60 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
f6b5b4d7
TL
61 super(Exception, self).__init__(msg)
62 self.errno = errno
63 # See OrchestratorEvent.subject
64 self.event_subject = event_kind_subject
65
9f95a23c
TL
66
67class NoOrchestrator(OrchestratorError):
68 """
69 No orchestrator in configured.
70 """
f6b5b4d7 71
f67539c2 72 def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
f6b5b4d7 73 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
9f95a23c
TL
74
75
76class OrchestratorValidationError(OrchestratorError):
77 """
78 Raised when an orchestrator doesn't support a specific feature.
79 """
80
81
f6b5b4d7 82@contextmanager
f67539c2 83def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
f6b5b4d7
TL
84 try:
85 yield
86 except OrchestratorError as e:
87 if overwrite or hasattr(e, 'event_subject'):
88 e.event_subject = (kind, subject)
89 raise
90
91
f67539c2 92def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
9f95a23c 93 @wraps(func)
f67539c2 94 def wrapper(*args: Any, **kwargs: Any) -> Any:
9f95a23c
TL
95 try:
96 return func(*args, **kwargs)
f67539c2 97 except (OrchestratorError, SpecValidationError) as e:
9f95a23c 98 # Do not print Traceback for expected errors.
f6b5b4d7
TL
99 return HandleCommandResult(e.errno, stderr=str(e))
100 except ImportError as e:
9f95a23c
TL
101 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
102 except NotImplementedError:
103 msg = 'This Orchestrator does not support `{}`'.format(prefix)
104 return HandleCommandResult(-errno.ENOENT, stderr=msg)
105
f67539c2
TL
106 # misuse lambda to copy `wrapper`
107 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731
9f95a23c 108 wrapper_copy._prefix = prefix # type: ignore
f67539c2
TL
109 wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore
110 wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
9f95a23c
TL
111 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
112
f67539c2 113 return cast(FuncT, wrapper_copy)
9f95a23c
TL
114
115
f67539c2
TL
116def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
117 """
118 Decorator to make Orchestrator methods return
119 an OrchResult.
120 """
121
122 @wraps(f)
123 def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
124 try:
125 return OrchResult(f(*args, **kwargs))
126 except Exception as e:
127 return OrchResult(None, exception=e)
128
129 return cast(Callable[..., OrchResult[T]], wrapper)
130
131
132class InnerCliCommandCallable(Protocol):
133 def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
134 ...
135
136
137def _cli_command(perm: str) -> InnerCliCommandCallable:
138 def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
139 return lambda func: handle_exception(prefix, perm, func)
9f95a23c
TL
140 return inner_cli_command
141
142
143_cli_read_command = _cli_command('r')
144_cli_write_command = _cli_command('rw')
145
146
147class CLICommandMeta(type):
148 """
149 This is a workaround for the use of a global variable CLICommand.COMMANDS which
150 prevents modules from importing any other module.
151
152 We make use of CLICommand, except for the use of the global variable.
153 """
f67539c2 154 def __init__(cls, name: str, bases: Any, dct: Any) -> None:
9f95a23c 155 super(CLICommandMeta, cls).__init__(name, bases, dct)
f6b5b4d7 156 dispatch: Dict[str, CLICommand] = {}
9f95a23c
TL
157 for v in dct.values():
158 try:
159 dispatch[v._prefix] = v._cli_command
160 except AttributeError:
161 pass
162
f67539c2 163 def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
9f95a23c
TL
164 if cmd['prefix'] not in dispatch:
165 return self.handle_command(inbuf, cmd)
166
167 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
168
169 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
170 cls.handle_command = handle_command
171
172
f67539c2
TL
173class OrchResult(Generic[T]):
174 """
175 Stores a result and an exception. Mainly to circumvent the
176 MgrModule.remote() method that hides all exceptions and for
177 handling different sub-interpreters.
9f95a23c 178 """
9f95a23c 179
f67539c2
TL
180 def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
181 self.result = result
182 self.serialized_exception: Optional[bytes] = None
183 self.exception_str: str = ''
184 self.set_exception(exception)
9f95a23c 185
f67539c2 186 __slots__ = 'result', 'serialized_exception', 'exception_str'
9f95a23c 187
f67539c2
TL
188 def set_exception(self, e: Optional[Exception]) -> None:
189 if e is None:
190 self.serialized_exception = None
191 self.exception_str = ''
192 return
9f95a23c 193
f67539c2 194 self.exception_str = f'{type(e)}: {str(e)}'
9f95a23c 195 try:
f67539c2 196 self.serialized_exception = pickle.dumps(e)
9f95a23c
TL
197 except pickle.PicklingError:
198 logger.error(f"failed to pickle {e}")
199 if isinstance(e, Exception):
200 e = Exception(*e.args)
201 else:
202 e = Exception(str(e))
203 # degenerate to a plain Exception
f67539c2 204 self.serialized_exception = pickle.dumps(e)
9f95a23c 205
f6b5b4d7 206 def result_str(self) -> str:
9f95a23c
TL
207 """Force a string."""
208 if self.result is None:
209 return ''
210 if isinstance(self.result, list):
211 return '\n'.join(str(x) for x in self.result)
212 return str(self.result)
213
9f95a23c 214
f67539c2 215def raise_if_exception(c: OrchResult[T]) -> T:
9f95a23c 216 """
f67539c2 217 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
9f95a23c
TL
218 """
219 if c.serialized_exception is not None:
220 try:
221 e = pickle.loads(c.serialized_exception)
222 except (KeyError, AttributeError):
f67539c2 223 raise Exception(c.exception_str)
9f95a23c 224 raise e
f67539c2
TL
225 assert c.result is not None, 'OrchResult should either have an exception or a result'
226 return c.result
9f95a23c
TL
227
228
f67539c2
TL
229def _hide_in_features(f: FuncT) -> FuncT:
230 f._hide_in_features = True # type: ignore
9f95a23c
TL
231 return f
232
233
234class Orchestrator(object):
235 """
236 Calls in this class may do long running remote operations, with time
237 periods ranging from network latencies to package install latencies and large
238 internet downloads. For that reason, all are asynchronous, and return
239 ``Completion`` objects.
240
241 Methods should only return the completion and not directly execute
242 anything, like network calls. Otherwise the purpose of
243 those completions is defeated.
244
245 Implementations are not required to start work on an operation until
246 the caller waits on the relevant Completion objects. Callers making
247 multiple updates should not wait on Completions until they're done
248 sending operations: this enables implementations to batch up a series
249 of updates when wait() is called on a set of Completion objects.
250
251 Implementations are encouraged to keep reasonably fresh caches of
252 the status of the system: it is better to serve a stale-but-recent
253 result read of e.g. device inventory than it is to keep the caller waiting
254 while you scan hosts every time.
255 """
256
257 @_hide_in_features
f67539c2 258 def is_orchestrator_module(self) -> bool:
9f95a23c
TL
259 """
260 Enable other modules to interrogate this module to discover
261 whether it's usable as an orchestrator module.
262
263 Subclasses do not need to override this.
264 """
265 return True
266
267 @_hide_in_features
f67539c2 268 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
9f95a23c
TL
269 """
270 Report whether we can talk to the orchestrator. This is the
271 place to give the user a meaningful message if the orchestrator
272 isn't running or can't be contacted.
273
274 This method may be called frequently (e.g. every page load
275 to conditionally display a warning banner), so make sure it's
276 not too expensive. It's okay to give a slightly stale status
277 (e.g. based on a periodic background ping of the orchestrator)
278 if that's necessary to make this method fast.
279
280 .. note::
281 `True` doesn't mean that the desired functionality
282 is actually available in the orchestrator. I.e. this
283 won't work as expected::
284
1911f103
TL
285 >>> #doctest: +SKIP
286 ... if OrchestratorClientMixin().available()[0]: # wrong.
9f95a23c
TL
287 ... OrchestratorClientMixin().get_hosts()
288
f67539c2
TL
289 :return: boolean representing whether the module is available/usable
290 :return: string describing any error
291 :return: dict containing any module specific information
9f95a23c
TL
292 """
293 raise NotImplementedError()
294
295 @_hide_in_features
f67539c2 296 def get_feature_set(self) -> Dict[str, dict]:
9f95a23c
TL
297 """Describes which methods this orchestrator implements
298
299 .. note::
300 `True` doesn't mean that the desired functionality
301 is actually possible in the orchestrator. I.e. this
302 won't work as expected::
303
1911f103
TL
304 >>> #doctest: +SKIP
305 ... api = OrchestratorClientMixin()
9f95a23c
TL
306 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
307 ... api.get_hosts()
308
309 It's better to ask for forgiveness instead::
310
1911f103
TL
311 >>> #doctest: +SKIP
312 ... try:
9f95a23c
TL
313 ... OrchestratorClientMixin().get_hosts()
314 ... except (OrchestratorError, NotImplementedError):
315 ... ...
316
317 :returns: Dict of API method names to ``{'available': True or False}``
318 """
319 module = self.__class__
320 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
321 for a in Orchestrator.__dict__
322 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
323 }
324 return features
325
f6b5b4d7 326 def cancel_completions(self) -> None:
9f95a23c
TL
327 """
328 Cancels ongoing completions. Unstuck the mgr.
329 """
330 raise NotImplementedError()
331
f6b5b4d7 332 def pause(self) -> None:
9f95a23c
TL
333 raise NotImplementedError()
334
f6b5b4d7 335 def resume(self) -> None:
9f95a23c
TL
336 raise NotImplementedError()
337
f67539c2 338 def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
9f95a23c
TL
339 """
340 Add a host to the orchestrator inventory.
341
342 :param host: hostname
343 """
344 raise NotImplementedError()
345
f67539c2 346 def remove_host(self, host: str) -> OrchResult[str]:
9f95a23c
TL
347 """
348 Remove a host from the orchestrator inventory.
349
350 :param host: hostname
351 """
352 raise NotImplementedError()
353
f67539c2 354 def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
9f95a23c
TL
355 """
356 Update a host's address
357
358 :param host: hostname
359 :param addr: address (dns name or IP)
360 """
361 raise NotImplementedError()
362
f67539c2 363 def get_hosts(self) -> OrchResult[List[HostSpec]]:
9f95a23c
TL
364 """
365 Report the hosts in the cluster.
366
367 :return: list of HostSpec
368 """
369 raise NotImplementedError()
370
f67539c2 371 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
9f95a23c
TL
372 """
373 Add a host label
374 """
375 raise NotImplementedError()
376
f67539c2 377 def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
9f95a23c
TL
378 """
379 Remove a host label
380 """
381 raise NotImplementedError()
382
f67539c2 383 def host_ok_to_stop(self, hostname: str) -> OrchResult:
f6b5b4d7
TL
384 """
385 Check if the specified host can be safely stopped without reducing availability
386
387 :param host: hostname
388 """
389 raise NotImplementedError()
390
f67539c2
TL
391 def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
392 """
393 Place a host in maintenance, stopping daemons and disabling it's systemd target
394 """
395 raise NotImplementedError()
396
397 def exit_host_maintenance(self, hostname: str) -> OrchResult:
398 """
399 Return a host from maintenance, restarting the clusters systemd target
400 """
401 raise NotImplementedError()
402
403 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
9f95a23c
TL
404 """
405 Returns something that was created by `ceph-volume inventory`.
406
407 :return: list of InventoryHost
408 """
409 raise NotImplementedError()
410
f67539c2 411 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
9f95a23c
TL
412 """
413 Describe a service (of any kind) that is already configured in
414 the orchestrator. For example, when viewing an OSD in the dashboard
415 we might like to also display information about the orchestrator's
416 view of the service (like the kubernetes pod ID).
417
418 When viewing a CephFS filesystem in the dashboard, we would use this
419 to display the pods being currently run for MDS daemons.
420
421 :return: list of ServiceDescription objects.
422 """
423 raise NotImplementedError()
424
f67539c2 425 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
9f95a23c
TL
426 """
427 Describe a daemon (of any kind) that is already configured in
428 the orchestrator.
429
430 :return: list of DaemonDescription objects.
431 """
432 raise NotImplementedError()
433
f67539c2
TL
434 @handle_orch_error
435 def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
9f95a23c
TL
436 """
437 Applies any spec
438 """
f67539c2 439 fns: Dict[str, Callable[..., OrchResult[str]]] = {
9f95a23c
TL
440 'alertmanager': self.apply_alertmanager,
441 'crash': self.apply_crash,
442 'grafana': self.apply_grafana,
e306af50 443 'iscsi': self.apply_iscsi,
9f95a23c
TL
444 'mds': self.apply_mds,
445 'mgr': self.apply_mgr,
446 'mon': self.apply_mon,
e306af50 447 'nfs': self.apply_nfs,
9f95a23c 448 'node-exporter': self.apply_node_exporter,
f67539c2 449 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
9f95a23c
TL
450 'prometheus': self.apply_prometheus,
451 'rbd-mirror': self.apply_rbd_mirror,
e306af50 452 'rgw': self.apply_rgw,
f67539c2 453 'ingress': self.apply_ingress,
e306af50 454 'host': self.add_host,
f67539c2 455 'cephadm-exporter': self.apply_cephadm_exporter,
9f95a23c
TL
456 }
457
f67539c2
TL
458 def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
459 l_res = raise_if_exception(l)
460 r_res = raise_if_exception(r)
461 l_res.append(r_res)
462 return OrchResult(l_res)
463 return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
9f95a23c 464
f67539c2 465 def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
f6b5b4d7
TL
466 """
467 Plan (Dry-run, Preview) a List of Specs.
468 """
469 raise NotImplementedError()
470
f67539c2 471 def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
9f95a23c
TL
472 """
473 Remove specific daemon(s).
474
475 :return: None
476 """
477 raise NotImplementedError()
478
f67539c2 479 def remove_service(self, service_name: str) -> OrchResult[str]:
9f95a23c
TL
480 """
481 Remove a service (a collection of daemons).
482
483 :return: None
484 """
485 raise NotImplementedError()
486
f67539c2 487 def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
9f95a23c
TL
488 """
489 Perform an action (start/stop/reload) on a service (i.e., all daemons
490 providing the logical service).
491
492 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
f6b5b4d7
TL
493 :param service_name: service_type + '.' + service_id
494 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
f67539c2 495 :rtype: OrchResult
9f95a23c 496 """
f6b5b4d7 497 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
9f95a23c
TL
498 raise NotImplementedError()
499
f67539c2 500 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
9f95a23c
TL
501 """
502 Perform an action (start/stop/reload) on a daemon.
503
504 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
f6b5b4d7
TL
505 :param daemon_name: name of daemon
506 :param image: Container image when redeploying that daemon
f67539c2 507 :rtype: OrchResult
9f95a23c 508 """
f6b5b4d7 509 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
9f95a23c
TL
510 raise NotImplementedError()
511
f67539c2 512 def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
9f95a23c
TL
513 """
514 Create one or more OSDs within a single Drive Group.
515
516 The principal argument here is the drive_group member
517 of OsdSpec: other fields are advisory/extensible for any
518 finer-grained OSD feature enablement (choice of backing store,
519 compression/encryption, etc).
520 """
521 raise NotImplementedError()
522
f67539c2 523 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
9f95a23c
TL
524 """ Update OSD cluster """
525 raise NotImplementedError()
526
e306af50
TL
527 def set_unmanaged_flag(self,
528 unmanaged_flag: bool,
529 service_type: str = 'osd',
f67539c2 530 service_name: Optional[str] = None
e306af50 531 ) -> HandleCommandResult:
1911f103
TL
532 raise NotImplementedError()
533
e306af50
TL
534 def preview_osdspecs(self,
535 osdspec_name: Optional[str] = 'osd',
536 osdspecs: Optional[List[DriveGroupSpec]] = None
f67539c2 537 ) -> OrchResult[str]:
1911f103
TL
538 """ Get a preview for OSD deployments """
539 raise NotImplementedError()
540
9f95a23c
TL
541 def remove_osds(self, osd_ids: List[str],
542 replace: bool = False,
f67539c2 543 force: bool = False) -> OrchResult[str]:
9f95a23c
TL
544 """
545 :param osd_ids: list of OSD IDs
546 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
547 :param force: Forces the OSD removal process without waiting for the data to be drained first.
548 Note that this can only remove OSDs that were successfully
549 created (i.e. got an OSD ID).
550 """
551 raise NotImplementedError()
552
f67539c2 553 def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
f6b5b4d7
TL
554 """
555 TODO
556 """
557 raise NotImplementedError()
558
f67539c2 559 def remove_osds_status(self) -> OrchResult:
9f95a23c
TL
560 """
561 Returns a status of the ongoing OSD removal operations.
562 """
563 raise NotImplementedError()
564
f67539c2 565 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
9f95a23c
TL
566 """
567 Instructs the orchestrator to enable or disable either the ident or the fault LED.
568
569 :param ident_fault: either ``"ident"`` or ``"fault"``
570 :param on: ``True`` = on.
571 :param locations: See :class:`orchestrator.DeviceLightLoc`
572 """
573 raise NotImplementedError()
574
f67539c2 575 def zap_device(self, host: str, path: str) -> OrchResult[str]:
9f95a23c
TL
576 """Zap/Erase a device (DESTROYS DATA)"""
577 raise NotImplementedError()
578
f67539c2
TL
579 def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
580 """Create daemons daemon(s) for unmanaged services"""
9f95a23c
TL
581 raise NotImplementedError()
582
f67539c2 583 def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
584 """Update mon cluster"""
585 raise NotImplementedError()
586
f67539c2 587 def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
588 """Update mgr cluster"""
589 raise NotImplementedError()
590
f67539c2 591 def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
592 """Update MDS cluster"""
593 raise NotImplementedError()
594
f67539c2 595 def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
801d1391
TL
596 """Update RGW cluster"""
597 raise NotImplementedError()
598
f67539c2
TL
599 def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
600 """Update ingress daemons"""
9f95a23c
TL
601 raise NotImplementedError()
602
f67539c2 603 def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
604 """Update rbd-mirror cluster"""
605 raise NotImplementedError()
606
f67539c2 607 def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
9f95a23c
TL
608 """Update NFS cluster"""
609 raise NotImplementedError()
610
f67539c2 611 def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
1911f103
TL
612 """Update iscsi cluster"""
613 raise NotImplementedError()
614
f67539c2 615 def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
616 """Update prometheus cluster"""
617 raise NotImplementedError()
618
f67539c2 619 def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
620 """Update existing a Node-Exporter daemon(s)"""
621 raise NotImplementedError()
622
f67539c2 623 def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
624 """Update existing a crash daemon(s)"""
625 raise NotImplementedError()
626
f67539c2
TL
627 def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
628 """Update existing a grafana service"""
9f95a23c
TL
629 raise NotImplementedError()
630
f67539c2
TL
631 def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
632 """Update an existing AlertManager daemon(s)"""
9f95a23c
TL
633 raise NotImplementedError()
634
f67539c2
TL
635 def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
636 """Update an existing cephadm exporter daemon"""
9f95a23c
TL
637 raise NotImplementedError()
638
f67539c2 639 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
9f95a23c
TL
640 raise NotImplementedError()
641
f67539c2 642 def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
9f95a23c
TL
643 raise NotImplementedError()
644
f67539c2 645 def upgrade_pause(self) -> OrchResult[str]:
9f95a23c
TL
646 raise NotImplementedError()
647
f67539c2 648 def upgrade_resume(self) -> OrchResult[str]:
9f95a23c
TL
649 raise NotImplementedError()
650
f67539c2 651 def upgrade_stop(self) -> OrchResult[str]:
9f95a23c
TL
652 raise NotImplementedError()
653
f67539c2 654 def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
9f95a23c
TL
655 """
656 If an upgrade is currently underway, report on where
657 we are in the process, or if some error has occurred.
658
659 :return: UpgradeStatusSpec instance
660 """
661 raise NotImplementedError()
662
663 @_hide_in_features
f67539c2 664 def upgrade_available(self) -> OrchResult:
9f95a23c
TL
665 """
666 Report on what versions are available to upgrade to
667
668 :return: List of strings
669 """
670 raise NotImplementedError()
671
672
e306af50
TL
673GenericSpec = Union[ServiceSpec, HostSpec]
674
f6b5b4d7
TL
675
676def json_to_generic_spec(spec: dict) -> GenericSpec:
e306af50
TL
677 if 'service_type' in spec and spec['service_type'] == 'host':
678 return HostSpec.from_json(spec)
679 else:
680 return ServiceSpec.from_json(spec)
9f95a23c 681
f6b5b4d7 682
f67539c2
TL
683def daemon_type_to_service(dtype: str) -> str:
684 mapping = {
685 'mon': 'mon',
686 'mgr': 'mgr',
687 'mds': 'mds',
688 'rgw': 'rgw',
689 'osd': 'osd',
690 'haproxy': 'ingress',
691 'keepalived': 'ingress',
692 'iscsi': 'iscsi',
693 'rbd-mirror': 'rbd-mirror',
694 'cephfs-mirror': 'cephfs-mirror',
695 'nfs': 'nfs',
696 'grafana': 'grafana',
697 'alertmanager': 'alertmanager',
698 'prometheus': 'prometheus',
699 'node-exporter': 'node-exporter',
700 'crash': 'crash',
701 'crashcollector': 'crash', # Specific Rook Daemon
702 'container': 'container',
703 'cephadm-exporter': 'cephadm-exporter',
704 }
705 return mapping[dtype]
706
707
708def service_to_daemon_types(stype: str) -> List[str]:
709 mapping = {
710 'mon': ['mon'],
711 'mgr': ['mgr'],
712 'mds': ['mds'],
713 'rgw': ['rgw'],
714 'osd': ['osd'],
715 'ingress': ['haproxy', 'keepalived'],
716 'iscsi': ['iscsi'],
717 'rbd-mirror': ['rbd-mirror'],
718 'cephfs-mirror': ['cephfs-mirror'],
719 'nfs': ['nfs'],
720 'grafana': ['grafana'],
721 'alertmanager': ['alertmanager'],
722 'prometheus': ['prometheus'],
723 'node-exporter': ['node-exporter'],
724 'crash': ['crash'],
725 'container': ['container'],
726 'cephadm-exporter': ['cephadm-exporter'],
727 }
728 return mapping[stype]
729
730
9f95a23c
TL
731class UpgradeStatusSpec(object):
732 # Orchestrator's report on what's going on with any ongoing upgrade
f67539c2 733 def __init__(self) -> None:
9f95a23c 734 self.in_progress = False # Is an upgrade underway?
f67539c2
TL
735 self.target_image: Optional[str] = None
736 self.services_complete: List[str] = [] # Which daemon types are fully updated?
737 self.progress: Optional[str] = None # How many of the daemons have we upgraded
9f95a23c
TL
738 self.message = "" # Freeform description
739
740
f67539c2 741def handle_type_error(method: FuncT) -> FuncT:
9f95a23c 742 @wraps(method)
f67539c2 743 def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
9f95a23c
TL
744 try:
745 return method(cls, *args, **kwargs)
746 except TypeError as e:
747 error_msg = '{}: {}'.format(cls.__name__, e)
748 raise OrchestratorValidationError(error_msg)
f67539c2
TL
749 return cast(FuncT, inner)
750
751
752class DaemonDescriptionStatus(enum.IntEnum):
753 error = -1
754 stopped = 0
755 running = 1
9f95a23c
TL
756
757
758class DaemonDescription(object):
759 """
760 For responding to queries about the status of a particular daemon,
761 stateful or stateless.
762
763 This is not about health or performance monitoring of daemons: it's
764 about letting the orchestrator tell Ceph whether and where a
765 daemon is scheduled in the cluster. When an orchestrator tells
766 Ceph "it's running on host123", that's not a promise that the process
767 is literally up this second, it's a description of where the orchestrator
768 has decided the daemon should run.
769 """
770
771 def __init__(self,
f67539c2
TL
772 daemon_type: Optional[str] = None,
773 daemon_id: Optional[str] = None,
774 hostname: Optional[str] = None,
775 container_id: Optional[str] = None,
776 container_image_id: Optional[str] = None,
777 container_image_name: Optional[str] = None,
778 container_image_digests: Optional[List[str]] = None,
779 version: Optional[str] = None,
780 status: Optional[DaemonDescriptionStatus] = None,
781 status_desc: Optional[str] = None,
782 last_refresh: Optional[datetime.datetime] = None,
783 created: Optional[datetime.datetime] = None,
784 started: Optional[datetime.datetime] = None,
785 last_configured: Optional[datetime.datetime] = None,
786 osdspec_affinity: Optional[str] = None,
787 last_deployed: Optional[datetime.datetime] = None,
f6b5b4d7 788 events: Optional[List['OrchestratorEvent']] = None,
f67539c2
TL
789 is_active: bool = False,
790 memory_usage: Optional[int] = None,
791 memory_request: Optional[int] = None,
792 memory_limit: Optional[int] = None,
793 service_name: Optional[str] = None,
794 ports: Optional[List[int]] = None,
795 ip: Optional[str] = None,
796 deployed_by: Optional[List[str]] = None,
797 ) -> None:
f6b5b4d7 798
9f95a23c 799 # Host is at the same granularity as InventoryHost
f67539c2 800 self.hostname: Optional[str] = hostname
9f95a23c
TL
801
802 # Not everyone runs in containers, but enough people do to
803 # justify having the container_id (runtime id) and container_image
804 # (image name)
805 self.container_id = container_id # runtime id
f67539c2 806 self.container_image_id = container_image_id # image id locally
9f95a23c 807 self.container_image_name = container_image_name # image friendly name
f67539c2 808 self.container_image_digests = container_image_digests # reg hashes
9f95a23c
TL
809
810 # The type of service (osd, mon, mgr, etc.)
811 self.daemon_type = daemon_type
812
813 # The orchestrator will have picked some names for daemons,
814 # typically either based on hostnames or on pod names.
815 # This is the <foo> in mds.<foo>, the ID that will appear
816 # in the FSMap/ServiceMap.
f67539c2
TL
817 self.daemon_id: Optional[str] = daemon_id
818
819 self._service_name: Optional[str] = service_name
9f95a23c
TL
820
821 # Service version that was deployed
822 self.version = version
823
824 # Service status: -1 error, 0 stopped, 1 running
825 self.status = status
826
f67539c2 827 # Service status description when status == error.
9f95a23c
TL
828 self.status_desc = status_desc
829
830 # datetime when this info was last refreshed
f6b5b4d7 831 self.last_refresh: Optional[datetime.datetime] = last_refresh
9f95a23c 832
f6b5b4d7
TL
833 self.created: Optional[datetime.datetime] = created
834 self.started: Optional[datetime.datetime] = started
835 self.last_configured: Optional[datetime.datetime] = last_configured
836 self.last_deployed: Optional[datetime.datetime] = last_deployed
9f95a23c 837
e306af50 838 # Affinity to a certain OSDSpec
f6b5b4d7
TL
839 self.osdspec_affinity: Optional[str] = osdspec_affinity
840
841 self.events: List[OrchestratorEvent] = events or []
f91f0fd5 842
f67539c2
TL
843 self.memory_usage: Optional[int] = memory_usage
844 self.memory_request: Optional[int] = memory_request
845 self.memory_limit: Optional[int] = memory_limit
846
847 self.ports: Optional[List[int]] = ports
848 self.ip: Optional[str] = ip
849
850 self.deployed_by = deployed_by
851
f6b5b4d7 852 self.is_active = is_active
e306af50 853
f67539c2
TL
854 def get_port_summary(self) -> str:
855 if not self.ports:
856 return ''
857 return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
858
859 def name(self) -> str:
9f95a23c
TL
860 return '%s.%s' % (self.daemon_type, self.daemon_id)
861
f6b5b4d7 862 def matches_service(self, service_name: Optional[str]) -> bool:
f67539c2
TL
863 assert self.daemon_id is not None
864 assert self.daemon_type is not None
9f95a23c 865 if service_name:
f67539c2 866 return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
9f95a23c
TL
867 return False
868
f67539c2
TL
869 def service_id(self) -> str:
870 assert self.daemon_id is not None
871 assert self.daemon_type is not None
f6b5b4d7 872
f67539c2
TL
873 if self._service_name:
874 if '.' in self._service_name:
875 return self._service_name.split('.', 1)[1]
876 else:
877 return ''
878
879 if self.daemon_type == 'osd':
880 if self.osdspec_affinity and self.osdspec_affinity != 'None':
881 return self.osdspec_affinity
882 return 'unmanaged'
883
884 def _match() -> str:
885 assert self.daemon_id is not None
f6b5b4d7
TL
886 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
887 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
e306af50
TL
888
889 if not self.hostname:
890 # TODO: can a DaemonDescription exist without a hostname?
891 raise err
892
893 # use the bare hostname, not the FQDN.
894 host = self.hostname.split('.')[0]
895
896 if host == self.daemon_id:
897 # daemon_id == "host"
898 return self.daemon_id
899
900 elif host in self.daemon_id:
901 # daemon_id == "service_id.host"
902 # daemon_id == "service_id.host.random"
903 pre, post = self.daemon_id.rsplit(host, 1)
904 if not pre.endswith('.'):
905 # '.' sep missing at front of host
906 raise err
907 elif post and not post.startswith('.'):
908 # '.' sep missing at end of host
909 raise err
1911f103 910 return pre[:-1]
e306af50
TL
911
912 # daemon_id == "service_id.random"
913 if self.daemon_type == 'rgw':
1911f103 914 v = self.daemon_id.split('.')
e306af50 915 if len(v) in [3, 4]:
1911f103 916 return '.'.join(v[0:2])
e306af50 917
f91f0fd5
TL
918 if self.daemon_type == 'iscsi':
919 v = self.daemon_id.split('.')
920 return '.'.join(v[0:-1])
921
e306af50
TL
922 # daemon_id == "service_id"
923 return self.daemon_id
924
f67539c2 925 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
e306af50
TL
926 return _match()
927
928 return self.daemon_id
1911f103 929
f67539c2
TL
930 def service_name(self) -> str:
931 if self._service_name:
932 return self._service_name
933 assert self.daemon_type is not None
934 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
935 return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
936 return daemon_type_to_service(self.daemon_type)
9f95a23c 937
f67539c2 938 def __repr__(self) -> str:
9f95a23c
TL
939 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
940 id=self.daemon_id)
941
f67539c2
TL
942 def to_json(self) -> dict:
943 out: Dict[str, Any] = OrderedDict()
f6b5b4d7
TL
944 out['daemon_type'] = self.daemon_type
945 out['daemon_id'] = self.daemon_id
946 out['hostname'] = self.hostname
947 out['container_id'] = self.container_id
948 out['container_image_id'] = self.container_image_id
949 out['container_image_name'] = self.container_image_name
f67539c2
TL
950 out['container_image_digests'] = self.container_image_digests
951 out['memory_usage'] = self.memory_usage
952 out['memory_request'] = self.memory_request
953 out['memory_limit'] = self.memory_limit
f6b5b4d7 954 out['version'] = self.version
f67539c2 955 out['status'] = self.status.value if self.status is not None else None
f6b5b4d7
TL
956 out['status_desc'] = self.status_desc
957 if self.daemon_type == 'osd':
958 out['osdspec_affinity'] = self.osdspec_affinity
959 out['is_active'] = self.is_active
f67539c2
TL
960 out['ports'] = self.ports
961 out['ip'] = self.ip
f6b5b4d7 962
9f95a23c
TL
963 for k in ['last_refresh', 'created', 'started', 'last_deployed',
964 'last_configured']:
965 if getattr(self, k):
adb31ebb 966 out[k] = datetime_to_str(getattr(self, k))
f6b5b4d7
TL
967
968 if self.events:
969 out['events'] = [e.to_json() for e in self.events]
970
971 empty = [k for k, v in out.items() if v is None]
972 for e in empty:
973 del out[e]
974 return out
9f95a23c
TL
975
976 @classmethod
977 @handle_type_error
f67539c2 978 def from_json(cls, data: dict) -> 'DaemonDescription':
9f95a23c 979 c = data.copy()
f6b5b4d7 980 event_strs = c.pop('events', [])
9f95a23c
TL
981 for k in ['last_refresh', 'created', 'started', 'last_deployed',
982 'last_configured']:
983 if k in c:
adb31ebb 984 c[k] = str_to_datetime(c[k])
f6b5b4d7 985 events = [OrchestratorEvent.from_json(e) for e in event_strs]
f67539c2
TL
986 status_int = c.pop('status', None)
987 status = DaemonDescriptionStatus(status_int) if status_int is not None else None
988 return cls(events=events, status=status, **c)
9f95a23c 989
f67539c2 990 def __copy__(self) -> 'DaemonDescription':
1911f103
TL
991 # feel free to change this:
992 return DaemonDescription.from_json(self.to_json())
993
f6b5b4d7 994 @staticmethod
f67539c2 995 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
f6b5b4d7
TL
996 return dumper.represent_dict(data.to_json().items())
997
998
999yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1000
1001
9f95a23c
TL
1002class ServiceDescription(object):
1003 """
1004 For responding to queries about the status of a particular service,
1005 stateful or stateless.
1006
1007 This is not about health or performance monitoring of services: it's
1008 about letting the orchestrator tell Ceph whether and where a
1009 service is scheduled in the cluster. When an orchestrator tells
1010 Ceph "it's running on host123", that's not a promise that the process
1011 is literally up this second, it's a description of where the orchestrator
1012 has decided the service should run.
1013 """
1014
1015 def __init__(self,
1911f103 1016 spec: ServiceSpec,
f67539c2
TL
1017 container_image_id: Optional[str] = None,
1018 container_image_name: Optional[str] = None,
1019 rados_config_location: Optional[str] = None,
1020 service_url: Optional[str] = None,
1021 last_refresh: Optional[datetime.datetime] = None,
1022 created: Optional[datetime.datetime] = None,
1023 deleted: Optional[datetime.datetime] = None,
1024 size: int = 0,
1025 running: int = 0,
1026 events: Optional[List['OrchestratorEvent']] = None,
1027 virtual_ip: Optional[str] = None,
1028 ports: List[int] = []) -> None:
9f95a23c
TL
1029 # Not everyone runs in containers, but enough people do to
1030 # justify having the container_image_id (image hash) and container_image
1031 # (image name)
1032 self.container_image_id = container_image_id # image hash
1033 self.container_image_name = container_image_name # image friendly name
1034
9f95a23c
TL
1035 # Location of the service configuration when stored in rados
1036 # object. Format: "rados://<pool>/[<namespace/>]<object>"
1037 self.rados_config_location = rados_config_location
1038
1039 # If the service exposes REST-like API, this attribute should hold
1040 # the URL.
1041 self.service_url = service_url
1042
1043 # Number of daemons
1044 self.size = size
1045
1046 # Number of daemons up
1047 self.running = running
1048
1049 # datetime when this info was last refreshed
f6b5b4d7
TL
1050 self.last_refresh: Optional[datetime.datetime] = last_refresh
1051 self.created: Optional[datetime.datetime] = created
f67539c2 1052 self.deleted: Optional[datetime.datetime] = deleted
9f95a23c 1053
1911f103 1054 self.spec: ServiceSpec = spec
9f95a23c 1055
f6b5b4d7
TL
1056 self.events: List[OrchestratorEvent] = events or []
1057
f67539c2
TL
1058 self.virtual_ip = virtual_ip
1059 self.ports = ports
1060
1061 def service_type(self) -> str:
1911f103 1062 return self.spec.service_type
9f95a23c 1063
f67539c2 1064 def __repr__(self) -> str:
1911f103 1065 return f"<ServiceDescription of {self.spec.one_line_str()}>"
9f95a23c 1066
f67539c2
TL
1067 def get_port_summary(self) -> str:
1068 if not self.ports:
1069 return ''
1070 return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1071
f6b5b4d7 1072 def to_json(self) -> OrderedDict:
1911f103
TL
1073 out = self.spec.to_json()
1074 status = {
9f95a23c
TL
1075 'container_image_id': self.container_image_id,
1076 'container_image_name': self.container_image_name,
9f95a23c
TL
1077 'rados_config_location': self.rados_config_location,
1078 'service_url': self.service_url,
1079 'size': self.size,
1080 'running': self.running,
1911f103 1081 'last_refresh': self.last_refresh,
f6b5b4d7 1082 'created': self.created,
f67539c2
TL
1083 'virtual_ip': self.virtual_ip,
1084 'ports': self.ports if self.ports else None,
9f95a23c
TL
1085 }
1086 for k in ['last_refresh', 'created']:
1087 if getattr(self, k):
adb31ebb 1088 status[k] = datetime_to_str(getattr(self, k))
1911f103
TL
1089 status = {k: v for (k, v) in status.items() if v is not None}
1090 out['status'] = status
f6b5b4d7
TL
1091 if self.events:
1092 out['events'] = [e.to_json() for e in self.events]
1911f103 1093 return out
9f95a23c
TL
1094
1095 @classmethod
1096 @handle_type_error
f67539c2 1097 def from_json(cls, data: dict) -> 'ServiceDescription':
9f95a23c 1098 c = data.copy()
1911f103 1099 status = c.pop('status', {})
f6b5b4d7 1100 event_strs = c.pop('events', [])
1911f103
TL
1101 spec = ServiceSpec.from_json(c)
1102
1103 c_status = status.copy()
9f95a23c 1104 for k in ['last_refresh', 'created']:
1911f103 1105 if k in c_status:
adb31ebb 1106 c_status[k] = str_to_datetime(c_status[k])
f6b5b4d7
TL
1107 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1108 return cls(spec=spec, events=events, **c_status)
1109
1110 @staticmethod
f67539c2 1111 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
f6b5b4d7
TL
1112 return dumper.represent_dict(data.to_json().items())
1113
1114
1115yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
9f95a23c
TL
1116
1117
1118class InventoryFilter(object):
1119 """
1120 When fetching inventory, use this filter to avoid unnecessarily
1121 scanning the whole estate.
1122
1123 Typical use: filter by host when presenting UI workflow for configuring
1124 a particular server.
1125 filter by label when not all of estate is Ceph servers,
1126 and we want to only learn about the Ceph servers.
1127 filter by label when we are interested particularly
1128 in e.g. OSD servers.
1129
1130 """
f6b5b4d7
TL
1131
1132 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
9f95a23c
TL
1133
1134 #: Optional: get info about hosts matching labels
1135 self.labels = labels
1136
1137 #: Optional: get info about certain named hosts only
1138 self.hosts = hosts
1139
1140
1141class InventoryHost(object):
1142 """
1143 When fetching inventory, all Devices are groups inside of an
1144 InventoryHost.
1145 """
f6b5b4d7
TL
1146
1147 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
9f95a23c
TL
1148 if devices is None:
1149 devices = inventory.Devices([])
1150 if labels is None:
1151 labels = []
1152 assert isinstance(devices, inventory.Devices)
1153
1154 self.name = name # unique within cluster. For example a hostname.
1155 self.addr = addr or name
1156 self.devices = devices
1157 self.labels = labels
1158
f67539c2 1159 def to_json(self) -> dict:
9f95a23c
TL
1160 return {
1161 'name': self.name,
1162 'addr': self.addr,
1163 'devices': self.devices.to_json(),
1164 'labels': self.labels,
1165 }
1166
1167 @classmethod
f67539c2 1168 def from_json(cls, data: dict) -> 'InventoryHost':
9f95a23c
TL
1169 try:
1170 _data = copy.deepcopy(data)
1171 name = _data.pop('name')
1172 addr = _data.pop('addr', None) or name
1173 devices = inventory.Devices.from_json(_data.pop('devices'))
1911f103 1174 labels = _data.pop('labels', list())
9f95a23c
TL
1175 if _data:
1176 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1177 raise OrchestratorValidationError(error_msg)
9f95a23c
TL
1178 return cls(name, devices, labels, addr)
1179 except KeyError as e:
1180 error_msg = '{} is required for {}'.format(e, cls.__name__)
1181 raise OrchestratorValidationError(error_msg)
1182 except TypeError as e:
1183 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1184
9f95a23c 1185 @classmethod
f67539c2 1186 def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
9f95a23c
TL
1187 devs = inventory.Devices.from_json
1188 return [cls(item[0], devs(item[1].data)) for item in hosts]
1189
f67539c2 1190 def __repr__(self) -> str:
9f95a23c
TL
1191 return "<InventoryHost>({name})".format(name=self.name)
1192
1193 @staticmethod
f6b5b4d7 1194 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
9f95a23c
TL
1195 return [host.name for host in hosts]
1196
f67539c2 1197 def __eq__(self, other: Any) -> bool:
9f95a23c
TL
1198 return self.name == other.name and self.devices == other.devices
1199
1200
1201class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1202 """
1203 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1204 on devices.
1205
1206 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1207
1208 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1209 See ``ceph osd metadata | jq '.[].device_ids'``
1210 """
1211 __slots__ = ()
1212
1213
f6b5b4d7
TL
1214class OrchestratorEvent:
1215 """
1216 Similar to K8s Events.
1217
1218 Some form of "important" log message attached to something.
1219 """
1220 INFO = 'INFO'
1221 ERROR = 'ERROR'
1222 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1223
f67539c2
TL
1224 def __init__(self, created: Union[str, datetime.datetime], kind: str,
1225 subject: str, level: str, message: str) -> None:
f6b5b4d7 1226 if isinstance(created, str):
adb31ebb 1227 created = str_to_datetime(created)
f6b5b4d7
TL
1228 self.created: datetime.datetime = created
1229
1230 assert kind in "service daemon".split()
1231 self.kind: str = kind
1232
1233 # service name, or daemon danem or something
1234 self.subject: str = subject
1235
1236 # Events are not meant for debugging. debugs should end in the log.
1237 assert level in "INFO ERROR".split()
1238 self.level = level
1239
1240 self.message: str = message
1241
1242 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1243
1244 def kind_subject(self) -> str:
1245 return f'{self.kind}:{self.subject}'
1246
1247 def to_json(self) -> str:
1248 # Make a long list of events readable.
adb31ebb 1249 created = datetime_to_str(self.created)
f6b5b4d7
TL
1250 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1251
1252 @classmethod
1253 @handle_type_error
f67539c2 1254 def from_json(cls, data: str) -> "OrchestratorEvent":
f6b5b4d7
TL
1255 """
1256 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
adb31ebb 1257 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
f6b5b4d7
TL
1258
1259 :param data:
1260 :return:
1261 """
1262 match = cls.regex_v1.match(data)
1263 if match:
1264 return cls(*match.groups())
1265 raise ValueError(f'Unable to match: "{data}"')
1266
f67539c2 1267 def __eq__(self, other: Any) -> bool:
f6b5b4d7
TL
1268 if not isinstance(other, OrchestratorEvent):
1269 return False
1270
1271 return self.created == other.created and self.kind == other.kind \
1272 and self.subject == other.subject and self.message == other.message
1273
1274
f67539c2 1275def _mk_orch_methods(cls: Any) -> Any:
9f95a23c
TL
1276 # Needs to be defined outside of for.
1277 # Otherwise meth is always bound to last key
f67539c2
TL
1278 def shim(method_name: str) -> Callable:
1279 def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
9f95a23c
TL
1280 completion = self._oremote(method_name, args, kwargs)
1281 return completion
1282 return inner
1283
1284 for meth in Orchestrator.__dict__:
1285 if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
1286 setattr(cls, meth, shim(meth))
1287 return cls
1288
1289
1290@_mk_orch_methods
1291class OrchestratorClientMixin(Orchestrator):
1292 """
1293 A module that inherents from `OrchestratorClientMixin` can directly call
1294 all :class:`Orchestrator` methods without manually calling remote.
1295
1296 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1297 calls :func:`OrchestratorClientMixin._oremote`
1298
1299 >>> class MyModule(OrchestratorClientMixin):
1300 ... def func(self):
1301 ... completion = self.add_host('somehost') # calls `_oremote()`
9f95a23c
TL
1302 ... self.log.debug(completion.result)
1303
1304 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1305 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1306 "real" implementation of the orchestrator.
1307
1308
1309 >>> import mgr_module
1911f103
TL
1310 >>> #doctest: +SKIP
1311 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
9f95a23c
TL
1312 ... def __init__(self, ...):
1313 ... self.orch_client = OrchestratorClientMixin()
1314 ... self.orch_client.set_mgr(self.mgr))
1315 """
1316
f6b5b4d7 1317 def set_mgr(self, mgr: MgrModule) -> None:
9f95a23c
TL
1318 """
1319 Useable in the Dashbord that uses a global ``mgr``
1320 """
1321
1322 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1323
f67539c2 1324 def __get_mgr(self) -> Any:
9f95a23c
TL
1325 try:
1326 return self.__mgr
1327 except AttributeError:
1328 return self
1329
f67539c2 1330 def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
9f95a23c
TL
1331 """
1332 Helper for invoking `remote` on whichever orchestrator is enabled
1333
1334 :raises RuntimeError: If the remote method failed.
1335 :raises OrchestratorError: orchestrator failed to perform
1336 :raises ImportError: no `orchestrator` module or backend not found.
1337 """
1338 mgr = self.__get_mgr()
1339
1340 try:
1341 o = mgr._select_orchestrator()
1342 except AttributeError:
1343 o = mgr.remote('orchestrator', '_select_orchestrator')
1344
1345 if o is None:
1346 raise NoOrchestrator()
1347
1348 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1349 try:
1350 return mgr.remote(o, meth, *args, **kwargs)
1351 except Exception as e:
1352 if meth == 'get_feature_set':
1353 raise # self.get_feature_set() calls self._oremote()
1354 f_set = self.get_feature_set()
1355 if meth not in f_set or not f_set[meth]['available']:
1356 raise NotImplementedError(f'{o} does not implement {meth}') from e
1357 raise