def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
logger.debug(f"Processing DriveGroup {drive_group}")
- osd_id_claims = self.find_destroyed_osds()
- if osd_id_claims:
+ osd_id_claims = OsdIdClaims(self.mgr)
+ if osd_id_claims.get():
logger.info(
- f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims}")
+ f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}")
@forall_hosts
def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
host, drive_group))
return None
+ # skip this host if we cannot schedule here
+ if self.mgr.inventory.has_label(host, '_no_schedule'):
+ return None
+
+ osd_id_claims_for_host = osd_id_claims.filtered_by_host(host)
cmd = self.driveselection_to_ceph_volume(drive_selection,
- osd_id_claims.get(host, []))
+ osd_id_claims_for_host)
if not cmd:
logger.debug("No data_devices, skipping DriveGroup: {}".format(
drive_group.service_id))
return None
- logger.info('Applying service osd.%s on host %s...' % (
+ logger.debug('Applying service osd.%s on host %s...' % (
drive_group.service_id, host
))
start_ts = datetime_now()
env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
ret_msg = self.create_single_host(
drive_group, host, cmd,
- replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars
+ replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars
)
self.mgr.cache.update_osdspec_last_applied(
host, drive_group.service_name(), start_ts
replace_osd_ids: Optional[List[str]] = None) -> str:
if replace_osd_ids is None:
- replace_osd_ids = self.find_destroyed_osds().get(host, [])
+ replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host)
assert replace_osd_ids is not None
# check result
osds_elems: dict = CephadmServe(self.mgr)._run_cephadm_json(
created = []
for osd_id, osds in osds_elems.items():
for osd in osds:
+ if osd['type'] == 'db':
+ continue
if osd['tags']['ceph.cluster_fsid'] != fsid:
logger.debug('mismatched fsid, skipping %s' % osd)
continue
if created:
self.mgr.cache.invalidate_host_devices(host)
+ self.mgr.cache.invalidate_autotune(host)
return "Created osd(s) %s on host '%s'" % (','.join(created), host)
else:
return "Created no osd(s) on host %s; already created?" % host
def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
# 1) use fn_filter to determine matching_hosts
matching_hosts = drive_group.placement.filter_matching_hostspecs(
- self.mgr.inventory.all_specs())
+ self.mgr._schedulable_hosts())
# 2) Map the inventory to the InventoryHost object
host_ds_map = []
for osdspec in osdspecs:
# populate osd_id_claims
- osd_id_claims = self.find_destroyed_osds()
+ osd_id_claims = OsdIdClaims(self.mgr)
# prepare driveselection
for host, ds in self.prepare_drivegroup(osdspec):
# driveselection for host
cmd = self.driveselection_to_ceph_volume(ds,
- osd_id_claims.get(host, []),
+ osd_id_claims.filtered_by_host(host),
preview=True)
if not cmd:
logger.debug("No data_devices, skipping DriveGroup: {}".format(
if not osdspecs:
self.mgr.log.debug("No OSDSpecs found")
return []
- return sum([spec.placement.filter_matching_hostspecs(self.mgr.inventory.all_specs()) for spec in osdspecs], [])
+ return sum([spec.placement.filter_matching_hostspecs(self.mgr._schedulable_hosts()) for spec in osdspecs], [])
def resolve_osdspecs_for_host(self, host: str,
specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]:
specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items()
if spec.service_type == 'osd']
for spec in specs:
- if host in spec.placement.filter_matching_hostspecs(self.mgr.inventory.all_specs()):
+ if host in spec.placement.filter_matching_hostspecs(self.mgr._schedulable_hosts()):
self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
matching_specs.append(spec)
return matching_specs
def get_osdspec_affinity(self, osd_id: str) -> str:
return self.mgr.get('osd_metadata').get(osd_id, {}).get('osdspec_affinity', '')
- def find_destroyed_osds(self) -> Dict[str, List[str]]:
- osd_host_map: Dict[str, List[str]] = dict()
+
+class OsdIdClaims(object):
+ """
+ Retrieve and provide osd ids that can be reused in the cluster
+ """
+
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr: "CephadmOrchestrator" = mgr
+ self.osd_host_map: Dict[str, List[str]] = dict()
+ self.refresh()
+
+ def refresh(self) -> None:
try:
ret, out, err = self.mgr.check_mon_command({
'prefix': 'osd tree',
tree = json.loads(out)
except ValueError:
logger.exception(f'Cannot decode JSON: \'{out}\'')
- return osd_host_map
+ return
nodes = tree.get('nodes', {})
for node in nodes:
if node.get('type') == 'host':
- osd_host_map.update(
+ self.osd_host_map.update(
{node.get('name'): [str(_id) for _id in node.get('children', list())]}
)
- if osd_host_map:
- self.mgr.log.info(f"Found osd claims -> {osd_host_map}")
- return osd_host_map
+ if self.osd_host_map:
+ self.mgr.log.info(f"Found osd claims -> {self.osd_host_map}")
+
+ def get(self) -> Dict[str, List[str]]:
+ return self.osd_host_map
+
+ def filtered_by_host(self, host: str) -> List[str]:
+ """
+ Return the list of osd ids that can be reused in a host
+
+ OSD id claims in CRUSH map are linked to the bare name of
+ the hostname. In case of FQDN hostnames the host is searched by the
+ bare name
+ """
+ return self.osd_host_map.get(host.split(".")[0], [])
class RemoveUtil(object):