from datetime import datetime
import orchestrator
from cephadm.utils import forall_hosts
+from ceph.utils import datetime_now
from orchestrator import OrchestratorError
from mgr_module import MonCommandFailed
@forall_hosts
def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
+ # skip this host if there has been no change in inventory
+ if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
+ self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
+ host, drive_group))
+ return None
+
cmd = self.driveselection_to_ceph_volume(drive_selection,
osd_id_claims.get(host, []))
if not cmd:
logger.debug("No data_devices, skipping DriveGroup: {}".format(
drive_group.service_id))
return None
- logger.info('Applying drive group %s on host %s...' % (drive_group.service_id, host))
+
+ logger.info('Applying service osd.%s on host %s...' % (
+ drive_group.service_id, host
+ ))
+ start_ts = datetime_now()
env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
ret_msg = self.create_single_host(
- host, cmd, replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars
+ drive_group, host, cmd,
+ replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars
)
+ self.mgr.cache.update_osdspec_last_applied(
+ host, drive_group.service_name(), start_ts
+ )
+ self.mgr.cache.save_host(host)
return ret_msg
ret = create_from_spec_one(self.prepare_drivegroup(drive_group))
return ", ".join(filter(None, ret))
- def create_single_host(self, host: str, cmd: str, replace_osd_ids: List[str],
+ def create_single_host(self,
+ drive_group: DriveGroupSpec,
+ host: str, cmd: str, replace_osd_ids: List[str],
env_vars: Optional[List[str]] = None) -> str:
out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
'ids': [str(osd.osd_id) for osd in osds]
})
if ret != 0:
- self.mgr.log.error(f"Could not set <{flag}> flag for osds: {osds}. <{err}>")
+ self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>")
return False
- self.mgr.log.info(f"OSDs <{osds}> are now <{flag}>")
+ self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}")
+ return True
+
+ def get_weight(self, osd: "OSD") -> Optional[float]:
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'osd crush tree',
+ 'format': 'json',
+ })
+ if ret != 0:
+ self.mgr.log.error(f"Could not dump crush weights. <{err}>")
+ return None
+ j = json.loads(out)
+ for n in j.get("nodes", []):
+ if n.get("name") == f"osd.{osd.osd_id}":
+ self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}")
+ return n.get("crush_weight")
+ return None
+
+ def reweight_osd(self, osd: "OSD", weight: float) -> bool:
+ self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}")
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': "osd crush reweight",
+ 'name': f"osd.{osd.osd_id}",
+ 'weight': weight,
+ })
+ if ret != 0:
+ self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>")
+ return False
+ self.mgr.log.info(f"{osd} weight is now {weight}")
return True
def safe_to_destroy(self, osd_ids: List[int]) -> bool:
replace: bool = False,
force: bool = False,
hostname: Optional[str] = None,
- fullname: Optional[str] = None,
):
# the ID of the OSD
self.osd_id = osd_id
self.force = force
# The name of the node
self.hostname = hostname
- # The full name of the osd
- self.fullname = fullname
# mgr obj to make mgr/mon calls
self.rm_util: RemoveUtil = remove_util
+ self.original_weight: Optional[float] = None
+
def start(self) -> None:
if self.started:
logger.debug(f"Already started draining {self}")
if self.stopped:
logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
return False
- self.rm_util.set_osd_flag([self], 'out')
+ if self.replace:
+ self.rm_util.set_osd_flag([self], 'out')
+ else:
+ self.original_weight = self.rm_util.get_weight(self)
+ self.rm_util.reweight_osd(self, 0.0)
self.drain_started_at = datetime.utcnow()
self.draining = True
logger.debug(f"Started draining {self}.")
return True
def stop_draining(self) -> bool:
- self.rm_util.set_osd_flag([self], 'in')
+ if self.replace:
+ self.rm_util.set_osd_flag([self], 'in')
+ else:
+ if self.original_weight:
+ self.rm_util.reweight_osd(self, self.original_weight)
self.drain_stopped_at = datetime.utcnow()
self.draining = False
logger.debug(f"Stopped draining {self}.")
return self.osd_id == other.osd_id
def __repr__(self) -> str:
- return f"<OSD>(osd_id={self.osd_id}, draining={self.draining})"
+ return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}"
class OSDRemovalQueue(object):
if not osd.force:
# skip criteria
if not osd.is_empty:
- logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
+ logger.debug(f"{osd} is not empty yet. Waiting a bit more")
new_queue.add(osd)
continue
if not osd.safe_to_destroy():
- logger.info(
- f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
+ logger.debug(
+ f"{osd} is not safe-to-destroy yet. Waiting a bit more")
new_queue.add(osd)
continue
if not osd.down():
# also remove it from the remove_osd list and set a health_check warning?
raise orchestrator.OrchestratorError(
- f"Could not set OSD <{osd.osd_id}> to 'down'")
+ f"Could not mark {osd} down")
+
+ # stop and remove daemon
+ assert osd.hostname is not None
+ self.mgr._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
+ logger.info(f"Successfully removed {osd} on {osd.hostname}")
if osd.replace:
+ # mark destroyed in osdmap
if not osd.destroy():
raise orchestrator.OrchestratorError(
- f"Could not destroy OSD <{osd.osd_id}>")
+ f"Could not destroy {osd}")
+ logger.info(f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
else:
+ # purge from osdmap
if not osd.purge():
- raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
+ raise orchestrator.OrchestratorError(f"Could not purge {osd}")
+ logger.info(f"Successfully purged {osd} on {osd.hostname}")
- if not osd.exists:
- continue
- assert osd.fullname is not None
- assert osd.hostname is not None
- self.mgr._remove_daemon(osd.fullname, osd.hostname)
- logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
- logger.debug(f"Removing {osd.osd_id} from the queue.")
+ logger.debug(f"Removing {osd} from the queue.")
# self could change while this is processing (osds get added from the CLI)
# The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and