]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/orchestrator.py
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / pybind / mgr / orchestrator.py
CommitLineData
11fdf7f2
TL
1
2"""
3ceph-mgr orchestrator interface
4
5Please see the ceph-mgr module developer's guide for more information.
6"""
7import sys
8import time
9import fnmatch
81eedcae
TL
10import uuid
11
12import six
13
14from mgr_module import MgrModule
15from mgr_util import format_bytes
11fdf7f2
TL
16
17try:
18 from typing import TypeVar, Generic, List, Optional, Union, Tuple
19 T = TypeVar('T')
20 G = Generic[T]
21except ImportError:
22 T, G = object, object
23
11fdf7f2
TL
24
25class OrchestratorError(Exception):
26 """
27 General orchestrator specific error.
28
29 Used for deployment, configuration or user errors.
30
31 It's not intended for programming errors or orchestrator internal errors.
32 """
33
34
35class NoOrchestrator(OrchestratorError):
36 """
37 No orchestrator in configured.
38 """
39 def __init__(self, msg="No orchestrator configured (try `ceph orchestrator set backend`)"):
40 super(NoOrchestrator, self).__init__(msg)
41
42
43class OrchestratorValidationError(OrchestratorError):
44 """
45 Raised when an orchestrator doesn't support a specific feature.
46 """
47
48
49class _Completion(G):
50 @property
51 def result(self):
52 # type: () -> T
53 """
54 Return the result of the operation that we were waited
55 for. Only valid after calling Orchestrator.wait() on this
56 completion.
57 """
58 raise NotImplementedError()
59
60 @property
61 def exception(self):
62 # type: () -> Optional[Exception]
63 """
64 Holds an exception object.
65 """
66 try:
67 return self.__exception
68 except AttributeError:
69 return None
70
71 @exception.setter
72 def exception(self, value):
73 self.__exception = value
74
75 @property
76 def is_read(self):
77 # type: () -> bool
78 raise NotImplementedError()
79
80 @property
81 def is_complete(self):
82 # type: () -> bool
83 raise NotImplementedError()
84
85 @property
86 def is_errored(self):
87 # type: () -> bool
88 """
89 Has the completion failed. Default implementation looks for
90 self.exception. Can be overwritten.
91 """
92 return self.exception is not None
93
94 @property
95 def should_wait(self):
96 # type: () -> bool
97 raise NotImplementedError()
98
99
100def raise_if_exception(c):
101 # type: (_Completion) -> None
102 """
103 :raises OrchestratorError: Some user error or a config error.
104 :raises Exception: Some internal error
105 """
106 def copy_to_this_subinterpreter(r_obj):
107 # This is something like `return pickle.loads(pickle.dumps(r_obj))`
108 # Without importing anything.
109 r_cls = r_obj.__class__
110 if r_cls.__module__ == '__builtin__':
111 return r_obj
112 my_cls = getattr(sys.modules[r_cls.__module__], r_cls.__name__)
113 if id(my_cls) == id(r_cls):
114 return r_obj
115 my_obj = my_cls.__new__(my_cls)
116 for k,v in r_obj.__dict__.items():
117 setattr(my_obj, k, copy_to_this_subinterpreter(v))
118 return my_obj
119
120 if c.exception is not None:
121 raise copy_to_this_subinterpreter(c.exception)
122
123
124class ReadCompletion(_Completion):
125 """
126 ``Orchestrator`` implementations should inherit from this
127 class to implement their own handles to operations in progress, and
128 return an instance of their subclass from calls into methods.
129 """
130
131 def __init__(self):
132 pass
133
134 @property
135 def is_read(self):
136 return True
137
138 @property
139 def should_wait(self):
140 """Could the external operation be deemed as complete,
141 or should we wait?
142 We must wait for a read operation only if it is not complete.
143 """
144 return not self.is_complete
145
146
147class WriteCompletion(_Completion):
148 """
149 ``Orchestrator`` implementations should inherit from this
150 class to implement their own handles to operations in progress, and
151 return an instance of their subclass from calls into methods.
152 """
153
154 def __init__(self):
81eedcae
TL
155 self.progress_id = str(uuid.uuid4())
156
157 #: if a orchestrator module can provide a more detailed
158 #: progress information, it needs to also call ``progress.update()``.
159 self.progress = 0.5
160
161 def __str__(self):
162 """
163 ``__str__()`` is used for determining the message for progress events.
164 """
165 return super(WriteCompletion, self).__str__()
11fdf7f2
TL
166
167 @property
168 def is_persistent(self):
169 # type: () -> bool
170 """
171 Has the operation updated the orchestrator's configuration
172 persistently? Typically this would indicate that an update
173 had been written to a manifest, but that the update
174 had not necessarily been pushed out to the cluster.
175 """
176 raise NotImplementedError()
177
178 @property
179 def is_effective(self):
180 """
181 Has the operation taken effect on the cluster? For example,
182 if we were adding a service, has it come up and appeared
183 in Ceph's cluster maps?
184 """
185 raise NotImplementedError()
186
187 @property
188 def is_complete(self):
189 return self.is_errored or (self.is_persistent and self.is_effective)
190
191 @property
192 def is_read(self):
193 return False
194
195 @property
196 def should_wait(self):
197 """Could the external operation be deemed as complete,
198 or should we wait?
199 We must wait for a write operation only if we know
200 it is not persistent yet.
201 """
202 return not self.is_persistent
203
204
205class Orchestrator(object):
206 """
207 Calls in this class may do long running remote operations, with time
208 periods ranging from network latencies to package install latencies and large
209 internet downloads. For that reason, all are asynchronous, and return
210 ``Completion`` objects.
211
212 Implementations are not required to start work on an operation until
213 the caller waits on the relevant Completion objects. Callers making
214 multiple updates should not wait on Completions until they're done
215 sending operations: this enables implementations to batch up a series
216 of updates when wait() is called on a set of Completion objects.
217
218 Implementations are encouraged to keep reasonably fresh caches of
219 the status of the system: it is better to serve a stale-but-recent
220 result read of e.g. device inventory than it is to keep the caller waiting
221 while you scan hosts every time.
222 """
223
224 def is_orchestrator_module(self):
225 """
226 Enable other modules to interrogate this module to discover
227 whether it's usable as an orchestrator module.
228
229 Subclasses do not need to override this.
230 """
231 return True
232
233 def available(self):
81eedcae 234 # type: () -> Tuple[bool, str]
11fdf7f2
TL
235 """
236 Report whether we can talk to the orchestrator. This is the
237 place to give the user a meaningful message if the orchestrator
238 isn't running or can't be contacted.
239
240 This method may be called frequently (e.g. every page load
241 to conditionally display a warning banner), so make sure it's
242 not too expensive. It's okay to give a slightly stale status
243 (e.g. based on a periodic background ping of the orchestrator)
244 if that's necessary to make this method fast.
245
81eedcae
TL
246 ..note:: `True` doesn't mean that the desired functionality
247 is actually available in the orchestrator. I.e. this
248 won't work as expected::
249
250 >>> if OrchestratorClientMixin().available()[0]: # wrong.
251 ... OrchestratorClientMixin().get_hosts()
11fdf7f2
TL
252
253 :return: two-tuple of boolean, string
254 """
81eedcae 255 raise NotImplementedError()
11fdf7f2
TL
256
257 def wait(self, completions):
258 """
259 Given a list of Completion instances, progress any which are
260 incomplete. Return a true if everything is done.
261
262 Callers should inspect the detail of each completion to identify
263 partial completion/progress information, and present that information
264 to the user.
265
266 For fast operations (e.g. reading from a database), implementations
267 may choose to do blocking IO in this call.
268
269 :rtype: bool
270 """
271 raise NotImplementedError()
272
273 def add_host(self, host):
274 # type: (str) -> WriteCompletion
275 """
276 Add a host to the orchestrator inventory.
277
278 :param host: hostname
279 """
280 raise NotImplementedError()
281
282 def remove_host(self, host):
283 # type: (str) -> WriteCompletion
284 """
285 Remove a host from the orchestrator inventory.
286
287 :param host: hostname
288 """
289 raise NotImplementedError()
290
291 def get_hosts(self):
292 # type: () -> ReadCompletion[List[InventoryNode]]
293 """
294 Report the hosts in the cluster.
295
296 The default implementation is extra slow.
297
298 :return: list of InventoryNodes
299 """
300 return self.get_inventory()
301
302 def get_inventory(self, node_filter=None, refresh=False):
303 # type: (InventoryFilter, bool) -> ReadCompletion[List[InventoryNode]]
304 """
305 Returns something that was created by `ceph-volume inventory`.
306
307 :return: list of InventoryNode
308 """
309 raise NotImplementedError()
310
311 def describe_service(self, service_type=None, service_id=None, node_name=None):
81eedcae 312 # type: (Optional[str], Optional[str], Optional[str]) -> ReadCompletion[List[ServiceDescription]]
11fdf7f2
TL
313 """
314 Describe a service (of any kind) that is already configured in
315 the orchestrator. For example, when viewing an OSD in the dashboard
316 we might like to also display information about the orchestrator's
317 view of the service (like the kubernetes pod ID).
318
319 When viewing a CephFS filesystem in the dashboard, we would use this
320 to display the pods being currently run for MDS daemons.
321
322 :return: list of ServiceDescription objects.
323 """
324 raise NotImplementedError()
325
326 def service_action(self, action, service_type, service_name=None, service_id=None):
327 # type: (str, str, str, str) -> WriteCompletion
328 """
329 Perform an action (start/stop/reload) on a service.
330
331 Either service_name or service_id must be specified:
332
333 * If using service_name, perform the action on that entire logical
334 service (i.e. all daemons providing that named service).
335 * If using service_id, perform the action on a single specific daemon
336 instance.
337
338 :param action: one of "start", "stop", "reload"
339 :param service_type: e.g. "mds", "rgw", ...
340 :param service_name: name of logical service ("cephfs", "us-east", ...)
341 :param service_id: service daemon instance (usually a short hostname)
342 :rtype: WriteCompletion
343 """
344 assert action in ["start", "stop", "reload"]
345 assert service_name or service_id
346 assert not (service_name and service_id)
347 raise NotImplementedError()
348
349 def create_osds(self, drive_group, all_hosts):
350 # type: (DriveGroupSpec, List[str]) -> WriteCompletion
351 """
352 Create one or more OSDs within a single Drive Group.
353
354 The principal argument here is the drive_group member
355 of OsdSpec: other fields are advisory/extensible for any
356 finer-grained OSD feature enablement (choice of backing store,
357 compression/encryption, etc).
358
359 :param drive_group: DriveGroupSpec
360 :param all_hosts: TODO, this is required because the orchestrator methods are not composable
361 Probably this parameter can be easily removed because each orchestrator can use
362 the "get_inventory" method and the "drive_group.host_pattern" attribute
363 to obtain the list of hosts where to apply the operation
364 """
365 raise NotImplementedError()
366
367 def replace_osds(self, drive_group):
368 # type: (DriveGroupSpec) -> WriteCompletion
369 """
370 Like create_osds, but the osd_id_claims must be fully
371 populated.
372 """
373 raise NotImplementedError()
374
375 def remove_osds(self, osd_ids):
376 # type: (List[str]) -> WriteCompletion
377 """
378 :param osd_ids: list of OSD IDs
379
380 Note that this can only remove OSDs that were successfully
381 created (i.e. got an OSD ID).
382 """
383 raise NotImplementedError()
384
385 def update_mgrs(self, num, hosts):
386 # type: (int, List[str]) -> WriteCompletion
387 """
388 Update the number of cluster managers.
389
390 :param num: requested number of managers.
391 :param hosts: list of hosts (optional)
392 """
393 raise NotImplementedError()
394
395 def update_mons(self, num, hosts):
396 # type: (int, List[Tuple[str,str]]) -> WriteCompletion
397 """
398 Update the number of cluster monitors.
399
400 :param num: requested number of monitors.
401 :param hosts: list of hosts + network (optional)
402 """
403 raise NotImplementedError()
404
405 def add_stateless_service(self, service_type, spec):
406 # type: (str, StatelessServiceSpec) -> WriteCompletion
407 """
408 Installing and adding a completely new service to the cluster.
409
410 This is not about starting services.
411 """
412 raise NotImplementedError()
413
414 def update_stateless_service(self, service_type, spec):
415 # type: (str, StatelessServiceSpec) -> WriteCompletion
416 """
417 This is about changing / redeploying existing services. Like for
418 example changing the number of service instances.
419
420 :rtype: WriteCompletion
421 """
422 raise NotImplementedError()
423
424 def remove_stateless_service(self, service_type, id_):
425 # type: (str, str) -> WriteCompletion
426 """
427 Uninstalls an existing service from the cluster.
428
429 This is not about stopping services.
430 """
431 raise NotImplementedError()
432
433 def upgrade_start(self, upgrade_spec):
434 # type: (UpgradeSpec) -> WriteCompletion
435 raise NotImplementedError()
436
437 def upgrade_status(self):
438 # type: () -> ReadCompletion[UpgradeStatusSpec]
439 """
440 If an upgrade is currently underway, report on where
441 we are in the process, or if some error has occurred.
442
443 :return: UpgradeStatusSpec instance
444 """
445 raise NotImplementedError()
446
447 def upgrade_available(self):
448 # type: () -> ReadCompletion[List[str]]
449 """
450 Report on what versions are available to upgrade to
451
452 :return: List of strings
453 """
454 raise NotImplementedError()
455
11fdf7f2
TL
456
457class UpgradeSpec(object):
458 # Request to orchestrator to initiate an upgrade to a particular
459 # version of Ceph
460 def __init__(self):
461 self.target_version = None
462
463
464class UpgradeStatusSpec(object):
465 # Orchestrator's report on what's going on with any ongoing upgrade
466 def __init__(self):
467 self.in_progress = False # Is an upgrade underway?
468 self.services_complete = [] # Which daemon types are fully updated?
469 self.message = "" # Freeform description
470
471
472class PlacementSpec(object):
473 """
474 For APIs that need to specify a node subset
475 """
476 def __init__(self):
477 self.label = None
478
479
480class ServiceDescription(object):
481 """
482 For responding to queries about the status of a particular service,
483 stateful or stateless.
484
485 This is not about health or performance monitoring of services: it's
486 about letting the orchestrator tell Ceph whether and where a
487 service is scheduled in the cluster. When an orchestrator tells
488 Ceph "it's running on node123", that's not a promise that the process
489 is literally up this second, it's a description of where the orchestrator
490 has decided the service should run.
491 """
492 def __init__(self):
493 # Node is at the same granularity as InventoryNode
494 self.nodename = None
495
496 # Not everyone runs in containers, but enough people do to
497 # justify having this field here.
498 self.container_id = None
499
500 # Some services can be deployed in groups. For example, mds's can
501 # have an active and standby daemons, and nfs-ganesha can run daemons
502 # in parallel. This tag refers to a group of daemons as a whole.
503 #
504 # For instance, a cluster of mds' all service the same fs, and they
505 # will all have the same service value (which may be the
506 # Filesystem name in the FSMap).
507 #
508 # Single-instance services should leave this set to None
509 self.service = None
510
511 # The orchestrator will have picked some names for daemons,
512 # typically either based on hostnames or on pod names.
513 # This is the <foo> in mds.<foo>, the ID that will appear
514 # in the FSMap/ServiceMap.
515 self.service_instance = None
516
517 # The type of service (osd, mon, mgr, etc.)
518 self.service_type = None
519
520 # Service version that was deployed
521 self.version = None
522
523 # Location of the service configuration when stored in rados
524 # object. Format: "rados://<pool>/[<namespace/>]<object>"
525 self.rados_config_location = None
526
527 # If the service exposes REST-like API, this attribute should hold
528 # the URL.
529 self.service_url = None
530
531 # Service status: -1 error, 0 stopped, 1 running
532 self.status = None
533
534 # Service status description when status == -1.
535 self.status_desc = None
536
537 def to_json(self):
538 out = {
539 'nodename': self.nodename,
540 'container_id': self.container_id,
541 'service': self.service,
542 'service_instance': self.service_instance,
543 'service_type': self.service_type,
544 'version': self.version,
545 'rados_config_location': self.rados_config_location,
546 'service_url': self.service_url,
547 'status': self.status,
548 'status_desc': self.status_desc,
549 }
550 return {k: v for (k, v) in out.items() if v is not None}
551
552
553class DeviceSelection(object):
554 """
555 Used within :class:`myclass.DriveGroupSpec` to specify the devices
556 used by the Drive Group.
557
558 Any attributes (even none) can be included in the device
559 specification structure.
560 """
561
562 def __init__(self, paths=None, id_model=None, size=None, rotates=None, count=None):
563 # type: (List[str], str, str, bool, int) -> None
564 """
565 ephemeral drive group device specification
566
567 TODO: translate from the user interface (Drive Groups) to an actual list of devices.
568 """
569 if paths is None:
570 paths = []
571
572 #: List of absolute paths to the devices.
573 self.paths = paths # type: List[str]
574
575 #: A wildcard string. e.g: "SDD*"
576 self.id_model = id_model
577
578 #: Size specification of format LOW:HIGH.
579 #: Can also take the the form :HIGH, LOW:
580 #: or an exact value (as ceph-volume inventory reports)
581 self.size = size
582
583 #: is the drive rotating or not
584 self.rotates = rotates
585
586 #: if this is present limit the number of drives to this number.
587 self.count = count
588 self.validate()
589
590 def validate(self):
591 props = [self.id_model, self.size, self.rotates, self.count]
592 if self.paths and any(p is not None for p in props):
593 raise DriveGroupValidationError('DeviceSelection: `paths` and other parameters are mutually exclusive')
594 if not any(p is not None for p in [self.paths] + props):
595 raise DriveGroupValidationError('DeviceSelection cannot be empty')
596
597 @classmethod
598 def from_json(cls, device_spec):
599 return cls(**device_spec)
600
601
602class DriveGroupValidationError(Exception):
603 """
604 Defining an exception here is a bit problematic, cause you cannot properly catch it,
605 if it was raised in a different mgr module.
606 """
607
608 def __init__(self, msg):
609 super(DriveGroupValidationError, self).__init__('Failed to validate Drive Group: ' + msg)
610
611class DriveGroupSpec(object):
612 """
613 Describe a drive group in the same form that ceph-volume
614 understands.
615 """
616 def __init__(self, host_pattern, data_devices=None, db_devices=None, wal_devices=None, journal_devices=None,
617 data_directories=None, osds_per_device=None, objectstore='bluestore', encrypted=False,
618 db_slots=None, wal_slots=None):
81eedcae 619 # type: (str, Optional[DeviceSelection], Optional[DeviceSelection], Optional[DeviceSelection], Optional[DeviceSelection], Optional[List[str]], int, str, bool, int, int) -> None
11fdf7f2
TL
620
621 # concept of applying a drive group to a (set) of hosts is tightly
622 # linked to the drive group itself
623 #
624 #: An fnmatch pattern to select hosts. Can also be a single host.
625 self.host_pattern = host_pattern
626
627 #: A :class:`orchestrator.DeviceSelection`
628 self.data_devices = data_devices
629
630 #: A :class:`orchestrator.DeviceSelection`
631 self.db_devices = db_devices
632
633 #: A :class:`orchestrator.DeviceSelection`
634 self.wal_devices = wal_devices
635
636 #: A :class:`orchestrator.DeviceSelection`
637 self.journal_devices = journal_devices
638
639 #: Number of osd daemons per "DATA" device.
640 #: To fully utilize nvme devices multiple osds are required.
641 self.osds_per_device = osds_per_device
642
643 #: A list of strings, containing paths which should back OSDs
644 self.data_directories = data_directories
645
646 #: ``filestore`` or ``bluestore``
647 self.objectstore = objectstore
648
649 #: ``true`` or ``false``
650 self.encrypted = encrypted
651
652 #: How many OSDs per DB device
653 self.db_slots = db_slots
654
655 #: How many OSDs per WAL device
656 self.wal_slots = wal_slots
657
658 # FIXME: needs ceph-volume support
659 #: Optional: mapping of drive to OSD ID, used when the
660 #: created OSDs are meant to replace previous OSDs on
661 #: the same node.
662 self.osd_id_claims = {}
663
664 @classmethod
665 def from_json(self, json_drive_group):
666 """
667 Initialize 'Drive group' structure
668
669 :param json_drive_group: A valid json string with a Drive Group
670 specification
671 """
672 args = {k: (DeviceSelection.from_json(v) if k.endswith('_devices') else v) for k, v in
673 json_drive_group.items()}
674 return DriveGroupSpec(**args)
675
676 def hosts(self, all_hosts):
677 return fnmatch.filter(all_hosts, self.host_pattern)
678
679 def validate(self, all_hosts):
680 if not isinstance(self.host_pattern, six.string_types):
681 raise DriveGroupValidationError('host_pattern must be of type string')
682
683 specs = [self.data_devices, self.db_devices, self.wal_devices, self.journal_devices]
684 for s in filter(None, specs):
685 s.validate()
686 if self.objectstore not in ('filestore', 'bluestore'):
687 raise DriveGroupValidationError("objectstore not in ('filestore', 'bluestore')")
688 if not self.hosts(all_hosts):
689 raise DriveGroupValidationError(
690 "host_pattern '{}' does not match any hosts".format(self.host_pattern))
691
692
693class StatelessServiceSpec(object):
694 # Request to orchestrator for a group of stateless services
695 # such as MDS, RGW, nfs gateway, iscsi gateway
696 """
697 Details of stateless service creation.
698
699 This is *not* supposed to contain all the configuration
700 of the services: it's just supposed to be enough information to
701 execute the binaries.
702 """
703
704 def __init__(self):
705 self.placement = PlacementSpec()
706
707 # Give this set of statelss services a name: typically it would
708 # be the name of a CephFS filesystem, RGW zone, etc. Must be unique
709 # within one ceph cluster.
710 self.name = ""
711
712 # Count of service instances
713 self.count = 1
714
715 # Arbitrary JSON-serializable object.
716 # Maybe you're using e.g. kubenetes and you want to pass through
717 # some replicaset special sauce for autoscaling?
718 self.extended = {}
719
720
721class InventoryFilter(object):
722 """
723 When fetching inventory, use this filter to avoid unnecessarily
724 scanning the whole estate.
725
726 Typical use: filter by node when presenting UI workflow for configuring
727 a particular server.
728 filter by label when not all of estate is Ceph servers,
729 and we want to only learn about the Ceph servers.
730 filter by label when we are interested particularly
731 in e.g. OSD servers.
732
733 """
734 def __init__(self, labels=None, nodes=None):
735 # type: (List[str], List[str]) -> None
736 self.labels = labels # Optional: get info about nodes matching labels
737 self.nodes = nodes # Optional: get info about certain named nodes only
738
739
740class InventoryDevice(object):
741 """
742 When fetching inventory, block devices are reported in this format.
743
744 Note on device identifiers: the format of this is up to the orchestrator,
745 but the same identifier must also work when passed into StatefulServiceSpec.
746 The identifier should be something meaningful like a device WWID or
747 stable device node path -- not something made up by the orchestrator.
748
749 "Extended" is for reporting any special configuration that may have
750 already been done out of band on the block device. For example, if
751 the device has already been configured for encryption, report that
752 here so that it can be indicated to the user. The set of
753 extended properties may differ between orchestrators. An orchestrator
754 is permitted to support no extended properties (only normal block
755 devices)
756 """
757 def __init__(self, blank=False, type=None, id=None, size=None,
758 rotates=False, available=False, dev_id=None, extended=None,
759 metadata_space_free=None):
760 # type: (bool, str, str, int, bool, bool, str, dict, bool) -> None
761
762 self.blank = blank
763
764 #: 'ssd', 'hdd', 'nvme'
765 self.type = type
766
767 #: unique within a node (or globally if you like).
768 self.id = id
769
770 #: byte integer.
771 self.size = size
772
773 #: indicates if it is a spinning disk
774 self.rotates = rotates
775
776 #: can be used to create a new OSD?
777 self.available = available
778
779 #: vendor/model
780 self.dev_id = dev_id
781
782 #: arbitrary JSON-serializable object
783 self.extended = extended if extended is not None else extended
784
785 # If this drive is not empty, but is suitable for appending
786 # additional journals, wals, or bluestore dbs, then report
787 # how much space is available.
788 self.metadata_space_free = metadata_space_free
789
790 def to_json(self):
791 return dict(type=self.type, blank=self.blank, id=self.id,
792 size=self.size, rotates=self.rotates,
793 available=self.available, dev_id=self.dev_id,
794 extended=self.extended)
795
796 @classmethod
797 def from_ceph_volume_inventory(cls, data):
798 # TODO: change InventoryDevice itself to mirror c-v inventory closely!
799
800 dev = InventoryDevice()
801 dev.id = data["path"]
802 dev.type = 'hdd' if data["sys_api"]["rotational"] == "1" else 'sdd/nvme'
803 dev.size = data["sys_api"]["size"]
804 dev.rotates = data["sys_api"]["rotational"] == "1"
805 dev.available = data["available"]
806 dev.dev_id = "%s/%s" % (data["sys_api"]["vendor"],
807 data["sys_api"]["model"])
808 dev.extended = data
809 return dev
810
811 def pretty_print(self, only_header=False):
812 """Print a human friendly line with the information of the device
813
814 :param only_header: Print only the name of the device attributes
815
816 Ex::
817
818 Device Path Type Size Rotates Available Model
819 /dev/sdc hdd 50.00 GB True True ATA/QEMU
820
821 """
822 row_format = " {0:<15} {1:>10} {2:>10} {3:>10} {4:>10} {5:<15}\n"
823 if only_header:
824 return row_format.format("Device Path", "Type", "Size", "Rotates",
825 "Available", "Model")
826 else:
827 return row_format.format(str(self.id), self.type if self.type is not None else "",
828 format_bytes(self.size if self.size is not None else 0, 5,
829 colored=False),
830 str(self.rotates), str(self.available),
831 self.dev_id if self.dev_id is not None else "")
832
833
834class InventoryNode(object):
835 """
836 When fetching inventory, all Devices are groups inside of an
837 InventoryNode.
838 """
839 def __init__(self, name, devices):
840 # type: (str, List[InventoryDevice]) -> None
841 assert isinstance(devices, list)
842 self.name = name # unique within cluster. For example a hostname.
843 self.devices = devices
844
845 def to_json(self):
846 return {'name': self.name, 'devices': [d.to_json() for d in self.devices]}
847
848
849def _mk_orch_methods(cls):
850 # Needs to be defined outside of for.
851 # Otherwise meth is always bound to last key
852 def shim(method_name):
853 def inner(self, *args, **kwargs):
81eedcae
TL
854 completion = self._oremote(method_name, args, kwargs)
855 self._update_completion_progress(completion, 0)
856 return completion
11fdf7f2
TL
857 return inner
858
859 for meth in Orchestrator.__dict__:
860 if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
861 setattr(cls, meth, shim(meth))
862 return cls
863
864
865@_mk_orch_methods
866class OrchestratorClientMixin(Orchestrator):
867 """
868 A module that inherents from `OrchestratorClientMixin` can directly call
869 all :class:`Orchestrator` methods without manually calling remote.
870
871 Every interface method from ``Orchestrator`` is converted into a stub method that internally
872 calls :func:`OrchestratorClientMixin._oremote`
873
874 >>> class MyModule(OrchestratorClientMixin):
875 ... def func(self):
876 ... completion = self.add_host('somehost') # calls `_oremote()`
877 ... self._orchestrator_wait([completion])
878 ... self.log.debug(completion.result)
879
880 """
81eedcae
TL
881
882 def set_mgr(self, mgr):
883 # type: (MgrModule) -> None
884 """
885 Useable in the Dashbord that uses a global ``mgr``
886 """
887
888 self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
889
11fdf7f2
TL
890 def _oremote(self, meth, args, kwargs):
891 """
892 Helper for invoking `remote` on whichever orchestrator is enabled
893
894 :raises RuntimeError: If the remote method failed.
81eedcae 895 :raises OrchestratorError: orchestrator failed to perform
11fdf7f2
TL
896 :raises ImportError: no `orchestrator_cli` module or backend not found.
897 """
898 try:
81eedcae
TL
899 mgr = self.__mgr
900 except AttributeError:
901 mgr = self
902 try:
903 o = mgr._select_orchestrator()
11fdf7f2 904 except AttributeError:
81eedcae 905 o = mgr.remote('orchestrator_cli', '_select_orchestrator')
11fdf7f2
TL
906
907 if o is None:
908 raise NoOrchestrator()
909
81eedcae
TL
910 mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
911 return mgr.remote(o, meth, *args, **kwargs)
912
913 def _update_completion_progress(self, completion, force_progress=None):
914 # type: (WriteCompletion, Optional[float]) -> None
915 try:
916 progress = force_progress if force_progress is not None else completion.progress
917 if completion.is_complete:
918 self.remote("progress", "complete", completion.progress_id)
919 else:
920 self.remote("progress", "update", completion.progress_id, str(completion), progress,
921 ["orchestrator"])
922 except AttributeError:
923 # No WriteCompletion. Ignore.
924 pass
925 except ImportError:
926 # If the progress module is disabled that's fine,
927 # they just won't see the output.
928 pass
11fdf7f2
TL
929
930 def _orchestrator_wait(self, completions):
931 # type: (List[_Completion]) -> None
932 """
933 Wait for completions to complete (reads) or
934 become persistent (writes).
935
936 Waits for writes to be *persistent* but not *effective*.
937
938 :param completions: List of Completions
939 :raises NoOrchestrator:
940 :raises ImportError: no `orchestrator_cli` module or backend not found.
941 """
81eedcae
TL
942 for c in completions:
943 self._update_completion_progress(c)
11fdf7f2
TL
944 while not self.wait(completions):
945 if any(c.should_wait for c in completions):
946 time.sleep(5)
947 else:
948 break
81eedcae
TL
949 for c in completions:
950 self._update_completion_progress(c)