]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/services/osd.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / osd.py
index 8febf352b615bb4c67c1a421c21c14dd1d978ede..b776a88f454736d2fe5307d38f2a321574aad286 100644 (file)
@@ -1,7 +1,7 @@
 import json
 import logging
 from threading import Lock
-from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional, TYPE_CHECKING
+from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING
 
 from ceph.deployment import translate
 from ceph.deployment.drive_group import DriveGroupSpec
@@ -11,11 +11,13 @@ from ceph.utils import datetime_to_str, str_to_datetime
 
 from datetime import datetime
 import orchestrator
+from cephadm.serve import CephadmServe
 from cephadm.utils import forall_hosts
+from ceph.utils import datetime_now
 from orchestrator import OrchestratorError
 from mgr_module import MonCommandFailed
 
-from cephadm.services.cephadmservice import CephadmDaemonSpec, CephService
+from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService
 
 if TYPE_CHECKING:
     from cephadm.module import CephadmOrchestrator
@@ -35,23 +37,40 @@ class OSDService(CephService):
 
         @forall_hosts
         def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
+            # skip this host if there has been no change in inventory
+            if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
+                self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
+                    host, drive_group))
+                return None
+
             cmd = self.driveselection_to_ceph_volume(drive_selection,
                                                      osd_id_claims.get(host, []))
             if not cmd:
                 logger.debug("No data_devices, skipping DriveGroup: {}".format(
                     drive_group.service_id))
                 return None
-            logger.info('Applying drive group %s on host %s...' % (drive_group.service_id, host))
+
+            logger.info('Applying service osd.%s on host %s...' % (
+                drive_group.service_id, host
+            ))
+            start_ts = datetime_now()
             env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
             ret_msg = self.create_single_host(
-                host, cmd, replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars
+                drive_group, host, cmd,
+                replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars
+            )
+            self.mgr.cache.update_osdspec_last_applied(
+                host, drive_group.service_name(), start_ts
             )
+            self.mgr.cache.save_host(host)
             return ret_msg
 
         ret = create_from_spec_one(self.prepare_drivegroup(drive_group))
         return ", ".join(filter(None, ret))
 
-    def create_single_host(self, host: str, cmd: str, replace_osd_ids: List[str],
+    def create_single_host(self,
+                           drive_group: DriveGroupSpec,
+                           host: str, cmd: str, replace_osd_ids: List[str],
                            env_vars: Optional[List[str]] = None) -> str:
         out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
 
@@ -65,9 +84,17 @@ class OSDService(CephService):
             raise RuntimeError(
                 'cephadm exited with an error code: %d, stderr:%s' % (
                     code, '\n'.join(err)))
+        return self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
+                                                         replace_osd_ids)
 
+    def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str,
+                                             replace_osd_ids: Optional[List[str]] = None) -> str:
+
+        if replace_osd_ids is None:
+            replace_osd_ids = self.find_destroyed_osds().get(host, [])
+            assert replace_osd_ids is not None
         # check result
-        out, err, code = self.mgr._run_cephadm(
+        osds_elems: dict = CephadmServe(self.mgr)._run_cephadm_json(
             host, 'osd', 'ceph-volume',
             [
                 '--',
@@ -75,11 +102,6 @@ class OSDService(CephService):
                 '--format', 'json',
             ])
         before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True)
-        try:
-            osds_elems = json.loads('\n'.join(out))
-        except ValueError:
-            logger.exception('Cannot decode JSON: \'%s\'' % '\n'.join(out))
-            osds_elems = {}
         fsid = self.mgr._cluster_fsid
         osd_uuid_map = self.mgr.get_osd_uuid_map()
         created = []
@@ -102,12 +124,14 @@ class OSDService(CephService):
                     continue
 
                 created.append(osd_id)
-                daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
+                daemon_spec: CephadmDaemonDeploySpec = CephadmDaemonDeploySpec(
+                    service_name=service_name,
                     daemon_id=osd_id,
                     host=host,
                     daemon_type='osd',
                 )
-                self.mgr._create_daemon(
+                daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+                CephadmServe(self.mgr)._create_daemon(
                     daemon_spec,
                     osd_uuid_map=osd_uuid_map)
 
@@ -263,7 +287,7 @@ class OSDService(CephService):
         split_cmd = cmd.split(' ')
         _cmd = ['--config-json', '-', '--']
         _cmd.extend(split_cmd)
-        out, err, code = self.mgr._run_cephadm(
+        out, err, code = CephadmServe(self.mgr)._run_cephadm(
             host, 'osd', 'ceph-volume',
             _cmd,
             env_vars=env_vars,
@@ -317,11 +341,10 @@ class RemoveUtil(object):
             'format': 'json'
         })
         try:
-            ret = json.loads(out)
+            return json.loads(out)
         except ValueError:
             logger.exception(f'Cannot decode JSON: \'{out}\'')
             return {}
-        return ret
 
     def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
         if not osd_df:
@@ -356,8 +379,8 @@ class RemoveUtil(object):
             osds = osds[len(osds) // 2:]
         return osds
 
-       # todo start draining
-       #  return all([osd.start_draining() for osd in osds])
+        # todo start draining
+        #  return all([osd.start_draining() for osd in osds])
 
     def ok_to_stop(self, osds: List["OSD"]) -> bool:
         cmd_args = {
@@ -374,9 +397,37 @@ class RemoveUtil(object):
             'ids': [str(osd.osd_id) for osd in osds]
         })
         if ret != 0:
-            self.mgr.log.error(f"Could not set <{flag}> flag for osds: {osds}. <{err}>")
+            self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>")
+            return False
+        self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}")
+        return True
+
+    def get_weight(self, osd: "OSD") -> Optional[float]:
+        ret, out, err = self.mgr.mon_command({
+            'prefix': 'osd crush tree',
+            'format': 'json',
+        })
+        if ret != 0:
+            self.mgr.log.error(f"Could not dump crush weights. <{err}>")
+            return None
+        j = json.loads(out)
+        for n in j.get("nodes", []):
+            if n.get("name") == f"osd.{osd.osd_id}":
+                self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}")
+                return n.get("crush_weight")
+        return None
+
+    def reweight_osd(self, osd: "OSD", weight: float) -> bool:
+        self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}")
+        ret, out, err = self.mgr.mon_command({
+            'prefix': "osd crush reweight",
+            'name': f"osd.{osd.osd_id}",
+            'weight': weight,
+        })
+        if ret != 0:
+            self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>")
             return False
