osd_id_claims_for_host = osd_id_claims.filtered_by_host(host)
- cmd = self.driveselection_to_ceph_volume(drive_selection,
- osd_id_claims_for_host)
- if not cmd:
+ cmds: List[str] = self.driveselection_to_ceph_volume(drive_selection,
+ osd_id_claims_for_host)
+ if not cmds:
logger.debug("No data_devices, skipping DriveGroup: {}".format(
drive_group.service_id))
return None
start_ts = datetime_now()
env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
ret_msg = await self.create_single_host(
- drive_group, host, cmd,
+ drive_group, host, cmds,
replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars
)
self.mgr.cache.update_osdspec_last_applied(
async def create_single_host(self,
drive_group: DriveGroupSpec,
- host: str, cmd: str, replace_osd_ids: List[str],
+ host: str, cmds: List[str], replace_osd_ids: List[str],
env_vars: Optional[List[str]] = None) -> str:
- out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
-
- if code == 1 and ', it is already prepared' in '\n'.join(err):
- # HACK: when we create against an existing LV, ceph-volume
- # returns an error and the above message. To make this
- # command idempotent, tolerate this "error" and continue.
- logger.debug('the device was already prepared; continuing')
- code = 0
- if code:
- raise RuntimeError(
- 'cephadm exited with an error code: %d, stderr:%s' % (
- code, '\n'.join(err)))
+ for cmd in cmds:
+ out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
+ if code == 1 and ', it is already prepared' in '\n'.join(err):
+ # HACK: when we create against an existing LV, ceph-volume
+ # returns an error and the above message. To make this
+ # command idempotent, tolerate this "error" and continue.
+ logger.debug('the device was already prepared; continuing')
+ code = 0
+ if code:
+ raise RuntimeError(
+ 'cephadm exited with an error code: %d, stderr:%s' % (
+ code, '\n'.join(err)))
return await self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
replace_osd_ids)
drive_selection = DriveSelection(drive_group, inventory_for_host,
existing_daemons=len(dd_for_spec_and_host))
logger.debug(f"Found drive selection {drive_selection}")
+ if drive_group.method and drive_group.method == 'raw':
+ # ceph-volume can currently only handle a 1:1 mapping
+ # of data/db/wal devices for raw mode osds. If db/wal devices
+ # are defined and the number does not match the number of data
+ # devices, we need to bail out
+ if drive_selection.data_devices() and drive_selection.db_devices():
+ if len(drive_selection.data_devices()) != len(drive_selection.db_devices()):
+ raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to db devices. Found '
+ f'{len(drive_selection.data_devices())} potential data device(s) and '
+ f'{len(drive_selection.db_devices())} potential db device(s) on host {host}')
+ if drive_selection.data_devices() and drive_selection.wal_devices():
+ if len(drive_selection.data_devices()) != len(drive_selection.wal_devices()):
+ raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to wal devices. Found '
+ f'{len(drive_selection.data_devices())} potential data device(s) and '
+ f'{len(drive_selection.wal_devices())} potential wal device(s) on host {host}')
host_ds_map.append((host, drive_selection))
return host_ds_map
@staticmethod
def driveselection_to_ceph_volume(drive_selection: DriveSelection,
osd_id_claims: Optional[List[str]] = None,
- preview: bool = False) -> Optional[str]:
+ preview: bool = False) -> List[str]:
logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
- cmd: Optional[str] = translate.to_ceph_volume(drive_selection,
- osd_id_claims, preview=preview).run()
- logger.debug(f"Resulting ceph-volume cmd: {cmd}")
- return cmd
+ cmds: List[str] = translate.to_ceph_volume(drive_selection,
+ osd_id_claims, preview=preview).run()
+ logger.debug(f"Resulting ceph-volume cmds: {cmds}")
+ return cmds
def get_previews(self, host: str) -> List[Dict[str, Any]]:
# Find OSDSpecs that match host.
continue
# driveselection for host
- cmd = self.driveselection_to_ceph_volume(ds,
- osd_id_claims.filtered_by_host(host),
- preview=True)
- if not cmd:
+ cmds: List[str] = self.driveselection_to_ceph_volume(ds,
+ osd_id_claims.filtered_by_host(
+ host),
+ preview=True)
+ if not cmds:
logger.debug("No data_devices, skipping DriveGroup: {}".format(
osdspec.service_name()))
continue
# get preview data from ceph-volume
- out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
- if out:
- try:
- concat_out: Dict[str, Any] = json.loads(' '.join(out))
- except ValueError:
- logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
- concat_out = {}
- notes = []
- if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit:
- found = len(concat_out)
- limit = osdspec.data_devices.limit
- notes.append(
- f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
- ret_all.append({'data': concat_out,
- 'osdspec': osdspec.service_id,
- 'host': host,
- 'notes': notes})
+ for cmd in cmds:
+ out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
+ if out:
+ try:
+ concat_out: Dict[str, Any] = json.loads(' '.join(out))
+ except ValueError:
+ logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
+ concat_out = {}
+ notes = []
+ if osdspec.data_devices is not None and osdspec.data_devices.limit and len(concat_out) < osdspec.data_devices.limit:
+ found = len(concat_out)
+ limit = osdspec.data_devices.limit
+ notes.append(
+ f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit (Found: {found} | Limit: {limit})')
+ ret_all.append({'data': concat_out,
+ 'osdspec': osdspec.service_id,
+ 'host': host,
+ 'notes': notes})
return ret_all
def resolve_hosts_for_osdspecs(self,