-import datetime
import json
import logging
from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.drive_selection import DriveSelection
+from datetime import datetime
import orchestrator
+from cephadm.utils import forall_hosts
from orchestrator import OrchestratorError
from mgr_module import MonCommandFailed
-from cephadm.services.cephadmservice import CephadmService
-
+from cephadm.services.cephadmservice import CephadmService, CephadmDaemonSpec
logger = logging.getLogger(__name__)
+DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
class OSDService(CephadmService):
- def create(self, drive_group: DriveGroupSpec) -> str:
+ TYPE = 'osd'
+
+ def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
logger.debug(f"Processing DriveGroup {drive_group}")
- ret = []
- drive_group.osd_id_claims = self.find_destroyed_osds()
- logger.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
- for host, drive_selection in self.prepare_drivegroup(drive_group):
+ osd_id_claims = self.find_destroyed_osds()
+ logger.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims}")
+
+ @forall_hosts
+ def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
logger.info('Applying %s on host %s...' % (drive_group.service_id, host))
- cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection,
- drive_group.osd_id_claims.get(host, []))
+ cmd = self.driveselection_to_ceph_volume(drive_selection,
+ osd_id_claims.get(host, []))
if not cmd:
logger.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
- continue
- # env_vars = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
- # disable this until https://github.com/ceph/ceph/pull/34835 is merged
- env_vars: List[str] = []
+ return None
+ env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
ret_msg = self.create_single_host(
- host, cmd, replace_osd_ids=drive_group.osd_id_claims.get(host, []), env_vars=env_vars
+ host, cmd, replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars
)
- ret.append(ret_msg)
- return ", ".join(ret)
-
+ return ret_msg
+
+ ret = create_from_spec_one(self.prepare_drivegroup(drive_group))
+ return ", ".join(filter(None, ret))
+
def create_single_host(self, host: str, cmd: str, replace_osd_ids=None, env_vars: Optional[List[str]] = None) -> str:
out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
continue
created.append(osd_id)
+ daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
+ daemon_id=osd_id,
+ host=host,
+ daemon_type='osd',
+ )
self.mgr._create_daemon(
- 'osd', osd_id, host,
+ daemon_spec,
osd_uuid_map=osd_uuid_map)
if created:
host_ds_map.append((host, drive_selection))
return host_ds_map
- def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
+ def driveselection_to_ceph_volume(self,
drive_selection: DriveSelection,
osd_id_claims: Optional[List[str]] = None,
preview: bool = False) -> Optional[str]:
- logger.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
- cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection,
+ logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
+ cmd: Optional[str] = translate.to_ceph_volume(drive_selection,
osd_id_claims, preview=preview).run()
logger.debug(f"Resulting ceph-volume cmd: {cmd}")
return cmd
def get_previews(self, host) -> List[Dict[str, Any]]:
# Find OSDSpecs that match host.
- osdspecs = self.mgr.resolve_osdspecs_for_host(host)
+ osdspecs = self.resolve_osdspecs_for_host(host)
return self.generate_previews(osdspecs, host)
def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]:
for osdspec in osdspecs:
# populate osd_id_claims
- osdspec.osd_id_claims = self.find_destroyed_osds()
+ osd_id_claims = self.find_destroyed_osds()
# prepare driveselection
for host, ds in self.prepare_drivegroup(osdspec):
continue
# driveselection for host
- cmd = self.driveselection_to_ceph_volume(osdspec,
- ds,
- osdspec.osd_id_claims.get(host, []),
+ cmd = self.driveselection_to_ceph_volume(ds,
+ osd_id_claims.get(host, []),
preview=True)
if not cmd:
logger.debug("No data_devices, skipping DriveGroup: {}".format(
'host': host})
return ret_all
+ def resolve_hosts_for_osdspecs(self,
+ specs: Optional[List[DriveGroupSpec]] = None
+ ) -> List[str]:
+ osdspecs = []
+ if specs:
+ osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
+ if not osdspecs:
+ self.mgr.log.debug("No OSDSpecs found")
+ return []
+ return sum([spec.placement.filter_matching_hosts(self.mgr._get_hosts) for spec in osdspecs], [])
+
+ def resolve_osdspecs_for_host(self, host: str, specs: Optional[List[DriveGroupSpec]] = None):
+ matching_specs = []
+ self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>")
+ if not specs:
+ specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items()
+ if spec.service_type == 'osd']
+ for spec in specs:
+ if host in spec.placement.filter_matching_hosts(self.mgr._get_hosts):
+ self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
+ matching_specs.append(spec)
+ return matching_specs
+
def _run_ceph_volume_command(self, host: str,
cmd: str, env_vars: Optional[List[str]] = None
) -> Tuple[List[str], List[str], int]:
return osd_host_map
-class OSDRemoval(object):
- def __init__(self,
- osd_id: str,
- replace: bool,
- force: bool,
- nodename: str,
- fullname: str,
- start_at: datetime.datetime,
- pg_count: int):
- self.osd_id = osd_id
- self.replace = replace
- self.force = force
- self.nodename = nodename
- self.fullname = fullname
- self.started_at = start_at
- self.pg_count = pg_count
-
- # needed due to changing 'started_at' attr
- def __eq__(self, other):
- return self.osd_id == other.osd_id
-
- def __hash__(self):
- return hash(self.osd_id)
-
- def __repr__(self):
- return ('<OSDRemoval>(osd_id={}, replace={}, force={}, nodename={}'
- ', fullname={}, started_at={}, pg_count={})').format(
- self.osd_id, self.replace, self.force, self.nodename,
- self.fullname, self.started_at, self.pg_count)
-
- @property
- def pg_count_str(self) -> str:
- return 'n/a' if self.pg_count < 0 else str(self.pg_count)
-
-
class RemoveUtil(object):
def __init__(self, mgr):
self.mgr = mgr
- self.to_remove_osds: Set[OSDRemoval] = set()
- self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict()
-
- @property
- def report(self) -> Set[OSDRemoval]:
- return self.to_remove_osds.copy()
- def queue_osds_for_removal(self, osds: Set[OSDRemoval]):
- self.to_remove_osds.update(osds)
-
- def _remove_osds_bg(self) -> None:
+ def process_removal_queue(self) -> None:
"""
Performs actions in the _serve() loop to remove an OSD
when criteria is met.
"""
+
+ # make sure that we don't run on OSDs that are not in the cluster anymore.
+ self.cleanup()
+
logger.debug(
- f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
- self._update_osd_removal_status()
- remove_osds: set = self.to_remove_osds.copy()
- for osd in remove_osds:
+ f"{self.mgr.to_remove_osds.queue_size()} OSDs are scheduled "
+ f"for removal: {self.mgr.to_remove_osds.all_osds()}")
+
+ # find osds that are ok-to-stop and not yet draining
+ ok_to_stop_osds = self.find_osd_stop_threshold(self.mgr.to_remove_osds.idling_osds())
+ if ok_to_stop_osds:
+ # start draining those
+ _ = [osd.start_draining() for osd in ok_to_stop_osds]
+
+ # Check all osds for their state and take action (remove, purge etc)
+ to_remove_osds = self.mgr.to_remove_osds.all_osds()
+ new_queue = set()
+ for osd in to_remove_osds:
if not osd.force:
- self.drain_osd(osd.osd_id)
# skip criteria
- if not self.is_empty(osd.osd_id):
+ if not osd.is_empty:
logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
+ new_queue.add(osd)
continue
- if not self.ok_to_destroy([osd.osd_id]):
+ if not osd.safe_to_destroy():
logger.info(
f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
+ new_queue.add(osd)
continue
# abort criteria
- if not self.down_osd([osd.osd_id]):
+ if not osd.down():
# also remove it from the remove_osd list and set a health_check warning?
raise orchestrator.OrchestratorError(
f"Could not set OSD <{osd.osd_id}> to 'down'")
if osd.replace:
- if not self.destroy_osd(osd.osd_id):
- # also remove it from the remove_osd list and set a health_check warning?
+ if not osd.destroy():
raise orchestrator.OrchestratorError(
f"Could not destroy OSD <{osd.osd_id}>")
else:
- if not self.purge_osd(osd.osd_id):
- # also remove it from the remove_osd list and set a health_check warning?
+ if not osd.purge():
raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
+ if not osd.exists:
+ continue
self.mgr._remove_daemon(osd.fullname, osd.nodename)
logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}")
logger.debug(f"Removing {osd.osd_id} from the queue.")
- self.to_remove_osds.remove(osd)
- def _update_osd_removal_status(self):
- """
- Generate a OSD report that can be printed to the CLI
- """
- logger.debug("Update OSD removal status")
- for osd in self.to_remove_osds:
- osd.pg_count = self.get_pg_count(str(osd.osd_id))
- logger.debug(f"OSD removal status: {self.to_remove_osds}")
+ # self.mgr.to_remove_osds could change while this is processing (osds get added from the CLI)
+ # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
+ # osds that were added while this method was executed'
+ self.mgr.to_remove_osds.intersection_update(new_queue)
+ self.save_to_store()
+
+ def cleanup(self):
+ # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
+ not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster()
+ [self.mgr.to_remove_osds.remove(osd) for osd in not_in_cluster_osds]
+
+ def get_osds_in_cluster(self) -> List[str]:
+ osd_map = self.mgr.get_osdmap()
+ return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
+
+ def osd_df(self) -> dict:
+ base_cmd = 'osd df'
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': base_cmd,
+ 'format': 'json'
+ })
+ return json.loads(out)
+
+ def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
+ if not osd_df:
+ osd_df = self.osd_df()
+ osd_nodes = osd_df.get('nodes', [])
+ for osd_node in osd_nodes:
+ if osd_node.get('id') == int(osd_id):
+ return osd_node.get('pgs', -1)
+ return -1
- def drain_osd(self, osd_id: str) -> bool:
+ def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]:
"""
- Uses `osd_support` module to schedule a drain operation of an OSD
+ Cut osd_id list in half until it's ok-to-stop
+
+ :param osds: list of osd_ids
+ :return: list of ods_ids that can be stopped at once
"""
+ if not osds:
+ return []
+ while not self.ok_to_stop(osds):
+ if len(osds) <= 1:
+ # can't even stop one OSD, aborting
+ self.mgr.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..")
+ return []
+
+ # This potentially prolongs the global wait time.
+ self.mgr.event.wait(1)
+ # splitting osd_ids in half until ok_to_stop yields success
+ # maybe popping ids off one by one is better here..depends on the cluster size I guess..
+ # There's a lot of room for micro adjustments here
+ osds = osds[len(osds) // 2:]
+ return osds
+
+ # todo start draining
+ # return all([osd.start_draining() for osd in osds])
+
+ def ok_to_stop(self, osds: List["OSD"]) -> bool:
cmd_args = {
- 'prefix': 'osd drain',
- 'osd_ids': [int(osd_id)]
+ 'prefix': "osd ok-to-stop",
+ 'ids': [str(osd.osd_id) for osd in osds]
}
return self._run_mon_cmd(cmd_args)
- def get_pg_count(self, osd_id: str) -> int:
- """ Queries for PG count of an OSD """
- self.mgr.log.debug("Querying for drain status")
+ def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool:
+ base_cmd = f"osd {flag}"
+ self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}")
ret, out, err = self.mgr.mon_command({
- 'prefix': 'osd drain status',
+ 'prefix': base_cmd,
+ 'ids': [str(osd.osd_id) for osd in osds]
})
if ret != 0:
- self.mgr.log.error(f"Calling osd drain status failed with {err}")
- raise OrchestratorError("Could not query `osd drain status`")
- out = json.loads(out)
- for o in out:
- if str(o.get('osd_id', '')) == str(osd_id):
- return int(o.get('pgs', -1))
- return -1
-
- def is_empty(self, osd_id: str) -> bool:
- """ Checks if an OSD is empty """
- return self.get_pg_count(osd_id) == 0
+ self.mgr.log.error(f"Could not set <{flag}> flag for osds: {osds}. <{err}>")
+ return False
+ self.mgr.log.info(f"OSDs <{osds}> are now <{flag}>")
+ return True
- def ok_to_destroy(self, osd_ids: List[int]) -> bool:
+ def safe_to_destroy(self, osd_ids: List[int]) -> bool:
""" Queries the safe-to-destroy flag for OSDs """
cmd_args = {'prefix': 'osd safe-to-destroy',
- 'ids': osd_ids}
+ 'ids': [str(x) for x in osd_ids]}
return self._run_mon_cmd(cmd_args)
def destroy_osd(self, osd_id: int) -> bool:
'yes_i_really_mean_it': True}
return self._run_mon_cmd(cmd_args)
- def down_osd(self, osd_ids: List[int]) -> bool:
- """ Sets `out` flag to OSDs """
- cmd_args = {
- 'prefix': 'osd down',
- 'ids': osd_ids,
- }
- return self._run_mon_cmd(cmd_args)
-
def purge_osd(self, osd_id: int) -> bool:
""" Purges an OSD from the cluster (forcefully) """
cmd_args = {
}
return self._run_mon_cmd(cmd_args)
- def out_osd(self, osd_ids: List[int]) -> bool:
- """ Sets `down` flag to OSDs """
- cmd_args = {
- 'prefix': 'osd out',
- 'ids': osd_ids,
- }
- return self._run_mon_cmd(cmd_args)
-
def _run_mon_cmd(self, cmd_args: dict) -> bool:
"""
Generic command to run mon_command and evaluate/log the results
return False
self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
return True
+
+ def save_to_store(self):
+ osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()]
+ logger.debug(f"Saving {osd_queue} to store")
+ self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
+
+ def load_from_store(self):
+ for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
+ for osd in json.loads(v):
+ logger.debug(f"Loading osd ->{osd} from store")
+ osd_obj = OSD.from_json(json.loads(osd), ctx=self)
+ self.mgr.to_remove_osds.add(osd_obj)
+
+
+class NotFoundError(Exception):
+ pass
+
+
+class OSD:
+
+ def __init__(self,
+ osd_id: int,
+ remove_util: RemoveUtil,
+ drain_started_at: Optional[datetime] = None,
+ process_started_at: Optional[datetime] = None,
+ drain_stopped_at: Optional[datetime] = None,
+ drain_done_at: Optional[datetime] = None,
+ draining: bool = False,
+ started: bool = False,
+ stopped: bool = False,
+ replace: bool = False,
+ force: bool = False,
+ hostname: Optional[str] = None,
+ fullname: Optional[str] = None,
+ ):
+ # the ID of the OSD
+ self.osd_id = osd_id
+
+ # when did process (not the actual draining) start
+ self.process_started_at = process_started_at
+
+ # when did the drain start
+ self.drain_started_at = drain_started_at
+
+ # when did the drain stop
+ self.drain_stopped_at = drain_stopped_at
+
+ # when did the drain finish
+ self.drain_done_at = drain_done_at
+
+ # did the draining start
+ self.draining = draining
+
+ # was the operation started
+ self.started = started
+
+ # was the operation stopped
+ self.stopped = stopped
+
+ # If this is a replace or remove operation
+ self.replace = replace
+ # If we wait for the osd to be drained
+ self.force = force
+ # The name of the node
+ self.nodename = hostname
+ # The full name of the osd
+ self.fullname = fullname
+
+ # mgr obj to make mgr/mon calls
+ self.rm_util = remove_util
+
+ def start(self) -> None:
+ if self.started:
+ logger.debug(f"Already started draining {self}")
+ return None
+ self.started = True
+ self.stopped = False
+
+ def start_draining(self) -> bool:
+ if self.stopped:
+ logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
+ return False
+ self.rm_util.set_osd_flag([self], 'out')
+ self.drain_started_at = datetime.utcnow()
+ self.draining = True
+ logger.debug(f"Started draining {self}.")
+ return True
+
+ def stop_draining(self) -> bool:
+ self.rm_util.set_osd_flag([self], 'in')
+ self.drain_stopped_at = datetime.utcnow()
+ self.draining = False
+ logger.debug(f"Stopped draining {self}.")
+ return True
+
+ def stop(self) -> None:
+ if self.stopped:
+ logger.debug(f"Already stopped draining {self}")
+ return None
+ self.started = False
+ self.stopped = True
+ self.stop_draining()
+
+ @property
+ def is_draining(self) -> bool:
+ """
+ Consider an OSD draining when it is
+ actively draining but not yet empty
+ """
+ return self.draining and not self.is_empty
+
+ @property
+ def is_ok_to_stop(self) -> bool:
+ return self.rm_util.ok_to_stop([self])
+
+ @property
+ def is_empty(self) -> bool:
+ if self.get_pg_count() == 0:
+ if not self.drain_done_at:
+ self.drain_done_at = datetime.utcnow()
+ self.draining = False
+ return True
+ return False
+
+ def safe_to_destroy(self) -> bool:
+ return self.rm_util.safe_to_destroy([self.osd_id])
+
+ def down(self) -> bool:
+ return self.rm_util.set_osd_flag([self], 'down')
+
+ def destroy(self) -> bool:
+ return self.rm_util.destroy_osd(self.osd_id)
+
+ def purge(self) -> bool:
+ return self.rm_util.purge_osd(self.osd_id)
+
+ def get_pg_count(self) -> int:
+ return self.rm_util.get_pg_count(self.osd_id)
+
+ @property
+ def exists(self) -> bool:
+ return str(self.osd_id) in self.rm_util.get_osds_in_cluster()
+
+ def drain_status_human(self):
+ default_status = 'not started'
+ status = 'started' if self.started and not self.draining else default_status
+ status = 'draining' if self.draining else status
+ status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status
+ return status
+
+ def pg_count_str(self):
+ return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())
+
+ def to_json(self) -> dict:
+ out = dict()
+ out['osd_id'] = self.osd_id
+ out['started'] = self.started
+ out['draining'] = self.draining
+ out['stopped'] = self.stopped
+ out['replace'] = self.replace
+ out['force'] = self.force
+ out['nodename'] = self.nodename # type: ignore
+
+ for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
+ if getattr(self, k):
+ out[k] = getattr(self, k).strftime(DATEFMT)
+ else:
+ out[k] = getattr(self, k)
+ return out
+
+ @classmethod
+ def from_json(cls, inp: Optional[Dict[str, Any]], ctx: Optional[RemoveUtil] = None) -> Optional["OSD"]:
+ if not inp:
+ return None
+ for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
+ if inp.get(date_field):
+ inp.update({date_field: datetime.strptime(inp.get(date_field, ''), DATEFMT)})
+ inp.update({'remove_util': ctx})
+ return cls(**inp)
+
+ def __hash__(self):
+ return hash(self.osd_id)
+
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, OSD):
+ return NotImplemented
+ return self.osd_id == other.osd_id
+
+ def __repr__(self) -> str:
+ return f"<OSD>(osd_id={self.osd_id}, is_draining={self.is_draining})"
+
+
+class OSDQueue(Set):
+
+ def __init__(self):
+ super().__init__()
+
+ def as_osd_ids(self) -> List[int]:
+ return [osd.osd_id for osd in self]
+
+ def queue_size(self) -> int:
+ return len(self)
+
+ def draining_osds(self) -> List["OSD"]:
+ return [osd for osd in self if osd.is_draining]
+
+ def idling_osds(self) -> List["OSD"]:
+ return [osd for osd in self if not osd.is_draining and not osd.is_empty]
+
+ def empty_osds(self) -> List["OSD"]:
+ return [osd for osd in self if osd.is_empty]
+
+ def all_osds(self) -> List["OSD"]:
+ return [osd for osd in self]
+
+ def not_in_cluster(self) -> List["OSD"]:
+ return [osd for osd in self if not osd.exists]
+
+ def enqueue(self, osd: "OSD") -> None:
+ if not osd.exists:
+ raise NotFoundError()
+ self.add(osd)
+ osd.start()
+
+ def rm(self, osd: "OSD") -> None:
+ if not osd.exists:
+ raise NotFoundError()
+ osd.stop()
+ try:
+ logger.debug(f'Removing {osd} from the queue.')
+ self.remove(osd)
+ except KeyError:
+ logger.debug(f"Could not find {osd} in queue.")
+ raise KeyError