-        self.mgr.log.info(f"OSDs <{osds}> are now <{flag}>")
+        self.mgr.log.info(f"{osd} weight is now {weight}")
         return True
 
     def safe_to_destroy(self, osd_ids: List[int]) -> bool:
@@ -433,7 +484,6 @@ class OSD:
                  replace: bool = False,
                  force: bool = False,
                  hostname: Optional[str] = None,
-                 fullname: Optional[str] = None,
                  ):
         # the ID of the OSD
         self.osd_id = osd_id
@@ -465,12 +515,12 @@ class OSD:
         self.force = force
         # The name of the node
         self.hostname = hostname
-        # The full name of the osd
-        self.fullname = fullname
 
         # mgr obj to make mgr/mon calls
         self.rm_util: RemoveUtil = remove_util
 
+        self.original_weight: Optional[float] = None
+
     def start(self) -> None:
         if self.started:
             logger.debug(f"Already started draining {self}")
@@ -482,14 +532,22 @@ class OSD:
         if self.stopped:
             logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
             return False
-        self.rm_util.set_osd_flag([self], 'out')
+        if self.replace:
+            self.rm_util.set_osd_flag([self], 'out')
+        else:
+            self.original_weight = self.rm_util.get_weight(self)
+            self.rm_util.reweight_osd(self, 0.0)
         self.drain_started_at = datetime.utcnow()
         self.draining = True
         logger.debug(f"Started draining {self}.")
         return True
 
     def stop_draining(self) -> bool:
-        self.rm_util.set_osd_flag([self], 'in')
+        if self.replace:
+            self.rm_util.set_osd_flag([self], 'in')
+        else:
+            if self.original_weight:
+                self.rm_util.reweight_osd(self, self.original_weight)
         self.drain_stopped_at = datetime.utcnow()
         self.draining = False
         logger.debug(f"Stopped draining {self}.")
@@ -592,7 +650,7 @@ class OSD:
         return self.osd_id == other.osd_id
 
     def __repr__(self) -> str:
-        return f"<OSD>(osd_id={self.osd_id}, draining={self.draining})"
+        return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
 
 
 class OSDRemovalQueue(object):
@@ -635,13 +693,13 @@ class OSDRemovalQueue(object):
             if not osd.force:
                 # skip criteria
                 if not osd.is_empty:
-                    logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
+                    logger.debug(f"{osd} is not empty yet. Waiting a bit more")
                     new_queue.add(osd)
                     continue
 
             if not osd.safe_to_destroy():
-                logger.info(
-                    f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
+                logger.debug(
+                    f"{osd} is not safe-to-destroy yet. Waiting a bit more")
                 new_queue.add(osd)
                 continue
 
@@ -649,23 +707,27 @@ class OSDRemovalQueue(object):
             if not osd.down():
                 # also remove it from the remove_osd list and set a health_check warning?
                 raise orchestrator.OrchestratorError(
-                    f"Could not set OSD <{osd.osd_id}> to 'down'")
+                    f"Could not mark {osd} down")
+
+            # stop and remove daemon
+            assert osd.hostname is not None
+            CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
+            logger.info(f"Successfully removed {osd} on {osd.hostname}")
 
             if osd.replace:
+                # mark destroyed in osdmap
                 if not osd.destroy():
                     raise orchestrator.OrchestratorError(
-                        f"Could not destroy OSD <{osd.osd_id}>")
+                        f"Could not destroy {osd}")
+                logger.info(
+                    f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
             else:
+                # purge from osdmap
                 if not osd.purge():
-                    raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
+                    raise orchestrator.OrchestratorError(f"Could not purge {osd}")
+                logger.info(f"Successfully purged {osd} on {osd.hostname}")
 
-            if not osd.exists:
-                continue
-            assert osd.fullname is not None
-            assert osd.hostname is not None
-            self.mgr._remove_daemon(osd.fullname, osd.hostname)
-            logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
-            logger.debug(f"Removing {osd.osd_id} from the queue.")
+            logger.debug(f"Removing {osd} from the queue.")
 
         # self could change while this is processing (osds get added from the CLI)
         # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and