from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import \
ServiceSpec, PlacementSpec, \
- HostPlacementSpec, IngressSpec
+ HostPlacementSpec, IngressSpec, \
+ TunedProfileSpec
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
NodeExporterService, SNMPGatewayService, LokiService, PromtailService
from .schedule import HostAssignment
from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \
- ClientKeyringStore, ClientKeyringSpec
+ ClientKeyringStore, ClientKeyringSpec, TunedProfileStore
from .upgrade import CephadmUpgrade
from .template import TemplateMgr
from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
cephadmNoImage, CEPH_UPGRADE_ORDER
from .configchecks import CephadmConfigChecks
from .offline_watcher import OfflineHostWatcher
+from .tuned_profiles import TunedProfileUtils
try:
import asyncssh
DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0'
DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.23.0'
DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:8.3.5'
-DEFAULT_HAPROXY_IMAGE = 'docker.io/library/haproxy:2.3'
-DEFAULT_KEEPALIVED_IMAGE = 'docker.io/arcts/keepalived'
+DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3'
+DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.1.5'
DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1'
# ------------------------------------------------------------------------------
self.keys = ClientKeyringStore(self)
self.keys.load()
+ self.tuned_profiles = TunedProfileStore(self)
+ self.tuned_profiles.load()
+
+ self.tuned_profile_utils = TunedProfileUtils(self)
+
# ensure the host lists are in sync
for h in self.inventory.keys():
if h not in self.cache.daemons:
return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode"
+ @handle_orch_error
+ @host_exists()
+ def rescan_host(self, hostname: str) -> str:
+ """Use cephadm to issue a disk rescan on each HBA
+
+ Some HBAs and external enclosures don't automatically register
+ device insertion with the kernel, so for these scenarios we need
+ to manually rescan
+
+ :param hostname: (str) host name
+ """
+ self.log.info(f'disk rescan request sent to host "{hostname}"')
+ _out, _err, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan",
+ [],
+ no_fsid=True,
+ error_ok=True))
+ if not _err:
+ raise OrchestratorError('Unexpected response from cephadm disk-rescan call')
+
+ msg = _err[0].split('\n')[-1]
+ log_msg = f'disk rescan: {msg}'
+ if msg.upper().startswith('OK'):
+ self.log.info(log_msg)
+ else:
+ self.log.warning(log_msg)
+
+ return f'{msg}'
+
def get_minimal_ceph_conf(self) -> str:
_, config, _ = self.check_mon_command({
"prefix": "config generate-minimal-conf",
@handle_orch_error
def service_action(self, action: str, service_name: str) -> List[str]:
+ if service_name not in self.spec_store.all_specs.keys():
+ raise OrchestratorError(f'Invalid service name "{service_name}".'
+ + ' View currently running services using "ceph orch ls"')
dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name)
if not dds:
raise OrchestratorError(f'No daemons exist under service name "{service_name}".'
if daemon_spec.daemon_type != 'osd':
daemon_spec = self.cephadm_services[daemon_type_to_service(
daemon_spec.daemon_type)].prepare_create(daemon_spec)
+ else:
+ # for OSDs, we still need to update config, just not carry out the full
+ # prepare_create function
+ daemon_spec.final_config, daemon_spec.deps = self.osd_service.generate_config(daemon_spec)
return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')))
actions = {
else:
need = {
'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'],
- 'grafana': ['prometheus'],
+ 'grafana': ['prometheus', 'loki'],
'alertmanager': ['mgr', 'alertmanager', 'snmp-gateway'],
+ 'promtail': ['loki'],
}
for dep_type in need.get(daemon_type, []):
for dd in self.cache.get_daemons_by_type(dep_type):
return self._apply_service_spec(cast(ServiceSpec, spec))
+ @handle_orch_error
+ def apply_tuned_profiles(self, specs: List[TunedProfileSpec], no_overwrite: bool = False) -> str:
+ outs = []
+ for spec in specs:
+ if no_overwrite and self.tuned_profiles.exists(spec.profile_name):
+ outs.append(f"Tuned profile '{spec.profile_name}' already exists (--no-overwrite was passed)")
+ else:
+ self.tuned_profiles.add_profile(spec)
+ outs.append(f'Saved tuned profile {spec.profile_name}')
+ self._kick_serve_loop()
+ return '\n'.join(outs)
+
+ @handle_orch_error
+ def rm_tuned_profile(self, profile_name: str) -> str:
+ if profile_name not in self.tuned_profiles:
+ raise OrchestratorError(
+ f'Tuned profile {profile_name} does not exist. Nothing to remove.')
+ self.tuned_profiles.rm_profile(profile_name)
+ self._kick_serve_loop()
+ return f'Removed tuned profile {profile_name}'
+
+ @handle_orch_error
+ def tuned_profile_ls(self) -> List[TunedProfileSpec]:
+ return self.tuned_profiles.list_profiles()
+
+ @handle_orch_error
+ def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> str:
+ if profile_name not in self.tuned_profiles:
+ raise OrchestratorError(
+ f'Tuned profile {profile_name} does not exist. Cannot add setting.')
+ self.tuned_profiles.add_setting(profile_name, setting, value)
+ self._kick_serve_loop()
+ return f'Added setting {setting} with value {value} to tuned profile {profile_name}'
+
+ @handle_orch_error
+ def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> str:
+ if profile_name not in self.tuned_profiles:
+ raise OrchestratorError(
+ f'Tuned profile {profile_name} does not exist. Cannot remove setting.')
+ self.tuned_profiles.rm_setting(profile_name, setting)
+ self._kick_serve_loop()
+ return f'Removed setting {setting} from tuned profile {profile_name}'
+
def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None:
self.health_checks[name] = {
'severity': 'warning',
spec=spec,
hosts=self.cache.get_schedulable_hosts(),
unreachable_hosts=self.cache.get_unreachable_hosts(),
+ draining_hosts=self.cache.get_draining_hosts(),
networks=self.cache.networks,
daemons=self.cache.get_daemons_by_service(spec.service_name()),
allow_colo=svc.allow_colo(),
spec=spec,
hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh
unreachable_hosts=self.cache.get_unreachable_hosts(),
+ draining_hosts=self.cache.get_draining_hosts(),
networks=self.cache.networks,
daemons=self.cache.get_daemons_by_service(spec.service_name()),
allow_colo=self.cephadm_services[spec.service_type].allow_colo(),
# trigger the serve loop to initiate the removal
self._kick_serve_loop()
- return "Scheduled OSD(s) for removal"
+ warning_zap = "" if zap else ("\nVG/LV for the OSDs won't be zapped (--zap wasn't passed).\n"
+ "Run the `ceph-volume lvm zap` command with `--destroy`"
+ " against the VG/LV if you want them to be destroyed.")
+ return f"Scheduled OSD(s) for removal.{warning_zap}"
@handle_orch_error
def stop_remove_osds(self, osd_ids: List[str]) -> str: