]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/module.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
index 37fe3b4ac58f0756c1f192799ce3a7ff66e30ee5..7b97ce74a947e8cda6dbec6350aca5ef5269ac95 100644 (file)
@@ -58,6 +58,7 @@ from .services.cephadmservice import MonService, MgrService, MdsService, RgwServ
 from .services.ingress import IngressService
 from .services.container import CustomContainerService
 from .services.iscsi import IscsiService
+from .services.nvmeof import NvmeofService
 from .services.nfs import NFSService
 from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
@@ -69,7 +70,7 @@ from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore,
 from .upgrade import CephadmUpgrade
 from .template import TemplateMgr
 from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
-    cephadmNoImage, CEPH_UPGRADE_ORDER
+    cephadmNoImage, CEPH_UPGRADE_ORDER, SpecialHostLabels
 from .configchecks import CephadmConfigChecks
 from .offline_watcher import OfflineHostWatcher
 from .tuned_profiles import TunedProfileUtils
@@ -103,9 +104,10 @@ os._exit = os_exit_noop   # type: ignore
 
 
 # Default container images -----------------------------------------------------
-DEFAULT_IMAGE = 'quay.io/ceph/ceph:v18'
+DEFAULT_IMAGE = 'quay.io/ceph/ceph'  # DO NOT ADD TAG TO THIS
 DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.43.0'
 DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.5.0'
+DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.2'
 DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0'
 DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0'
 DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0'
@@ -165,6 +167,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             default=False,
             desc='Use libstoragemgmt during device scans',
         ),
+        Option(
+            'inventory_list_all',
+            type='bool',
+            default=False,
+            desc='Whether ceph-volume inventory should report '
+            'more devices (mostly mappers (LVs / mpaths), partitions...)',
+        ),
         Option(
             'daemon_cache_timeout',
             type='secs',
@@ -201,6 +210,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             default=DEFAULT_PROMETHEUS_IMAGE,
             desc='Prometheus container image',
         ),
+        Option(
+            'container_image_nvmeof',
+            default=DEFAULT_NVMEOF_IMAGE,
+            desc='Nvme-of container image',
+        ),
         Option(
             'container_image_grafana',
             default=DEFAULT_GRAFANA_IMAGE,
@@ -445,30 +459,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             default=False,
             desc='Log all refresh metadata. Includes daemon, device, and host info collected regularly. Only has effect if logging at debug level'
         ),
-        Option(
-            'prometheus_web_user',
-            type='str',
-            default='admin',
-            desc='Prometheus web user'
-        ),
-        Option(
-            'prometheus_web_password',
-            type='str',
-            default='admin',
-            desc='Prometheus web password'
-        ),
-        Option(
-            'alertmanager_web_user',
-            type='str',
-            default='admin',
-            desc='Alertmanager web user'
-        ),
-        Option(
-            'alertmanager_web_password',
-            type='str',
-            default='admin',
-            desc='Alertmanager web password'
-        ),
         Option(
             'secure_monitoring_stack',
             type='bool',
@@ -511,6 +501,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             self.mode = ''
             self.container_image_base = ''
             self.container_image_prometheus = ''
+            self.container_image_nvmeof = ''
             self.container_image_grafana = ''
             self.container_image_alertmanager = ''
             self.container_image_node_exporter = ''
@@ -549,19 +540,17 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             self._temp_files: List = []
             self.ssh_key: Optional[str] = None
             self.ssh_pub: Optional[str] = None
+            self.ssh_cert: Optional[str] = None
             self.use_agent = False
             self.agent_refresh_rate = 0
             self.agent_down_multiplier = 0.0
             self.agent_starting_port = 0
             self.service_discovery_port = 0
             self.secure_monitoring_stack = False
-            self.prometheus_web_password: Optional[str] = None
-            self.prometheus_web_user: Optional[str] = None
-            self.alertmanager_web_password: Optional[str] = None
-            self.alertmanager_web_user: Optional[str] = None
             self.apply_spec_fails: List[Tuple[str, str]] = []
             self.max_osd_draining_count = 10
             self.device_enhanced_scan = False
+            self.inventory_list_all = False
             self.cgroups_split = True
             self.log_refresh_metadata = False
             self.default_cephadm_command_timeout = 0
@@ -630,7 +619,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             OSDService, NFSService, MonService, MgrService, MdsService,
             RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
             PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService,
-            IngressService, CustomContainerService, CephfsMirrorService,
+            IngressService, CustomContainerService, CephfsMirrorService, NvmeofService,
             CephadmAgent, CephExporterService, SNMPGatewayService, ElasticSearchService,
             JaegerQueryService, JaegerAgentService, JaegerCollectorService
         ]
@@ -642,6 +631,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr'])
         self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
         self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi'])
+        self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof'])
 
         self.scheduled_async_actions: List[Callable] = []
 
@@ -869,6 +859,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
                 ssh_config_fname))
 
     def _process_ls_output(self, host: str, ls: List[Dict[str, Any]]) -> None:
+        def _as_datetime(value: Optional[str]) -> Optional[datetime.datetime]:
+            return str_to_datetime(value) if value is not None else None
+
         dm = {}
         for d in ls:
             if not d['style'].startswith('cephadm'):
@@ -877,51 +870,56 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
                 continue
             if '.' not in d['name']:
                 continue
-            sd = orchestrator.DaemonDescription()
-            sd.last_refresh = datetime_now()
-            for k in ['created', 'started', 'last_configured', 'last_deployed']:
-                v = d.get(k, None)
-                if v:
-                    setattr(sd, k, str_to_datetime(d[k]))
-            sd.daemon_type = d['name'].split('.')[0]
-            if sd.daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
-                logger.warning(f"Found unknown daemon type {sd.daemon_type} on host {host}")
+            daemon_type = d['name'].split('.')[0]
+            if daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
+                logger.warning(f"Found unknown daemon type {daemon_type} on host {host}")
                 continue
 
-            sd.daemon_id = '.'.join(d['name'].split('.')[1:])
-            sd.hostname = host
-            sd.container_id = d.get('container_id')
-            if sd.container_id:
+            container_id = d.get('container_id')
+            if container_id:
                 # shorten the hash
