result = api.parse_tags('ceph.osd_something=1')
assert result == {'ceph.osd_something': '1'}
+ def test_non_ceph_tags_are_skipped(self):
+ result = api.parse_tags('foo')
+ assert result == {}
+
+ def test_mixed_non_ceph_tags(self):
+ result = api.parse_tags('foo,ceph.bar=1')
+ assert result == {'ceph.bar': '1'}
+
def test_multiple_csv_expands_in_dict(self):
result = api.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000')
# assert them piecemeal to avoid the un-ordered dict nature
assert api.get_pv() is None
def test_single_pv_is_not_matched(self, pvolumes, monkeypatch):
- FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
+ FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={}, vg_name="vg")
pvolumes.append(FooPVolume)
monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
assert api.get_pv(pv_uuid='foo') is None
def test_single_pv_is_matched(self, pvolumes, monkeypatch):
- FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
+ FooPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
pvolumes.append(FooPVolume)
monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
assert api.get_pv(pv_uuid='0000') == FooPVolume
def test_single_pv_is_matched_by_uuid(self, pvolumes, monkeypatch):
FooPVolume = api.PVolume(
pv_name='/dev/vg/foo',
- pv_uuid='1111', pv_tags="ceph.type=data")
+ pv_uuid='1111', pv_tags="ceph.type=data", vg_name="vg")
pvolumes.append(FooPVolume)
monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
assert api.get_pv(pv_uuid='1111') == FooPVolume
+ def test_vg_name_is_set(self, pvolumes, monkeypatch):
+ FooPVolume = api.PVolume(
+ pv_name='/dev/vg/foo',
+ pv_uuid='1111', pv_tags="ceph.type=data", vg_name="vg")
+ pvolumes.append(FooPVolume)
+ monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
+ pv = api.get_pv(pv_name="/dev/vg/foo")
+ assert pv.vg_name == "vg"
+
class TestPVolumes(object):
pv_tags = "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa"
FooPVolume = api.PVolume(
pv_name='/dev/vg/foo',
- pv_uuid='1111', pv_tags=pv_tags)
+ pv_uuid='1111', pv_tags=pv_tags, vg_name='vg')
pvolumes.append(FooPVolume)
pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '2'})
assert pvolumes == []
pv_tags = "ceph.type=journal,ceph.osd_id=1"
FooPVolume = api.PVolume(
pv_name='/dev/vg/foo',
- pv_uuid='1111', pv_tags=pv_tags)
+ pv_uuid='1111', pv_tags=pv_tags, vg_name="vg")
pvolumes.append(FooPVolume)
pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '1'})
assert pvolumes == [FooPVolume]
monkeypatch.setattr(process, 'call', capture)
monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
- expected = ['sudo', 'lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group']
+ expected = ['lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group']
assert capture.calls[0]['args'][0] == expected
def test_calls_to_set_type_tag(self, monkeypatch, capture):
monkeypatch.setattr(process, 'call', capture)
monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
- ceph_tag = ['sudo', 'lvchange', '--addtag', 'ceph.type=data', '/path']
+ ceph_tag = ['lvchange', '--addtag', 'ceph.type=data', '/path']
assert capture.calls[1]['args'][0] == ceph_tag
def test_calls_to_set_data_tag(self, monkeypatch, capture):
monkeypatch.setattr(process, 'call', capture)
monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
- data_tag = ['sudo', 'lvchange', '--addtag', 'ceph.data_device=/path', '/path']
+ data_tag = ['lvchange', '--addtag', 'ceph.data_device=/path', '/path']
assert capture.calls[2]['args'][0] == data_tag