return volumes
-@pytest.fixture
-def pvolumes(monkeypatch):
- monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
- pvolumes = api.PVolumes()
- pvolumes._purge()
- return pvolumes
-
-
@pytest.fixture
def volume_groups(monkeypatch):
monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
assert api.get_pv(pv_uuid='0000') == FooPVolume
+ def test_multiple_pvs_is_matched_by_uuid(self, pvolumes, monkeypatch):
+ FooPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={}, lv_uuid="0000000")
+ BarPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
+ pvolumes.append(FooPVolume)
+ pvolumes.append(BarPVolume)
+ monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
+ assert api.get_pv(pv_uuid='0000') == FooPVolume
+
+ def test_multiple_pvs_is_matched_by_name(self, pvolumes, monkeypatch):
+ FooPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={}, lv_uuid="0000000")
+ BarPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
+ pvolumes.append(FooPVolume)
+ pvolumes.append(BarPVolume)
+ monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
+ assert api.get_pv(pv_name='/dev/sda') == FooPVolume
+
+ def test_multiple_pvs_is_matched_by_tags(self, pvolumes, monkeypatch):
+ FooPVolume = api.PVolume(vg_name="vg1", pv_name='/dev/sdc', pv_uuid="1000", pv_tags="ceph.foo=bar", lv_uuid="0000000")
+ BarPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags="ceph.foo=bar")
+ pvolumes.append(FooPVolume)
+ pvolumes.append(BarPVolume)
+ monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
+ with pytest.raises(exceptions.MultiplePVsError):
+ api.get_pv(pv_tags={"ceph.foo": "bar"})
+
def test_single_pv_is_matched_by_uuid(self, pvolumes, monkeypatch):
FooPVolume = api.PVolume(
pv_name='/dev/vg/foo',
volume_groups.filter()
+class TestVolumeGroupFree(object):
+
+ def test_no_g_in_output(self):
+ vg = api.VolumeGroup(vg_name='nosize', vg_free='')
+ with pytest.raises(RuntimeError):
+ vg.free
+
+ def test_g_without_size(self):
+ vg = api.VolumeGroup(vg_name='nosize', vg_free='g')
+ with pytest.raises(RuntimeError):
+ vg.free
+
+ def test_size_without_g(self):
+ vg = api.VolumeGroup(vg_name='nosize', vg_free='1')
+ with pytest.raises(RuntimeError):
+ vg.free
+
+ def test_error_message(self):
+ vg = api.VolumeGroup(vg_name='nosize', vg_free='F')
+ with pytest.raises(RuntimeError) as error:
+ vg.free
+ assert "Unable to convert vg size to integer: 'F'" in str(error)
+
+ def test_invalid_float(self):
+ vg = api.VolumeGroup(vg_name='nosize', vg_free=' g')
+ with pytest.raises(RuntimeError) as error:
+ vg.free
+ assert "Unable to convert to integer: ' '" in str(error.value)
+
+ def test_integer_gets_produced(self):
+ vg = api.VolumeGroup(vg_name='nosize', vg_free='100g')
+ assert vg.free == 100
+
+ def test_integer_gets_produced_whitespace(self):
+ vg = api.VolumeGroup(vg_name='nosize', vg_free=' 100g ')
+ assert vg.free == 100
+
+ def test_integer_gets_rounded_down(self):
+ vg = api.VolumeGroup(vg_name='nosize', vg_free='100.99g')
+ assert vg.free == 100
+
+
+class TestCreateLVs(object):
+
+ def test_creates_correct_lv_number_from_parts(self, monkeypatch):
+ monkeypatch.setattr('ceph_volume.api.lvm.create_lv', lambda *a, **kw: (a, kw))
+ vg = api.VolumeGroup(
+ vg_name='ceph', vg_free='1024g',
+ vg_size='99999999g', vg_free_count='999'
+ )
+ lvs = api.create_lvs(vg, parts=4)
+ assert len(lvs) == 4
+
+ def test_suffixes_the_size_arg(self, monkeypatch):
+ monkeypatch.setattr('ceph_volume.api.lvm.create_lv', lambda *a, **kw: (a, kw))
+ vg = api.VolumeGroup(
+ vg_name='ceph', vg_free='1024g',
+ vg_size='99999999g', vg_free_count='999'
+ )
+ lvs = api.create_lvs(vg, parts=4)
+ assert lvs[0][1]['extents'] == 249
+
+ def test_only_uses_free_size(self, monkeypatch):
+ monkeypatch.setattr('ceph_volume.api.lvm.create_lv', lambda *a, **kw: (a, kw))
+ vg = api.VolumeGroup(
+ vg_name='ceph', vg_free='1024g',
+ vg_size='99999999g', vg_free_count='1000'
+ )
+ lvs = api.create_lvs(vg, parts=4)
+ assert lvs[0][1]['extents'] == 250
+
+ def test_null_tags_are_set_by_default(self, monkeypatch):
+ monkeypatch.setattr('ceph_volume.api.lvm.create_lv', lambda *a, **kw: (a, kw))
+ vg = api.VolumeGroup(
+ vg_name='ceph', vg_free='1024g',
+ vg_size='99999999g', vg_free_count='999'
+ )
+ kwargs = api.create_lvs(vg, parts=4)[0][1]
+ assert list(kwargs['tags'].values()) == ['null', 'null', 'null', 'null']
+
+ def test_fallback_to_one_part(self, monkeypatch):
+ monkeypatch.setattr('ceph_volume.api.lvm.create_lv', lambda *a, **kw: (a, kw))
+ vg = api.VolumeGroup(
+ vg_name='ceph', vg_free='1024g',
+ vg_size='99999999g', vg_free_count='999'
+ )
+ lvs = api.create_lvs(vg)
+ assert len(lvs) == 1
+
+
+class TestVolumeGroupSizing(object):
+
+ def setup(self):
+ self.vg = api.VolumeGroup(
+ vg_name='ceph', vg_free='1024g',
+ vg_free_count='261129'
+ )
+
+ def test_parts_and_size_errors(self):
+ with pytest.raises(ValueError) as error:
+ self.vg.sizing(parts=4, size=10)
+ assert "Cannot process sizing" in str(error)
+
+ def test_zero_parts_produces_100_percent(self):
+ result = self.vg.sizing(parts=0)
+ assert result['percentages'] == 100
+
+ def test_two_parts_produces_50_percent(self):
+ result = self.vg.sizing(parts=2)
+ assert result['percentages'] == 50
+
+ def test_two_parts_produces_half_size(self):
+ result = self.vg.sizing(parts=2)
+ assert result['sizes'] == 512
+
+ def test_half_size_produces_round_sizes(self):
+ result = self.vg.sizing(size=512)
+ assert result['sizes'] == 512
+ assert result['percentages'] == 50
+ assert result['parts'] == 2
+
+ def test_bit_more_than_half_size_allocates_full_size(self):
+ # 513 can't allocate more than 1, so it just fallsback to using the
+ # whole device
+ result = self.vg.sizing(size=513)
+ assert result['sizes'] == 1024
+ assert result['percentages'] == 100
+ assert result['parts'] == 1
+
+ def test_extents_are_halfed_rounded_down(self):
+ result = self.vg.sizing(size=512)
+ # the real extents would've given 130564.5
+ assert result['extents'] == 130564
+
+ def test_bit_less_size_rounds_down(self):
+ result = self.vg.sizing(size=129)
+ assert result['sizes'] == 146
+ assert result['percentages'] == 14
+ assert result['parts'] == 7
+
+ def test_unable_to_allocate_past_free_size(self):
+ with pytest.raises(exceptions.SizeAllocationError):
+ self.vg.sizing(size=2048)
+
+
class TestGetLVFromArgument(object):
def setup(self):
data_tag = ['lvchange', '--addtag', 'ceph.data_device=/path', '/path']
assert capture.calls[2]['args'][0] == data_tag
+ def test_uses_uuid(self, monkeypatch, capture):
+ monkeypatch.setattr(process, 'run', capture)
+ monkeypatch.setattr(process, 'call', capture)
+ monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
+ api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'}, uuid_name=True)
+ result = capture.calls[0]['args'][0][5]
+ assert result.startswith('foo-')
+ assert len(result) == 40
+
+
+class TestExtendVG(object):
+
+ def setup(self):
+ self.foo_volume = api.VolumeGroup(vg_name='foo', lv_tags='')
+
+ def test_uses_single_device_in_list(self, monkeypatch, fake_run):
+ monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ api.extend_vg(self.foo_volume, ['/dev/sda'])
+ expected = ['vgextend', '--force', '--yes', 'foo', '/dev/sda']
+ assert fake_run.calls[0]['args'][0] == expected
+
+ def test_uses_single_device(self, monkeypatch, fake_run):
+ monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ api.extend_vg(self.foo_volume, '/dev/sda')
+ expected = ['vgextend', '--force', '--yes', 'foo', '/dev/sda']
+ assert fake_run.calls[0]['args'][0] == expected
+
+ def test_uses_multiple_devices(self, monkeypatch, fake_run):
+ monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ api.extend_vg(self.foo_volume, ['/dev/sda', '/dev/sdb'])
+ expected = ['vgextend', '--force', '--yes', 'foo', '/dev/sda', '/dev/sdb']
+ assert fake_run.calls[0]['args'][0] == expected
+
+
+class TestCreateVG(object):
+
+ def setup(self):
+ self.foo_volume = api.VolumeGroup(vg_name='foo', lv_tags='')
+
+ def test_no_name(self, monkeypatch, fake_run):
+ monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ api.create_vg('/dev/sda')
+ result = fake_run.calls[0]['args'][0]
+ assert '/dev/sda' in result
+ assert result[-2].startswith('ceph-')
+
+ def test_devices_list(self, monkeypatch, fake_run):
+ monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ api.create_vg(['/dev/sda', '/dev/sdb'], name='ceph')
+ result = fake_run.calls[0]['args'][0]
+ expected = ['vgcreate', '--force', '--yes', 'ceph', '/dev/sda', '/dev/sdb']
+ assert result == expected
+
+ def test_name_prefix(self, monkeypatch, fake_run):
+ monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ api.create_vg('/dev/sda', name_prefix='master')
+ result = fake_run.calls[0]['args'][0]
+ assert '/dev/sda' in result
+ assert result[-2].startswith('master-')
+
+ def test_specific_name(self, monkeypatch, fake_run):
+ monkeypatch.setattr(api, 'get_vg', lambda **kw: True)
+ api.create_vg('/dev/sda', name='master')
+ result = fake_run.calls[0]['args'][0]
+ assert '/dev/sda' in result
+ assert result[-2] == 'master'
#
# The following tests are pretty gnarly. VDO detection is very convoluted and
'/sys/block': block_path})
result = api._vdo_parents(['dm-3'])
assert result == []
+
+
+class TestSplitNameParser(object):
+
+ def test_keys_are_parsed_without_prefix(self):
+ line = ["DM_VG_NAME='/dev/mapper/vg';DM_LV_NAME='lv';DM_LV_LAYER=''"]
+ result = api._splitname_parser(line)
+ assert result['VG_NAME'] == 'vg'
+ assert result['LV_NAME'] == 'lv'
+ assert result['LV_LAYER'] == ''
+
+ def test_vg_name_sans_mapper(self):
+ line = ["DM_VG_NAME='/dev/mapper/vg';DM_LV_NAME='lv';DM_LV_LAYER=''"]
+ result = api._splitname_parser(line)
+ assert '/dev/mapper' not in result['VG_NAME']
+
+
+class TestIsLV(object):
+
+ def test_is_not_an_lv(self, monkeypatch):
+ monkeypatch.setattr(api, 'dmsetup_splitname', lambda x: {})
+ assert api.is_lv('/dev/sda1', lvs=[]) is False
+
+ def test_lvs_not_found(self, monkeypatch, volumes):
+ CephVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data")
+ volumes.append(CephVolume)
+ splitname = {'LV_NAME': 'data', 'VG_NAME': 'ceph'}
+ monkeypatch.setattr(api, 'dmsetup_splitname', lambda x: splitname)
+ assert api.is_lv('/dev/sda1', lvs=volumes) is False
+
+ def test_is_lv(self, monkeypatch, volumes):
+ CephVolume = api.Volume(
+ vg_name='ceph', lv_name='data',
+ lv_path='/dev/vg/foo', lv_tags="ceph.type=data"
+ )
+ volumes.append(CephVolume)
+ splitname = {'LV_NAME': 'data', 'VG_NAME': 'ceph'}
+ monkeypatch.setattr(api, 'dmsetup_splitname', lambda x: splitname)
+ assert api.is_lv('/dev/sda1', lvs=volumes) is True