-                sd.container_id = sd.container_id[0:12]
-            sd.container_image_name = d.get('container_image_name')
-            sd.container_image_id = d.get('container_image_id')
-            sd.container_image_digests = d.get('container_image_digests')
-            sd.memory_usage = d.get('memory_usage')
-            sd.memory_request = d.get('memory_request')
-            sd.memory_limit = d.get('memory_limit')
-            sd.cpu_percentage = d.get('cpu_percentage')
-            sd._service_name = d.get('service_name')
-            sd.deployed_by = d.get('deployed_by')
-            sd.version = d.get('version')
-            sd.ports = d.get('ports')
-            sd.ip = d.get('ip')
-            sd.rank = int(d['rank']) if d.get('rank') is not None else None
-            sd.rank_generation = int(d['rank_generation']) if d.get(
+                container_id = container_id[0:12]
+            rank = int(d['rank']) if d.get('rank') is not None else None
+            rank_generation = int(d['rank_generation']) if d.get(
                 'rank_generation') is not None else None
-            sd.extra_container_args = d.get('extra_container_args')
-            sd.extra_entrypoint_args = d.get('extra_entrypoint_args')
+            status, status_desc = None, 'unknown'
             if 'state' in d:
-                sd.status_desc = d['state']
-                sd.status = {
+                status_desc = d['state']
+                status = {
                     'running': DaemonDescriptionStatus.running,
                     'stopped': DaemonDescriptionStatus.stopped,
                     'error': DaemonDescriptionStatus.error,
                     'unknown': DaemonDescriptionStatus.error,
                 }[d['state']]
-            else:
-                sd.status_desc = 'unknown'
-                sd.status = None
+            sd = orchestrator.DaemonDescription(
+                daemon_type=daemon_type,
+                daemon_id='.'.join(d['name'].split('.')[1:]),
+                hostname=host,
+                container_id=container_id,
+                container_image_id=d.get('container_image_id'),
+                container_image_name=d.get('container_image_name'),
+                container_image_digests=d.get('container_image_digests'),
+                version=d.get('version'),
+                status=status,
+                status_desc=status_desc,
+                created=_as_datetime(d.get('created')),
+                started=_as_datetime(d.get('started')),
+                last_refresh=datetime_now(),
+                last_configured=_as_datetime(d.get('last_configured')),
+                last_deployed=_as_datetime(d.get('last_deployed')),
+                memory_usage=d.get('memory_usage'),
+                memory_request=d.get('memory_request'),
+                memory_limit=d.get('memory_limit'),
+                cpu_percentage=d.get('cpu_percentage'),
+                service_name=d.get('service_name'),
+                ports=d.get('ports'),
+                ip=d.get('ip'),
+                deployed_by=d.get('deployed_by'),
+                rank=rank,
+                rank_generation=rank_generation,
+                extra_container_args=d.get('extra_container_args'),
+                extra_entrypoint_args=d.get('extra_entrypoint_args'),
+            )
             dm[sd.name()] = sd
         self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
         self.cache.update_host_daemons(host, dm)
@@ -1086,12 +1084,25 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self._validate_and_set_ssh_val('ssh_identity_pub', inbuf, old)
         return 0, "", ""
 
+    @orchestrator._cli_write_command(
+        'cephadm set-signed-cert')
+    def _set_signed_cert(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
+        """Set a signed cert if CA signed keys are being used (use -i <cert_filename>)"""
+        if inbuf is None or len(inbuf) == 0:
+            return -errno.EINVAL, "", "empty cert file provided"
+        old = self.ssh_cert
+        if inbuf == old:
+            return 0, "value unchanged", ""
+        self._validate_and_set_ssh_val('ssh_identity_cert', inbuf, old)
+        return 0, "", ""
+
     @orchestrator._cli_write_command(
         'cephadm clear-key')
     def _clear_key(self) -> Tuple[int, str, str]:
         """Clear cluster SSH key"""
         self.set_store('ssh_identity_key', None)
         self.set_store('ssh_identity_pub', None)
+        self.set_store('ssh_identity_cert', None)
         self.ssh._reconfig_ssh()
         self.log.info('Cleared cluster SSH key')
         return 0, '', ''
@@ -1105,6 +1116,15 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         else:
             return -errno.ENOENT, '', 'No cluster SSH key defined'
 
+    @orchestrator._cli_read_command(
+        'cephadm get-signed-cert')
+    def _get_signed_cert(self) -> Tuple[int, str, str]:
+        """Show SSH signed cert for connecting to cluster hosts using CA signed keys"""
+        if self.ssh_cert:
+            return 0, self.ssh_cert, ''
+        else:
+            return -errno.ENOENT, '', 'No signed cert defined'
+
     @orchestrator._cli_read_command(
         'cephadm get-user')
     def _get_user(self) -> Tuple[int, str, str]:
@@ -1191,7 +1211,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             if code:
                 return 1, '', ('check-host failed:\n' + '\n'.join(err))
         except ssh.HostConnectionError as e:
-            self.log.exception(f"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}")
+            self.log.exception(
+                f"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}")
             return 1, '', ('check-host failed:\n'
                            + f"Failed to connect to {host} at address ({e.addr}): {str(e)}")
         except OrchestratorError:
@@ -1472,6 +1493,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             )).strip()
         elif daemon_type == 'prometheus':
             image = self.container_image_prometheus
+        elif daemon_type == 'nvmeof':
+            image = self.container_image_nvmeof
         elif daemon_type == 'grafana':
             image = self.container_image_grafana
         elif daemon_type == 'alertmanager':
@@ -1640,11 +1663,11 @@ Then run the following:
 
         # check, if there we're removing the last _admin host
         if not force:
-            p = PlacementSpec(label='_admin')
+            p = PlacementSpec(label=SpecialHostLabels.ADMIN)
             admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
             if len(admin_hosts) == 1 and admin_hosts[0] == host:
-                raise OrchestratorValidationError(f"Host {host} is the last host with the '_admin'"
-                                                  " label. Please add the '_admin' label to a host"
+                raise OrchestratorValidationError(f"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'"
+                                                  f" label. Please add the '{SpecialHostLabels.ADMIN}' label to a host"
                                                   " or add --force to this command")
 
         def run_cmd(cmd_args: dict) -> None:
@@ -1662,7 +1685,8 @@ Then run the following:
 
                 if d.daemon_type != 'osd':
                     self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].pre_remove(d)
-                    self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].post_remove(d, is_failed_deploy=False)
+                    self.cephadm_services[daemon_type_to_service(
+                        str(d.daemon_type))].post_remove(d, is_failed_deploy=False)
                 else:
                     cmd_args = {
                         'prefix': 'osd purge-actual',
@@ -1680,7 +1704,8 @@ Then run the following:
         self.inventory.rm_host(host)
         self.cache.rm_host(host)
         self.ssh.reset_con(host)
-        self.offline_hosts_remove(host)  # if host was in offline host list, we should remove it now.
+        # if host was in offline host list, we should remove it now.
+        self.offline_hosts_remove(host)
         self.event.set()  # refresh stray health check
         self.log.info('Removed host %s' % host)
         return "Removed {} host '{}'".format('offline' if offline else '', host)
@@ -1729,20 +1754,24 @@ Then run the following:
     def remove_host_label(self, host: str, label: str, force: bool = False) -> str:
         # if we remove the _admin label from the only host that has it we could end up
         # removing the only instance of the config and keyring and cause issues
-        if not force and label == '_admin':
-            p = PlacementSpec(label='_admin')
+        if not force and label == SpecialHostLabels.ADMIN:
+            p = PlacementSpec(label=SpecialHostLabels.ADMIN)
             admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
             if len(admin_hosts) == 1 and admin_hosts[0] == host:
-                raise OrchestratorValidationError(f"Host {host} is the last host with the '_admin'"
-                                                  " label.\nRemoving the _admin label from this host could cause the removal"
+                raise OrchestratorValidationError(f"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'"
+                                                  f" label.\nRemoving the {SpecialHostLabels.ADMIN} label from this host could cause the removal"
                                                   " of the last cluster config/keyring managed by cephadm.\n"
-                                                  "It is recommended to add the _admin label to another host"
+                                                  f"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host"
                                                   " before completing this operation.\nIf you're certain this is"
                                                   " what you want rerun this command with --force.")
-        self.inventory.rm_label(host, label)
-        self.log.info('Removed label %s to host %s' % (label, host))
+        if self.inventory.has_label(host, label):
+            self.inventory.rm_label(host, label)
+            msg = f'Removed label {label} from host {host}'
+        else:
+            msg = f"Host {host} does not have label '{label}'. Please use 'ceph orch host ls' to list all the labels."
+        self.log.info(msg)
         self._kick_serve_loop()
-        return 'Removed label %s from host %s' % (label, host)
+        return msg
 
     def _host_ok_to_stop(self, hostname: str, force: bool = False) -> Tuple[int, str]:
         self.log.debug("running host-ok-to-stop checks")
@@ -2604,6 +2633,9 @@ Then run the following:
                     daemon_names.append(dd.name())
             return daemon_names
 
+        alertmanager_user, alertmanager_password = self._get_alertmanager_credentials()
+        prometheus_user, prometheus_password = self._get_prometheus_credentials()
+
         deps = []
         if daemon_type == 'haproxy':
             # because cephadm creates new daemon instances whenever
@@ -2651,24 +2683,25 @@ Then run the following:
             # an explicit dependency is added for each service-type to force a reconfig
             # whenever the number of daemons for those service-type changes from 0 to greater
             # than zero and vice versa.
-            deps += [s for s in ['node-exporter', 'alertmanager'] if self.cache.get_daemons_by_service(s)]
+            deps += [s for s in ['node-exporter', 'alertmanager']
+                     if self.cache.get_daemons_by_service(s)]
             if len(self.cache.get_daemons_by_type('ingress')) > 0:
                 deps.append('ingress')
             # add dependency on ceph-exporter daemons
             deps += [d.name() for d in self.cache.get_daemons_by_service('ceph-exporter')]
             if self.secure_monitoring_stack:
-                if self.prometheus_web_user and self.prometheus_web_password:
-                    deps.append(f'{hash(self.prometheus_web_user + self.prometheus_web_password)}')
-                if self.alertmanager_web_user and self.alertmanager_web_password:
-                    deps.append(f'{hash(self.alertmanager_web_user + self.alertmanager_web_password)}')
+                if prometheus_user and prometheus_password:
+                    deps.append(f'{hash(prometheus_user + prometheus_password)}')
+                if alertmanager_user and alertmanager_password:
+                    deps.append(f'{hash(alertmanager_user + alertmanager_password)}')
         elif daemon_type == 'grafana':
             deps += get_daemon_names(['prometheus', 'loki'])
-            if self.secure_monitoring_stack and self.prometheus_web_user and self.prometheus_web_password:
-                deps.append(f'{hash(self.prometheus_web_user + self.prometheus_web_password)}')
+            if self.secure_monitoring_stack and prometheus_user and prometheus_password:
+                deps.append(f'{hash(prometheus_user + prometheus_password)}')
         elif daemon_type == 'alertmanager':
             deps += get_daemon_names(['mgr', 'alertmanager', 'snmp-gateway'])
-            if self.secure_monitoring_stack and self.alertmanager_web_user and self.alertmanager_web_password:
-                deps.append(f'{hash(self.alertmanager_web_user + self.alertmanager_web_password)}')
+            if self.secure_monitoring_stack and alertmanager_user and alertmanager_password:
+                deps.append(f'{hash(alertmanager_user + alertmanager_password)}')
         elif daemon_type == 'promtail':
             deps += get_daemon_names(['loki'])
         else:
@@ -2769,16 +2802,50 @@ Then run the following:
             self.events.from_orch_error(e)
             raise
 
+    def _get_alertmanager_credentials(self) -> Tuple[str, str]:
+        user = self.get_store(AlertmanagerService.USER_CFG_KEY)
+        password = self.get_store(AlertmanagerService.PASS_CFG_KEY)
+        if user is None or password is None:
+            user = 'admin'
+            password = 'admin'
+            self.set_store(AlertmanagerService.USER_CFG_KEY, user)
+            self.set_store(AlertmanagerService.PASS_CFG_KEY, password)
+        return (user, password)
+
+    def _get_prometheus_credentials(self) -> Tuple[str, str]:
+        user = self.get_store(PrometheusService.USER_CFG_KEY)
+        password = self.get_store(PrometheusService.PASS_CFG_KEY)
+        if user is None or password is None:
+            user = 'admin'
+            password = 'admin'
+            self.set_store(PrometheusService.USER_CFG_KEY, user)
+            self.set_store(PrometheusService.PASS_CFG_KEY, password)
+        return (user, password)
+
+    @handle_orch_error
+    def set_prometheus_access_info(self, user: str, password: str) -> str:
+        self.set_store(PrometheusService.USER_CFG_KEY, user)
+        self.set_store(PrometheusService.PASS_CFG_KEY, password)
+        return 'prometheus credentials updated correctly'
+
+    @handle_orch_error
+    def set_alertmanager_access_info(self, user: str, password: str) -> str:
+        self.set_store(AlertmanagerService.USER_CFG_KEY, user)
+        self.set_store(AlertmanagerService.PASS_CFG_KEY, password)
+        return 'alertmanager credentials updated correctly'
+
     @handle_orch_error
     def get_prometheus_access_info(self) -> Dict[str, str]:
-        return {'user': self.prometheus_web_user or '',
-                'password': self.prometheus_web_password or '',
+        user, password = self._get_prometheus_credentials()
+        return {'user': user,
+                'password': password,
                 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()}
 
     @handle_orch_error
     def get_alertmanager_access_info(self) -> Dict[str, str]:
-        return {'user': self.alertmanager_web_user or '',
-                'password': self.alertmanager_web_password or '',
+        user, password = self._get_alertmanager_credentials()
+        return {'user': user,
+                'password': password,
                 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()}
 
     @handle_orch_error
@@ -2799,7 +2866,6 @@ Then run the following:
     def _get_candidate_hosts(self, placement: PlacementSpec) -> List[str]:
         """Return a list of candidate hosts according to the placement specification."""
         all_hosts = self.cache.get_schedulable_hosts()
-        draining_hosts = [dh.hostname for dh in self.cache.get_draining_hosts()]
         candidates = []
         if placement.hosts:
             candidates = [h.hostname for h in placement.hosts if h.hostname in placement.hosts]
@@ -2809,7 +2875,7 @@ Then run the following:
             candidates = [x for x in placement.filter_matching_hostspecs(all_hosts)]
         elif (placement.count is not None or placement.count_per_host is not None):
             candidates = [x.hostname for x in all_hosts]
-        return [h for h in candidates if h not in draining_hosts]
+        return [h for h in candidates if not self.cache.is_host_draining(h)]
 
     def _validate_one_shot_placement_spec(self, spec: PlacementSpec) -> None:
         """Validate placement specification for TunedProfileSpec and ClientKeyringSpec."""
@@ -2966,6 +3032,7 @@ Then run the following:
                 'rgw': PlacementSpec(count=2),
                 'ingress': PlacementSpec(count=2),
                 'iscsi': PlacementSpec(count=1),
+                'nvmeof': PlacementSpec(count=1),
                 'rbd-mirror': PlacementSpec(count=2),
                 'cephfs-mirror': PlacementSpec(count=1),
                 'nfs': PlacementSpec(count=1),
@@ -3189,7 +3256,8 @@ Then run the following:
         if self.inventory.get_host_with_state("maintenance"):
             raise OrchestratorError("Upgrade aborted - you have host(s) in maintenance state")
         if self.offline_hosts:
-            raise OrchestratorError(f"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}")
+            raise OrchestratorError(
+                f"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}")
         if daemon_types is not None and services is not None:
             raise OrchestratorError('--daemon-types and --services are mutually exclusive')
         if daemon_types is not None:
@@ -3296,8 +3364,7 @@ Then run the following:
         return self.to_remove_osds.all_osds()
 
     @handle_orch_error
-    def drain_host(self, hostname, force=False):
-        # type: (str, bool) -> str
+    def drain_host(self, hostname: str, force: bool = False, keep_conf_keyring: bool = False, zap_osd_devices: bool = False) -> str:
         """
         Drain all daemons from a host.
         :param host: host name
@@ -3306,22 +3373,24 @@ Then run the following:
         # if we drain the last admin host we could end up removing the only instance
         # of the config and keyring and cause issues
         if not force:
-            p = PlacementSpec(label='_admin')
+            p = PlacementSpec(label=SpecialHostLabels.ADMIN)
             admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
             if len(admin_hosts) == 1 and admin_hosts[0] == hostname:
-                raise OrchestratorValidationError(f"Host {hostname} is the last host with the '_admin'"
+                raise OrchestratorValidationError(f"Host {hostname} is the last host with the '{SpecialHostLabels.ADMIN}'"
                                                   " label.\nDraining this host could cause the removal"
                                                   " of the last cluster config/keyring managed by cephadm.\n"
-                                                  "It is recommended to add the _admin label to another host"
+                                                  f"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host"
                                                   " before completing this operation.\nIf you're certain this is"
                                                   " what you want rerun this command with --force.")
 
         self.add_host_label(hostname, '_no_schedule')
+        if not keep_conf_keyring:
+            self.add_host_label(hostname, SpecialHostLabels.DRAIN_CONF_KEYRING)
 
         daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_host(hostname)
 
         osds_to_remove = [d.daemon_id for d in daemons if d.daemon_type == 'osd']
-        self.remove_osds(osds_to_remove)
+        self.remove_osds(osds_to_remove, zap=zap_osd_devices)
 
         daemons_table = ""
         daemons_table += "{:<20} {:<15}\n".format("type", "id")