disk = device.Device("/dev/nvme0n1")
assert disk.vgs == []
- def test_vgs_is_not_empty(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
- BarPVolume = api.PVolume(vg_name='foo', lv_uuid='111', pv_name='/dev/nvme0n1', pv_uuid="0000", pv_tags={})
- pvolumes.append(BarPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda populate=True: pvolumes if populate else pvolumes_empty)
+ def test_vgs_is_not_empty(self, device_info, monkeypatch):
+ vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
+ vg_extent_size=1073741824)
+ monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
lsblk = {"TYPE": "disk"}
device_info(lsblk=lsblk)
disk = device.Device("/dev/nvme0n1")
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
"disable_kernel_queries",
"disable_lvm_queries")
- def test_is_ceph_disk_lsblk(self, monkeypatch):
+ def test_is_ceph_disk_lsblk(self, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
@pytest.mark.usefixtures("blkid_ceph_disk_member",
"disable_kernel_queries",
"disable_lvm_queries")
- def test_is_ceph_disk_blkid(self, monkeypatch):
+ def test_is_ceph_disk_blkid(self, monkeypatch, patch_bluestore_label):
monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
lambda path: {'PARTLABEL': ""})
disk = device.Device("/dev/sda")
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
"disable_kernel_queries",
"disable_lvm_queries")
- def test_is_ceph_disk_member_not_available_lsblk(self, monkeypatch):
+ def test_is_ceph_disk_member_not_available_lsblk(self, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
assert not disk.available
@pytest.mark.usefixtures("blkid_ceph_disk_member",
"disable_kernel_queries",
"disable_lvm_queries")
- def test_is_ceph_disk_member_not_available_blkid(self, monkeypatch):
+ def test_is_ceph_disk_member_not_available_blkid(self, monkeypatch, patch_bluestore_label):
monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
lambda path: {'PARTLABEL': ""})
disk = device.Device("/dev/sda")
assert not disk.available
assert "Used by ceph-disk" in disk.rejected_reasons
+ def test_reject_removable_device(self, device_info):
+ data = {"/dev/sdb": {"removable": 1}}
+ device_info(devices=data)
+ disk = device.Device("/dev/sdb")
+ assert not disk.available
+
+ def test_accept_non_removable_device(self, device_info):
+ data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
+ device_info(devices=data)
+ disk = device.Device("/dev/sdb")
+ assert disk.available
+
+ def test_reject_readonly_device(self, device_info):
+ data = {"/dev/cdrom": {"ro": 1}}
+ device_info(devices=data)
+ disk = device.Device("/dev/cdrom")
+ assert not disk.available
+
+ def test_reject_smaller_than_5gb(self, device_info):
+ data = {"/dev/sda": {"size": 5368709119}}
+ device_info(devices=data)
+ disk = device.Device("/dev/sda")
+ assert not disk.available, 'too small device is available'
+
+ def test_accept_non_readonly_device(self, device_info):
+ data = {"/dev/sda": {"ro": 0, "size": 5368709120}}
+ device_info(devices=data)
+ disk = device.Device("/dev/sda")
+ assert disk.available
+
+ def test_reject_bluestore_device(self, monkeypatch, patch_bluestore_label):
+ patch_bluestore_label.return_value = True
+ disk = device.Device("/dev/sda")
+ assert not disk.available
+ assert "Has BlueStore device label" in disk.rejected_reasons
+
@pytest.mark.usefixtures("device_info_not_ceph_disk_member",
"disable_lvm_queries",
"disable_kernel_queries")
- def test_is_not_ceph_disk_member_lsblk(self):
+ def test_is_not_ceph_disk_member_lsblk(self, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member is False
- def test_pv_api(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
- FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
- pvolumes.append(FooPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda populate=True: pvolumes if populate else pvolumes_empty)
- data = {"/dev/sda": {"foo": "bar"}}
- lsblk = {"TYPE": "part"}
+ def test_existing_vg_available(self, monkeypatch, device_info):
+ vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
+ vg_extent_size=1073741824)
+ monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
+ lsblk = {"TYPE": "disk"}
+ data = {"/dev/nvme0n1": {"size": "6442450944"}}
device_info(devices=data, lsblk=lsblk)
- disk = device.Device("/dev/sda")
- assert disk.pvs_api
+ disk = device.Device("/dev/nvme0n1")
+ assert disk.available_lvm
+ assert not disk.available
+ assert not disk.available_raw
+
+ def test_existing_vg_too_small(self, monkeypatch, device_info):
+ vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4,
+ vg_extent_size=1073741824)
+ monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
+ lsblk = {"TYPE": "disk"}
+ data = {"/dev/nvme0n1": {"size": "6442450944"}}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/nvme0n1")
+ assert not disk.available_lvm
+ assert not disk.available
+ assert not disk.available_raw
+
+ def test_multiple_existing_vgs(self, monkeypatch, device_info):
+ vg1 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4,
+ vg_extent_size=1073741824)
+ vg2 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
+ vg_extent_size=1073741824)
+ monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg1, vg2])
+ lsblk = {"TYPE": "disk"}
+ data = {"/dev/nvme0n1": {"size": "6442450944"}}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/nvme0n1")
+ assert disk.available_lvm
+ assert not disk.available
+ assert not disk.available_raw
@pytest.mark.parametrize("ceph_type", ["data", "block"])
def test_used_by_ceph(self, device_info, pvolumes, pvolumes_empty, monkeypatch, ceph_type):
lsblk = {"TYPE": "part"}
lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type}}
device_info(devices=data, lsblk=lsblk, lv=lv_data)
+ vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
+ vg_extent_size=1073741824)
+ monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
disk = device.Device("/dev/sda")
assert disk.used_by_ceph
@pytest.mark.usefixtures("blkid_ceph_disk_member",
"disable_kernel_queries",
"disable_lvm_queries")
- def test_is_member_blkid(self, monkeypatch):
+ def test_is_member_blkid(self, monkeypatch, patch_bluestore_label):
monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
lambda path: {'PARTLABEL': ""})
disk = device.CephDiskDevice(device.Device("/dev/sda"))
assert not disk.available
def test_accept_non_removable_device(self, device_info):
- data = {"/dev/sdb": {"removable": 0}}
+ data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
device_info(devices=data)
disk = device.Device("/dev/sdb")
assert disk.available
disk = device.Device("/dev/cdrom")
assert not disk.available
+ def test_reject_smaller_than_5gb(self, device_info):
+ data = {"/dev/sda": {"size": 5368709119}}
+ device_info(devices=data)
+ disk = device.Device("/dev/sda")
+ assert not disk.available, 'too small device is available'
+
def test_accept_non_readonly_device(self, device_info):
- data = {"/dev/sda": {"ro": 0}}
+ data = {"/dev/sda": {"ro": 0, "size": 5368709120}}
device_info(devices=data)
disk = device.Device("/dev/sda")
assert disk.available
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
"disable_kernel_queries",
"disable_lvm_queries")
- def test_is_member_lsblk(self):
+ def test_is_member_lsblk(self, patch_bluestore_label):
disk = device.CephDiskDevice(device.Device("/dev/sda"))
assert disk.is_member is True