from .services.ingress import IngressService
from .services.container import CustomContainerService
from .services.iscsi import IscsiService
+from .services.nvmeof import NvmeofService
from .services.nfs import NFSService
from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
from .upgrade import CephadmUpgrade
from .template import TemplateMgr
from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
- cephadmNoImage, CEPH_UPGRADE_ORDER
+ cephadmNoImage, CEPH_UPGRADE_ORDER, SpecialHostLabels
from .configchecks import CephadmConfigChecks
from .offline_watcher import OfflineHostWatcher
from .tuned_profiles import TunedProfileUtils
# Default container images -----------------------------------------------------
-DEFAULT_IMAGE = 'quay.io/ceph/ceph:v18'
+DEFAULT_IMAGE = 'quay.io/ceph/ceph' # DO NOT ADD TAG TO THIS
DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.43.0'
DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.5.0'
+DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.2'
DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0'
DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0'
DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0'
default=False,
desc='Use libstoragemgmt during device scans',
),
+ Option(
+ 'inventory_list_all',
+ type='bool',
+ default=False,
+ desc='Whether ceph-volume inventory should report '
+ 'more devices (mostly mappers (LVs / mpaths), partitions...)',
+ ),
Option(
'daemon_cache_timeout',
type='secs',
default=DEFAULT_PROMETHEUS_IMAGE,
desc='Prometheus container image',
),
+ Option(
+ 'container_image_nvmeof',
+ default=DEFAULT_NVMEOF_IMAGE,
+ desc='Nvme-of container image',
+ ),
Option(
'container_image_grafana',
default=DEFAULT_GRAFANA_IMAGE,
default=False,
desc='Log all refresh metadata. Includes daemon, device, and host info collected regularly. Only has effect if logging at debug level'
),
- Option(
- 'prometheus_web_user',
- type='str',
- default='admin',
- desc='Prometheus web user'
- ),
- Option(
- 'prometheus_web_password',
- type='str',
- default='admin',
- desc='Prometheus web password'
- ),
- Option(
- 'alertmanager_web_user',
- type='str',
- default='admin',
- desc='Alertmanager web user'
- ),
- Option(
- 'alertmanager_web_password',
- type='str',
- default='admin',
- desc='Alertmanager web password'
- ),
Option(
'secure_monitoring_stack',
type='bool',
self.mode = ''
self.container_image_base = ''
self.container_image_prometheus = ''
+ self.container_image_nvmeof = ''
self.container_image_grafana = ''
self.container_image_alertmanager = ''
self.container_image_node_exporter = ''
self._temp_files: List = []
self.ssh_key: Optional[str] = None
self.ssh_pub: Optional[str] = None
+ self.ssh_cert: Optional[str] = None
self.use_agent = False
self.agent_refresh_rate = 0
self.agent_down_multiplier = 0.0
self.agent_starting_port = 0
self.service_discovery_port = 0
self.secure_monitoring_stack = False
- self.prometheus_web_password: Optional[str] = None
- self.prometheus_web_user: Optional[str] = None
- self.alertmanager_web_password: Optional[str] = None
- self.alertmanager_web_user: Optional[str] = None
self.apply_spec_fails: List[Tuple[str, str]] = []
self.max_osd_draining_count = 10
self.device_enhanced_scan = False
+ self.inventory_list_all = False
self.cgroups_split = True
self.log_refresh_metadata = False
self.default_cephadm_command_timeout = 0
OSDService, NFSService, MonService, MgrService, MdsService,
RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService,
- IngressService, CustomContainerService, CephfsMirrorService,
+ IngressService, CustomContainerService, CephfsMirrorService, NvmeofService,
CephadmAgent, CephExporterService, SNMPGatewayService, ElasticSearchService,
JaegerQueryService, JaegerAgentService, JaegerCollectorService
]
self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr'])
self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi'])
+ self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof'])
self.scheduled_async_actions: List[Callable] = []
ssh_config_fname))
def _process_ls_output(self, host: str, ls: List[Dict[str, Any]]) -> None:
+ def _as_datetime(value: Optional[str]) -> Optional[datetime.datetime]:
+ return str_to_datetime(value) if value is not None else None
+
dm = {}
for d in ls:
if not d['style'].startswith('cephadm'):
continue
if '.' not in d['name']:
continue
- sd = orchestrator.DaemonDescription()
- sd.last_refresh = datetime_now()
- for k in ['created', 'started', 'last_configured', 'last_deployed']:
- v = d.get(k, None)
- if v:
- setattr(sd, k, str_to_datetime(d[k]))
- sd.daemon_type = d['name'].split('.')[0]
- if sd.daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
- logger.warning(f"Found unknown daemon type {sd.daemon_type} on host {host}")
+ daemon_type = d['name'].split('.')[0]
+ if daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
+ logger.warning(f"Found unknown daemon type {daemon_type} on host {host}")
continue
- sd.daemon_id = '.'.join(d['name'].split('.')[1:])
- sd.hostname = host
- sd.container_id = d.get('container_id')
- if sd.container_id:
+ container_id = d.get('container_id')
+ if container_id:
# shorten the hash
- sd.container_id = sd.container_id[0:12]
- sd.container_image_name = d.get('container_image_name')
- sd.container_image_id = d.get('container_image_id')
- sd.container_image_digests = d.get('container_image_digests')
- sd.memory_usage = d.get('memory_usage')
- sd.memory_request = d.get('memory_request')
- sd.memory_limit = d.get('memory_limit')
- sd.cpu_percentage = d.get('cpu_percentage')
- sd._service_name = d.get('service_name')
- sd.deployed_by = d.get('deployed_by')
- sd.version = d.get('version')
- sd.ports = d.get('ports')
- sd.ip = d.get('ip')
- sd.rank = int(d['rank']) if d.get('rank') is not None else None
- sd.rank_generation = int(d['rank_generation']) if d.get(
+ container_id = container_id[0:12]
+ rank = int(d['rank']) if d.get('rank') is not None else None
+ rank_generation = int(d['rank_generation']) if d.get(
'rank_generation') is not None else None
- sd.extra_container_args = d.get('extra_container_args')
- sd.extra_entrypoint_args = d.get('extra_entrypoint_args')
+ status, status_desc = None, 'unknown'
if 'state' in d:
- sd.status_desc = d['state']
- sd.status = {
+ status_desc = d['state']
+ status = {
'running': DaemonDescriptionStatus.running,
'stopped': DaemonDescriptionStatus.stopped,
'error': DaemonDescriptionStatus.error,
'unknown': DaemonDescriptionStatus.error,
}[d['state']]
- else:
- sd.status_desc = 'unknown'
- sd.status = None
+ sd = orchestrator.DaemonDescription(
+ daemon_type=daemon_type,
+ daemon_id='.'.join(d['name'].split('.')[1:]),
+ hostname=host,
+ container_id=container_id,
+ container_image_id=d.get('container_image_id'),
+ container_image_name=d.get('container_image_name'),
+ container_image_digests=d.get('container_image_digests'),
+ version=d.get('version'),
+ status=status,
+ status_desc=status_desc,
+ created=_as_datetime(d.get('created')),
+ started=_as_datetime(d.get('started')),
+ last_refresh=datetime_now(),
+ last_configured=_as_datetime(d.get('last_configured')),
+ last_deployed=_as_datetime(d.get('last_deployed')),
+ memory_usage=d.get('memory_usage'),
+ memory_request=d.get('memory_request'),
+ memory_limit=d.get('memory_limit'),
+ cpu_percentage=d.get('cpu_percentage'),
+ service_name=d.get('service_name'),
+ ports=d.get('ports'),
+ ip=d.get('ip'),
+ deployed_by=d.get('deployed_by'),
+ rank=rank,
+ rank_generation=rank_generation,
+ extra_container_args=d.get('extra_container_args'),
+ extra_entrypoint_args=d.get('extra_entrypoint_args'),
+ )
dm[sd.name()] = sd
self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
self.cache.update_host_daemons(host, dm)
self._validate_and_set_ssh_val('ssh_identity_pub', inbuf, old)
return 0, "", ""
+ @orchestrator._cli_write_command(
+ 'cephadm set-signed-cert')
+ def _set_signed_cert(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
+ """Set a signed cert if CA signed keys are being used (use -i <cert_filename>)"""
+ if inbuf is None or len(inbuf) == 0:
+ return -errno.EINVAL, "", "empty cert file provided"
+ old = self.ssh_cert
+ if inbuf == old:
+ return 0, "value unchanged", ""
+ self._validate_and_set_ssh_val('ssh_identity_cert', inbuf, old)
+ return 0, "", ""
+
@orchestrator._cli_write_command(
'cephadm clear-key')
def _clear_key(self) -> Tuple[int, str, str]:
"""Clear cluster SSH key"""
self.set_store('ssh_identity_key', None)
self.set_store('ssh_identity_pub', None)
+ self.set_store('ssh_identity_cert', None)
self.ssh._reconfig_ssh()
self.log.info('Cleared cluster SSH key')
return 0, '', ''
else:
return -errno.ENOENT, '', 'No cluster SSH key defined'
+ @orchestrator._cli_read_command(
+ 'cephadm get-signed-cert')
+ def _get_signed_cert(self) -> Tuple[int, str, str]:
+ """Show SSH signed cert for connecting to cluster hosts using CA signed keys"""
+ if self.ssh_cert:
+ return 0, self.ssh_cert, ''
+ else:
+ return -errno.ENOENT, '', 'No signed cert defined'
+
@orchestrator._cli_read_command(
'cephadm get-user')
def _get_user(self) -> Tuple[int, str, str]:
if code:
return 1, '', ('check-host failed:\n' + '\n'.join(err))
except ssh.HostConnectionError as e:
- self.log.exception(f"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}")
+ self.log.exception(
+ f"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}")
return 1, '', ('check-host failed:\n'
+ f"Failed to connect to {host} at address ({e.addr}): {str(e)}")
except OrchestratorError:
)).strip()
elif daemon_type == 'prometheus':
image = self.container_image_prometheus
+ elif daemon_type == 'nvmeof':
+ image = self.container_image_nvmeof
elif daemon_type == 'grafana':
image = self.container_image_grafana
elif daemon_type == 'alertmanager':
# check, if there we're removing the last _admin host
if not force:
- p = PlacementSpec(label='_admin')
+ p = PlacementSpec(label=SpecialHostLabels.ADMIN)
admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
if len(admin_hosts) == 1 and admin_hosts[0] == host:
- raise OrchestratorValidationError(f"Host {host} is the last host with the '_admin'"
- " label. Please add the '_admin' label to a host"
+ raise OrchestratorValidationError(f"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'"
+ f" label. Please add the '{SpecialHostLabels.ADMIN}' label to a host"
" or add --force to this command")
def run_cmd(cmd_args: dict) -> None:
if d.daemon_type != 'osd':
self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].pre_remove(d)
- self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].post_remove(d, is_failed_deploy=False)
+ self.cephadm_services[daemon_type_to_service(
+ str(d.daemon_type))].post_remove(d, is_failed_deploy=False)
else:
cmd_args = {
'prefix': 'osd purge-actual',
self.inventory.rm_host(host)
self.cache.rm_host(host)
self.ssh.reset_con(host)
- self.offline_hosts_remove(host) # if host was in offline host list, we should remove it now.
+ # if host was in offline host list, we should remove it now.
+ self.offline_hosts_remove(host)
self.event.set() # refresh stray health check
self.log.info('Removed host %s' % host)
return "Removed {} host '{}'".format('offline' if offline else '', host)
def remove_host_label(self, host: str, label: str, force: bool = False) -> str:
# if we remove the _admin label from the only host that has it we could end up
# removing the only instance of the config and keyring and cause issues
- if not force and label == '_admin':
- p = PlacementSpec(label='_admin')
+ if not force and label == SpecialHostLabels.ADMIN:
+ p = PlacementSpec(label=SpecialHostLabels.ADMIN)
admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
if len(admin_hosts) == 1 and admin_hosts[0] == host:
- raise OrchestratorValidationError(f"Host {host} is the last host with the '_admin'"
- " label.\nRemoving the _admin label from this host could cause the removal"
+ raise OrchestratorValidationError(f"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'"
+ f" label.\nRemoving the {SpecialHostLabels.ADMIN} label from this host could cause the removal"
" of the last cluster config/keyring managed by cephadm.\n"
- "It is recommended to add the _admin label to another host"
+ f"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host"
" before completing this operation.\nIf you're certain this is"
" what you want rerun this command with --force.")
- self.inventory.rm_label(host, label)
- self.log.info('Removed label %s to host %s' % (label, host))
+ if self.inventory.has_label(host, label):
+ self.inventory.rm_label(host, label)
+ msg = f'Removed label {label} from host {host}'
+ else:
+ msg = f"Host {host} does not have label '{label}'. Please use 'ceph orch host ls' to list all the labels."
+ self.log.info(msg)
self._kick_serve_loop()
- return 'Removed label %s from host %s' % (label, host)
+ return msg
def _host_ok_to_stop(self, hostname: str, force: bool = False) -> Tuple[int, str]:
self.log.debug("running host-ok-to-stop checks")
daemon_names.append(dd.name())
return daemon_names
+ alertmanager_user, alertmanager_password = self._get_alertmanager_credentials()
+ prometheus_user, prometheus_password = self._get_prometheus_credentials()
+
deps = []
if daemon_type == 'haproxy':
# because cephadm creates new daemon instances whenever
# an explicit dependency is added for each service-type to force a reconfig
# whenever the number of daemons for those service-type changes from 0 to greater
# than zero and vice versa.
- deps += [s for s in ['node-exporter', 'alertmanager'] if self.cache.get_daemons_by_service(s)]
+ deps += [s for s in ['node-exporter', 'alertmanager']
+ if self.cache.get_daemons_by_service(s)]
if len(self.cache.get_daemons_by_type('ingress')) > 0:
deps.append('ingress')
# add dependency on ceph-exporter daemons
deps += [d.name() for d in self.cache.get_daemons_by_service('ceph-exporter')]
if self.secure_monitoring_stack:
- if self.prometheus_web_user and self.prometheus_web_password:
- deps.append(f'{hash(self.prometheus_web_user + self.prometheus_web_password)}')
- if self.alertmanager_web_user and self.alertmanager_web_password:
- deps.append(f'{hash(self.alertmanager_web_user + self.alertmanager_web_password)}')
+ if prometheus_user and prometheus_password:
+ deps.append(f'{hash(prometheus_user + prometheus_password)}')
+ if alertmanager_user and alertmanager_password:
+ deps.append(f'{hash(alertmanager_user + alertmanager_password)}')
elif daemon_type == 'grafana':
deps += get_daemon_names(['prometheus', 'loki'])
- if self.secure_monitoring_stack and self.prometheus_web_user and self.prometheus_web_password:
- deps.append(f'{hash(self.prometheus_web_user + self.prometheus_web_password)}')
+ if self.secure_monitoring_stack and prometheus_user and prometheus_password:
+ deps.append(f'{hash(prometheus_user + prometheus_password)}')
elif daemon_type == 'alertmanager':
deps += get_daemon_names(['mgr', 'alertmanager', 'snmp-gateway'])
- if self.secure_monitoring_stack and self.alertmanager_web_user and self.alertmanager_web_password:
- deps.append(f'{hash(self.alertmanager_web_user + self.alertmanager_web_password)}')
+ if self.secure_monitoring_stack and alertmanager_user and alertmanager_password:
+ deps.append(f'{hash(alertmanager_user + alertmanager_password)}')
elif daemon_type == 'promtail':
deps += get_daemon_names(['loki'])
else:
self.events.from_orch_error(e)
raise
+ def _get_alertmanager_credentials(self) -> Tuple[str, str]:
+ user = self.get_store(AlertmanagerService.USER_CFG_KEY)
+ password = self.get_store(AlertmanagerService.PASS_CFG_KEY)
+ if user is None or password is None:
+ user = 'admin'
+ password = 'admin'
+ self.set_store(AlertmanagerService.USER_CFG_KEY, user)
+ self.set_store(AlertmanagerService.PASS_CFG_KEY, password)
+ return (user, password)
+
+ def _get_prometheus_credentials(self) -> Tuple[str, str]:
+ user = self.get_store(PrometheusService.USER_CFG_KEY)
+ password = self.get_store(PrometheusService.PASS_CFG_KEY)
+ if user is None or password is None:
+ user = 'admin'
+ password = 'admin'
+ self.set_store(PrometheusService.USER_CFG_KEY, user)
+ self.set_store(PrometheusService.PASS_CFG_KEY, password)
+ return (user, password)
+
+ @handle_orch_error
+ def set_prometheus_access_info(self, user: str, password: str) -> str:
+ self.set_store(PrometheusService.USER_CFG_KEY, user)
+ self.set_store(PrometheusService.PASS_CFG_KEY, password)
+ return 'prometheus credentials updated correctly'
+
+ @handle_orch_error
+ def set_alertmanager_access_info(self, user: str, password: str) -> str:
+ self.set_store(AlertmanagerService.USER_CFG_KEY, user)
+ self.set_store(AlertmanagerService.PASS_CFG_KEY, password)
+ return 'alertmanager credentials updated correctly'
+
@handle_orch_error
def get_prometheus_access_info(self) -> Dict[str, str]:
- return {'user': self.prometheus_web_user or '',
- 'password': self.prometheus_web_password or '',
+ user, password = self._get_prometheus_credentials()
+ return {'user': user,
+ 'password': password,
'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()}
@handle_orch_error
def get_alertmanager_access_info(self) -> Dict[str, str]:
- return {'user': self.alertmanager_web_user or '',
- 'password': self.alertmanager_web_password or '',
+ user, password = self._get_alertmanager_credentials()
+ return {'user': user,
+ 'password': password,
'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()}
@handle_orch_error
def _get_candidate_hosts(self, placement: PlacementSpec) -> List[str]:
"""Return a list of candidate hosts according to the placement specification."""
all_hosts = self.cache.get_schedulable_hosts()
- draining_hosts = [dh.hostname for dh in self.cache.get_draining_hosts()]
candidates = []
if placement.hosts:
candidates = [h.hostname for h in placement.hosts if h.hostname in placement.hosts]
candidates = [x for x in placement.filter_matching_hostspecs(all_hosts)]
elif (placement.count is not None or placement.count_per_host is not None):
candidates = [x.hostname for x in all_hosts]
- return [h for h in candidates if h not in draining_hosts]
+ return [h for h in candidates if not self.cache.is_host_draining(h)]
def _validate_one_shot_placement_spec(self, spec: PlacementSpec) -> None:
"""Validate placement specification for TunedProfileSpec and ClientKeyringSpec."""
'rgw': PlacementSpec(count=2),
'ingress': PlacementSpec(count=2),
'iscsi': PlacementSpec(count=1),
+ 'nvmeof': PlacementSpec(count=1),
'rbd-mirror': PlacementSpec(count=2),
'cephfs-mirror': PlacementSpec(count=1),
'nfs': PlacementSpec(count=1),
if self.inventory.get_host_with_state("maintenance"):
raise OrchestratorError("Upgrade aborted - you have host(s) in maintenance state")
if self.offline_hosts:
- raise OrchestratorError(f"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}")
+ raise OrchestratorError(
+ f"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}")
if daemon_types is not None and services is not None:
raise OrchestratorError('--daemon-types and --services are mutually exclusive')
if daemon_types is not None:
return self.to_remove_osds.all_osds()
@handle_orch_error
- def drain_host(self, hostname, force=False):
- # type: (str, bool) -> str
+ def drain_host(self, hostname: str, force: bool = False, keep_conf_keyring: bool = False, zap_osd_devices: bool = False) -> str:
"""
Drain all daemons from a host.
:param host: host name
# if we drain the last admin host we could end up removing the only instance
# of the config and keyring and cause issues
if not force:
- p = PlacementSpec(label='_admin')
+ p = PlacementSpec(label=SpecialHostLabels.ADMIN)
admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
if len(admin_hosts) == 1 and admin_hosts[0] == hostname:
- raise OrchestratorValidationError(f"Host {hostname} is the last host with the '_admin'"
+ raise OrchestratorValidationError(f"Host {hostname} is the last host with the '{SpecialHostLabels.ADMIN}'"
" label.\nDraining this host could cause the removal"
" of the last cluster config/keyring managed by cephadm.\n"
- "It is recommended to add the _admin label to another host"
+ f"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host"
" before completing this operation.\nIf you're certain this is"
" what you want rerun this command with --force.")
self.add_host_label(hostname, '_no_schedule')
+ if not keep_conf_keyring:
+ self.add_host_label(hostname, SpecialHostLabels.DRAIN_CONF_KEYRING)
daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_host(hostname)
osds_to_remove = [d.daemon_id for d in daemons if d.daemon_type == 'osd']
- self.remove_osds(osds_to_remove)
+ self.remove_osds(osds_to_remove, zap=zap_osd_devices)
daemons_table = ""
daemons_table += "{:<20} {:<15}\n".format("type", "id")