]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/services/osd.py
import ceph octopus 15.2.17
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / osd.py
index 8febf352b615bb4c67c1a421c21c14dd1d978ede..5b388106aae45807a85cff4f2b2bc731f9af460a 100644 (file)
@@ -12,6 +12,7 @@ from ceph.utils import datetime_to_str, str_to_datetime
 from datetime import datetime
 import orchestrator
 from cephadm.utils import forall_hosts
+from ceph.utils import datetime_now
 from orchestrator import OrchestratorError
 from mgr_module import MonCommandFailed
 
@@ -35,23 +36,40 @@ class OSDService(CephService):
 
         @forall_hosts
         def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
+            # skip this host if there has been no change in inventory
+            if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
+                self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
+                    host, drive_group))
+                return None
+
             cmd = self.driveselection_to_ceph_volume(drive_selection,
                                                      osd_id_claims.get(host, []))
             if not cmd:
                 logger.debug("No data_devices, skipping DriveGroup: {}".format(
                     drive_group.service_id))
                 return None
-            logger.info('Applying drive group %s on host %s...' % (drive_group.service_id, host))
+
+            logger.info('Applying service osd.%s on host %s...' % (
+                drive_group.service_id, host
+            ))
+            start_ts = datetime_now()
             env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
             ret_msg = self.create_single_host(
-                host, cmd, replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars
+                drive_group, host, cmd,
+                replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars
             )
+            self.mgr.cache.update_osdspec_last_applied(
+                host, drive_group.service_name(), start_ts
+            )
+            self.mgr.cache.save_host(host)
             return ret_msg
 
         ret = create_from_spec_one(self.prepare_drivegroup(drive_group))
         return ", ".join(filter(None, ret))
 
-    def create_single_host(self, host: str, cmd: str, replace_osd_ids: List[str],
+    def create_single_host(self,
+                           drive_group: DriveGroupSpec,
+                           host: str, cmd: str, replace_osd_ids: List[str],
                            env_vars: Optional[List[str]] = None) -> str:
         out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
 
@@ -374,9 +392,37 @@ class RemoveUtil(object):
             'ids': [str(osd.osd_id) for osd in osds]
         })
         if ret != 0:
-            self.mgr.log.error(f"Could not set <{flag}> flag for osds: {osds}. <{err}>")
+            self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>")
             return False
-        self.mgr.log.info(f"OSDs <{osds}> are now <{flag}>")
+        self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}")
+        return True
+
+    def get_weight(self, osd: "OSD") -> Optional[float]:
+        ret, out, err = self.mgr.mon_command({
+            'prefix': 'osd crush tree',
+            'format': 'json',
+        })
+        if ret != 0:
+            self.mgr.log.error(f"Could not dump crush weights. <{err}>")
+            return None
+        j = json.loads(out)
+        for n in j.get("nodes", []):
+            if n.get("name") == f"osd.{osd.osd_id}":
+                self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}")
+                return n.get("crush_weight")
+        return None
+
+    def reweight_osd(self, osd: "OSD", weight: float) -> bool:
+        self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}")
+        ret, out, err = self.mgr.mon_command({
+            'prefix': "osd crush reweight",
+            'name': f"osd.{osd.osd_id}",
+            'weight': weight,
+        })
+        if ret != 0:
+            self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>")
+            return False
+        self.mgr.log.info(f"{osd} weight is now {weight}")
         return True
 
     def safe_to_destroy(self, osd_ids: List[int]) -> bool:
@@ -433,7 +479,6 @@ class OSD:
                  replace: bool = False,
                  force: bool = False,
                  hostname: Optional[str] = None,
-                 fullname: Optional[str] = None,
                  ):
         # the ID of the OSD
         self.osd_id = osd_id
@@ -465,12 +510,12 @@ class OSD:
         self.force = force
         # The name of the node
         self.hostname = hostname
-        # The full name of the osd
-        self.fullname = fullname
 
         # mgr obj to make mgr/mon calls
         self.rm_util: RemoveUtil = remove_util
 
+        self.original_weight: Optional[float] = None
+
     def start(self) -> None:
         if self.started:
             logger.debug(f"Already started draining {self}")
@@ -482,14 +527,22 @@ class OSD:
         if self.stopped:
             logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
             return False
-        self.rm_util.set_osd_flag([self], 'out')
+        if self.replace:
+            self.rm_util.set_osd_flag([self], 'out')
+        else:
+            self.original_weight = self.rm_util.get_weight(self)
+            self.rm_util.reweight_osd(self, 0.0)
         self.drain_started_at = datetime.utcnow()
         self.draining = True
         logger.debug(f"Started draining {self}.")
         return True
 
     def stop_draining(self) -> bool:
-        self.rm_util.set_osd_flag([self], 'in')
+        if self.replace:
+            self.rm_util.set_osd_flag([self], 'in')
+        else:
+            if self.original_weight:
+                self.rm_util.reweight_osd(self, self.original_weight)
         self.drain_stopped_at = datetime.utcnow()
         self.draining = False
         logger.debug(f"Stopped draining {self}.")
@@ -592,7 +645,7 @@ class OSD:
         return self.osd_id == other.osd_id
 
     def __repr__(self) -> str:
-        return f"<OSD>(osd_id={self.osd_id}, draining={self.draining})"
+        return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
 
 
 class OSDRemovalQueue(object):
@@ -635,13 +688,13 @@ class OSDRemovalQueue(object):
             if not osd.force:
                 # skip criteria
                 if not osd.is_empty:
-                    logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
+                    logger.debug(f"{osd} is not empty yet. Waiting a bit more")
                     new_queue.add(osd)
                     continue
 
             if not osd.safe_to_destroy():
-                logger.info(
-                    f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
+                logger.debug(
+                    f"{osd} is not safe-to-destroy yet. Waiting a bit more")
                 new_queue.add(osd)
                 continue
 
@@ -649,23 +702,26 @@ class OSDRemovalQueue(object):
             if not osd.down():
                 # also remove it from the remove_osd list and set a health_check warning?
                 raise orchestrator.OrchestratorError(
-                    f"Could not set OSD <{osd.osd_id}> to 'down'")
+                    f"Could not mark {osd} down")
+
+            # stop and remove daemon
+            assert osd.hostname is not None
+            self.mgr._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
+            logger.info(f"Successfully removed {osd} on {osd.hostname}")
 
             if osd.replace:
+                # mark destroyed in osdmap
                 if not osd.destroy():
                     raise orchestrator.OrchestratorError(
-                        f"Could not destroy OSD <{osd.osd_id}>")
+                        f"Could not destroy {osd}")
+                logger.info(f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
             else:
+                # purge from osdmap
                 if not osd.purge():
-                    raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
+                    raise orchestrator.OrchestratorError(f"Could not purge {osd}")
+                logger.info(f"Successfully purged {osd} on {osd.hostname}")
 
-            if not osd.exists:
-                continue
-            assert osd.fullname is not None
-            assert osd.hostname is not None
-            self.mgr._remove_daemon(osd.fullname, osd.hostname)
-            logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
-            logger.debug(f"Removing {osd.osd_id} from the queue.")
+            logger.debug(f"Removing {osd} from the queue.")
 
         # self could change while this is processing (osds get added from the CLI)
         # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and