import pytest
+from copy import deepcopy
from ceph_volume.util import device
from ceph_volume.api import lvm as api
class TestDevice(object):
- def test_sys_api(self, device_info):
+ def test_sys_api(self, monkeypatch, device_info):
+ volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
+ lv_tags={}, lv_path='/dev/VolGroup/lv')
+ volumes = []
+ volumes.append(volume)
+ monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
+ deepcopy(volumes))
+
data = {"/dev/sda": {"foo": "bar"}}
device_info(devices=data)
disk = device.Device("/dev/sda")
assert disk.sys_api
assert "foo" in disk.sys_api
- def test_lvm_size(self, device_info):
+ def test_lvm_size(self, monkeypatch, device_info):
+ volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
+ lv_tags={}, lv_path='/dev/VolGroup/lv')
+ volumes = []
+ volumes.append(volume)
+ monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
+ deepcopy(volumes))
+
# 5GB in size
data = {"/dev/sda": {"size": "5368709120"}}
device_info(devices=data)
disk = device.Device("vg/lv")
assert disk.is_lv
- def test_vgs_is_empty(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
- BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
+ def test_vgs_is_empty(self, device_info, monkeypatch):
+ BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
+ pv_tags={})
+ pvolumes = []
pvolumes.append(BarPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda populate=True: pvolumes if populate else pvolumes_empty)
lsblk = {"TYPE": "disk"}
device_info(lsblk=lsblk)
+ monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: {})
+
disk = device.Device("/dev/nvme0n1")
assert disk.vgs == []
disk = device.Device("/dev/nvme0n1")
assert len(disk.vgs) == 1
- def test_device_is_device(self, device_info, pvolumes):
+ def test_device_is_device(self, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "device"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_device is True
- def test_device_is_rotational(self, device_info, pvolumes):
+ def test_device_is_rotational(self, device_info):
data = {"/dev/sda": {"rotational": "1"}}
lsblk = {"TYPE": "device"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.rotational
- def test_device_is_not_rotational(self, device_info, pvolumes):
+ def test_device_is_not_rotational(self, device_info):
data = {"/dev/sda": {"rotational": "0"}}
lsblk = {"TYPE": "device"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert not disk.rotational
- def test_device_is_rotational_lsblk(self, device_info, pvolumes):
+ def test_device_is_rotational_lsblk(self, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "device", "ROTA": "1"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.rotational
- def test_device_is_not_rotational_lsblk(self, device_info, pvolumes):
+ def test_device_is_not_rotational_lsblk(self, device_info):
data = {"/dev/sda": {"rotational": "0"}}
lsblk = {"TYPE": "device", "ROTA": "0"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert not disk.rotational
- def test_device_is_rotational_defaults_true(self, device_info, pvolumes):
+ def test_device_is_rotational_defaults_true(self, device_info):
# rotational will default true if no info from sys_api or lsblk is found
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "device", "foo": "bar"}
disk = device.Device("/dev/sda")
assert disk.rotational
- def test_disk_is_device(self, device_info, pvolumes):
+ def test_disk_is_device(self, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "disk"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_device is True
- def test_is_partition(self, device_info, pvolumes):
+ def test_is_partition(self, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "part"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_partition
- def test_is_not_lvm_memeber(self, device_info, pvolumes):
+ def test_is_not_acceptable_device(self, device_info):
+ data = {"/dev/dm-0": {"foo": "bar"}}
+ lsblk = {"TYPE": "mpath"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/dm-0")
+ assert not disk.is_device
+
+ def test_is_not_lvm_memeber(self, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "part"}
device_info(devices=data, lsblk=lsblk)
disk = device.Device("/dev/sda")
assert not disk.is_lvm_member
- def test_is_lvm_memeber(self, device_info, pvolumes):
+ def test_is_lvm_memeber(self, device_info):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "part"}
device_info(devices=data, lsblk=lsblk)
assert not disk.is_mapper
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_is_ceph_disk_lsblk(self, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
@pytest.mark.usefixtures("blkid_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_is_ceph_disk_blkid(self, monkeypatch, patch_bluestore_label):
monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
lambda path: {'PARTLABEL': ""})
assert disk.is_ceph_disk_member
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_is_ceph_disk_member_not_available_lsblk(self, monkeypatch, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert disk.is_ceph_disk_member
assert "Used by ceph-disk" in disk.rejected_reasons
@pytest.mark.usefixtures("blkid_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_is_ceph_disk_member_not_available_blkid(self, monkeypatch, patch_bluestore_label):
monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
lambda path: {'PARTLABEL': ""})
assert "Has BlueStore device label" in disk.rejected_reasons
@pytest.mark.usefixtures("device_info_not_ceph_disk_member",
- "disable_lvm_queries",
"disable_kernel_queries")
def test_is_not_ceph_disk_member_lsblk(self, patch_bluestore_label):
disk = device.Device("/dev/sda")
assert not disk.available_raw
@pytest.mark.parametrize("ceph_type", ["data", "block"])
- def test_used_by_ceph(self, device_info, pvolumes, pvolumes_empty, monkeypatch, ceph_type):
- FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
- pvolumes.append(FooPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda populate=True: pvolumes if populate else pvolumes_empty)
+ def test_used_by_ceph(self, device_info,
+ monkeypatch, ceph_type):
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "part"}
- lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type}}
+ FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
+ lv_uuid="0000", pv_tags={}, vg_name="vg")
+ pvolumes = []
+ pvolumes.append(FooPVolume)
+ lv_data = {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg",
+ "lv_uuid": "0000", "lv_tags":
+ "ceph.osd_id=0,ceph.type="+ceph_type}
+ volumes = []
+ lv = api.Volume(**lv_data)
+ volumes.append(lv)
+ monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
+ monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
+ deepcopy(volumes))
+
device_info(devices=data, lsblk=lsblk, lv=lv_data)
vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
vg_extent_size=1073741824)
disk = device.Device("/dev/sda")
assert disk.used_by_ceph
- def test_not_used_by_ceph(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
+ def test_not_used_by_ceph(self, device_info, monkeypatch):
FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
+ pvolumes = []
pvolumes.append(FooPVolume)
- monkeypatch.setattr(api, 'PVolumes', lambda populate=True: pvolumes if populate else pvolumes_empty)
data = {"/dev/sda": {"foo": "bar"}}
lsblk = {"TYPE": "part"}
lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
+ monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
+
device_info(devices=data, lsblk=lsblk, lv=lv_data)
disk = device.Device("/dev/sda")
assert not disk.used_by_ceph
class TestDeviceEncryption(object):
- def test_partition_is_not_encrypted_lsblk(self, device_info, pvolumes):
+ def test_partition_is_not_encrypted_lsblk(self, device_info):
lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs'}
device_info(lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is False
- def test_partition_is_encrypted_lsblk(self, device_info, pvolumes):
+ def test_partition_is_encrypted_lsblk(self, device_info):
lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS'}
device_info(lsblk=lsblk)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is True
- def test_partition_is_not_encrypted_blkid(self, device_info, pvolumes):
+ def test_partition_is_not_encrypted_blkid(self, device_info):
lsblk = {'TYPE': 'part'}
blkid = {'TYPE': 'ceph data'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is False
- def test_partition_is_encrypted_blkid(self, device_info, pvolumes):
+ def test_partition_is_encrypted_blkid(self, device_info):
lsblk = {'TYPE': 'part'}
blkid = {'TYPE': 'crypto_LUKS'}
device_info(lsblk=lsblk, blkid=blkid)
disk = device.Device("/dev/sda")
assert disk.is_encrypted is True
- def test_mapper_is_encrypted_luks1(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
+ def test_mapper_is_encrypted_luks1(self, device_info, monkeypatch):
status = {'type': 'LUKS1'}
monkeypatch.setattr(device, 'encryption_status', lambda x: status)
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
disk = device.Device("/dev/mapper/uuid")
assert disk.is_encrypted is True
- def test_mapper_is_encrypted_luks2(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
+ def test_mapper_is_encrypted_luks2(self, device_info, monkeypatch):
status = {'type': 'LUKS2'}
monkeypatch.setattr(device, 'encryption_status', lambda x: status)
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
disk = device.Device("/dev/mapper/uuid")
assert disk.is_encrypted is True
- def test_mapper_is_encrypted_plain(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
+ def test_mapper_is_encrypted_plain(self, device_info, monkeypatch):
status = {'type': 'PLAIN'}
monkeypatch.setattr(device, 'encryption_status', lambda x: status)
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
disk = device.Device("/dev/mapper/uuid")
assert disk.is_encrypted is True
- def test_mapper_is_not_encrypted_plain(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
+ def test_mapper_is_not_encrypted_plain(self, device_info, monkeypatch):
monkeypatch.setattr(device, 'encryption_status', lambda x: {})
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
disk = device.Device("/dev/mapper/uuid")
assert disk.is_encrypted is False
- def test_lv_is_encrypted_blkid(self, device_info, pvolumes):
+ def test_lv_is_encrypted_blkid(self, device_info):
lsblk = {'TYPE': 'lvm'}
blkid = {'TYPE': 'crypto_LUKS'}
device_info(lsblk=lsblk, blkid=blkid)
disk.lv_api = {}
assert disk.is_encrypted is True
- def test_lv_is_not_encrypted_blkid(self, factory, device_info, pvolumes):
+ def test_lv_is_not_encrypted_blkid(self, factory, device_info):
lsblk = {'TYPE': 'lvm'}
blkid = {'TYPE': 'xfs'}
device_info(lsblk=lsblk, blkid=blkid)
disk.lv_api = factory(encrypted=None)
assert disk.is_encrypted is False
- def test_lv_is_encrypted_lsblk(self, device_info, pvolumes):
+ def test_lv_is_encrypted_lsblk(self, device_info):
lsblk = {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk.lv_api = {}
assert disk.is_encrypted is True
- def test_lv_is_not_encrypted_lsblk(self, factory, device_info, pvolumes):
+ def test_lv_is_not_encrypted_lsblk(self, factory, device_info):
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk.lv_api = factory(encrypted=None)
assert disk.is_encrypted is False
- def test_lv_is_encrypted_lvm_api(self, factory, device_info, pvolumes):
+ def test_lv_is_encrypted_lvm_api(self, factory, device_info):
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
disk.lv_api = factory(encrypted=True)
assert disk.is_encrypted is True
- def test_lv_is_not_encrypted_lvm_api(self, factory, device_info, pvolumes):
+ def test_lv_is_not_encrypted_lvm_api(self, factory, device_info):
lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
blkid = {'TYPE': 'mapper'}
device_info(lsblk=lsblk, blkid=blkid)
assert disk.partlabel == 'ceph data'
@pytest.mark.usefixtures("blkid_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_is_member_blkid(self, monkeypatch, patch_bluestore_label):
monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
lambda path: {'PARTLABEL': ""})
assert disk.is_member is True
@pytest.mark.usefixtures("lsblk_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_is_member_lsblk(self, patch_bluestore_label):
disk = device.CephDiskDevice(device.Device("/dev/sda"))
ceph_types = ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
@pytest.mark.usefixtures("blkid_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_type_blkid(self, monkeypatch, device_info, ceph_partlabel):
monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
lambda path: {'PARTLABEL': ''})
@pytest.mark.usefixtures("blkid_ceph_disk_member",
"lsblk_ceph_disk_member",
- "disable_kernel_queries",
- "disable_lvm_queries")
+ "disable_kernel_queries")
def test_type_lsblk(self, device_info, ceph_partlabel):
disk = device.CephDiskDevice(device.Device("/dev/sda"))