from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
- IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec
+ IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec, TunedProfileSpec
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.hostspec import HostSpec, SpecValidationError
from ceph.utils import datetime_to_str, str_to_datetime
"""
raise NotImplementedError()
+ def rescan_host(self, hostname: str) -> OrchResult:
+ """Use cephadm to issue a disk rescan on each HBA
+
+ Some HBAs and external enclosures don't automatically register
+ device insertion with the kernel, so for these scenarios we need
+ to manually rescan
+
+ :param hostname: (str) host name
+ """
+ raise NotImplementedError()
+
def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
"""
Returns something that was created by `ceph-volume inventory`.
"""Update an existing snmp gateway service"""
raise NotImplementedError()
+ def apply_tuned_profiles(self, specs: List[TunedProfileSpec], no_overwrite: bool) -> OrchResult[str]:
+ """Add or update an existing tuned profile"""
+ raise NotImplementedError()
+
+ def rm_tuned_profile(self, profile_name: str) -> OrchResult[str]:
+ """Remove a tuned profile"""
+ raise NotImplementedError()
+
+ def tuned_profile_ls(self) -> OrchResult[List[TunedProfileSpec]]:
+ """See current tuned profiles"""
+ raise NotImplementedError()
+
+ def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> OrchResult[str]:
+ """Change/Add a specific setting for a tuned profile"""
+ raise NotImplementedError()
+
+ def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> OrchResult[str]:
+ """Remove a specific setting for a tuned profile"""
+ raise NotImplementedError()
+
def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
raise NotImplementedError()
self.which: str = '<unknown>' # for if user specified daemon types, services or hosts
self.progress: Optional[str] = None # How many of the daemons have we upgraded
self.message = "" # Freeform description
+ self.is_paused: bool = False # Is the upgrade paused?
def handle_type_error(method: FuncT) -> FuncT: