]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/orchestrator/_interface.py
import quincy 17.2.0
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
CommitLineData
9f95a23c
TL
1
2"""
3ceph-mgr orchestrator interface
4
5Please see the ceph-mgr module developer's guide for more information.
6"""
e306af50
TL
7
8import copy
9import datetime
f67539c2 10import enum
e306af50 11import errno
9f95a23c
TL
12import logging
13import pickle
e306af50 14import re
e306af50 15
f6b5b4d7
TL
16from collections import namedtuple, OrderedDict
17from contextlib import contextmanager
1d09f67e 18from functools import wraps, reduce, update_wrapper
f67539c2
TL
19
20from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
522d829b 21 Sequence, Dict, cast, Mapping
f67539c2
TL
22
23try:
24 from typing import Protocol # Protocol was added in Python 3.8
25except ImportError:
26 class Protocol: # type: ignore
27 pass
28
9f95a23c 29
f6b5b4d7
TL
30import yaml
31
9f95a23c
TL
32from ceph.deployment import inventory
33from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
20effc67 34 IscsiServiceSpec, IngressSpec, SNMPGatewaySpec
9f95a23c 35from ceph.deployment.drive_group import DriveGroupSpec
f67539c2 36from ceph.deployment.hostspec import HostSpec, SpecValidationError
adb31ebb 37from ceph.utils import datetime_to_str, str_to_datetime
9f95a23c
TL
38
39from mgr_module import MgrModule, CLICommand, HandleCommandResult
40
9f95a23c
TL
41
42logger = logging.getLogger(__name__)
43
f6b5b4d7 44T = TypeVar('T')
f67539c2 45FuncT = TypeVar('FuncT', bound=Callable[..., Any])
f6b5b4d7 46
9f95a23c
TL
47
48class OrchestratorError(Exception):
49 """
50 General orchestrator specific error.
51
52 Used for deployment, configuration or user errors.
53
54 It's not intended for programming errors or orchestrator internal errors.
55 """
56
f6b5b4d7
TL
57 def __init__(self,
58 msg: str,
59 errno: int = -errno.EINVAL,
f67539c2 60 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
f6b5b4d7
TL
61 super(Exception, self).__init__(msg)
62 self.errno = errno
63 # See OrchestratorEvent.subject
64 self.event_subject = event_kind_subject
65
9f95a23c
TL
66
67class NoOrchestrator(OrchestratorError):
68 """
69 No orchestrator in configured.
70 """
f6b5b4d7 71
f67539c2 72 def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
f6b5b4d7 73 super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
9f95a23c
TL
74
75
76class OrchestratorValidationError(OrchestratorError):
77 """
78 Raised when an orchestrator doesn't support a specific feature.
79 """
80
81
f6b5b4d7 82@contextmanager
f67539c2 83def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
f6b5b4d7
TL
84 try:
85 yield
86 except OrchestratorError as e:
87 if overwrite or hasattr(e, 'event_subject'):
88 e.event_subject = (kind, subject)
89 raise
90
91
f67539c2 92def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
9f95a23c 93 @wraps(func)
f67539c2 94 def wrapper(*args: Any, **kwargs: Any) -> Any:
9f95a23c
TL
95 try:
96 return func(*args, **kwargs)
f67539c2 97 except (OrchestratorError, SpecValidationError) as e:
9f95a23c 98 # Do not print Traceback for expected errors.
f6b5b4d7
TL
99 return HandleCommandResult(e.errno, stderr=str(e))
100 except ImportError as e:
9f95a23c
TL
101 return HandleCommandResult(-errno.ENOENT, stderr=str(e))
102 except NotImplementedError:
103 msg = 'This Orchestrator does not support `{}`'.format(prefix)
104 return HandleCommandResult(-errno.ENOENT, stderr=msg)
105
f67539c2
TL
106 # misuse lambda to copy `wrapper`
107 wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731
9f95a23c 108 wrapper_copy._prefix = prefix # type: ignore
f67539c2
TL
109 wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore
110 wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
9f95a23c
TL
111 wrapper_copy._cli_command.func = wrapper_copy # type: ignore
112
f67539c2 113 return cast(FuncT, wrapper_copy)
9f95a23c
TL
114
115
f67539c2
TL
116def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
117 """
118 Decorator to make Orchestrator methods return
119 an OrchResult.
120 """
121
122 @wraps(f)
123 def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
124 try:
125 return OrchResult(f(*args, **kwargs))
126 except Exception as e:
a4b75251 127 logger.exception(e)
20effc67
TL
128 import os
129 if 'UNITTEST' in os.environ:
130 raise # This makes debugging of Tracebacks from unittests a bit easier
f67539c2
TL
131 return OrchResult(None, exception=e)
132
133 return cast(Callable[..., OrchResult[T]], wrapper)
134
135
136class InnerCliCommandCallable(Protocol):
137 def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
138 ...
139
140
141def _cli_command(perm: str) -> InnerCliCommandCallable:
142 def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
143 return lambda func: handle_exception(prefix, perm, func)
9f95a23c
TL
144 return inner_cli_command
145
146
147_cli_read_command = _cli_command('r')
148_cli_write_command = _cli_command('rw')
149
150
151class CLICommandMeta(type):
152 """
153 This is a workaround for the use of a global variable CLICommand.COMMANDS which
154 prevents modules from importing any other module.
155
156 We make use of CLICommand, except for the use of the global variable.
157 """
f67539c2 158 def __init__(cls, name: str, bases: Any, dct: Any) -> None:
9f95a23c 159 super(CLICommandMeta, cls).__init__(name, bases, dct)
f6b5b4d7 160 dispatch: Dict[str, CLICommand] = {}
9f95a23c
TL
161 for v in dct.values():
162 try:
163 dispatch[v._prefix] = v._cli_command
164 except AttributeError:
165 pass
166
f67539c2 167 def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
9f95a23c
TL
168 if cmd['prefix'] not in dispatch:
169 return self.handle_command(inbuf, cmd)
170
171 return dispatch[cmd['prefix']].call(self, cmd, inbuf)
172
173 cls.COMMANDS = [cmd.dump_cmd() for cmd in dispatch.values()]
174 cls.handle_command = handle_command
175
176
f67539c2
TL
177class OrchResult(Generic[T]):
178 """
179 Stores a result and an exception. Mainly to circumvent the
180 MgrModule.remote() method that hides all exceptions and for
181 handling different sub-interpreters.
9f95a23c 182 """
9f95a23c 183
f67539c2
TL
184 def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
185 self.result = result
186 self.serialized_exception: Optional[bytes] = None
187 self.exception_str: str = ''
188 self.set_exception(exception)
9f95a23c 189
f67539c2 190 __slots__ = 'result', 'serialized_exception', 'exception_str'
9f95a23c 191
f67539c2
TL
192 def set_exception(self, e: Optional[Exception]) -> None:
193 if e is None:
194 self.serialized_exception = None
195 self.exception_str = ''
196 return
9f95a23c 197
f67539c2 198 self.exception_str = f'{type(e)}: {str(e)}'
9f95a23c 199 try:
f67539c2 200 self.serialized_exception = pickle.dumps(e)
9f95a23c
TL
201 except pickle.PicklingError:
202 logger.error(f"failed to pickle {e}")
203 if isinstance(e, Exception):
204 e = Exception(*e.args)
205 else:
206 e = Exception(str(e))
207 # degenerate to a plain Exception
f67539c2 208 self.serialized_exception = pickle.dumps(e)
9f95a23c 209
f6b5b4d7 210 def result_str(self) -> str:
9f95a23c
TL
211 """Force a string."""
212 if self.result is None:
213 return ''
214 if isinstance(self.result, list):
215 return '\n'.join(str(x) for x in self.result)
216 return str(self.result)
217
9f95a23c 218
f67539c2 219def raise_if_exception(c: OrchResult[T]) -> T:
9f95a23c 220 """
f67539c2 221 Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
9f95a23c
TL
222 """
223 if c.serialized_exception is not None:
224 try:
225 e = pickle.loads(c.serialized_exception)
226 except (KeyError, AttributeError):
f67539c2 227 raise Exception(c.exception_str)
9f95a23c 228 raise e
f67539c2
TL
229 assert c.result is not None, 'OrchResult should either have an exception or a result'
230 return c.result
9f95a23c
TL
231
232
f67539c2
TL
233def _hide_in_features(f: FuncT) -> FuncT:
234 f._hide_in_features = True # type: ignore
9f95a23c
TL
235 return f
236
237
238class Orchestrator(object):
239 """
240 Calls in this class may do long running remote operations, with time
241 periods ranging from network latencies to package install latencies and large
242 internet downloads. For that reason, all are asynchronous, and return
243 ``Completion`` objects.
244
245 Methods should only return the completion and not directly execute
246 anything, like network calls. Otherwise the purpose of
247 those completions is defeated.
248
249 Implementations are not required to start work on an operation until
250 the caller waits on the relevant Completion objects. Callers making
251 multiple updates should not wait on Completions until they're done
252 sending operations: this enables implementations to batch up a series
253 of updates when wait() is called on a set of Completion objects.
254
255 Implementations are encouraged to keep reasonably fresh caches of
256 the status of the system: it is better to serve a stale-but-recent
257 result read of e.g. device inventory than it is to keep the caller waiting
258 while you scan hosts every time.
259 """
260
261 @_hide_in_features
f67539c2 262 def is_orchestrator_module(self) -> bool:
9f95a23c
TL
263 """
264 Enable other modules to interrogate this module to discover
265 whether it's usable as an orchestrator module.
266
267 Subclasses do not need to override this.
268 """
269 return True
270
271 @_hide_in_features
f67539c2 272 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
9f95a23c
TL
273 """
274 Report whether we can talk to the orchestrator. This is the
275 place to give the user a meaningful message if the orchestrator
276 isn't running or can't be contacted.
277
278 This method may be called frequently (e.g. every page load
279 to conditionally display a warning banner), so make sure it's
280 not too expensive. It's okay to give a slightly stale status
281 (e.g. based on a periodic background ping of the orchestrator)
282 if that's necessary to make this method fast.
283
284 .. note::
285 `True` doesn't mean that the desired functionality
286 is actually available in the orchestrator. I.e. this
287 won't work as expected::
288
1911f103
TL
289 >>> #doctest: +SKIP
290 ... if OrchestratorClientMixin().available()[0]: # wrong.
9f95a23c
TL
291 ... OrchestratorClientMixin().get_hosts()
292
f67539c2
TL
293 :return: boolean representing whether the module is available/usable
294 :return: string describing any error
295 :return: dict containing any module specific information
9f95a23c
TL
296 """
297 raise NotImplementedError()
298
299 @_hide_in_features
f67539c2 300 def get_feature_set(self) -> Dict[str, dict]:
9f95a23c
TL
301 """Describes which methods this orchestrator implements
302
303 .. note::
304 `True` doesn't mean that the desired functionality
305 is actually possible in the orchestrator. I.e. this
306 won't work as expected::
307
1911f103
TL
308 >>> #doctest: +SKIP
309 ... api = OrchestratorClientMixin()
9f95a23c
TL
310 ... if api.get_feature_set()['get_hosts']['available']: # wrong.
311 ... api.get_hosts()
312
313 It's better to ask for forgiveness instead::
314
1911f103
TL
315 >>> #doctest: +SKIP
316 ... try:
9f95a23c
TL
317 ... OrchestratorClientMixin().get_hosts()
318 ... except (OrchestratorError, NotImplementedError):
319 ... ...
320
321 :returns: Dict of API method names to ``{'available': True or False}``
322 """
323 module = self.__class__
324 features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
325 for a in Orchestrator.__dict__
326 if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
327 }
328 return features
329
f6b5b4d7 330 def cancel_completions(self) -> None:
9f95a23c
TL
331 """
332 Cancels ongoing completions. Unstuck the mgr.
333 """
334 raise NotImplementedError()
335
f6b5b4d7 336 def pause(self) -> None:
9f95a23c
TL
337 raise NotImplementedError()
338
f6b5b4d7 339 def resume(self) -> None:
9f95a23c
TL
340 raise NotImplementedError()
341
f67539c2 342 def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
9f95a23c
TL
343 """
344 Add a host to the orchestrator inventory.
345
346 :param host: hostname
347 """
348 raise NotImplementedError()
349
522d829b 350 def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
9f95a23c
TL
351 """
352 Remove a host from the orchestrator inventory.
353
354 :param host: hostname
355 """
356 raise NotImplementedError()
357
522d829b
TL
358 def drain_host(self, hostname: str) -> OrchResult[str]:
359 """
360 drain all daemons from a host
361
362 :param hostname: hostname
363 """
364 raise NotImplementedError()
365
f67539c2 366 def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
9f95a23c
TL
367 """
368 Update a host's address
369
370 :param host: hostname
371 :param addr: address (dns name or IP)
372 """
373 raise NotImplementedError()
374
f67539c2 375 def get_hosts(self) -> OrchResult[List[HostSpec]]:
9f95a23c
TL
376 """
377 Report the hosts in the cluster.
378
379 :return: list of HostSpec
380 """
381 raise NotImplementedError()
382
a4b75251
TL
383 def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
384 """
385 Return hosts metadata(gather_facts).
386 """
387 raise NotImplementedError()
388
f67539c2 389 def add_host_label(self, host: str, label: str) -> OrchResult[str]:
9f95a23c
TL
390 """
391 Add a host label
392 """
393 raise NotImplementedError()
394
f67539c2 395 def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
9f95a23c
TL
396 """
397 Remove a host label
398 """
399 raise NotImplementedError()
400
f67539c2 401 def host_ok_to_stop(self, hostname: str) -> OrchResult:
f6b5b4d7
TL
402 """
403 Check if the specified host can be safely stopped without reducing availability
404
405 :param host: hostname
406 """
407 raise NotImplementedError()
408
f67539c2
TL
409 def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
410 """
411 Place a host in maintenance, stopping daemons and disabling it's systemd target
412 """
413 raise NotImplementedError()
414
415 def exit_host_maintenance(self, hostname: str) -> OrchResult:
416 """
417 Return a host from maintenance, restarting the clusters systemd target
418 """
419 raise NotImplementedError()
420
421 def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
9f95a23c
TL
422 """
423 Returns something that was created by `ceph-volume inventory`.
424
425 :return: list of InventoryHost
426 """
427 raise NotImplementedError()
428
f67539c2 429 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
9f95a23c
TL
430 """
431 Describe a service (of any kind) that is already configured in
432 the orchestrator. For example, when viewing an OSD in the dashboard
433 we might like to also display information about the orchestrator's
434 view of the service (like the kubernetes pod ID).
435
436 When viewing a CephFS filesystem in the dashboard, we would use this
437 to display the pods being currently run for MDS daemons.
438
439 :return: list of ServiceDescription objects.
440 """
441 raise NotImplementedError()
442
f67539c2 443 def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
9f95a23c
TL
444 """
445 Describe a daemon (of any kind) that is already configured in
446 the orchestrator.
447
448 :return: list of DaemonDescription objects.
449 """
450 raise NotImplementedError()
451
f67539c2
TL
452 @handle_orch_error
453 def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
9f95a23c
TL
454 """
455 Applies any spec
456 """
f67539c2 457 fns: Dict[str, Callable[..., OrchResult[str]]] = {
9f95a23c
TL
458 'alertmanager': self.apply_alertmanager,
459 'crash': self.apply_crash,
460 'grafana': self.apply_grafana,
e306af50 461 'iscsi': self.apply_iscsi,
9f95a23c
TL
462 'mds': self.apply_mds,
463 'mgr': self.apply_mgr,
464 'mon': self.apply_mon,
e306af50 465 'nfs': self.apply_nfs,
9f95a23c 466 'node-exporter': self.apply_node_exporter,
f67539c2 467 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
9f95a23c
TL
468 'prometheus': self.apply_prometheus,
469 'rbd-mirror': self.apply_rbd_mirror,
e306af50 470 'rgw': self.apply_rgw,
f67539c2 471 'ingress': self.apply_ingress,
20effc67 472 'snmp-gateway': self.apply_snmp_gateway,
e306af50 473 'host': self.add_host,
9f95a23c
TL
474 }
475
f67539c2
TL
476 def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
477 l_res = raise_if_exception(l)
478 r_res = raise_if_exception(r)
479 l_res.append(r_res)
480 return OrchResult(l_res)
481 return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
9f95a23c 482
f67539c2 483 def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
f6b5b4d7
TL
484 """
485 Plan (Dry-run, Preview) a List of Specs.
486 """
487 raise NotImplementedError()
488
f67539c2 489 def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
9f95a23c
TL
490 """
491 Remove specific daemon(s).
492
493 :return: None
494 """
495 raise NotImplementedError()
496
a4b75251 497 def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
9f95a23c
TL
498 """
499 Remove a service (a collection of daemons).
500
501 :return: None
502 """
503 raise NotImplementedError()
504
f67539c2 505 def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
9f95a23c
TL
506 """
507 Perform an action (start/stop/reload) on a service (i.e., all daemons
508 providing the logical service).
509
510 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
f6b5b4d7
TL
511 :param service_name: service_type + '.' + service_id
512 (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
f67539c2 513 :rtype: OrchResult
9f95a23c 514 """
f6b5b4d7 515 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
9f95a23c
TL
516 raise NotImplementedError()
517
f67539c2 518 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
9f95a23c
TL
519 """
520 Perform an action (start/stop/reload) on a daemon.
521
522 :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
f6b5b4d7
TL
523 :param daemon_name: name of daemon
524 :param image: Container image when redeploying that daemon
f67539c2 525 :rtype: OrchResult
9f95a23c 526 """
f6b5b4d7 527 # assert action in ["start", "stop", "reload, "restart", "redeploy"]
9f95a23c
TL
528 raise NotImplementedError()
529
f67539c2 530 def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
9f95a23c
TL
531 """
532 Create one or more OSDs within a single Drive Group.
533
534 The principal argument here is the drive_group member
535 of OsdSpec: other fields are advisory/extensible for any
536 finer-grained OSD feature enablement (choice of backing store,
537 compression/encryption, etc).
538 """
539 raise NotImplementedError()
540
f67539c2 541 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
9f95a23c
TL
542 """ Update OSD cluster """
543 raise NotImplementedError()
544
e306af50
TL
545 def set_unmanaged_flag(self,
546 unmanaged_flag: bool,
547 service_type: str = 'osd',
f67539c2 548 service_name: Optional[str] = None
e306af50 549 ) -> HandleCommandResult:
1911f103
TL
550 raise NotImplementedError()
551
e306af50
TL
552 def preview_osdspecs(self,
553 osdspec_name: Optional[str] = 'osd',
554 osdspecs: Optional[List[DriveGroupSpec]] = None
f67539c2 555 ) -> OrchResult[str]:
1911f103
TL
556 """ Get a preview for OSD deployments """
557 raise NotImplementedError()
558
9f95a23c
TL
559 def remove_osds(self, osd_ids: List[str],
560 replace: bool = False,
a4b75251
TL
561 force: bool = False,
562 zap: bool = False) -> OrchResult[str]:
9f95a23c
TL
563 """
564 :param osd_ids: list of OSD IDs
565 :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
566 :param force: Forces the OSD removal process without waiting for the data to be drained first.
a4b75251 567 :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
1d09f67e
TL
568
569
570 .. note:: this can only remove OSDs that were successfully
571 created (i.e. got an OSD ID).
9f95a23c
TL
572 """
573 raise NotImplementedError()
574
f67539c2 575 def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
f6b5b4d7
TL
576 """
577 TODO
578 """
579 raise NotImplementedError()
580
f67539c2 581 def remove_osds_status(self) -> OrchResult:
9f95a23c
TL
582 """
583 Returns a status of the ongoing OSD removal operations.
584 """
585 raise NotImplementedError()
586
f67539c2 587 def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
9f95a23c
TL
588 """
589 Instructs the orchestrator to enable or disable either the ident or the fault LED.
590
591 :param ident_fault: either ``"ident"`` or ``"fault"``
592 :param on: ``True`` = on.
593 :param locations: See :class:`orchestrator.DeviceLightLoc`
594 """
595 raise NotImplementedError()
596
f67539c2 597 def zap_device(self, host: str, path: str) -> OrchResult[str]:
9f95a23c
TL
598 """Zap/Erase a device (DESTROYS DATA)"""
599 raise NotImplementedError()
600
f67539c2
TL
601 def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
602 """Create daemons daemon(s) for unmanaged services"""
9f95a23c
TL
603 raise NotImplementedError()
604
f67539c2 605 def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
606 """Update mon cluster"""
607 raise NotImplementedError()
608
f67539c2 609 def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
610 """Update mgr cluster"""
611 raise NotImplementedError()
612
f67539c2 613 def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
614 """Update MDS cluster"""
615 raise NotImplementedError()
616
f67539c2 617 def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
801d1391
TL
618 """Update RGW cluster"""
619 raise NotImplementedError()
620
f67539c2
TL
621 def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
622 """Update ingress daemons"""
9f95a23c
TL
623 raise NotImplementedError()
624
f67539c2 625 def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
626 """Update rbd-mirror cluster"""
627 raise NotImplementedError()
628
f67539c2 629 def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
9f95a23c
TL
630 """Update NFS cluster"""
631 raise NotImplementedError()
632
f67539c2 633 def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
1911f103
TL
634 """Update iscsi cluster"""
635 raise NotImplementedError()
636
f67539c2 637 def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
638 """Update prometheus cluster"""
639 raise NotImplementedError()
640
f67539c2 641 def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
642 """Update existing a Node-Exporter daemon(s)"""
643 raise NotImplementedError()
644
f67539c2 645 def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
9f95a23c
TL
646 """Update existing a crash daemon(s)"""
647 raise NotImplementedError()
648
f67539c2
TL
649 def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
650 """Update existing a grafana service"""
9f95a23c
TL
651 raise NotImplementedError()
652
f67539c2
TL
653 def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
654 """Update an existing AlertManager daemon(s)"""
9f95a23c
TL
655 raise NotImplementedError()
656
20effc67
TL
657 def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
658 """Update an existing snmp gateway service"""
9f95a23c
TL
659 raise NotImplementedError()
660
f67539c2 661 def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
9f95a23c
TL
662 raise NotImplementedError()
663
a4b75251
TL
664 def upgrade_ls(self, image: Optional[str], tags: bool) -> OrchResult[Dict[Any, Any]]:
665 raise NotImplementedError()
666
f67539c2 667 def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
9f95a23c
TL
668 raise NotImplementedError()
669
f67539c2 670 def upgrade_pause(self) -> OrchResult[str]:
9f95a23c
TL
671 raise NotImplementedError()
672
f67539c2 673 def upgrade_resume(self) -> OrchResult[str]:
9f95a23c
TL
674 raise NotImplementedError()
675
f67539c2 676 def upgrade_stop(self) -> OrchResult[str]:
9f95a23c
TL
677 raise NotImplementedError()
678
f67539c2 679 def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
9f95a23c
TL
680 """
681 If an upgrade is currently underway, report on where
682 we are in the process, or if some error has occurred.
683
684 :return: UpgradeStatusSpec instance
685 """
686 raise NotImplementedError()
687
688 @_hide_in_features
f67539c2 689 def upgrade_available(self) -> OrchResult:
9f95a23c
TL
690 """
691 Report on what versions are available to upgrade to
692
693 :return: List of strings
694 """
695 raise NotImplementedError()
696
697
e306af50
TL
698GenericSpec = Union[ServiceSpec, HostSpec]
699
f6b5b4d7
TL
700
701def json_to_generic_spec(spec: dict) -> GenericSpec:
e306af50
TL
702 if 'service_type' in spec and spec['service_type'] == 'host':
703 return HostSpec.from_json(spec)
704 else:
705 return ServiceSpec.from_json(spec)
9f95a23c 706
f6b5b4d7 707
f67539c2
TL
708def daemon_type_to_service(dtype: str) -> str:
709 mapping = {
710 'mon': 'mon',
711 'mgr': 'mgr',
712 'mds': 'mds',
713 'rgw': 'rgw',
714 'osd': 'osd',
715 'haproxy': 'ingress',
716 'keepalived': 'ingress',
717 'iscsi': 'iscsi',
718 'rbd-mirror': 'rbd-mirror',
719 'cephfs-mirror': 'cephfs-mirror',
720 'nfs': 'nfs',
721 'grafana': 'grafana',
722 'alertmanager': 'alertmanager',
723 'prometheus': 'prometheus',
724 'node-exporter': 'node-exporter',
725 'crash': 'crash',
726 'crashcollector': 'crash', # Specific Rook Daemon
727 'container': 'container',
20effc67
TL
728 'agent': 'agent',
729 'snmp-gateway': 'snmp-gateway',
f67539c2
TL
730 }
731 return mapping[dtype]
732
733
734def service_to_daemon_types(stype: str) -> List[str]:
735 mapping = {
736 'mon': ['mon'],
737 'mgr': ['mgr'],
738 'mds': ['mds'],
739 'rgw': ['rgw'],
740 'osd': ['osd'],
741 'ingress': ['haproxy', 'keepalived'],
742 'iscsi': ['iscsi'],
743 'rbd-mirror': ['rbd-mirror'],
744 'cephfs-mirror': ['cephfs-mirror'],
745 'nfs': ['nfs'],
746 'grafana': ['grafana'],
747 'alertmanager': ['alertmanager'],
748 'prometheus': ['prometheus'],
749 'node-exporter': ['node-exporter'],
750 'crash': ['crash'],
751 'container': ['container'],
20effc67
TL
752 'agent': ['agent'],
753 'snmp-gateway': ['snmp-gateway'],
f67539c2
TL
754 }
755 return mapping[stype]
756
757
522d829b
TL
758KNOWN_DAEMON_TYPES: List[str] = list(
759 sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
760
761
9f95a23c
TL
762class UpgradeStatusSpec(object):
763 # Orchestrator's report on what's going on with any ongoing upgrade
f67539c2 764 def __init__(self) -> None:
9f95a23c 765 self.in_progress = False # Is an upgrade underway?
f67539c2
TL
766 self.target_image: Optional[str] = None
767 self.services_complete: List[str] = [] # Which daemon types are fully updated?
768 self.progress: Optional[str] = None # How many of the daemons have we upgraded
9f95a23c
TL
769 self.message = "" # Freeform description
770
771
f67539c2 772def handle_type_error(method: FuncT) -> FuncT:
9f95a23c 773 @wraps(method)
f67539c2 774 def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
9f95a23c
TL
775 try:
776 return method(cls, *args, **kwargs)
777 except TypeError as e:
778 error_msg = '{}: {}'.format(cls.__name__, e)
779 raise OrchestratorValidationError(error_msg)
f67539c2
TL
780 return cast(FuncT, inner)
781
782
783class DaemonDescriptionStatus(enum.IntEnum):
20effc67 784 unknown = -2
f67539c2
TL
785 error = -1
786 stopped = 0
787 running = 1
20effc67
TL
788 starting = 2 #: Daemon is deployed, but not yet running
789
790 @staticmethod
791 def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
792 if status is None:
793 status = DaemonDescriptionStatus.unknown
794 return {
795 DaemonDescriptionStatus.unknown: 'unknown',
796 DaemonDescriptionStatus.error: 'error',
797 DaemonDescriptionStatus.stopped: 'stopped',
798 DaemonDescriptionStatus.running: 'running',
799 DaemonDescriptionStatus.starting: 'starting',
800 }.get(status, '<unknown>')
9f95a23c
TL
801
802
803class DaemonDescription(object):
804 """
805 For responding to queries about the status of a particular daemon,
806 stateful or stateless.
807
808 This is not about health or performance monitoring of daemons: it's
809 about letting the orchestrator tell Ceph whether and where a
810 daemon is scheduled in the cluster. When an orchestrator tells
811 Ceph "it's running on host123", that's not a promise that the process
812 is literally up this second, it's a description of where the orchestrator
813 has decided the daemon should run.
814 """
815
816 def __init__(self,
f67539c2
TL
817 daemon_type: Optional[str] = None,
818 daemon_id: Optional[str] = None,
819 hostname: Optional[str] = None,
820 container_id: Optional[str] = None,
821 container_image_id: Optional[str] = None,
822 container_image_name: Optional[str] = None,
823 container_image_digests: Optional[List[str]] = None,
824 version: Optional[str] = None,
825 status: Optional[DaemonDescriptionStatus] = None,
826 status_desc: Optional[str] = None,
827 last_refresh: Optional[datetime.datetime] = None,
828 created: Optional[datetime.datetime] = None,
829 started: Optional[datetime.datetime] = None,
830 last_configured: Optional[datetime.datetime] = None,
831 osdspec_affinity: Optional[str] = None,
832 last_deployed: Optional[datetime.datetime] = None,
f6b5b4d7 833 events: Optional[List['OrchestratorEvent']] = None,
f67539c2
TL
834 is_active: bool = False,
835 memory_usage: Optional[int] = None,
836 memory_request: Optional[int] = None,
837 memory_limit: Optional[int] = None,
838 service_name: Optional[str] = None,
839 ports: Optional[List[int]] = None,
840 ip: Optional[str] = None,
841 deployed_by: Optional[List[str]] = None,
b3b6e05e
TL
842 rank: Optional[int] = None,
843 rank_generation: Optional[int] = None,
20effc67 844 extra_container_args: Optional[List[str]] = None,
f67539c2 845 ) -> None:
f6b5b4d7 846
20effc67 847 #: Host is at the same granularity as InventoryHost
f67539c2 848 self.hostname: Optional[str] = hostname
9f95a23c
TL
849
850 # Not everyone runs in containers, but enough people do to
851 # justify having the container_id (runtime id) and container_image
852 # (image name)
853 self.container_id = container_id # runtime id
f67539c2 854 self.container_image_id = container_image_id # image id locally
9f95a23c 855 self.container_image_name = container_image_name # image friendly name
f67539c2 856 self.container_image_digests = container_image_digests # reg hashes
9f95a23c 857
20effc67 858 #: The type of service (osd, mon, mgr, etc.)
9f95a23c
TL
859 self.daemon_type = daemon_type
860
20effc67
TL
861 #: The orchestrator will have picked some names for daemons,
862 #: typically either based on hostnames or on pod names.
863 #: This is the <foo> in mds.<foo>, the ID that will appear
864 #: in the FSMap/ServiceMap.
f67539c2 865 self.daemon_id: Optional[str] = daemon_id
a4b75251 866 self.daemon_name = self.name()
f67539c2 867
20effc67 868 #: Some daemon types have a numeric rank assigned
b3b6e05e
TL
869 self.rank: Optional[int] = rank
870 self.rank_generation: Optional[int] = rank_generation
871
f67539c2 872 self._service_name: Optional[str] = service_name
9f95a23c 873
20effc67 874 #: Service version that was deployed
9f95a23c
TL
875 self.version = version
876
20effc67
TL
877 # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
878 self._status = status
9f95a23c 879
20effc67 880 #: Service status description when status == error.
9f95a23c
TL
881 self.status_desc = status_desc
882
20effc67 883 #: datetime when this info was last refreshed
f6b5b4d7 884 self.last_refresh: Optional[datetime.datetime] = last_refresh
9f95a23c 885
f6b5b4d7
TL
886 self.created: Optional[datetime.datetime] = created
887 self.started: Optional[datetime.datetime] = started
888 self.last_configured: Optional[datetime.datetime] = last_configured
889 self.last_deployed: Optional[datetime.datetime] = last_deployed
9f95a23c 890
20effc67 891 #: Affinity to a certain OSDSpec
f6b5b4d7
TL
892 self.osdspec_affinity: Optional[str] = osdspec_affinity
893
894 self.events: List[OrchestratorEvent] = events or []
f91f0fd5 895
f67539c2
TL
896 self.memory_usage: Optional[int] = memory_usage
897 self.memory_request: Optional[int] = memory_request
898 self.memory_limit: Optional[int] = memory_limit
899
900 self.ports: Optional[List[int]] = ports
901 self.ip: Optional[str] = ip
902
903 self.deployed_by = deployed_by
904
f6b5b4d7 905 self.is_active = is_active
e306af50 906
20effc67
TL
907 self.extra_container_args = extra_container_args
908
909 @property
910 def status(self) -> Optional[DaemonDescriptionStatus]:
911 return self._status
912
913 @status.setter
914 def status(self, new: DaemonDescriptionStatus) -> None:
915 self._status = new
916 self.status_desc = DaemonDescriptionStatus.to_str(new)
917
f67539c2
TL
918 def get_port_summary(self) -> str:
919 if not self.ports:
920 return ''
921 return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
922
923 def name(self) -> str:
9f95a23c
TL
924 return '%s.%s' % (self.daemon_type, self.daemon_id)
925
f6b5b4d7 926 def matches_service(self, service_name: Optional[str]) -> bool:
f67539c2
TL
927 assert self.daemon_id is not None
928 assert self.daemon_type is not None
9f95a23c 929 if service_name:
f67539c2 930 return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
9f95a23c
TL
931 return False
932
f67539c2
TL
933 def service_id(self) -> str:
934 assert self.daemon_id is not None
935 assert self.daemon_type is not None
f6b5b4d7 936
f67539c2
TL
937 if self._service_name:
938 if '.' in self._service_name:
939 return self._service_name.split('.', 1)[1]
940 else:
941 return ''
942
943 if self.daemon_type == 'osd':
944 if self.osdspec_affinity and self.osdspec_affinity != 'None':
945 return self.osdspec_affinity
20effc67 946 return ''
f67539c2
TL
947
948 def _match() -> str:
949 assert self.daemon_id is not None
f6b5b4d7
TL
950 err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
951 f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
e306af50
TL
952
953 if not self.hostname:
954 # TODO: can a DaemonDescription exist without a hostname?
955 raise err
956
957 # use the bare hostname, not the FQDN.
958 host = self.hostname.split('.')[0]
959
960 if host == self.daemon_id:
961 # daemon_id == "host"
962 return self.daemon_id
963
964 elif host in self.daemon_id:
965 # daemon_id == "service_id.host"
966 # daemon_id == "service_id.host.random"
967 pre, post = self.daemon_id.rsplit(host, 1)
968 if not pre.endswith('.'):
969 # '.' sep missing at front of host
970 raise err
971 elif post and not post.startswith('.'):
972 # '.' sep missing at end of host
973 raise err
1911f103 974 return pre[:-1]
e306af50
TL
975
976 # daemon_id == "service_id.random"
977 if self.daemon_type == 'rgw':
1911f103 978 v = self.daemon_id.split('.')
e306af50 979 if len(v) in [3, 4]:
1911f103 980 return '.'.join(v[0:2])
e306af50 981
f91f0fd5
TL
982 if self.daemon_type == 'iscsi':
983 v = self.daemon_id.split('.')
984 return '.'.join(v[0:-1])
985
e306af50
TL
986 # daemon_id == "service_id"
987 return self.daemon_id
988
f67539c2 989 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
e306af50
TL
990 return _match()
991
992 return self.daemon_id
1911f103 993
f67539c2
TL
994 def service_name(self) -> str:
995 if self._service_name:
996 return self._service_name
997 assert self.daemon_type is not None
998 if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
999 return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
1000 return daemon_type_to_service(self.daemon_type)
9f95a23c 1001
f67539c2 1002 def __repr__(self) -> str:
9f95a23c
TL
1003 return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
1004 id=self.daemon_id)
1005
20effc67
TL
1006 def __str__(self) -> str:
1007 return f"{self.name()} in status {self.status_desc} on {self.hostname}"
1008
f67539c2
TL
1009 def to_json(self) -> dict:
1010 out: Dict[str, Any] = OrderedDict()
f6b5b4d7
TL
1011 out['daemon_type'] = self.daemon_type
1012 out['daemon_id'] = self.daemon_id
b3b6e05e 1013 out['service_name'] = self._service_name
a4b75251 1014 out['daemon_name'] = self.name()
f6b5b4d7
TL
1015 out['hostname'] = self.hostname
1016 out['container_id'] = self.container_id
1017 out['container_image_id'] = self.container_image_id
1018 out['container_image_name'] = self.container_image_name
f67539c2
TL
1019 out['container_image_digests'] = self.container_image_digests
1020 out['memory_usage'] = self.memory_usage
1021 out['memory_request'] = self.memory_request
1022 out['memory_limit'] = self.memory_limit
f6b5b4d7 1023 out['version'] = self.version
f67539c2 1024 out['status'] = self.status.value if self.status is not None else None
f6b5b4d7
TL
1025 out['status_desc'] = self.status_desc
1026 if self.daemon_type == 'osd':
1027 out['osdspec_affinity'] = self.osdspec_affinity
1028 out['is_active'] = self.is_active
f67539c2
TL
1029 out['ports'] = self.ports
1030 out['ip'] = self.ip
b3b6e05e
TL
1031 out['rank'] = self.rank
1032 out['rank_generation'] = self.rank_generation
f6b5b4d7 1033
9f95a23c
TL
1034 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1035 'last_configured']:
1036 if getattr(self, k):
adb31ebb 1037 out[k] = datetime_to_str(getattr(self, k))
f6b5b4d7
TL
1038
1039 if self.events:
1040 out['events'] = [e.to_json() for e in self.events]
1041
1042 empty = [k for k, v in out.items() if v is None]
1043 for e in empty:
1044 del out[e]
1045 return out
9f95a23c 1046
b3b6e05e
TL
1047 def to_dict(self) -> dict:
1048 out: Dict[str, Any] = OrderedDict()
1049 out['daemon_type'] = self.daemon_type
1050 out['daemon_id'] = self.daemon_id
a4b75251 1051 out['daemon_name'] = self.name()
b3b6e05e
TL
1052 out['hostname'] = self.hostname
1053 out['container_id'] = self.container_id
1054 out['container_image_id'] = self.container_image_id
1055 out['container_image_name'] = self.container_image_name
1056 out['container_image_digests'] = self.container_image_digests
1057 out['memory_usage'] = self.memory_usage
1058 out['memory_request'] = self.memory_request
1059 out['memory_limit'] = self.memory_limit
1060 out['version'] = self.version
1061 out['status'] = self.status.value if self.status is not None else None
1062 out['status_desc'] = self.status_desc
1063 if self.daemon_type == 'osd':
1064 out['osdspec_affinity'] = self.osdspec_affinity
1065 out['is_active'] = self.is_active
1066 out['ports'] = self.ports
1067 out['ip'] = self.ip
1068
1069 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1070 'last_configured']:
1071 if getattr(self, k):
1072 out[k] = datetime_to_str(getattr(self, k))
1073
1074 if self.events:
1075 out['events'] = [e.to_dict() for e in self.events]
1076
1077 empty = [k for k, v in out.items() if v is None]
1078 for e in empty:
1079 del out[e]
1080 return out
1081
9f95a23c
TL
1082 @classmethod
1083 @handle_type_error
f67539c2 1084 def from_json(cls, data: dict) -> 'DaemonDescription':
9f95a23c 1085 c = data.copy()
f6b5b4d7 1086 event_strs = c.pop('events', [])
9f95a23c
TL
1087 for k in ['last_refresh', 'created', 'started', 'last_deployed',
1088 'last_configured']:
1089 if k in c:
adb31ebb 1090 c[k] = str_to_datetime(c[k])
f6b5b4d7 1091 events = [OrchestratorEvent.from_json(e) for e in event_strs]
f67539c2 1092 status_int = c.pop('status', None)
a4b75251
TL
1093 if 'daemon_name' in c:
1094 del c['daemon_name']
20effc67
TL
1095 if 'service_name' in c and c['service_name'].startswith('osd.'):
1096 # if the service_name is a osd.NNN (numeric osd id) then
1097 # ignore it -- it is not a valid service_name and
1098 # (presumably) came from an older version of cephadm.
1099 try:
1100 int(c['service_name'][4:])
1101 del c['service_name']
1102 except ValueError:
1103 pass
f67539c2
TL
1104 status = DaemonDescriptionStatus(status_int) if status_int is not None else None
1105 return cls(events=events, status=status, **c)
9f95a23c 1106
f67539c2 1107 def __copy__(self) -> 'DaemonDescription':
1911f103
TL
1108 # feel free to change this:
1109 return DaemonDescription.from_json(self.to_json())
1110
f6b5b4d7 1111 @staticmethod
f67539c2 1112 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
522d829b 1113 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
f6b5b4d7
TL
1114
1115
1116yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
1117
1118
9f95a23c
TL
1119class ServiceDescription(object):
1120 """
1121 For responding to queries about the status of a particular service,
1122 stateful or stateless.
1123
1124 This is not about health or performance monitoring of services: it's
1125 about letting the orchestrator tell Ceph whether and where a
1126 service is scheduled in the cluster. When an orchestrator tells
1127 Ceph "it's running on host123", that's not a promise that the process
1128 is literally up this second, it's a description of where the orchestrator
1129 has decided the service should run.
1130 """
1131
1132 def __init__(self,
1911f103 1133 spec: ServiceSpec,
f67539c2
TL
1134 container_image_id: Optional[str] = None,
1135 container_image_name: Optional[str] = None,
f67539c2
TL
1136 service_url: Optional[str] = None,
1137 last_refresh: Optional[datetime.datetime] = None,
1138 created: Optional[datetime.datetime] = None,
1139 deleted: Optional[datetime.datetime] = None,
1140 size: int = 0,
1141 running: int = 0,
1142 events: Optional[List['OrchestratorEvent']] = None,
1143 virtual_ip: Optional[str] = None,
1144 ports: List[int] = []) -> None:
9f95a23c
TL
1145 # Not everyone runs in containers, but enough people do to
1146 # justify having the container_image_id (image hash) and container_image
1147 # (image name)
1148 self.container_image_id = container_image_id # image hash
1149 self.container_image_name = container_image_name # image friendly name
1150
9f95a23c
TL
1151 # If the service exposes REST-like API, this attribute should hold
1152 # the URL.
1153 self.service_url = service_url
1154
1155 # Number of daemons
1156 self.size = size
1157
1158 # Number of daemons up
1159 self.running = running
1160
1161 # datetime when this info was last refreshed
f6b5b4d7
TL
1162 self.last_refresh: Optional[datetime.datetime] = last_refresh
1163 self.created: Optional[datetime.datetime] = created
f67539c2 1164 self.deleted: Optional[datetime.datetime] = deleted
9f95a23c 1165
1911f103 1166 self.spec: ServiceSpec = spec
9f95a23c 1167
f6b5b4d7
TL
1168 self.events: List[OrchestratorEvent] = events or []
1169
f67539c2
TL
1170 self.virtual_ip = virtual_ip
1171 self.ports = ports
1172
1173 def service_type(self) -> str:
1911f103 1174 return self.spec.service_type
9f95a23c 1175
f67539c2 1176 def __repr__(self) -> str:
1911f103 1177 return f"<ServiceDescription of {self.spec.one_line_str()}>"
9f95a23c 1178
f67539c2
TL
1179 def get_port_summary(self) -> str:
1180 if not self.ports:
1181 return ''
1182 return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
1183
f6b5b4d7 1184 def to_json(self) -> OrderedDict:
1911f103
TL
1185 out = self.spec.to_json()
1186 status = {
9f95a23c
TL
1187 'container_image_id': self.container_image_id,
1188 'container_image_name': self.container_image_name,
9f95a23c
TL
1189 'service_url': self.service_url,
1190 'size': self.size,
1191 'running': self.running,
1911f103 1192 'last_refresh': self.last_refresh,
f6b5b4d7 1193 'created': self.created,
f67539c2
TL
1194 'virtual_ip': self.virtual_ip,
1195 'ports': self.ports if self.ports else None,
9f95a23c
TL
1196 }
1197 for k in ['last_refresh', 'created']:
1198 if getattr(self, k):
adb31ebb 1199 status[k] = datetime_to_str(getattr(self, k))
1911f103
TL
1200 status = {k: v for (k, v) in status.items() if v is not None}
1201 out['status'] = status
f6b5b4d7
TL
1202 if self.events:
1203 out['events'] = [e.to_json() for e in self.events]
1911f103 1204 return out
9f95a23c 1205
b3b6e05e
TL
1206 def to_dict(self) -> OrderedDict:
1207 out = self.spec.to_json()
1208 status = {
1209 'container_image_id': self.container_image_id,
1210 'container_image_name': self.container_image_name,
b3b6e05e
TL
1211 'service_url': self.service_url,
1212 'size': self.size,
1213 'running': self.running,
1214 'last_refresh': self.last_refresh,
1215 'created': self.created,
1216 'virtual_ip': self.virtual_ip,
1217 'ports': self.ports if self.ports else None,
1218 }
1219 for k in ['last_refresh', 'created']:
1220 if getattr(self, k):
1221 status[k] = datetime_to_str(getattr(self, k))
1222 status = {k: v for (k, v) in status.items() if v is not None}
1223 out['status'] = status
1224 if self.events:
1225 out['events'] = [e.to_dict() for e in self.events]
1226 return out
1227
9f95a23c
TL
1228 @classmethod
1229 @handle_type_error
f67539c2 1230 def from_json(cls, data: dict) -> 'ServiceDescription':
9f95a23c 1231 c = data.copy()
1911f103 1232 status = c.pop('status', {})
f6b5b4d7 1233 event_strs = c.pop('events', [])
1911f103
TL
1234 spec = ServiceSpec.from_json(c)
1235
1236 c_status = status.copy()
9f95a23c 1237 for k in ['last_refresh', 'created']:
1911f103 1238 if k in c_status:
adb31ebb 1239 c_status[k] = str_to_datetime(c_status[k])
f6b5b4d7
TL
1240 events = [OrchestratorEvent.from_json(e) for e in event_strs]
1241 return cls(spec=spec, events=events, **c_status)
1242
1243 @staticmethod
522d829b
TL
1244 def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
1245 return dumper.represent_dict(cast(Mapping, data.to_json().items()))
f6b5b4d7
TL
1246
1247
1248yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
9f95a23c
TL
1249
1250
1251class InventoryFilter(object):
1252 """
1253 When fetching inventory, use this filter to avoid unnecessarily
1254 scanning the whole estate.
1255
1d09f67e 1256 Typical use:
9f95a23c 1257
1d09f67e
TL
1258 filter by host when presentig UI workflow for configuring
1259 a particular server.
1260 filter by label when not all of estate is Ceph servers,
1261 and we want to only learn about the Ceph servers.
1262 filter by label when we are interested particularly
1263 in e.g. OSD servers.
9f95a23c 1264 """
f6b5b4d7
TL
1265
1266 def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
9f95a23c
TL
1267
1268 #: Optional: get info about hosts matching labels
1269 self.labels = labels
1270
1271 #: Optional: get info about certain named hosts only
1272 self.hosts = hosts
1273
1274
1275class InventoryHost(object):
1276 """
1277 When fetching inventory, all Devices are groups inside of an
1278 InventoryHost.
1279 """
f6b5b4d7
TL
1280
1281 def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
9f95a23c
TL
1282 if devices is None:
1283 devices = inventory.Devices([])
1284 if labels is None:
1285 labels = []
1286 assert isinstance(devices, inventory.Devices)
1287
1288 self.name = name # unique within cluster. For example a hostname.
1289 self.addr = addr or name
1290 self.devices = devices
1291 self.labels = labels
1292
f67539c2 1293 def to_json(self) -> dict:
9f95a23c
TL
1294 return {
1295 'name': self.name,
1296 'addr': self.addr,
1297 'devices': self.devices.to_json(),
1298 'labels': self.labels,
1299 }
1300
1301 @classmethod
f67539c2 1302 def from_json(cls, data: dict) -> 'InventoryHost':
9f95a23c
TL
1303 try:
1304 _data = copy.deepcopy(data)
1305 name = _data.pop('name')
1306 addr = _data.pop('addr', None) or name
1307 devices = inventory.Devices.from_json(_data.pop('devices'))
1911f103 1308 labels = _data.pop('labels', list())
9f95a23c
TL
1309 if _data:
1310 error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
1311 raise OrchestratorValidationError(error_msg)
9f95a23c
TL
1312 return cls(name, devices, labels, addr)
1313 except KeyError as e:
1314 error_msg = '{} is required for {}'.format(e, cls.__name__)
1315 raise OrchestratorValidationError(error_msg)
1316 except TypeError as e:
1317 raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
1318
9f95a23c 1319 @classmethod
f67539c2 1320 def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
9f95a23c
TL
1321 devs = inventory.Devices.from_json
1322 return [cls(item[0], devs(item[1].data)) for item in hosts]
1323
f67539c2 1324 def __repr__(self) -> str:
9f95a23c
TL
1325 return "<InventoryHost>({name})".format(name=self.name)
1326
1327 @staticmethod
f6b5b4d7 1328 def get_host_names(hosts: List['InventoryHost']) -> List[str]:
9f95a23c
TL
1329 return [host.name for host in hosts]
1330
f67539c2 1331 def __eq__(self, other: Any) -> bool:
9f95a23c
TL
1332 return self.name == other.name and self.devices == other.devices
1333
1334
1335class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
1336 """
1337 Describes a specific device on a specific host. Used for enabling or disabling LEDs
1338 on devices.
1339
1340 hostname as in :func:`orchestrator.Orchestrator.get_hosts`
1341
1342 device_id: e.g. ``ABC1234DEF567-1R1234_ABC8DE0Q``.
1343 See ``ceph osd metadata | jq '.[].device_ids'``
1344 """
1345 __slots__ = ()
1346
1347
f6b5b4d7
TL
1348class OrchestratorEvent:
1349 """
1350 Similar to K8s Events.
1351
1352 Some form of "important" log message attached to something.
1353 """
1354 INFO = 'INFO'
1355 ERROR = 'ERROR'
1356 regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
1357
f67539c2
TL
1358 def __init__(self, created: Union[str, datetime.datetime], kind: str,
1359 subject: str, level: str, message: str) -> None:
f6b5b4d7 1360 if isinstance(created, str):
adb31ebb 1361 created = str_to_datetime(created)
f6b5b4d7
TL
1362 self.created: datetime.datetime = created
1363
1364 assert kind in "service daemon".split()
1365 self.kind: str = kind
1366
1367 # service name, or daemon danem or something
1368 self.subject: str = subject
1369
1370 # Events are not meant for debugging. debugs should end in the log.
1371 assert level in "INFO ERROR".split()
1372 self.level = level
1373
1374 self.message: str = message
1375
1376 __slots__ = ('created', 'kind', 'subject', 'level', 'message')
1377
1378 def kind_subject(self) -> str:
1379 return f'{self.kind}:{self.subject}'
1380
1381 def to_json(self) -> str:
1382 # Make a long list of events readable.
adb31ebb 1383 created = datetime_to_str(self.created)
f6b5b4d7
TL
1384 return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
1385
b3b6e05e
TL
1386 def to_dict(self) -> dict:
1387 # Convert events data to dict.
1388 return {
1389 'created': datetime_to_str(self.created),
1390 'subject': self.kind_subject(),
1391 'level': self.level,
1392 'message': self.message
1393 }
1394
f6b5b4d7
TL
1395 @classmethod
1396 @handle_type_error
f67539c2 1397 def from_json(cls, data: str) -> "OrchestratorEvent":
f6b5b4d7
TL
1398 """
1399 >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
adb31ebb 1400 '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
f6b5b4d7
TL
1401
1402 :param data:
1403 :return:
1404 """
1405 match = cls.regex_v1.match(data)
1406 if match:
1407 return cls(*match.groups())
1408 raise ValueError(f'Unable to match: "{data}"')
1409
f67539c2 1410 def __eq__(self, other: Any) -> bool:
f6b5b4d7
TL
1411 if not isinstance(other, OrchestratorEvent):
1412 return False
1413
1414 return self.created == other.created and self.kind == other.kind \
1415 and self.subject == other.subject and self.message == other.message
1416
20effc67
TL
1417 def __repr__(self) -> str:
1418 return f'OrchestratorEvent.from_json({self.to_json()!r})'
1419
f6b5b4d7 1420
f67539c2 1421def _mk_orch_methods(cls: Any) -> Any:
9f95a23c
TL
1422 # Needs to be defined outside of for.
1423 # Otherwise meth is always bound to last key
f67539c2
TL
1424 def shim(method_name: str) -> Callable:
1425 def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
9f95a23c
TL
1426 completion = self._oremote(method_name, args, kwargs)
1427 return completion
1428 return inner
1429
1d09f67e
TL
1430 for name, method in Orchestrator.__dict__.items():
1431 if not name.startswith('_') and name not in ['is_orchestrator_module']:
1432 remote_call = update_wrapper(shim(name), method)
1433 setattr(cls, name, remote_call)
9f95a23c
TL
1434 return cls
1435
1436
1437@_mk_orch_methods
1438class OrchestratorClientMixin(Orchestrator):
1439 """
1440 A module that inherents from `OrchestratorClientMixin` can directly call
1441 all :class:`Orchestrator` methods without manually calling remote.
1442
1443 Every interface method from ``Orchestrator`` is converted into a stub method that internally
1444 calls :func:`OrchestratorClientMixin._oremote`
1445
1446 >>> class MyModule(OrchestratorClientMixin):
1447 ... def func(self):
1448 ... completion = self.add_host('somehost') # calls `_oremote()`
9f95a23c
TL
1449 ... self.log.debug(completion.result)
1450
1451 .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
1452 Reason is, that OrchestratorClientMixin magically redirects all methods to the
1453 "real" implementation of the orchestrator.
1454
1455
1456 >>> import mgr_module
1911f103
TL
1457 >>> #doctest: +SKIP
1458 ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
9f95a23c
TL
1459 ... def __init__(self, ...):
1460 ... self.orch_client = OrchestratorClientMixin()
1461 ... self.orch_client.set_mgr(self.mgr))
1462 """
1463
f6b5b4d7 1464 def set_mgr(self, mgr: MgrModule) -> None:
9f95a23c
TL
1465 """
1466 Useable in the Dashbord that uses a global ``mgr``
1467 """
1468
1469 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
1470
f67539c2 1471 def __get_mgr(self) -> Any:
9f95a23c
TL
1472 try:
1473 return self.__mgr
1474 except AttributeError:
1475 return self
1476
f67539c2 1477 def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
9f95a23c
TL
1478 """
1479 Helper for invoking `remote` on whichever orchestrator is enabled
1480
1481 :raises RuntimeError: If the remote method failed.
1482 :raises OrchestratorError: orchestrator failed to perform
1483 :raises ImportError: no `orchestrator` module or backend not found.
1484 """
1485 mgr = self.__get_mgr()
1486
1487 try:
1488 o = mgr._select_orchestrator()
1489 except AttributeError:
1490 o = mgr.remote('orchestrator', '_select_orchestrator')
1491
1492 if o is None:
1493 raise NoOrchestrator()
1494
1495 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
1496 try:
1497 return mgr.remote(o, meth, *args, **kwargs)
1498 except Exception as e:
1499 if meth == 'get_feature_set':
1500 raise # self.get_feature_set() calls self._oremote()
1501 f_set = self.get_feature_set()
1502 if meth not in f_set or not f_set[meth]['available']:
1503 raise NotImplementedError(f'{o} does not implement {meth}') from e
1504 raise