service_id: testing_drivegroup
placement:
host_pattern: hostname
+crush_device_class: ssd
data_devices:
paths:
- - /dev/sda
+ - /dev/sda
"""
),
(
host_pattern: hostname
data_devices:
paths:
- - /dev/sda"""
+ - path: /dev/sda
+ crush_device_class: ssd"""
),
])
def test_DriveGroup(test_input):
+
dg = DriveGroupSpec.from_json(yaml.safe_load(test_input))
assert dg.service_id == 'testing_drivegroup'
assert all([isinstance(x, Device) for x in dg.data_devices.paths])
- assert dg.data_devices.paths[0].path == '/dev/sda'
+ if isinstance(dg.data_devices.paths[0].path, str):
+ assert dg.data_devices.paths[0].path == '/dev/sda'
+
+
@pytest.mark.parametrize("match,test_input",
[
cmds = translate.to_ceph_volume(sel, []).run()
assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --data-allocate-fraction 0.8 --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+@pytest.mark.parametrize("test_input_base",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+crush_device_class: ssd
+data_devices:
+ paths:
+ - /dev/sda
+"""
+ ),
+ ])
+def test_ceph_volume_command_10(test_input_base):
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input_base))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert all(cmd == 'lvm batch --no-auto /dev/sda --crush-device-class ssd --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+@pytest.mark.parametrize("test_input1",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+crush_device_class: ssd
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: hdd
+ - path: /dev/sdb
+ crush_device_class: hdd
+"""
+ ),
+ ])
+def test_ceph_volume_command_11(test_input1):
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input1))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --crush-device-class hdd --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+@pytest.mark.parametrize("test_input2",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+crush_device_class: ssd
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: hdd
+ - path: /dev/sdb
+"""
+ ),
+ ])
+def test_ceph_volume_command_12(test_input2):
+
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input2))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert (cmds[0] == 'lvm batch --no-auto /dev/sdb --crush-device-class ssd --yes --no-systemd') # noqa E501
+ assert (cmds[1] == 'lvm batch --no-auto /dev/sda --crush-device-class hdd --yes --no-systemd') # noqa E501
+
+
+@pytest.mark.parametrize("test_input3",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: hdd
+ - path: /dev/sdb
+"""
+ ),
+ ])
+def test_ceph_volume_command_13(test_input3):
+
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input3))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert (cmds[0] == 'lvm batch --no-auto /dev/sdb --yes --no-systemd') # noqa E501
+ assert (cmds[1] == 'lvm batch --no-auto /dev/sda --crush-device-class hdd --yes --no-systemd') # noqa E501
+
+
+@pytest.mark.parametrize("test_input4",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+data_devices:
+ paths:
+ - crush_device_class: hdd
+"""
+ ),
+ ])
+def test_ceph_volume_command_14(test_input4):
+
+ with pytest.raises(DriveGroupValidationError, match='Device path'):
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input4))
+ spec.validate()
+
+
def test_raw_ceph_volume_command_0():
spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
service_id='foobar',
sel = drive_selection.DriveSelection(spec, inventory)
with pytest.raises(ValueError):
cmds = translate.to_ceph_volume(sel, []).run()
+
+@pytest.mark.parametrize("test_input5",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+method: raw
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: hdd
+ - path: /dev/sdb
+ crush_device_class: hdd
+ - path: /dev/sdc
+ crush_device_class: hdd
+db_devices:
+ paths:
+ - /dev/sdd
+ - /dev/sde
+ - /dev/sdf
+
+"""
+ ),
+ ])
+def test_raw_ceph_volume_command_2(test_input5):
+
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input5))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert cmds[0] == 'raw prepare --bluestore --data /dev/sda --block.db /dev/sdd --crush-device-class hdd'
+ assert cmds[1] == 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sde --crush-device-class hdd'
+ assert cmds[2] == 'raw prepare --bluestore --data /dev/sdc --block.db /dev/sdf --crush-device-class hdd'
+
+
+@pytest.mark.parametrize("test_input6",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+method: raw
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: hdd
+ - path: /dev/sdb
+ crush_device_class: hdd
+ - path: /dev/sdc
+ crush_device_class: ssd
+db_devices:
+ paths:
+ - /dev/sdd
+ - /dev/sde
+ - /dev/sdf
+
+"""
+ ),
+ ])
+def test_raw_ceph_volume_command_3(test_input6):
+
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input6))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert cmds[0] == 'raw prepare --bluestore --data /dev/sda --block.db /dev/sdd --crush-device-class hdd'
+ assert cmds[1] == 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sde --crush-device-class hdd'
+ assert cmds[2] == 'raw prepare --bluestore --data /dev/sdc --block.db /dev/sdf --crush-device-class ssd'
+
+
+@pytest.mark.parametrize("test_input7",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+method: raw
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: hdd
+ - path: /dev/sdb
+ crush_device_class: nvme
+ - path: /dev/sdc
+ crush_device_class: ssd
+db_devices:
+ paths:
+ - /dev/sdd
+ - /dev/sde
+ - /dev/sdf
+wal_devices:
+ paths:
+ - /dev/sdg
+ - /dev/sdh
+ - /dev/sdi
+
+"""
+ ),
+ ])
+def test_raw_ceph_volume_command_4(test_input7):
+
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input7))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert cmds[0] == 'raw prepare --bluestore --data /dev/sda --block.db /dev/sdd --block.wal /dev/sdg --crush-device-class hdd'
+ assert cmds[1] == 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sdf --block.wal /dev/sdi --crush-device-class nvme'
+ assert cmds[2] == 'raw prepare --bluestore --data /dev/sdc --block.db /dev/sde --block.wal /dev/sdh --crush-device-class ssd'