]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/orchestrator.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / pybind / mgr / orchestrator.py
CommitLineData
11fdf7f2
TL
1
2"""
3ceph-mgr orchestrator interface
4
5Please see the ceph-mgr module developer's guide for more information.
6"""
7import sys
8import time
9import fnmatch
81eedcae 10import uuid
494da23a 11import datetime
81eedcae
TL
12
13import six
14
494da23a 15from mgr_module import MgrModule, PersistentStoreDict
81eedcae 16from mgr_util import format_bytes
11fdf7f2
TL
17
18try:
494da23a
TL
19 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator
20
11fdf7f2
TL
21 T = TypeVar('T')
22 G = Generic[T]
23except ImportError:
24 T, G = object, object
25
11fdf7f2
TL
26
27class OrchestratorError(Exception):
28 """
29 General orchestrator specific error.
30
31 Used for deployment, configuration or user errors.
32
33 It's not intended for programming errors or orchestrator internal errors.
34 """
35
36
37class NoOrchestrator(OrchestratorError):
38 """
39 No orchestrator in configured.
40 """
41 def __init__(self, msg="No orchestrator configured (try `ceph orchestrator set backend`)"):
42 super(NoOrchestrator, self).__init__(msg)
43
44
45class OrchestratorValidationError(OrchestratorError):
46 """
47 Raised when an orchestrator doesn't support a specific feature.
48 """
49
50
51class _Completion(G):
52 @property
53 def result(self):
54 # type: () -> T
55 """
56 Return the result of the operation that we were waited
57 for. Only valid after calling Orchestrator.wait() on this
58 completion.
59 """
60 raise NotImplementedError()
61
62 @property
63 def exception(self):
64 # type: () -> Optional[Exception]
65 """
66 Holds an exception object.
67 """
68 try:
69 return self.__exception
70 except AttributeError:
71 return None
72
73 @exception.setter
74 def exception(self, value):
75 self.__exception = value
76
77 @property
78 def is_read(self):
79 # type: () -> bool
80 raise NotImplementedError()
81
82 @property
83 def is_complete(self):
84 # type: () -> bool
85 raise NotImplementedError()
86
87 @property
88 def is_errored(self):
89 # type: () -> bool
90 """
91 Has the completion failed. Default implementation looks for
92 self.exception. Can be overwritten.
93 """
94 return self.exception is not None
95
96 @property
97 def should_wait(self):
98 # type: () -> bool
99 raise NotImplementedError()
100
101
102def raise_if_exception(c):
103 # type: (_Completion) -> None
104 """
105 :raises OrchestratorError: Some user error or a config error.
106 :raises Exception: Some internal error
107 """
108 def copy_to_this_subinterpreter(r_obj):
109 # This is something like `return pickle.loads(pickle.dumps(r_obj))`
110 # Without importing anything.
111 r_cls = r_obj.__class__
112 if r_cls.__module__ == '__builtin__':
113 return r_obj
114 my_cls = getattr(sys.modules[r_cls.__module__], r_cls.__name__)
115 if id(my_cls) == id(r_cls):
116 return r_obj
117 my_obj = my_cls.__new__(my_cls)
118 for k,v in r_obj.__dict__.items():
119 setattr(my_obj, k, copy_to_this_subinterpreter(v))
120 return my_obj
121
122 if c.exception is not None:
123 raise copy_to_this_subinterpreter(c.exception)
124
125
126class ReadCompletion(_Completion):
127 """
128 ``Orchestrator`` implementations should inherit from this
129 class to implement their own handles to operations in progress, and
130 return an instance of their subclass from calls into methods.
131 """
132
133 def __init__(self):
134 pass
135
136 @property
137 def is_read(self):
138 return True
139
140 @property
141 def should_wait(self):
142 """Could the external operation be deemed as complete,
143 or should we wait?
144 We must wait for a read operation only if it is not complete.
145 """
146 return not self.is_complete
147
148
494da23a
TL
149class TrivialReadCompletion(ReadCompletion):
150 """
151 This is the trivial completion simply wrapping a result.
152 """
153 def __init__(self, result):
154 super(TrivialReadCompletion, self).__init__()
155 self._result = result
156
157 @property
158 def result(self):
159 return self._result
160
161 @property
162 def is_complete(self):
163 return True
164
165
11fdf7f2
TL
166class WriteCompletion(_Completion):
167 """
168 ``Orchestrator`` implementations should inherit from this
169 class to implement their own handles to operations in progress, and
170 return an instance of their subclass from calls into methods.
171 """
172
173 def __init__(self):
81eedcae
TL
174 self.progress_id = str(uuid.uuid4())
175
176 #: if a orchestrator module can provide a more detailed
177 #: progress information, it needs to also call ``progress.update()``.
178 self.progress = 0.5
179
180 def __str__(self):
181 """
182 ``__str__()`` is used for determining the message for progress events.
183 """
184 return super(WriteCompletion, self).__str__()
11fdf7f2
TL
185
186 @property
187 def is_persistent(self):
188 # type: () -> bool
189 """
190 Has the operation updated the orchestrator's configuration
191 persistently? Typically this would indicate that an update
192 had been written to a manifest, but that the update
193 had not necessarily been pushed out to the cluster.
194 """
195 raise NotImplementedError()
196
197 @property
198 def is_effective(self):
199 """
200 Has the operation taken effect on the cluster? For example,
201 if we were adding a service, has it come up and appeared
202 in Ceph's cluster maps?
203 """
204 raise NotImplementedError()
205
206 @property
207 def is_complete(self):
208 return self.is_errored or (self.is_persistent and self.is_effective)
209
210 @property
211 def is_read(self):
212 return False
213
214 @property
215 def should_wait(self):
216 """Could the external operation be deemed as complete,
217 or should we wait?
218 We must wait for a write operation only if we know
219 it is not persistent yet.
220 """
221 return not self.is_persistent
222
223
224class Orchestrator(object):
225 """
226 Calls in this class may do long running remote operations, with time
227 periods ranging from network latencies to package install latencies and large
228 internet downloads. For that reason, all are asynchronous, and return
229 ``Completion`` objects.
230
231 Implementations are not required to start work on an operation until
232 the caller waits on the relevant Completion objects. Callers making
233 multiple updates should not wait on Completions until they're done
234 sending operations: this enables implementations to batch up a series
235 of updates when wait() is called on a set of Completion objects.
236
237 Implementations are encouraged to keep reasonably fresh caches of
238 the status of the system: it is better to serve a stale-but-recent
239 result read of e.g. device inventory than it is to keep the caller waiting
240 while you scan hosts every time.
241 """
242
243 def is_orchestrator_module(self):
244 """
245 Enable other modules to interrogate this module to discover
246 whether it's usable as an orchestrator module.
247
248 Subclasses do not need to override this.
249 """
250 return True
251
252 def available(self):
81eedcae 253 # type: () -> Tuple[bool, str]
11fdf7f2
TL
254 """
255 Report whether we can talk to the orchestrator. This is the
256 place to give the user a meaningful message if the orchestrator
257 isn't running or can't be contacted.
258
259 This method may be called frequently (e.g. every page load
260 to conditionally display a warning banner), so make sure it's
261 not too expensive. It's okay to give a slightly stale status
262 (e.g. based on a periodic background ping of the orchestrator)
263 if that's necessary to make this method fast.
264
81eedcae
TL
265 ..note:: `True` doesn't mean that the desired functionality
266 is actually available in the orchestrator. I.e. this
267 won't work as expected::
268
269 >>> if OrchestratorClientMixin().available()[0]: # wrong.
270 ... OrchestratorClientMixin().get_hosts()
11fdf7f2
TL
271
272 :return: two-tuple of boolean, string
273 """
81eedcae 274 raise NotImplementedError()
11fdf7f2
TL
275
276 def wait(self, completions):
277 """
278 Given a list of Completion instances, progress any which are
279 incomplete. Return a true if everything is done.
280
281 Callers should inspect the detail of each completion to identify
282 partial completion/progress information, and present that information
283 to the user.
284
285 For fast operations (e.g. reading from a database), implementations
286 may choose to do blocking IO in this call.
287
288 :rtype: bool
289 """
290 raise NotImplementedError()
291
292 def add_host(self, host):
293 # type: (str) -> WriteCompletion
294 """
295 Add a host to the orchestrator inventory.
296
297 :param host: hostname
298 """
299 raise NotImplementedError()
300
301 def remove_host(self, host):
302 # type: (str) -> WriteCompletion
303 """
304 Remove a host from the orchestrator inventory.
305
306 :param host: hostname
307 """
308 raise NotImplementedError()
309
310 def get_hosts(self):
311 # type: () -> ReadCompletion[List[InventoryNode]]
312 """
313 Report the hosts in the cluster.
314
315 The default implementation is extra slow.
316
317 :return: list of InventoryNodes
318 """
319 return self.get_inventory()
320
321 def get_inventory(self, node_filter=None, refresh=False):
322 # type: (InventoryFilter, bool) -> ReadCompletion[List[InventoryNode]]
323 """
324 Returns something that was created by `ceph-volume inventory`.
325
326 :return: list of InventoryNode
327 """
328 raise NotImplementedError()
329
494da23a
TL
330 def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
331 # type: (Optional[str], Optional[str], Optional[str], bool) -> ReadCompletion[List[ServiceDescription]]
11fdf7f2
TL
332 """
333 Describe a service (of any kind) that is already configured in
334 the orchestrator. For example, when viewing an OSD in the dashboard
335 we might like to also display information about the orchestrator's
336 view of the service (like the kubernetes pod ID).
337
338 When viewing a CephFS filesystem in the dashboard, we would use this
339 to display the pods being currently run for MDS daemons.
340
341 :return: list of ServiceDescription objects.
342 """
343 raise NotImplementedError()
344
345 def service_action(self, action, service_type, service_name=None, service_id=None):
346 # type: (str, str, str, str) -> WriteCompletion
347 """
348 Perform an action (start/stop/reload) on a service.
349
350 Either service_name or service_id must be specified:
351
352 * If using service_name, perform the action on that entire logical
353 service (i.e. all daemons providing that named service).
354 * If using service_id, perform the action on a single specific daemon
355 instance.
356
357 :param action: one of "start", "stop", "reload"
358 :param service_type: e.g. "mds", "rgw", ...
359 :param service_name: name of logical service ("cephfs", "us-east", ...)
360 :param service_id: service daemon instance (usually a short hostname)
361 :rtype: WriteCompletion
362 """
363 assert action in ["start", "stop", "reload"]
364 assert service_name or service_id
365 assert not (service_name and service_id)
366 raise NotImplementedError()
367
368 def create_osds(self, drive_group, all_hosts):
369 # type: (DriveGroupSpec, List[str]) -> WriteCompletion
370 """
371 Create one or more OSDs within a single Drive Group.
372
373 The principal argument here is the drive_group member
374 of OsdSpec: other fields are advisory/extensible for any
375 finer-grained OSD feature enablement (choice of backing store,
376 compression/encryption, etc).
377
378 :param drive_group: DriveGroupSpec
379 :param all_hosts: TODO, this is required because the orchestrator methods are not composable
380 Probably this parameter can be easily removed because each orchestrator can use
381 the "get_inventory" method and the "drive_group.host_pattern" attribute
382 to obtain the list of hosts where to apply the operation
383 """
384 raise NotImplementedError()
385
386 def replace_osds(self, drive_group):
387 # type: (DriveGroupSpec) -> WriteCompletion
388 """
389 Like create_osds, but the osd_id_claims must be fully
390 populated.
391 """
392 raise NotImplementedError()
393
394 def remove_osds(self, osd_ids):
395 # type: (List[str]) -> WriteCompletion
396 """
397 :param osd_ids: list of OSD IDs
398
399 Note that this can only remove OSDs that were successfully
400 created (i.e. got an OSD ID).
401 """
402 raise NotImplementedError()
403
404 def update_mgrs(self, num, hosts):
405 # type: (int, List[str]) -> WriteCompletion
406 """
407 Update the number of cluster managers.
408
409 :param num: requested number of managers.
410 :param hosts: list of hosts (optional)
411 """
412 raise NotImplementedError()
413
414 def update_mons(self, num, hosts):
415 # type: (int, List[Tuple[str,str]]) -> WriteCompletion
416 """
417 Update the number of cluster monitors.
418
419 :param num: requested number of monitors.
420 :param hosts: list of hosts + network (optional)
421 """
422 raise NotImplementedError()
423
424 def add_stateless_service(self, service_type, spec):
425 # type: (str, StatelessServiceSpec) -> WriteCompletion
426 """
427 Installing and adding a completely new service to the cluster.
428
429 This is not about starting services.
430 """
431 raise NotImplementedError()
432
433 def update_stateless_service(self, service_type, spec):
434 # type: (str, StatelessServiceSpec) -> WriteCompletion
435 """
436 This is about changing / redeploying existing services. Like for
437 example changing the number of service instances.
438
439 :rtype: WriteCompletion
440 """
441 raise NotImplementedError()
442
443 def remove_stateless_service(self, service_type, id_):
444 # type: (str, str) -> WriteCompletion
445 """
446 Uninstalls an existing service from the cluster.
447
448 This is not about stopping services.
449 """
450 raise NotImplementedError()
451
452 def upgrade_start(self, upgrade_spec):
453 # type: (UpgradeSpec) -> WriteCompletion
454 raise NotImplementedError()
455
456 def upgrade_status(self):
457 # type: () -> ReadCompletion[UpgradeStatusSpec]
458 """
459 If an upgrade is currently underway, report on where
460 we are in the process, or if some error has occurred.
461
462 :return: UpgradeStatusSpec instance
463 """
464 raise NotImplementedError()
465
466 def upgrade_available(self):
467 # type: () -> ReadCompletion[List[str]]
468 """
469 Report on what versions are available to upgrade to
470
471 :return: List of strings
472 """
473 raise NotImplementedError()
474
11fdf7f2
TL
475
476class UpgradeSpec(object):
477 # Request to orchestrator to initiate an upgrade to a particular
478 # version of Ceph
479 def __init__(self):
480 self.target_version = None
481
482
483class UpgradeStatusSpec(object):
484 # Orchestrator's report on what's going on with any ongoing upgrade
485 def __init__(self):
486 self.in_progress = False # Is an upgrade underway?
487 self.services_complete = [] # Which daemon types are fully updated?
488 self.message = "" # Freeform description
489
490
491class PlacementSpec(object):
492 """
493 For APIs that need to specify a node subset
494 """
495 def __init__(self):
496 self.label = None
497
498
499class ServiceDescription(object):
500 """
501 For responding to queries about the status of a particular service,
502 stateful or stateless.
503
504 This is not about health or performance monitoring of services: it's
505 about letting the orchestrator tell Ceph whether and where a
506 service is scheduled in the cluster. When an orchestrator tells
507 Ceph "it's running on node123", that's not a promise that the process
508 is literally up this second, it's a description of where the orchestrator
509 has decided the service should run.
510 """
494da23a
TL
511
512 def __init__(self, nodename=None, container_id=None, service=None, service_instance=None,
513 service_type=None, version=None, rados_config_location=None,
514 service_url=None, status=None, status_desc=None):
11fdf7f2 515 # Node is at the same granularity as InventoryNode
494da23a 516 self.nodename = nodename
11fdf7f2
TL
517
518 # Not everyone runs in containers, but enough people do to
519 # justify having this field here.
494da23a 520 self.container_id = container_id
11fdf7f2
TL
521
522 # Some services can be deployed in groups. For example, mds's can
523 # have an active and standby daemons, and nfs-ganesha can run daemons
524 # in parallel. This tag refers to a group of daemons as a whole.
525 #
526 # For instance, a cluster of mds' all service the same fs, and they
527 # will all have the same service value (which may be the
528 # Filesystem name in the FSMap).
529 #
530 # Single-instance services should leave this set to None
494da23a 531 self.service = service
11fdf7f2
TL
532
533 # The orchestrator will have picked some names for daemons,
534 # typically either based on hostnames or on pod names.
535 # This is the <foo> in mds.<foo>, the ID that will appear
536 # in the FSMap/ServiceMap.
494da23a 537 self.service_instance = service_instance
11fdf7f2
TL
538
539 # The type of service (osd, mon, mgr, etc.)
494da23a 540 self.service_type = service_type
11fdf7f2
TL
541
542 # Service version that was deployed
494da23a 543 self.version = version
11fdf7f2
TL
544
545 # Location of the service configuration when stored in rados
546 # object. Format: "rados://<pool>/[<namespace/>]<object>"
494da23a 547 self.rados_config_location = rados_config_location
11fdf7f2
TL
548
549 # If the service exposes REST-like API, this attribute should hold
550 # the URL.
494da23a 551 self.service_url = service_url
11fdf7f2
TL
552
553 # Service status: -1 error, 0 stopped, 1 running
494da23a 554 self.status = status
11fdf7f2
TL
555
556 # Service status description when status == -1.
494da23a 557 self.status_desc = status_desc
11fdf7f2
TL
558
559 def to_json(self):
560 out = {
561 'nodename': self.nodename,
562 'container_id': self.container_id,
563 'service': self.service,
564 'service_instance': self.service_instance,
565 'service_type': self.service_type,
566 'version': self.version,
567 'rados_config_location': self.rados_config_location,
568 'service_url': self.service_url,
569 'status': self.status,
570 'status_desc': self.status_desc,
571 }
572 return {k: v for (k, v) in out.items() if v is not None}
573
494da23a
TL
574 @classmethod
575 def from_json(cls, data):
576 return cls(**data)
577
11fdf7f2
TL
578
579class DeviceSelection(object):
580 """
581 Used within :class:`myclass.DriveGroupSpec` to specify the devices
582 used by the Drive Group.
583
584 Any attributes (even none) can be included in the device
585 specification structure.
586 """
587
588 def __init__(self, paths=None, id_model=None, size=None, rotates=None, count=None):
589 # type: (List[str], str, str, bool, int) -> None
590 """
591 ephemeral drive group device specification
592
593 TODO: translate from the user interface (Drive Groups) to an actual list of devices.
594 """
595 if paths is None:
596 paths = []
597
598 #: List of absolute paths to the devices.
599 self.paths = paths # type: List[str]
600
601 #: A wildcard string. e.g: "SDD*"
602 self.id_model = id_model
603
604 #: Size specification of format LOW:HIGH.
605 #: Can also take the the form :HIGH, LOW:
606 #: or an exact value (as ceph-volume inventory reports)
607 self.size = size
608
609 #: is the drive rotating or not
610 self.rotates = rotates
611
612 #: if this is present limit the number of drives to this number.
613 self.count = count
614 self.validate()
615
616 def validate(self):
617 props = [self.id_model, self.size, self.rotates, self.count]
618 if self.paths and any(p is not None for p in props):
619 raise DriveGroupValidationError('DeviceSelection: `paths` and other parameters are mutually exclusive')
620 if not any(p is not None for p in [self.paths] + props):
621 raise DriveGroupValidationError('DeviceSelection cannot be empty')
622
623 @classmethod
624 def from_json(cls, device_spec):
625 return cls(**device_spec)
626
627
628class DriveGroupValidationError(Exception):
629 """
630 Defining an exception here is a bit problematic, cause you cannot properly catch it,
631 if it was raised in a different mgr module.
632 """
633
634 def __init__(self, msg):
635 super(DriveGroupValidationError, self).__init__('Failed to validate Drive Group: ' + msg)
636
637class DriveGroupSpec(object):
638 """
639 Describe a drive group in the same form that ceph-volume
640 understands.
641 """
642 def __init__(self, host_pattern, data_devices=None, db_devices=None, wal_devices=None, journal_devices=None,
643 data_directories=None, osds_per_device=None, objectstore='bluestore', encrypted=False,
644 db_slots=None, wal_slots=None):
81eedcae 645 # type: (str, Optional[DeviceSelection], Optional[DeviceSelection], Optional[DeviceSelection], Optional[DeviceSelection], Optional[List[str]], int, str, bool, int, int) -> None
11fdf7f2
TL
646
647 # concept of applying a drive group to a (set) of hosts is tightly
648 # linked to the drive group itself
649 #
650 #: An fnmatch pattern to select hosts. Can also be a single host.
651 self.host_pattern = host_pattern
652
653 #: A :class:`orchestrator.DeviceSelection`
654 self.data_devices = data_devices
655
656 #: A :class:`orchestrator.DeviceSelection`
657 self.db_devices = db_devices
658
659 #: A :class:`orchestrator.DeviceSelection`
660 self.wal_devices = wal_devices
661
662 #: A :class:`orchestrator.DeviceSelection`
663 self.journal_devices = journal_devices
664
665 #: Number of osd daemons per "DATA" device.
666 #: To fully utilize nvme devices multiple osds are required.
667 self.osds_per_device = osds_per_device
668
669 #: A list of strings, containing paths which should back OSDs
670 self.data_directories = data_directories
671
672 #: ``filestore`` or ``bluestore``
673 self.objectstore = objectstore
674
675 #: ``true`` or ``false``
676 self.encrypted = encrypted
677
678 #: How many OSDs per DB device
679 self.db_slots = db_slots
680
681 #: How many OSDs per WAL device
682 self.wal_slots = wal_slots
683
684 # FIXME: needs ceph-volume support
685 #: Optional: mapping of drive to OSD ID, used when the
686 #: created OSDs are meant to replace previous OSDs on
687 #: the same node.
688 self.osd_id_claims = {}
689
690 @classmethod
691 def from_json(self, json_drive_group):
692 """
693 Initialize 'Drive group' structure
694
695 :param json_drive_group: A valid json string with a Drive Group
696 specification
697 """
698 args = {k: (DeviceSelection.from_json(v) if k.endswith('_devices') else v) for k, v in
699 json_drive_group.items()}
700 return DriveGroupSpec(**args)
701
702 def hosts(self, all_hosts):
703 return fnmatch.filter(all_hosts, self.host_pattern)
704
705 def validate(self, all_hosts):
706 if not isinstance(self.host_pattern, six.string_types):
707 raise DriveGroupValidationError('host_pattern must be of type string')
708
709 specs = [self.data_devices, self.db_devices, self.wal_devices, self.journal_devices]
710 for s in filter(None, specs):
711 s.validate()
712 if self.objectstore not in ('filestore', 'bluestore'):
713 raise DriveGroupValidationError("objectstore not in ('filestore', 'bluestore')")
714 if not self.hosts(all_hosts):
715 raise DriveGroupValidationError(
716 "host_pattern '{}' does not match any hosts".format(self.host_pattern))
717
718
719class StatelessServiceSpec(object):
720 # Request to orchestrator for a group of stateless services
721 # such as MDS, RGW, nfs gateway, iscsi gateway
722 """
723 Details of stateless service creation.
724
725 This is *not* supposed to contain all the configuration
726 of the services: it's just supposed to be enough information to
727 execute the binaries.
728 """
729
730 def __init__(self):
731 self.placement = PlacementSpec()
732
733 # Give this set of statelss services a name: typically it would
734 # be the name of a CephFS filesystem, RGW zone, etc. Must be unique
735 # within one ceph cluster.
736 self.name = ""
737
738 # Count of service instances
739 self.count = 1
740
741 # Arbitrary JSON-serializable object.
742 # Maybe you're using e.g. kubenetes and you want to pass through
743 # some replicaset special sauce for autoscaling?
744 self.extended = {}
745
746
747class InventoryFilter(object):
748 """
749 When fetching inventory, use this filter to avoid unnecessarily
750 scanning the whole estate.
751
752 Typical use: filter by node when presenting UI workflow for configuring
753 a particular server.
754 filter by label when not all of estate is Ceph servers,
755 and we want to only learn about the Ceph servers.
756 filter by label when we are interested particularly
757 in e.g. OSD servers.
758
759 """
760 def __init__(self, labels=None, nodes=None):
761 # type: (List[str], List[str]) -> None
762 self.labels = labels # Optional: get info about nodes matching labels
763 self.nodes = nodes # Optional: get info about certain named nodes only
764
765
766class InventoryDevice(object):
767 """
768 When fetching inventory, block devices are reported in this format.
769
770 Note on device identifiers: the format of this is up to the orchestrator,
771 but the same identifier must also work when passed into StatefulServiceSpec.
772 The identifier should be something meaningful like a device WWID or
773 stable device node path -- not something made up by the orchestrator.
774
775 "Extended" is for reporting any special configuration that may have
776 already been done out of band on the block device. For example, if
777 the device has already been configured for encryption, report that
778 here so that it can be indicated to the user. The set of
779 extended properties may differ between orchestrators. An orchestrator
780 is permitted to support no extended properties (only normal block
781 devices)
782 """
783 def __init__(self, blank=False, type=None, id=None, size=None,
784 rotates=False, available=False, dev_id=None, extended=None,
785 metadata_space_free=None):
786 # type: (bool, str, str, int, bool, bool, str, dict, bool) -> None
787
788 self.blank = blank
789
790 #: 'ssd', 'hdd', 'nvme'
791 self.type = type
792
793 #: unique within a node (or globally if you like).
794 self.id = id
795
796 #: byte integer.
797 self.size = size
798
799 #: indicates if it is a spinning disk
800 self.rotates = rotates
801
802 #: can be used to create a new OSD?
803 self.available = available
804
805 #: vendor/model
806 self.dev_id = dev_id
807
808 #: arbitrary JSON-serializable object
809 self.extended = extended if extended is not None else extended
810
811 # If this drive is not empty, but is suitable for appending
812 # additional journals, wals, or bluestore dbs, then report
813 # how much space is available.
814 self.metadata_space_free = metadata_space_free
815
816 def to_json(self):
817 return dict(type=self.type, blank=self.blank, id=self.id,
818 size=self.size, rotates=self.rotates,
819 available=self.available, dev_id=self.dev_id,
820 extended=self.extended)
821
822 @classmethod
823 def from_ceph_volume_inventory(cls, data):
824 # TODO: change InventoryDevice itself to mirror c-v inventory closely!
825
826 dev = InventoryDevice()
827 dev.id = data["path"]
828 dev.type = 'hdd' if data["sys_api"]["rotational"] == "1" else 'sdd/nvme'
829 dev.size = data["sys_api"]["size"]
830 dev.rotates = data["sys_api"]["rotational"] == "1"
831 dev.available = data["available"]
832 dev.dev_id = "%s/%s" % (data["sys_api"]["vendor"],
833 data["sys_api"]["model"])
834 dev.extended = data
835 return dev
836
494da23a
TL
837 @classmethod
838 def from_ceph_volume_inventory_list(cls, datas):
839 return [cls.from_ceph_volume_inventory(d) for d in datas]
840
11fdf7f2
TL
841 def pretty_print(self, only_header=False):
842 """Print a human friendly line with the information of the device
843
844 :param only_header: Print only the name of the device attributes
845
846 Ex::
847
848 Device Path Type Size Rotates Available Model
849 /dev/sdc hdd 50.00 GB True True ATA/QEMU
850
851 """
852 row_format = " {0:<15} {1:>10} {2:>10} {3:>10} {4:>10} {5:<15}\n"
853 if only_header:
854 return row_format.format("Device Path", "Type", "Size", "Rotates",
855 "Available", "Model")
856 else:
857 return row_format.format(str(self.id), self.type if self.type is not None else "",
858 format_bytes(self.size if self.size is not None else 0, 5,
859 colored=False),
860 str(self.rotates), str(self.available),
861 self.dev_id if self.dev_id is not None else "")
862
863
864class InventoryNode(object):
865 """
866 When fetching inventory, all Devices are groups inside of an
867 InventoryNode.
868 """
869 def __init__(self, name, devices):
870 # type: (str, List[InventoryDevice]) -> None
871 assert isinstance(devices, list)
872 self.name = name # unique within cluster. For example a hostname.
873 self.devices = devices
874
875 def to_json(self):
876 return {'name': self.name, 'devices': [d.to_json() for d in self.devices]}
877
494da23a
TL
878 @classmethod
879 def from_nested_items(cls, hosts):
880 devs = InventoryDevice.from_ceph_volume_inventory_list
881 return [cls(item[0], devs(item[1].data)) for item in hosts]
882
11fdf7f2
TL
883
884def _mk_orch_methods(cls):
885 # Needs to be defined outside of for.
886 # Otherwise meth is always bound to last key
887 def shim(method_name):
888 def inner(self, *args, **kwargs):
81eedcae
TL
889 completion = self._oremote(method_name, args, kwargs)
890 self._update_completion_progress(completion, 0)
891 return completion
11fdf7f2
TL
892 return inner
893
894 for meth in Orchestrator.__dict__:
895 if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
896 setattr(cls, meth, shim(meth))
897 return cls
898
899
900@_mk_orch_methods
901class OrchestratorClientMixin(Orchestrator):
902 """
903 A module that inherents from `OrchestratorClientMixin` can directly call
904 all :class:`Orchestrator` methods without manually calling remote.
905
906 Every interface method from ``Orchestrator`` is converted into a stub method that internally
907 calls :func:`OrchestratorClientMixin._oremote`
908
909 >>> class MyModule(OrchestratorClientMixin):
910 ... def func(self):
911 ... completion = self.add_host('somehost') # calls `_oremote()`
912 ... self._orchestrator_wait([completion])
913 ... self.log.debug(completion.result)
914
915 """
81eedcae
TL
916
917 def set_mgr(self, mgr):
918 # type: (MgrModule) -> None
919 """
920 Useable in the Dashbord that uses a global ``mgr``
921 """
922
923 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
924
11fdf7f2
TL
925 def _oremote(self, meth, args, kwargs):
926 """
927 Helper for invoking `remote` on whichever orchestrator is enabled
928
929 :raises RuntimeError: If the remote method failed.
81eedcae 930 :raises OrchestratorError: orchestrator failed to perform
11fdf7f2
TL
931 :raises ImportError: no `orchestrator_cli` module or backend not found.
932 """
933 try:
81eedcae
TL
934 mgr = self.__mgr
935 except AttributeError:
936 mgr = self
937 try:
938 o = mgr._select_orchestrator()
11fdf7f2 939 except AttributeError:
81eedcae 940 o = mgr.remote('orchestrator_cli', '_select_orchestrator')
11fdf7f2
TL
941
942 if o is None:
943 raise NoOrchestrator()
944
81eedcae
TL
945 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
946 return mgr.remote(o, meth, *args, **kwargs)
947
948 def _update_completion_progress(self, completion, force_progress=None):
949 # type: (WriteCompletion, Optional[float]) -> None
950 try:
951 progress = force_progress if force_progress is not None else completion.progress
952 if completion.is_complete:
953 self.remote("progress", "complete", completion.progress_id)
954 else:
955 self.remote("progress", "update", completion.progress_id, str(completion), progress,
956 ["orchestrator"])
957 except AttributeError:
958 # No WriteCompletion. Ignore.
959 pass
960 except ImportError:
961 # If the progress module is disabled that's fine,
962 # they just won't see the output.
963 pass
11fdf7f2
TL
964
965 def _orchestrator_wait(self, completions):
966 # type: (List[_Completion]) -> None
967 """
968 Wait for completions to complete (reads) or
969 become persistent (writes).
970
971 Waits for writes to be *persistent* but not *effective*.
972
973 :param completions: List of Completions
974 :raises NoOrchestrator:
975 :raises ImportError: no `orchestrator_cli` module or backend not found.
976 """
81eedcae
TL
977 for c in completions:
978 self._update_completion_progress(c)
11fdf7f2
TL
979 while not self.wait(completions):
980 if any(c.should_wait for c in completions):
981 time.sleep(5)
982 else:
983 break
81eedcae
TL
984 for c in completions:
985 self._update_completion_progress(c)
494da23a
TL
986
987
988class OutdatableData(object):
989 DATEFMT = '%Y-%m-%d %H:%M:%S.%f'
990
991 def __init__(self, data=None, last_refresh=None):
992 # type: (Optional[dict], Optional[datetime.datetime]) -> None
993 self._data = data
994 if data is not None and last_refresh is None:
995 self.last_refresh = datetime.datetime.utcnow()
996 else:
997 self.last_refresh = last_refresh
998
999 def json(self):
1000 if self.last_refresh is not None:
1001 timestr = self.last_refresh.strftime(self.DATEFMT)
1002 else:
1003 timestr = None
1004
1005 return {
1006 "data": self._data,
1007 "last_refresh": timestr,
1008 }
1009
1010 @property
1011 def data(self):
1012 return self._data
1013
1014 # @data.setter
1015 # No setter, as it doesn't work as expected: It's not saved in store automatically
1016
1017 @classmethod
1018 def time_from_string(cls, timestr):
1019 if timestr is None:
1020 return None
1021 # drop the 'Z' timezone indication, it's always UTC
1022 timestr = timestr.rstrip('Z')
1023 return datetime.datetime.strptime(timestr, cls.DATEFMT)
1024
1025
1026 @classmethod
1027 def from_json(cls, data):
1028 return cls(data['data'], cls.time_from_string(data['last_refresh']))
1029
1030 def outdated(self, timeout_min=None):
1031 if timeout_min is None:
1032 timeout_min = 10
1033 if self.last_refresh is None:
1034 return True
1035 cutoff = datetime.datetime.utcnow() - datetime.timedelta(
1036 minutes=timeout_min)
1037 return self.last_refresh < cutoff
1038
1039 def __repr__(self):
1040 return 'OutdatableData(data={}, last_refresh={})'.format(self._data, self.last_refresh)
1041
1042
1043class OutdatableDictMixin(object):
1044 """
1045 Toolbox for implementing a cache. As every orchestrator has
1046 different needs, we cannot implement any logic here.
1047 """
1048
1049 def __getitem__(self, item):
1050 # type: (str) -> OutdatableData
1051 return OutdatableData.from_json(super(OutdatableDictMixin, self).__getitem__(item))
1052
1053 def __setitem__(self, key, value):
1054 # type: (str, OutdatableData) -> None
1055 val = None if value is None else value.json()
1056 super(OutdatableDictMixin, self).__setitem__(key, val)
1057
1058 def items(self):
1059 # type: () -> Iterator[Tuple[str, OutdatableData]]
1060 for item in super(OutdatableDictMixin, self).items():
1061 k, v = item
1062 yield k, OutdatableData.from_json(v)
1063
1064 def items_filtered(self, keys=None):
1065 if keys:
1066 return [(host, self[host]) for host in keys]
1067 else:
1068 return list(self.items())
1069
1070 def any_outdated(self, timeout=None):
1071 items = self.items()
1072 if not list(items):
1073 return True
1074 return any([i[1].outdated(timeout) for i in items])
1075
1076 def remove_outdated(self):
1077 outdated = [item[0] for item in self.items() if item[1].outdated()]
1078 for o in outdated:
1079 del self[o]
1080
1081class OutdatablePersistentDict(OutdatableDictMixin, PersistentStoreDict):
1082 pass
1083
1084class OutdatableDict(OutdatableDictMixin, dict):
1085 pass