return fake_call
+@pytest.fixture
+def fakedevice(factory):
+ def apply(**kw):
+ params = dict(
+ path='/dev/sda',
+ abspath='/dev/sda',
+ lv_api=None,
+ pvs_api=[],
+ disk_api={},
+ sys_api={},
+ exists=True,
+ is_lvm_member=True,
+ )
+ params.update(dict(kw))
+ return factory(**params)
+ return apply
+
+
@pytest.fixture
def stub_call(monkeypatch):
"""
return apply
+@pytest.fixture(autouse=True)
+def reset_cluster_name(request, monkeypatch):
+ """
+ The globally available ``ceph_volume.conf.cluster`` might get mangled in
+ tests, make sure that after evert test, it gets reset, preventing pollution
+ going into other tests later.
+ """
+ def fin():
+ conf.cluster = None
+ try:
+ os.environ.pop('CEPH_CONF')
+ except KeyError:
+ pass
+ request.addfinalizer(fin)
+
+
@pytest.fixture
def conf_ceph(monkeypatch):
"""
@pytest.fixture
def volumes(monkeypatch):
- monkeypatch.setattr('ceph_volume.process.call', lambda x: ('', '', 0))
+ monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
volumes = lvm_api.Volumes()
volumes._purge()
return volumes
@pytest.fixture
def volume_groups(monkeypatch):
- monkeypatch.setattr('ceph_volume.process.call', lambda x: ('', '', 0))
+ monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
vgs = lvm_api.VolumeGroups()
vgs._purge()
return vgs
+@pytest.fixture
+def stub_vgs(monkeypatch, volume_groups):
+ def apply(vgs):
+ monkeypatch.setattr(lvm_api, 'get_api_vgs', lambda: vgs)
+ return apply
+
+
@pytest.fixture
def pvolumes(monkeypatch):
- monkeypatch.setattr('ceph_volume.process.call', lambda x: ('', '', 0))
+ monkeypatch.setattr('ceph_volume.process.call', lambda x, **kw: ('', '', 0))
pvolumes = lvm_api.PVolumes()
pvolumes._purge()
return pvolumes
@pytest.fixture
def device_info(monkeypatch):
- def apply(devices=None, lsblk=None, lv=None):
+ def apply(devices=None, lsblk=None, lv=None, blkid=None):
devices = devices if devices else {}
lsblk = lsblk if lsblk else {}
+ blkid = blkid if blkid else {}
lv = Factory(**lv) if lv else None
monkeypatch.setattr("ceph_volume.sys_info.devices", {})
monkeypatch.setattr("ceph_volume.util.device.disk.get_devices", lambda: devices)
- monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: lv)
+ if not devices:
+ monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: lv)
+ else:
+ monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv_from_argument", lambda path: None)
+ monkeypatch.setattr("ceph_volume.util.device.lvm.get_lv", lambda vg_name, lv_uuid: lv)
monkeypatch.setattr("ceph_volume.util.device.disk.lsblk", lambda path: lsblk)
+ monkeypatch.setattr("ceph_volume.util.device.disk.blkid", lambda path: blkid)
return apply