pv_name='/dev/vg/foo',
pv_uuid='1111', pv_tags=pv_tags, vg_name='vg')
pvolumes.append(FooPVolume)
- pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '2'})
- assert pvolumes == []
+ assert pvolumes.filter(pv_tags={'ceph.type': 'journal',
+ 'ceph.osd_id': '2'}) == []
def test_filter_by_tags_matches(self, pvolumes, monkeypatch):
pv_tags = "ceph.type=journal,ceph.osd_id=1"
pv_name='/dev/vg/foo',
pv_uuid='1111', pv_tags=pv_tags, vg_name="vg")
pvolumes.append(FooPVolume)
- pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '1'})
- assert pvolumes == [FooPVolume]
+ assert pvolumes.filter(pv_tags={'ceph.type': 'journal',
+ 'ceph.osd_id': '1'}) == [FooPVolume]
class TestGetVG(object):
journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain')
volume_groups.append(osd)
volume_groups.append(journal)
- volume_groups.filter(vg_tags={'ceph.group': 'dmcache'})
+ volume_groups = volume_groups.filter(vg_tags={'ceph.group': 'dmcache'})
assert len(volume_groups) == 1
assert volume_groups[0].vg_name == 'volume1'
vg_tags = "ceph.group=dmcache,ceph.disk_type=ssd"
osd = api.VolumeGroup(vg_name='volume1', vg_path='/dev/vg/lv', vg_tags=vg_tags)
volume_groups.append(osd)
- volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
+ volume_groups = volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
assert volume_groups == []
def test_filter_by_vg_name(self, volume_groups):
journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal')
volume_groups.append(osd)
volume_groups.append(journal)
- volume_groups.filter(vg_name='ceph_vg')
+ volume_groups = volume_groups.filter(vg_name='ceph_vg')
assert len(volume_groups) == 1
assert volume_groups[0].vg_name == 'ceph_vg'
def test_filter_requires_params(self, volume_groups):
with pytest.raises(TypeError):
- volume_groups.filter()
+ volume_groups = volume_groups.filter()
class TestVolumeGroupFree(object):
class TestIsLV(object):
def test_is_not_an_lv(self, monkeypatch):
+ monkeypatch.setattr(api.process, 'call', lambda x, **kw: ('', '', 0))
monkeypatch.setattr(api, 'dmsetup_splitname', lambda x, **kw: {})
assert api.is_lv('/dev/sda1', lvs=[]) is False