from mock.mock import patch, PropertyMock
from ceph_volume.util import disk
from ceph_volume.util.constants import ceph_disk_guids
-from ceph_volume.api import lvm as lvm_api
from ceph_volume import conf, configuration
return apply
-@pytest.fixture
-def volumes(monkeypatch):
- monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
- volumes = lvm_api.Volumes()
- volumes._purge()
- return volumes
-
-
-@pytest.fixture
-def volume_groups(monkeypatch):
- monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
- vgs = lvm_api.VolumeGroups()
- vgs._purge()
- return vgs
-
-def volume_groups_empty(monkeypatch):
- monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
- vgs = lvm_api.VolumeGroups(populate=False)
- return vgs
-
-@pytest.fixture
-def stub_vgs(monkeypatch, volume_groups):
- def apply(vgs):
- monkeypatch.setattr(lvm_api, 'get_api_vgs', lambda: vgs)
- return apply
-
-
-# TODO: allow init-ing pvolumes to list we want
-@pytest.fixture
-def pvolumes(monkeypatch):
- monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
- pvolumes = lvm_api.PVolumes()
- pvolumes._purge()
- return pvolumes
-
-@pytest.fixture
-def pvolumes_empty(monkeypatch):
- monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
- pvolumes = lvm_api.PVolumes(populate=False)
- return pvolumes
-
-
-
@pytest.fixture
def is_root(monkeypatch):
"""
monkeypatch.setattr("ceph_volume.util.disk.udevadm_property", lambda *a, **kw: {})
-@pytest.fixture
-def disable_lvm_queries(monkeypatch):
- '''
- This speeds up calls to Device and Disk
- '''
- monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: None)
- monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv", lambda vg_name, lv_uuid: None)
-
-
@pytest.fixture(params=[
'', 'ceph data', 'ceph journal', 'ceph block',
'ceph block.wal', 'ceph block.db', 'ceph lockbox'])
monkeypatch.setattr("ceph_volume.sys_info.devices", {})
monkeypatch.setattr("ceph_volume.util.device.disk.get_devices", lambda: devices)
if not devices:
- monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: lv)
+ monkeypatch.setattr("ceph_volume.util.device.lvm.get_first_lv", lambda filters: lv)
else:
- monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: None)
monkeypatch.setattr("ceph_volume.util.device.lvm.get_device_lvs",
lambda path: [lv])
- monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv", lambda vg_name, lv_uuid: lv)
monkeypatch.setattr("ceph_volume.util.device.disk.lsblk", lambda path: lsblk)
monkeypatch.setattr("ceph_volume.util.device.disk.blkid", lambda path: blkid)
monkeypatch.setattr("ceph_volume.util.disk.udevadm_property", lambda *a, **kw: udevadm)