3 ceph-mgr orchestrator interface
5 Please see the ceph-mgr module developer's guide for more information.
15 from mgr_module
import MgrModule
, PersistentStoreDict
16 from mgr_util
import format_bytes
19 from typing
import TypeVar
, Generic
, List
, Optional
, Union
, Tuple
, Iterator
27 class OrchestratorError(Exception):
29 General orchestrator specific error.
31 Used for deployment, configuration or user errors.
33 It's not intended for programming errors or orchestrator internal errors.
37 class NoOrchestrator(OrchestratorError
):
39 No orchestrator in configured.
41 def __init__(self
, msg
="No orchestrator configured (try `ceph orchestrator set backend`)"):
42 super(NoOrchestrator
, self
).__init
__(msg
)
45 class OrchestratorValidationError(OrchestratorError
):
47 Raised when an orchestrator doesn't support a specific feature.
56 Return the result of the operation that we were waited
57 for. Only valid after calling Orchestrator.wait() on this
60 raise NotImplementedError()
64 # type: () -> Optional[Exception]
66 Holds an exception object.
69 return self
.__exception
70 except AttributeError:
74 def exception(self
, value
):
75 self
.__exception
= value
80 raise NotImplementedError()
83 def is_complete(self
):
85 raise NotImplementedError()
91 Has the completion failed. Default implementation looks for
92 self.exception. Can be overwritten.
94 return self
.exception
is not None
97 def should_wait(self
):
99 raise NotImplementedError()
102 def raise_if_exception(c
):
103 # type: (_Completion) -> None
105 :raises OrchestratorError: Some user error or a config error.
106 :raises Exception: Some internal error
108 def copy_to_this_subinterpreter(r_obj
):
109 # This is something like `return pickle.loads(pickle.dumps(r_obj))`
110 # Without importing anything.
111 r_cls
= r_obj
.__class
__
112 if r_cls
.__module
__ == '__builtin__':
114 my_cls
= getattr(sys
.modules
[r_cls
.__module
__], r_cls
.__name
__)
115 if id(my_cls
) == id(r_cls
):
117 my_obj
= my_cls
.__new
__(my_cls
)
118 for k
,v
in r_obj
.__dict
__.items():
119 setattr(my_obj
, k
, copy_to_this_subinterpreter(v
))
122 if c
.exception
is not None:
123 raise copy_to_this_subinterpreter(c
.exception
)
126 class ReadCompletion(_Completion
):
128 ``Orchestrator`` implementations should inherit from this
129 class to implement their own handles to operations in progress, and
130 return an instance of their subclass from calls into methods.
141 def should_wait(self
):
142 """Could the external operation be deemed as complete,
144 We must wait for a read operation only if it is not complete.
146 return not self
.is_complete
149 class TrivialReadCompletion(ReadCompletion
):
151 This is the trivial completion simply wrapping a result.
153 def __init__(self
, result
):
154 super(TrivialReadCompletion
, self
).__init
__()
155 self
._result
= result
162 def is_complete(self
):
166 class WriteCompletion(_Completion
):
168 ``Orchestrator`` implementations should inherit from this
169 class to implement their own handles to operations in progress, and
170 return an instance of their subclass from calls into methods.
174 self
.progress_id
= str(uuid
.uuid4())
176 #: if a orchestrator module can provide a more detailed
177 #: progress information, it needs to also call ``progress.update()``.
182 ``__str__()`` is used for determining the message for progress events.
184 return super(WriteCompletion
, self
).__str
__()
187 def is_persistent(self
):
190 Has the operation updated the orchestrator's configuration
191 persistently? Typically this would indicate that an update
192 had been written to a manifest, but that the update
193 had not necessarily been pushed out to the cluster.
195 raise NotImplementedError()
198 def is_effective(self
):
200 Has the operation taken effect on the cluster? For example,
201 if we were adding a service, has it come up and appeared
202 in Ceph's cluster maps?
204 raise NotImplementedError()
207 def is_complete(self
):
208 return self
.is_errored
or (self
.is_persistent
and self
.is_effective
)
215 def should_wait(self
):
216 """Could the external operation be deemed as complete,
218 We must wait for a write operation only if we know
219 it is not persistent yet.
221 return not self
.is_persistent
224 class Orchestrator(object):
226 Calls in this class may do long running remote operations, with time
227 periods ranging from network latencies to package install latencies and large
228 internet downloads. For that reason, all are asynchronous, and return
229 ``Completion`` objects.
231 Implementations are not required to start work on an operation until
232 the caller waits on the relevant Completion objects. Callers making
233 multiple updates should not wait on Completions until they're done
234 sending operations: this enables implementations to batch up a series
235 of updates when wait() is called on a set of Completion objects.
237 Implementations are encouraged to keep reasonably fresh caches of
238 the status of the system: it is better to serve a stale-but-recent
239 result read of e.g. device inventory than it is to keep the caller waiting
240 while you scan hosts every time.
243 def is_orchestrator_module(self
):
245 Enable other modules to interrogate this module to discover
246 whether it's usable as an orchestrator module.
248 Subclasses do not need to override this.
253 # type: () -> Tuple[bool, str]
255 Report whether we can talk to the orchestrator. This is the
256 place to give the user a meaningful message if the orchestrator
257 isn't running or can't be contacted.
259 This method may be called frequently (e.g. every page load
260 to conditionally display a warning banner), so make sure it's
261 not too expensive. It's okay to give a slightly stale status
262 (e.g. based on a periodic background ping of the orchestrator)
263 if that's necessary to make this method fast.
265 ..note:: `True` doesn't mean that the desired functionality
266 is actually available in the orchestrator. I.e. this
267 won't work as expected::
269 >>> if OrchestratorClientMixin().available()[0]: # wrong.
270 ... OrchestratorClientMixin().get_hosts()
272 :return: two-tuple of boolean, string
274 raise NotImplementedError()
276 def wait(self
, completions
):
278 Given a list of Completion instances, progress any which are
279 incomplete. Return a true if everything is done.
281 Callers should inspect the detail of each completion to identify
282 partial completion/progress information, and present that information
285 For fast operations (e.g. reading from a database), implementations
286 may choose to do blocking IO in this call.
290 raise NotImplementedError()
292 def add_host(self
, host
):
293 # type: (str) -> WriteCompletion
295 Add a host to the orchestrator inventory.
297 :param host: hostname
299 raise NotImplementedError()
301 def remove_host(self
, host
):
302 # type: (str) -> WriteCompletion
304 Remove a host from the orchestrator inventory.
306 :param host: hostname
308 raise NotImplementedError()
311 # type: () -> ReadCompletion[List[InventoryNode]]
313 Report the hosts in the cluster.
315 The default implementation is extra slow.
317 :return: list of InventoryNodes
319 return self
.get_inventory()
321 def get_inventory(self
, node_filter
=None, refresh
=False):
322 # type: (InventoryFilter, bool) -> ReadCompletion[List[InventoryNode]]
324 Returns something that was created by `ceph-volume inventory`.
326 :return: list of InventoryNode
328 raise NotImplementedError()
330 def describe_service(self
, service_type
=None, service_id
=None, node_name
=None, refresh
=False):
331 # type: (Optional[str], Optional[str], Optional[str], bool) -> ReadCompletion[List[ServiceDescription]]
333 Describe a service (of any kind) that is already configured in
334 the orchestrator. For example, when viewing an OSD in the dashboard
335 we might like to also display information about the orchestrator's
336 view of the service (like the kubernetes pod ID).
338 When viewing a CephFS filesystem in the dashboard, we would use this
339 to display the pods being currently run for MDS daemons.
341 :return: list of ServiceDescription objects.
343 raise NotImplementedError()
345 def service_action(self
, action
, service_type
, service_name
=None, service_id
=None):
346 # type: (str, str, str, str) -> WriteCompletion
348 Perform an action (start/stop/reload) on a service.
350 Either service_name or service_id must be specified:
352 * If using service_name, perform the action on that entire logical
353 service (i.e. all daemons providing that named service).
354 * If using service_id, perform the action on a single specific daemon
357 :param action: one of "start", "stop", "reload"
358 :param service_type: e.g. "mds", "rgw", ...
359 :param service_name: name of logical service ("cephfs", "us-east", ...)
360 :param service_id: service daemon instance (usually a short hostname)
361 :rtype: WriteCompletion
363 assert action
in ["start", "stop", "reload"]
364 assert service_name
or service_id
365 assert not (service_name
and service_id
)
366 raise NotImplementedError()
368 def create_osds(self
, drive_group
, all_hosts
):
369 # type: (DriveGroupSpec, List[str]) -> WriteCompletion
371 Create one or more OSDs within a single Drive Group.
373 The principal argument here is the drive_group member
374 of OsdSpec: other fields are advisory/extensible for any
375 finer-grained OSD feature enablement (choice of backing store,
376 compression/encryption, etc).
378 :param drive_group: DriveGroupSpec
379 :param all_hosts: TODO, this is required because the orchestrator methods are not composable
380 Probably this parameter can be easily removed because each orchestrator can use
381 the "get_inventory" method and the "drive_group.host_pattern" attribute
382 to obtain the list of hosts where to apply the operation
384 raise NotImplementedError()
386 def replace_osds(self
, drive_group
):
387 # type: (DriveGroupSpec) -> WriteCompletion
389 Like create_osds, but the osd_id_claims must be fully
392 raise NotImplementedError()
394 def remove_osds(self
, osd_ids
):
395 # type: (List[str]) -> WriteCompletion
397 :param osd_ids: list of OSD IDs
399 Note that this can only remove OSDs that were successfully
400 created (i.e. got an OSD ID).
402 raise NotImplementedError()
404 def update_mgrs(self
, num
, hosts
):
405 # type: (int, List[str]) -> WriteCompletion
407 Update the number of cluster managers.
409 :param num: requested number of managers.
410 :param hosts: list of hosts (optional)
412 raise NotImplementedError()
414 def update_mons(self
, num
, hosts
):
415 # type: (int, List[Tuple[str,str]]) -> WriteCompletion
417 Update the number of cluster monitors.
419 :param num: requested number of monitors.
420 :param hosts: list of hosts + network (optional)
422 raise NotImplementedError()
424 def add_stateless_service(self
, service_type
, spec
):
425 # type: (str, StatelessServiceSpec) -> WriteCompletion
427 Installing and adding a completely new service to the cluster.
429 This is not about starting services.
431 raise NotImplementedError()
433 def update_stateless_service(self
, service_type
, spec
):
434 # type: (str, StatelessServiceSpec) -> WriteCompletion
436 This is about changing / redeploying existing services. Like for
437 example changing the number of service instances.
439 :rtype: WriteCompletion
441 raise NotImplementedError()
443 def remove_stateless_service(self
, service_type
, id_
):
444 # type: (str, str) -> WriteCompletion
446 Uninstalls an existing service from the cluster.
448 This is not about stopping services.
450 raise NotImplementedError()
452 def upgrade_start(self
, upgrade_spec
):
453 # type: (UpgradeSpec) -> WriteCompletion
454 raise NotImplementedError()
456 def upgrade_status(self
):
457 # type: () -> ReadCompletion[UpgradeStatusSpec]
459 If an upgrade is currently underway, report on where
460 we are in the process, or if some error has occurred.
462 :return: UpgradeStatusSpec instance
464 raise NotImplementedError()
466 def upgrade_available(self
):
467 # type: () -> ReadCompletion[List[str]]
469 Report on what versions are available to upgrade to
471 :return: List of strings
473 raise NotImplementedError()
476 class UpgradeSpec(object):
477 # Request to orchestrator to initiate an upgrade to a particular
480 self
.target_version
= None
483 class UpgradeStatusSpec(object):
484 # Orchestrator's report on what's going on with any ongoing upgrade
486 self
.in_progress
= False # Is an upgrade underway?
487 self
.services_complete
= [] # Which daemon types are fully updated?
488 self
.message
= "" # Freeform description
491 class PlacementSpec(object):
493 For APIs that need to specify a node subset
499 class ServiceDescription(object):
501 For responding to queries about the status of a particular service,
502 stateful or stateless.
504 This is not about health or performance monitoring of services: it's
505 about letting the orchestrator tell Ceph whether and where a
506 service is scheduled in the cluster. When an orchestrator tells
507 Ceph "it's running on node123", that's not a promise that the process
508 is literally up this second, it's a description of where the orchestrator
509 has decided the service should run.
512 def __init__(self
, nodename
=None, container_id
=None, service
=None, service_instance
=None,
513 service_type
=None, version
=None, rados_config_location
=None,
514 service_url
=None, status
=None, status_desc
=None):
515 # Node is at the same granularity as InventoryNode
516 self
.nodename
= nodename
518 # Not everyone runs in containers, but enough people do to
519 # justify having this field here.
520 self
.container_id
= container_id
522 # Some services can be deployed in groups. For example, mds's can
523 # have an active and standby daemons, and nfs-ganesha can run daemons
524 # in parallel. This tag refers to a group of daemons as a whole.
526 # For instance, a cluster of mds' all service the same fs, and they
527 # will all have the same service value (which may be the
528 # Filesystem name in the FSMap).
530 # Single-instance services should leave this set to None
531 self
.service
= service
533 # The orchestrator will have picked some names for daemons,
534 # typically either based on hostnames or on pod names.
535 # This is the <foo> in mds.<foo>, the ID that will appear
536 # in the FSMap/ServiceMap.
537 self
.service_instance
= service_instance
539 # The type of service (osd, mon, mgr, etc.)
540 self
.service_type
= service_type
542 # Service version that was deployed
543 self
.version
= version
545 # Location of the service configuration when stored in rados
546 # object. Format: "rados://<pool>/[<namespace/>]<object>"
547 self
.rados_config_location
= rados_config_location
549 # If the service exposes REST-like API, this attribute should hold
551 self
.service_url
= service_url
553 # Service status: -1 error, 0 stopped, 1 running
556 # Service status description when status == -1.
557 self
.status_desc
= status_desc
561 'nodename': self
.nodename
,
562 'container_id': self
.container_id
,
563 'service': self
.service
,
564 'service_instance': self
.service_instance
,
565 'service_type': self
.service_type
,
566 'version': self
.version
,
567 'rados_config_location': self
.rados_config_location
,
568 'service_url': self
.service_url
,
569 'status': self
.status
,
570 'status_desc': self
.status_desc
,
572 return {k
: v
for (k
, v
) in out
.items() if v
is not None}
575 def from_json(cls
, data
):
579 class DeviceSelection(object):
581 Used within :class:`myclass.DriveGroupSpec` to specify the devices
582 used by the Drive Group.
584 Any attributes (even none) can be included in the device
585 specification structure.
588 def __init__(self
, paths
=None, id_model
=None, size
=None, rotates
=None, count
=None):
589 # type: (List[str], str, str, bool, int) -> None
591 ephemeral drive group device specification
593 TODO: translate from the user interface (Drive Groups) to an actual list of devices.
598 #: List of absolute paths to the devices.
599 self
.paths
= paths
# type: List[str]
601 #: A wildcard string. e.g: "SDD*"
602 self
.id_model
= id_model
604 #: Size specification of format LOW:HIGH.
605 #: Can also take the the form :HIGH, LOW:
606 #: or an exact value (as ceph-volume inventory reports)
609 #: is the drive rotating or not
610 self
.rotates
= rotates
612 #: if this is present limit the number of drives to this number.
617 props
= [self
.id_model
, self
.size
, self
.rotates
, self
.count
]
618 if self
.paths
and any(p
is not None for p
in props
):
619 raise DriveGroupValidationError('DeviceSelection: `paths` and other parameters are mutually exclusive')
620 if not any(p
is not None for p
in [self
.paths
] + props
):
621 raise DriveGroupValidationError('DeviceSelection cannot be empty')
624 def from_json(cls
, device_spec
):
625 return cls(**device_spec
)
628 class DriveGroupValidationError(Exception):
630 Defining an exception here is a bit problematic, cause you cannot properly catch it,
631 if it was raised in a different mgr module.
634 def __init__(self
, msg
):
635 super(DriveGroupValidationError
, self
).__init
__('Failed to validate Drive Group: ' + msg
)
637 class DriveGroupSpec(object):
639 Describe a drive group in the same form that ceph-volume
642 def __init__(self
, host_pattern
, data_devices
=None, db_devices
=None, wal_devices
=None, journal_devices
=None,
643 data_directories
=None, osds_per_device
=None, objectstore
='bluestore', encrypted
=False,
644 db_slots
=None, wal_slots
=None):
645 # type: (str, Optional[DeviceSelection], Optional[DeviceSelection], Optional[DeviceSelection], Optional[DeviceSelection], Optional[List[str]], int, str, bool, int, int) -> None
647 # concept of applying a drive group to a (set) of hosts is tightly
648 # linked to the drive group itself
650 #: An fnmatch pattern to select hosts. Can also be a single host.
651 self
.host_pattern
= host_pattern
653 #: A :class:`orchestrator.DeviceSelection`
654 self
.data_devices
= data_devices
656 #: A :class:`orchestrator.DeviceSelection`
657 self
.db_devices
= db_devices
659 #: A :class:`orchestrator.DeviceSelection`
660 self
.wal_devices
= wal_devices
662 #: A :class:`orchestrator.DeviceSelection`
663 self
.journal_devices
= journal_devices
665 #: Number of osd daemons per "DATA" device.
666 #: To fully utilize nvme devices multiple osds are required.
667 self
.osds_per_device
= osds_per_device
669 #: A list of strings, containing paths which should back OSDs
670 self
.data_directories
= data_directories
672 #: ``filestore`` or ``bluestore``
673 self
.objectstore
= objectstore
675 #: ``true`` or ``false``
676 self
.encrypted
= encrypted
678 #: How many OSDs per DB device
679 self
.db_slots
= db_slots
681 #: How many OSDs per WAL device
682 self
.wal_slots
= wal_slots
684 # FIXME: needs ceph-volume support
685 #: Optional: mapping of drive to OSD ID, used when the
686 #: created OSDs are meant to replace previous OSDs on
688 self
.osd_id_claims
= {}
691 def from_json(self
, json_drive_group
):
693 Initialize 'Drive group' structure
695 :param json_drive_group: A valid json string with a Drive Group
698 args
= {k
: (DeviceSelection
.from_json(v
) if k
.endswith('_devices') else v
) for k
, v
in
699 json_drive_group
.items()}
700 return DriveGroupSpec(**args
)
702 def hosts(self
, all_hosts
):
703 return fnmatch
.filter(all_hosts
, self
.host_pattern
)
705 def validate(self
, all_hosts
):
706 if not isinstance(self
.host_pattern
, six
.string_types
):
707 raise DriveGroupValidationError('host_pattern must be of type string')
709 specs
= [self
.data_devices
, self
.db_devices
, self
.wal_devices
, self
.journal_devices
]
710 for s
in filter(None, specs
):
712 if self
.objectstore
not in ('filestore', 'bluestore'):
713 raise DriveGroupValidationError("objectstore not in ('filestore', 'bluestore')")
714 if not self
.hosts(all_hosts
):
715 raise DriveGroupValidationError(
716 "host_pattern '{}' does not match any hosts".format(self
.host_pattern
))
719 class StatelessServiceSpec(object):
720 # Request to orchestrator for a group of stateless services
721 # such as MDS, RGW, nfs gateway, iscsi gateway
723 Details of stateless service creation.
725 This is *not* supposed to contain all the configuration
726 of the services: it's just supposed to be enough information to
727 execute the binaries.
731 self
.placement
= PlacementSpec()
733 # Give this set of statelss services a name: typically it would
734 # be the name of a CephFS filesystem, RGW zone, etc. Must be unique
735 # within one ceph cluster.
738 # Count of service instances
741 # Arbitrary JSON-serializable object.
742 # Maybe you're using e.g. kubenetes and you want to pass through
743 # some replicaset special sauce for autoscaling?
747 class InventoryFilter(object):
749 When fetching inventory, use this filter to avoid unnecessarily
750 scanning the whole estate.
752 Typical use: filter by node when presenting UI workflow for configuring
754 filter by label when not all of estate is Ceph servers,
755 and we want to only learn about the Ceph servers.
756 filter by label when we are interested particularly
760 def __init__(self
, labels
=None, nodes
=None):
761 # type: (List[str], List[str]) -> None
762 self
.labels
= labels
# Optional: get info about nodes matching labels
763 self
.nodes
= nodes
# Optional: get info about certain named nodes only
766 class InventoryDevice(object):
768 When fetching inventory, block devices are reported in this format.
770 Note on device identifiers: the format of this is up to the orchestrator,
771 but the same identifier must also work when passed into StatefulServiceSpec.
772 The identifier should be something meaningful like a device WWID or
773 stable device node path -- not something made up by the orchestrator.
775 "Extended" is for reporting any special configuration that may have
776 already been done out of band on the block device. For example, if
777 the device has already been configured for encryption, report that
778 here so that it can be indicated to the user. The set of
779 extended properties may differ between orchestrators. An orchestrator
780 is permitted to support no extended properties (only normal block
783 def __init__(self
, blank
=False, type=None, id=None, size
=None,
784 rotates
=False, available
=False, dev_id
=None, extended
=None,
785 metadata_space_free
=None):
786 # type: (bool, str, str, int, bool, bool, str, dict, bool) -> None
790 #: 'ssd', 'hdd', 'nvme'
793 #: unique within a node (or globally if you like).
799 #: indicates if it is a spinning disk
800 self
.rotates
= rotates
802 #: can be used to create a new OSD?
803 self
.available
= available
808 #: arbitrary JSON-serializable object
809 self
.extended
= extended
if extended
is not None else extended
811 # If this drive is not empty, but is suitable for appending
812 # additional journals, wals, or bluestore dbs, then report
813 # how much space is available.
814 self
.metadata_space_free
= metadata_space_free
817 return dict(type=self
.type, blank
=self
.blank
, id=self
.id,
818 size
=self
.size
, rotates
=self
.rotates
,
819 available
=self
.available
, dev_id
=self
.dev_id
,
820 extended
=self
.extended
)
823 def from_ceph_volume_inventory(cls
, data
):
824 # TODO: change InventoryDevice itself to mirror c-v inventory closely!
826 dev
= InventoryDevice()
827 dev
.id = data
["path"]
828 dev
.type = 'hdd' if data
["sys_api"]["rotational"] == "1" else 'sdd/nvme'
829 dev
.size
= data
["sys_api"]["size"]
830 dev
.rotates
= data
["sys_api"]["rotational"] == "1"
831 dev
.available
= data
["available"]
832 dev
.dev_id
= "%s/%s" % (data
["sys_api"]["vendor"],
833 data
["sys_api"]["model"])
838 def from_ceph_volume_inventory_list(cls
, datas
):
839 return [cls
.from_ceph_volume_inventory(d
) for d
in datas
]
841 def pretty_print(self
, only_header
=False):
842 """Print a human friendly line with the information of the device
844 :param only_header: Print only the name of the device attributes
848 Device Path Type Size Rotates Available Model
849 /dev/sdc hdd 50.00 GB True True ATA/QEMU
852 row_format
= " {0:<15} {1:>10} {2:>10} {3:>10} {4:>10} {5:<15}\n"
854 return row_format
.format("Device Path", "Type", "Size", "Rotates",
855 "Available", "Model")
857 return row_format
.format(str(self
.id), self
.type if self
.type is not None else "",
858 format_bytes(self
.size
if self
.size
is not None else 0, 5,
860 str(self
.rotates
), str(self
.available
),
861 self
.dev_id
if self
.dev_id
is not None else "")
864 class InventoryNode(object):
866 When fetching inventory, all Devices are groups inside of an
869 def __init__(self
, name
, devices
):
870 # type: (str, List[InventoryDevice]) -> None
871 assert isinstance(devices
, list)
872 self
.name
= name
# unique within cluster. For example a hostname.
873 self
.devices
= devices
876 return {'name': self
.name
, 'devices': [d
.to_json() for d
in self
.devices
]}
879 def from_nested_items(cls
, hosts
):
880 devs
= InventoryDevice
.from_ceph_volume_inventory_list
881 return [cls(item
[0], devs(item
[1].data
)) for item
in hosts
]
884 def _mk_orch_methods(cls
):
885 # Needs to be defined outside of for.
886 # Otherwise meth is always bound to last key
887 def shim(method_name
):
888 def inner(self
, *args
, **kwargs
):
889 completion
= self
._oremote
(method_name
, args
, kwargs
)
890 self
._update
_completion
_progress
(completion
, 0)
894 for meth
in Orchestrator
.__dict
__:
895 if not meth
.startswith('_') and meth
not in ['is_orchestrator_module']:
896 setattr(cls
, meth
, shim(meth
))
901 class OrchestratorClientMixin(Orchestrator
):
903 A module that inherents from `OrchestratorClientMixin` can directly call
904 all :class:`Orchestrator` methods without manually calling remote.
906 Every interface method from ``Orchestrator`` is converted into a stub method that internally
907 calls :func:`OrchestratorClientMixin._oremote`
909 >>> class MyModule(OrchestratorClientMixin):
911 ... completion = self.add_host('somehost') # calls `_oremote()`
912 ... self._orchestrator_wait([completion])
913 ... self.log.debug(completion.result)
917 def set_mgr(self
, mgr
):
918 # type: (MgrModule) -> None
920 Useable in the Dashbord that uses a global ``mgr``
923 self
.__mgr
= mgr
# Make sure we're not overwriting any other `mgr` properties
925 def _oremote(self
, meth
, args
, kwargs
):
927 Helper for invoking `remote` on whichever orchestrator is enabled
929 :raises RuntimeError: If the remote method failed.
930 :raises OrchestratorError: orchestrator failed to perform
931 :raises ImportError: no `orchestrator_cli` module or backend not found.
935 except AttributeError:
938 o
= mgr
._select
_orchestrator
()
939 except AttributeError:
940 o
= mgr
.remote('orchestrator_cli', '_select_orchestrator')
943 raise NoOrchestrator()
945 mgr
.log
.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr
.module_name
, o
, meth
, args
, kwargs
))
946 return mgr
.remote(o
, meth
, *args
, **kwargs
)
948 def _update_completion_progress(self
, completion
, force_progress
=None):
949 # type: (WriteCompletion, Optional[float]) -> None
951 progress
= force_progress
if force_progress
is not None else completion
.progress
952 if completion
.is_complete
:
953 self
.remote("progress", "complete", completion
.progress_id
)
955 self
.remote("progress", "update", completion
.progress_id
, str(completion
), progress
,
957 except AttributeError:
958 # No WriteCompletion. Ignore.
961 # If the progress module is disabled that's fine,
962 # they just won't see the output.
965 def _orchestrator_wait(self
, completions
):
966 # type: (List[_Completion]) -> None
968 Wait for completions to complete (reads) or
969 become persistent (writes).
971 Waits for writes to be *persistent* but not *effective*.
973 :param completions: List of Completions
974 :raises NoOrchestrator:
975 :raises ImportError: no `orchestrator_cli` module or backend not found.
977 for c
in completions
:
978 self
._update
_completion
_progress
(c
)
979 while not self
.wait(completions
):
980 if any(c
.should_wait
for c
in completions
):
984 for c
in completions
:
985 self
._update
_completion
_progress
(c
)
988 class OutdatableData(object):
989 DATEFMT
= '%Y-%m-%d %H:%M:%S.%f'
991 def __init__(self
, data
=None, last_refresh
=None):
992 # type: (Optional[dict], Optional[datetime.datetime]) -> None
994 if data
is not None and last_refresh
is None:
995 self
.last_refresh
= datetime
.datetime
.utcnow()
997 self
.last_refresh
= last_refresh
1000 if self
.last_refresh
is not None:
1001 timestr
= self
.last_refresh
.strftime(self
.DATEFMT
)
1007 "last_refresh": timestr
,
1015 # No setter, as it doesn't work as expected: It's not saved in store automatically
1018 def time_from_string(cls
, timestr
):
1021 # drop the 'Z' timezone indication, it's always UTC
1022 timestr
= timestr
.rstrip('Z')
1023 return datetime
.datetime
.strptime(timestr
, cls
.DATEFMT
)
1027 def from_json(cls
, data
):
1028 return cls(data
['data'], cls
.time_from_string(data
['last_refresh']))
1030 def outdated(self
, timeout_min
=None):
1031 if timeout_min
is None:
1033 if self
.last_refresh
is None:
1035 cutoff
= datetime
.datetime
.utcnow() - datetime
.timedelta(
1036 minutes
=timeout_min
)
1037 return self
.last_refresh
< cutoff
1040 return 'OutdatableData(data={}, last_refresh={})'.format(self
._data
, self
.last_refresh
)
1043 class OutdatableDictMixin(object):
1045 Toolbox for implementing a cache. As every orchestrator has
1046 different needs, we cannot implement any logic here.
1049 def __getitem__(self
, item
):
1050 # type: (str) -> OutdatableData
1051 return OutdatableData
.from_json(super(OutdatableDictMixin
, self
).__getitem
__(item
))
1053 def __setitem__(self
, key
, value
):
1054 # type: (str, OutdatableData) -> None
1055 val
= None if value
is None else value
.json()
1056 super(OutdatableDictMixin
, self
).__setitem
__(key
, val
)
1059 # type: () -> Iterator[Tuple[str, OutdatableData]]
1060 for item
in super(OutdatableDictMixin
, self
).items():
1062 yield k
, OutdatableData
.from_json(v
)
1064 def items_filtered(self
, keys
=None):
1066 return [(host
, self
[host
]) for host
in keys
]
1068 return list(self
.items())
1070 def any_outdated(self
, timeout
=None):
1071 items
= self
.items()
1074 return any([i
[1].outdated(timeout
) for i
in items
])
1076 def remove_outdated(self
):
1077 outdated
= [item
[0] for item
in self
.items() if item
[1].outdated()]
1081 class OutdatablePersistentDict(OutdatableDictMixin
, PersistentStoreDict
):
1084 class OutdatableDict(OutdatableDictMixin
, dict):