]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/orchestrator.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / pybind / mgr / orchestrator.py
CommitLineData
11fdf7f2
TL
1
2"""
3ceph-mgr orchestrator interface
4
5Please see the ceph-mgr module developer's guide for more information.
6"""
7import sys
8import time
9import fnmatch
10
11try:
12 from typing import TypeVar, Generic, List, Optional, Union, Tuple
13 T = TypeVar('T')
14 G = Generic[T]
15except ImportError:
16 T, G = object, object
17
18import six
19
20from mgr_util import format_bytes
21
22
23class OrchestratorError(Exception):
24 """
25 General orchestrator specific error.
26
27 Used for deployment, configuration or user errors.
28
29 It's not intended for programming errors or orchestrator internal errors.
30 """
31
32
33class NoOrchestrator(OrchestratorError):
34 """
35 No orchestrator in configured.
36 """
37 def __init__(self, msg="No orchestrator configured (try `ceph orchestrator set backend`)"):
38 super(NoOrchestrator, self).__init__(msg)
39
40
41class OrchestratorValidationError(OrchestratorError):
42 """
43 Raised when an orchestrator doesn't support a specific feature.
44 """
45
46
47class _Completion(G):
48 @property
49 def result(self):
50 # type: () -> T
51 """
52 Return the result of the operation that we were waited
53 for. Only valid after calling Orchestrator.wait() on this
54 completion.
55 """
56 raise NotImplementedError()
57
58 @property
59 def exception(self):
60 # type: () -> Optional[Exception]
61 """
62 Holds an exception object.
63 """
64 try:
65 return self.__exception
66 except AttributeError:
67 return None
68
69 @exception.setter
70 def exception(self, value):
71 self.__exception = value
72
73 @property
74 def is_read(self):
75 # type: () -> bool
76 raise NotImplementedError()
77
78 @property
79 def is_complete(self):
80 # type: () -> bool
81 raise NotImplementedError()
82
83 @property
84 def is_errored(self):
85 # type: () -> bool
86 """
87 Has the completion failed. Default implementation looks for
88 self.exception. Can be overwritten.
89 """
90 return self.exception is not None
91
92 @property
93 def should_wait(self):
94 # type: () -> bool
95 raise NotImplementedError()
96
97
98def raise_if_exception(c):
99 # type: (_Completion) -> None
100 """
101 :raises OrchestratorError: Some user error or a config error.
102 :raises Exception: Some internal error
103 """
104 def copy_to_this_subinterpreter(r_obj):
105 # This is something like `return pickle.loads(pickle.dumps(r_obj))`
106 # Without importing anything.
107 r_cls = r_obj.__class__
108 if r_cls.__module__ == '__builtin__':
109 return r_obj
110 my_cls = getattr(sys.modules[r_cls.__module__], r_cls.__name__)
111 if id(my_cls) == id(r_cls):
112 return r_obj
113 my_obj = my_cls.__new__(my_cls)
114 for k,v in r_obj.__dict__.items():
115 setattr(my_obj, k, copy_to_this_subinterpreter(v))
116 return my_obj
117
118 if c.exception is not None:
119 raise copy_to_this_subinterpreter(c.exception)
120
121
122class ReadCompletion(_Completion):
123 """
124 ``Orchestrator`` implementations should inherit from this
125 class to implement their own handles to operations in progress, and
126 return an instance of their subclass from calls into methods.
127 """
128
129 def __init__(self):
130 pass
131
132 @property
133 def is_read(self):
134 return True
135
136 @property
137 def should_wait(self):
138 """Could the external operation be deemed as complete,
139 or should we wait?
140 We must wait for a read operation only if it is not complete.
141 """
142 return not self.is_complete
143
144
145class WriteCompletion(_Completion):
146 """
147 ``Orchestrator`` implementations should inherit from this
148 class to implement their own handles to operations in progress, and
149 return an instance of their subclass from calls into methods.
150 """
151
152 def __init__(self):
153 pass
154
155 @property
156 def is_persistent(self):
157 # type: () -> bool
158 """
159 Has the operation updated the orchestrator's configuration
160 persistently? Typically this would indicate that an update
161 had been written to a manifest, but that the update
162 had not necessarily been pushed out to the cluster.
163 """
164 raise NotImplementedError()
165
166 @property
167 def is_effective(self):
168 """
169 Has the operation taken effect on the cluster? For example,
170 if we were adding a service, has it come up and appeared
171 in Ceph's cluster maps?
172 """
173 raise NotImplementedError()
174
175 @property
176 def is_complete(self):
177 return self.is_errored or (self.is_persistent and self.is_effective)
178
179 @property
180 def is_read(self):
181 return False
182
183 @property
184 def should_wait(self):
185 """Could the external operation be deemed as complete,
186 or should we wait?
187 We must wait for a write operation only if we know
188 it is not persistent yet.
189 """
190 return not self.is_persistent
191
192
193class Orchestrator(object):
194 """
195 Calls in this class may do long running remote operations, with time
196 periods ranging from network latencies to package install latencies and large
197 internet downloads. For that reason, all are asynchronous, and return
198 ``Completion`` objects.
199
200 Implementations are not required to start work on an operation until
201 the caller waits on the relevant Completion objects. Callers making
202 multiple updates should not wait on Completions until they're done
203 sending operations: this enables implementations to batch up a series
204 of updates when wait() is called on a set of Completion objects.
205
206 Implementations are encouraged to keep reasonably fresh caches of
207 the status of the system: it is better to serve a stale-but-recent
208 result read of e.g. device inventory than it is to keep the caller waiting
209 while you scan hosts every time.
210 """
211
212 def is_orchestrator_module(self):
213 """
214 Enable other modules to interrogate this module to discover
215 whether it's usable as an orchestrator module.
216
217 Subclasses do not need to override this.
218 """
219 return True
220
221 def available(self):
222 # type: () -> Tuple[Optional[bool], Optional[str]]
223 """
224 Report whether we can talk to the orchestrator. This is the
225 place to give the user a meaningful message if the orchestrator
226 isn't running or can't be contacted.
227
228 This method may be called frequently (e.g. every page load
229 to conditionally display a warning banner), so make sure it's
230 not too expensive. It's okay to give a slightly stale status
231 (e.g. based on a periodic background ping of the orchestrator)
232 if that's necessary to make this method fast.
233
234 Do not override this method if you don't have a meaningful
235 status to return: the default None, None return value is used
236 to indicate that a module is unable to indicate its availability.
237
238 :return: two-tuple of boolean, string
239 """
240 return None, None
241
242 def wait(self, completions):
243 """
244 Given a list of Completion instances, progress any which are
245 incomplete. Return a true if everything is done.
246
247 Callers should inspect the detail of each completion to identify
248 partial completion/progress information, and present that information
249 to the user.
250
251 For fast operations (e.g. reading from a database), implementations
252 may choose to do blocking IO in this call.
253
254 :rtype: bool
255 """
256 raise NotImplementedError()
257
258 def add_host(self, host):
259 # type: (str) -> WriteCompletion
260 """
261 Add a host to the orchestrator inventory.
262
263 :param host: hostname
264 """
265 raise NotImplementedError()
266
267 def remove_host(self, host):
268 # type: (str) -> WriteCompletion
269 """
270 Remove a host from the orchestrator inventory.
271
272 :param host: hostname
273 """
274 raise NotImplementedError()
275
276 def get_hosts(self):
277 # type: () -> ReadCompletion[List[InventoryNode]]
278 """
279 Report the hosts in the cluster.
280
281 The default implementation is extra slow.
282
283 :return: list of InventoryNodes
284 """
285 return self.get_inventory()
286
287 def get_inventory(self, node_filter=None, refresh=False):
288 # type: (InventoryFilter, bool) -> ReadCompletion[List[InventoryNode]]
289 """
290 Returns something that was created by `ceph-volume inventory`.
291
292 :return: list of InventoryNode
293 """
294 raise NotImplementedError()
295
296 def describe_service(self, service_type=None, service_id=None, node_name=None):
297 # type: (str, str, str) -> ReadCompletion[List[ServiceDescription]]
298 """
299 Describe a service (of any kind) that is already configured in
300 the orchestrator. For example, when viewing an OSD in the dashboard
301 we might like to also display information about the orchestrator's
302 view of the service (like the kubernetes pod ID).
303
304 When viewing a CephFS filesystem in the dashboard, we would use this
305 to display the pods being currently run for MDS daemons.
306
307 :return: list of ServiceDescription objects.
308 """
309 raise NotImplementedError()
310
311 def service_action(self, action, service_type, service_name=None, service_id=None):
312 # type: (str, str, str, str) -> WriteCompletion
313 """
314 Perform an action (start/stop/reload) on a service.
315
316 Either service_name or service_id must be specified:
317
318 * If using service_name, perform the action on that entire logical
319 service (i.e. all daemons providing that named service).
320 * If using service_id, perform the action on a single specific daemon
321 instance.
322
323 :param action: one of "start", "stop", "reload"
324 :param service_type: e.g. "mds", "rgw", ...
325 :param service_name: name of logical service ("cephfs", "us-east", ...)
326 :param service_id: service daemon instance (usually a short hostname)
327 :rtype: WriteCompletion
328 """
329 assert action in ["start", "stop", "reload"]
330 assert service_name or service_id
331 assert not (service_name and service_id)
332 raise NotImplementedError()
333
334 def create_osds(self, drive_group, all_hosts):
335 # type: (DriveGroupSpec, List[str]) -> WriteCompletion
336 """
337 Create one or more OSDs within a single Drive Group.
338
339 The principal argument here is the drive_group member
340 of OsdSpec: other fields are advisory/extensible for any
341 finer-grained OSD feature enablement (choice of backing store,
342 compression/encryption, etc).
343
344 :param drive_group: DriveGroupSpec
345 :param all_hosts: TODO, this is required because the orchestrator methods are not composable
346 Probably this parameter can be easily removed because each orchestrator can use
347 the "get_inventory" method and the "drive_group.host_pattern" attribute
348 to obtain the list of hosts where to apply the operation
349 """
350 raise NotImplementedError()
351
352 def replace_osds(self, drive_group):
353 # type: (DriveGroupSpec) -> WriteCompletion
354 """
355 Like create_osds, but the osd_id_claims must be fully
356 populated.
357 """
358 raise NotImplementedError()
359
360 def remove_osds(self, osd_ids):
361 # type: (List[str]) -> WriteCompletion
362 """
363 :param osd_ids: list of OSD IDs
364
365 Note that this can only remove OSDs that were successfully
366 created (i.e. got an OSD ID).
367 """
368 raise NotImplementedError()
369
370 def update_mgrs(self, num, hosts):
371 # type: (int, List[str]) -> WriteCompletion
372 """
373 Update the number of cluster managers.
374
375 :param num: requested number of managers.
376 :param hosts: list of hosts (optional)
377 """
378 raise NotImplementedError()
379
380 def update_mons(self, num, hosts):
381 # type: (int, List[Tuple[str,str]]) -> WriteCompletion
382 """
383 Update the number of cluster monitors.
384
385 :param num: requested number of monitors.
386 :param hosts: list of hosts + network (optional)
387 """
388 raise NotImplementedError()
389
390 def add_stateless_service(self, service_type, spec):
391 # type: (str, StatelessServiceSpec) -> WriteCompletion
392 """
393 Installing and adding a completely new service to the cluster.
394
395 This is not about starting services.
396 """
397 raise NotImplementedError()
398
399 def update_stateless_service(self, service_type, spec):
400 # type: (str, StatelessServiceSpec) -> WriteCompletion
401 """
402 This is about changing / redeploying existing services. Like for
403 example changing the number of service instances.
404
405 :rtype: WriteCompletion
406 """
407 raise NotImplementedError()
408
409 def remove_stateless_service(self, service_type, id_):
410 # type: (str, str) -> WriteCompletion
411 """
412 Uninstalls an existing service from the cluster.
413
414 This is not about stopping services.
415 """
416 raise NotImplementedError()
417
418 def upgrade_start(self, upgrade_spec):
419 # type: (UpgradeSpec) -> WriteCompletion
420 raise NotImplementedError()
421
422 def upgrade_status(self):
423 # type: () -> ReadCompletion[UpgradeStatusSpec]
424 """
425 If an upgrade is currently underway, report on where
426 we are in the process, or if some error has occurred.
427
428 :return: UpgradeStatusSpec instance
429 """
430 raise NotImplementedError()
431
432 def upgrade_available(self):
433 # type: () -> ReadCompletion[List[str]]
434 """
435 Report on what versions are available to upgrade to
436
437 :return: List of strings
438 """
439 raise NotImplementedError()
440
441 def add_stateful_service_rule(self, service_type, stateful_service_spec,
442 placement_spec):
443 """
444 Stateful service rules serve two purposes:
445 - Optionally delegate device selection to the orchestrator
446 - Enable the orchestrator to auto-assimilate new hardware if it
447 matches the placement spec, without any further calls from ceph-mgr.
448
449 To create a confidence-inspiring UI workflow, use test_stateful_service_rule
450 beforehand to show the user where stateful services will be placed
451 if they proceed.
452 """
453 raise NotImplementedError()
454
455 def test_stateful_service_rule(self, service_type, stateful_service_spec,
456 placement_spec):
457 """
458 See add_stateful_service_rule.
459 """
460 raise NotImplementedError()
461
462 def remove_stateful_service_rule(self, service_type, id_):
463 """
464 This will remove the *rule* but not the services that were
465 created as a result. Those should be converted into statically
466 placed services as if they had been created with add_stateful_service,
467 so that they can be removed with remove_stateless_service
468 if desired.
469 """
470 raise NotImplementedError()
471
472
473class UpgradeSpec(object):
474 # Request to orchestrator to initiate an upgrade to a particular
475 # version of Ceph
476 def __init__(self):
477 self.target_version = None
478
479
480class UpgradeStatusSpec(object):
481 # Orchestrator's report on what's going on with any ongoing upgrade
482 def __init__(self):
483 self.in_progress = False # Is an upgrade underway?
484 self.services_complete = [] # Which daemon types are fully updated?
485 self.message = "" # Freeform description
486
487
488class PlacementSpec(object):
489 """
490 For APIs that need to specify a node subset
491 """
492 def __init__(self):
493 self.label = None
494
495
496class ServiceDescription(object):
497 """
498 For responding to queries about the status of a particular service,
499 stateful or stateless.
500
501 This is not about health or performance monitoring of services: it's
502 about letting the orchestrator tell Ceph whether and where a
503 service is scheduled in the cluster. When an orchestrator tells
504 Ceph "it's running on node123", that's not a promise that the process
505 is literally up this second, it's a description of where the orchestrator
506 has decided the service should run.
507 """
508 def __init__(self):
509 # Node is at the same granularity as InventoryNode
510 self.nodename = None
511
512 # Not everyone runs in containers, but enough people do to
513 # justify having this field here.
514 self.container_id = None
515
516 # Some services can be deployed in groups. For example, mds's can
517 # have an active and standby daemons, and nfs-ganesha can run daemons
518 # in parallel. This tag refers to a group of daemons as a whole.
519 #
520 # For instance, a cluster of mds' all service the same fs, and they
521 # will all have the same service value (which may be the
522 # Filesystem name in the FSMap).
523 #
524 # Single-instance services should leave this set to None
525 self.service = None
526
527 # The orchestrator will have picked some names for daemons,
528 # typically either based on hostnames or on pod names.
529 # This is the <foo> in mds.<foo>, the ID that will appear
530 # in the FSMap/ServiceMap.
531 self.service_instance = None
532
533 # The type of service (osd, mon, mgr, etc.)
534 self.service_type = None
535
536 # Service version that was deployed
537 self.version = None
538
539 # Location of the service configuration when stored in rados
540 # object. Format: "rados://<pool>/[<namespace/>]<object>"
541 self.rados_config_location = None
542
543 # If the service exposes REST-like API, this attribute should hold
544 # the URL.
545 self.service_url = None
546
547 # Service status: -1 error, 0 stopped, 1 running
548 self.status = None
549
550 # Service status description when status == -1.
551 self.status_desc = None
552
553 def to_json(self):
554 out = {
555 'nodename': self.nodename,
556 'container_id': self.container_id,
557 'service': self.service,
558 'service_instance': self.service_instance,
559 'service_type': self.service_type,
560 'version': self.version,
561 'rados_config_location': self.rados_config_location,
562 'service_url': self.service_url,
563 'status': self.status,
564 'status_desc': self.status_desc,
565 }
566 return {k: v for (k, v) in out.items() if v is not None}
567
568
569class DeviceSelection(object):
570 """
571 Used within :class:`myclass.DriveGroupSpec` to specify the devices
572 used by the Drive Group.
573
574 Any attributes (even none) can be included in the device
575 specification structure.
576 """
577
578 def __init__(self, paths=None, id_model=None, size=None, rotates=None, count=None):
579 # type: (List[str], str, str, bool, int) -> None
580 """
581 ephemeral drive group device specification
582
583 TODO: translate from the user interface (Drive Groups) to an actual list of devices.
584 """
585 if paths is None:
586 paths = []
587
588 #: List of absolute paths to the devices.
589 self.paths = paths # type: List[str]
590
591 #: A wildcard string. e.g: "SDD*"
592 self.id_model = id_model
593
594 #: Size specification of format LOW:HIGH.
595 #: Can also take the the form :HIGH, LOW:
596 #: or an exact value (as ceph-volume inventory reports)
597 self.size = size
598
599 #: is the drive rotating or not
600 self.rotates = rotates
601
602 #: if this is present limit the number of drives to this number.
603 self.count = count
604 self.validate()
605
606 def validate(self):
607 props = [self.id_model, self.size, self.rotates, self.count]
608 if self.paths and any(p is not None for p in props):
609 raise DriveGroupValidationError('DeviceSelection: `paths` and other parameters are mutually exclusive')
610 if not any(p is not None for p in [self.paths] + props):
611 raise DriveGroupValidationError('DeviceSelection cannot be empty')
612
613 @classmethod
614 def from_json(cls, device_spec):
615 return cls(**device_spec)
616
617
618class DriveGroupValidationError(Exception):
619 """
620 Defining an exception here is a bit problematic, cause you cannot properly catch it,
621 if it was raised in a different mgr module.
622 """
623
624 def __init__(self, msg):
625 super(DriveGroupValidationError, self).__init__('Failed to validate Drive Group: ' + msg)
626
627class DriveGroupSpec(object):
628 """
629 Describe a drive group in the same form that ceph-volume
630 understands.
631 """
632 def __init__(self, host_pattern, data_devices=None, db_devices=None, wal_devices=None, journal_devices=None,
633 data_directories=None, osds_per_device=None, objectstore='bluestore', encrypted=False,
634 db_slots=None, wal_slots=None):
635 # type: (str, Optional[DeviceSelection], Optional[DeviceSelection], Optional[DeviceSelection], Optional[DeviceSelection], Optional[List[str]], int, str, bool, int, int) -> ()
636
637 # concept of applying a drive group to a (set) of hosts is tightly
638 # linked to the drive group itself
639 #
640 #: An fnmatch pattern to select hosts. Can also be a single host.
641 self.host_pattern = host_pattern
642
643 #: A :class:`orchestrator.DeviceSelection`
644 self.data_devices = data_devices
645
646 #: A :class:`orchestrator.DeviceSelection`
647 self.db_devices = db_devices
648
649 #: A :class:`orchestrator.DeviceSelection`
650 self.wal_devices = wal_devices
651
652 #: A :class:`orchestrator.DeviceSelection`
653 self.journal_devices = journal_devices
654
655 #: Number of osd daemons per "DATA" device.
656 #: To fully utilize nvme devices multiple osds are required.
657 self.osds_per_device = osds_per_device
658
659 #: A list of strings, containing paths which should back OSDs
660 self.data_directories = data_directories
661
662 #: ``filestore`` or ``bluestore``
663 self.objectstore = objectstore
664
665 #: ``true`` or ``false``
666 self.encrypted = encrypted
667
668 #: How many OSDs per DB device
669 self.db_slots = db_slots
670
671 #: How many OSDs per WAL device
672 self.wal_slots = wal_slots
673
674 # FIXME: needs ceph-volume support
675 #: Optional: mapping of drive to OSD ID, used when the
676 #: created OSDs are meant to replace previous OSDs on
677 #: the same node.
678 self.osd_id_claims = {}
679
680 @classmethod
681 def from_json(self, json_drive_group):
682 """
683 Initialize 'Drive group' structure
684
685 :param json_drive_group: A valid json string with a Drive Group
686 specification
687 """
688 args = {k: (DeviceSelection.from_json(v) if k.endswith('_devices') else v) for k, v in
689 json_drive_group.items()}
690 return DriveGroupSpec(**args)
691
692 def hosts(self, all_hosts):
693 return fnmatch.filter(all_hosts, self.host_pattern)
694
695 def validate(self, all_hosts):
696 if not isinstance(self.host_pattern, six.string_types):
697 raise DriveGroupValidationError('host_pattern must be of type string')
698
699 specs = [self.data_devices, self.db_devices, self.wal_devices, self.journal_devices]
700 for s in filter(None, specs):
701 s.validate()
702 if self.objectstore not in ('filestore', 'bluestore'):
703 raise DriveGroupValidationError("objectstore not in ('filestore', 'bluestore')")
704 if not self.hosts(all_hosts):
705 raise DriveGroupValidationError(
706 "host_pattern '{}' does not match any hosts".format(self.host_pattern))
707
708
709class StatelessServiceSpec(object):
710 # Request to orchestrator for a group of stateless services
711 # such as MDS, RGW, nfs gateway, iscsi gateway
712 """
713 Details of stateless service creation.
714
715 This is *not* supposed to contain all the configuration
716 of the services: it's just supposed to be enough information to
717 execute the binaries.
718 """
719
720 def __init__(self):
721 self.placement = PlacementSpec()
722
723 # Give this set of statelss services a name: typically it would
724 # be the name of a CephFS filesystem, RGW zone, etc. Must be unique
725 # within one ceph cluster.
726 self.name = ""
727
728 # Count of service instances
729 self.count = 1
730
731 # Arbitrary JSON-serializable object.
732 # Maybe you're using e.g. kubenetes and you want to pass through
733 # some replicaset special sauce for autoscaling?
734 self.extended = {}
735
736
737class InventoryFilter(object):
738 """
739 When fetching inventory, use this filter to avoid unnecessarily
740 scanning the whole estate.
741
742 Typical use: filter by node when presenting UI workflow for configuring
743 a particular server.
744 filter by label when not all of estate is Ceph servers,
745 and we want to only learn about the Ceph servers.
746 filter by label when we are interested particularly
747 in e.g. OSD servers.
748
749 """
750 def __init__(self, labels=None, nodes=None):
751 # type: (List[str], List[str]) -> None
752 self.labels = labels # Optional: get info about nodes matching labels
753 self.nodes = nodes # Optional: get info about certain named nodes only
754
755
756class InventoryDevice(object):
757 """
758 When fetching inventory, block devices are reported in this format.
759
760 Note on device identifiers: the format of this is up to the orchestrator,
761 but the same identifier must also work when passed into StatefulServiceSpec.
762 The identifier should be something meaningful like a device WWID or
763 stable device node path -- not something made up by the orchestrator.
764
765 "Extended" is for reporting any special configuration that may have
766 already been done out of band on the block device. For example, if
767 the device has already been configured for encryption, report that
768 here so that it can be indicated to the user. The set of
769 extended properties may differ between orchestrators. An orchestrator
770 is permitted to support no extended properties (only normal block
771 devices)
772 """
773 def __init__(self, blank=False, type=None, id=None, size=None,
774 rotates=False, available=False, dev_id=None, extended=None,
775 metadata_space_free=None):
776 # type: (bool, str, str, int, bool, bool, str, dict, bool) -> None
777
778 self.blank = blank
779
780 #: 'ssd', 'hdd', 'nvme'
781 self.type = type
782
783 #: unique within a node (or globally if you like).
784 self.id = id
785
786 #: byte integer.
787 self.size = size
788
789 #: indicates if it is a spinning disk
790 self.rotates = rotates
791
792 #: can be used to create a new OSD?
793 self.available = available
794
795 #: vendor/model
796 self.dev_id = dev_id
797
798 #: arbitrary JSON-serializable object
799 self.extended = extended if extended is not None else extended
800
801 # If this drive is not empty, but is suitable for appending
802 # additional journals, wals, or bluestore dbs, then report
803 # how much space is available.
804 self.metadata_space_free = metadata_space_free
805
806 def to_json(self):
807 return dict(type=self.type, blank=self.blank, id=self.id,
808 size=self.size, rotates=self.rotates,
809 available=self.available, dev_id=self.dev_id,
810 extended=self.extended)
811
812 @classmethod
813 def from_ceph_volume_inventory(cls, data):
814 # TODO: change InventoryDevice itself to mirror c-v inventory closely!
815
816 dev = InventoryDevice()
817 dev.id = data["path"]
818 dev.type = 'hdd' if data["sys_api"]["rotational"] == "1" else 'sdd/nvme'
819 dev.size = data["sys_api"]["size"]
820 dev.rotates = data["sys_api"]["rotational"] == "1"
821 dev.available = data["available"]
822 dev.dev_id = "%s/%s" % (data["sys_api"]["vendor"],
823 data["sys_api"]["model"])
824 dev.extended = data
825 return dev
826
827 def pretty_print(self, only_header=False):
828 """Print a human friendly line with the information of the device
829
830 :param only_header: Print only the name of the device attributes
831
832 Ex::
833
834 Device Path Type Size Rotates Available Model
835 /dev/sdc hdd 50.00 GB True True ATA/QEMU
836
837 """
838 row_format = " {0:<15} {1:>10} {2:>10} {3:>10} {4:>10} {5:<15}\n"
839 if only_header:
840 return row_format.format("Device Path", "Type", "Size", "Rotates",
841 "Available", "Model")
842 else:
843 return row_format.format(str(self.id), self.type if self.type is not None else "",
844 format_bytes(self.size if self.size is not None else 0, 5,
845 colored=False),
846 str(self.rotates), str(self.available),
847 self.dev_id if self.dev_id is not None else "")
848
849
850class InventoryNode(object):
851 """
852 When fetching inventory, all Devices are groups inside of an
853 InventoryNode.
854 """
855 def __init__(self, name, devices):
856 # type: (str, List[InventoryDevice]) -> None
857 assert isinstance(devices, list)
858 self.name = name # unique within cluster. For example a hostname.
859 self.devices = devices
860
861 def to_json(self):
862 return {'name': self.name, 'devices': [d.to_json() for d in self.devices]}
863
864
865def _mk_orch_methods(cls):
866 # Needs to be defined outside of for.
867 # Otherwise meth is always bound to last key
868 def shim(method_name):
869 def inner(self, *args, **kwargs):
870 return self._oremote(method_name, args, kwargs)
871 return inner
872
873 for meth in Orchestrator.__dict__:
874 if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
875 setattr(cls, meth, shim(meth))
876 return cls
877
878
879@_mk_orch_methods
880class OrchestratorClientMixin(Orchestrator):
881 """
882 A module that inherents from `OrchestratorClientMixin` can directly call
883 all :class:`Orchestrator` methods without manually calling remote.
884
885 Every interface method from ``Orchestrator`` is converted into a stub method that internally
886 calls :func:`OrchestratorClientMixin._oremote`
887
888 >>> class MyModule(OrchestratorClientMixin):
889 ... def func(self):
890 ... completion = self.add_host('somehost') # calls `_oremote()`
891 ... self._orchestrator_wait([completion])
892 ... self.log.debug(completion.result)
893
894 """
895 def _oremote(self, meth, args, kwargs):
896 """
897 Helper for invoking `remote` on whichever orchestrator is enabled
898
899 :raises RuntimeError: If the remote method failed.
900 :raises NoOrchestrator:
901 :raises ImportError: no `orchestrator_cli` module or backend not found.
902 """
903 try:
904 o = self._select_orchestrator()
905 except AttributeError:
906 o = self.remote('orchestrator_cli', '_select_orchestrator')
907
908 if o is None:
909 raise NoOrchestrator()
910
911 self.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(self.module_name, o, meth, args, kwargs))
912 return self.remote(o, meth, *args, **kwargs)
913
914 def _orchestrator_wait(self, completions):
915 # type: (List[_Completion]) -> None
916 """
917 Wait for completions to complete (reads) or
918 become persistent (writes).
919
920 Waits for writes to be *persistent* but not *effective*.
921
922 :param completions: List of Completions
923 :raises NoOrchestrator:
924 :raises ImportError: no `orchestrator_cli` module or backend not found.
925 """
926 while not self.wait(completions):
927 if any(c.should_wait for c in completions):
928 time.sleep(5)
929 else:
930 break