]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_api.py
2 from ceph_volume
import process
, exceptions
3 from ceph_volume
.devices
.lvm
import api
6 class TestParseTags(object):
8 def test_no_tags_means_empty_dict(self
):
9 result
= api
.parse_tags('')
12 def test_single_tag_gets_parsed(self
):
13 result
= api
.parse_tags('ceph.osd_something=1')
14 assert result
== {'ceph.osd_something': '1'}
16 def test_multiple_csv_expands_in_dict(self
):
17 result
= api
.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000')
18 # assert them piecemeal to avoid the un-ordered dict nature
19 assert result
['ceph.osd_something'] == '1'
20 assert result
['ceph.foo'] == '2'
21 assert result
['ceph.fsid'] == '0000'
24 class TestGetAPIVgs(object):
26 def test_report_is_emtpy(self
, monkeypatch
):
27 monkeypatch
.setattr(api
.process
, 'call', lambda x
: ('{}', '', 0))
28 assert api
.get_api_vgs() == []
30 def test_report_has_stuff(self
, monkeypatch
):
31 report
= '{"report":[{"vg":[{"vg_name":"VolGroup00"}]}]}'
32 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
33 assert api
.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
35 def test_report_has_multiple_items(self
, monkeypatch
):
36 report
= '{"report":[{"vg":[{"vg_name":"VolGroup00"},{"vg_name":"ceph_vg"}]}]}'
37 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
38 assert api
.get_api_vgs() == [{'vg_name': 'VolGroup00'}, {'vg_name': 'ceph_vg'}]
40 def test_does_not_get_poluted_with_non_vg_items(self
, monkeypatch
):
41 report
= '{"report":[{"vg":[{"vg_name":"VolGroup00"}],"lv":[{"lv":"1"}]}]}'
42 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
43 assert api
.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
46 class TestGetAPILvs(object):
48 def test_report_is_emtpy(self
, monkeypatch
):
49 monkeypatch
.setattr(api
.process
, 'call', lambda x
: ('{}', '', 0))
50 assert api
.get_api_lvs() == []
52 def test_report_has_stuff(self
, monkeypatch
):
53 report
= '{"report":[{"lv":[{"lv_name":"VolGroup00"}]}]}'
54 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
55 assert api
.get_api_lvs() == [{'lv_name': 'VolGroup00'}]
57 def test_report_has_multiple_items(self
, monkeypatch
):
58 report
= '{"report":[{"lv":[{"lv_name":"VolName"},{"lv_name":"ceph_lv"}]}]}'
59 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
60 assert api
.get_api_lvs() == [{'lv_name': 'VolName'}, {'lv_name': 'ceph_lv'}]
62 def test_does_not_get_poluted_with_non_lv_items(self
, monkeypatch
):
63 report
= '{"report":[{"lv":[{"lv_name":"VolName"}],"vg":[{"vg":"1"}]}]}'
64 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
65 assert api
.get_api_lvs() == [{'lv_name': 'VolName'}]
69 def volumes(monkeypatch
):
70 monkeypatch
.setattr(process
, 'call', lambda x
: ('{}', '', 0))
71 volumes
= api
.Volumes()
77 def volume_groups(monkeypatch
):
78 monkeypatch
.setattr(process
, 'call', lambda x
: ('{}', '', 0))
79 vgs
= api
.VolumeGroups()
84 class TestGetLV(object):
86 def test_nothing_is_passed_in(self
):
88 assert api
.get_lv() is None
90 def test_single_lv_is_matched(self
, volumes
, monkeypatch
):
91 FooVolume
= api
.Volume(lv_name
='foo', lv_path
='/dev/vg/foo', lv_tags
="ceph.type=data")
92 volumes
.append(FooVolume
)
93 monkeypatch
.setattr(api
, 'Volumes', lambda: volumes
)
94 assert api
.get_lv(lv_name
='foo') == FooVolume
97 class TestGetVG(object):
99 def test_nothing_is_passed_in(self
):
100 # so we return a None
101 assert api
.get_vg() is None
103 def test_single_vg_is_matched(self
, volume_groups
, monkeypatch
):
104 FooVG
= api
.VolumeGroup(vg_name
='foo')
105 volume_groups
.append(FooVG
)
106 monkeypatch
.setattr(api
, 'VolumeGroups', lambda: volume_groups
)
107 assert api
.get_vg(vg_name
='foo') == FooVG
110 class TestVolumes(object):
112 def test_volume_get_has_no_volumes(self
, volumes
):
113 assert volumes
.get() is None
115 def test_volume_get_filtered_has_no_volumes(self
, volumes
):
116 assert volumes
.get(lv_name
='ceph') is None
118 def test_volume_has_multiple_matches(self
, volumes
):
119 volume1
= volume2
= api
.Volume(lv_name
='foo', lv_path
='/dev/vg/lv', lv_tags
='')
120 volumes
.append(volume1
)
121 volumes
.append(volume2
)
122 with pytest
.raises(exceptions
.MultipleLVsError
):
123 volumes
.get(lv_name
='foo')
125 def test_find_the_correct_one(self
, volumes
):
126 volume1
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
='')
127 volume2
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='')
128 volumes
.append(volume1
)
129 volumes
.append(volume2
)
130 assert volumes
.get(lv_name
='volume1') == volume1
132 def test_filter_by_tag(self
, volumes
):
133 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
134 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
135 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='ceph.type=journal')
137 volumes
.append(journal
)
138 volumes
.filter(lv_tags
={'ceph.type': 'data'})
139 assert len(volumes
) == 1
140 assert volumes
[0].lv_name
== 'volume1'
142 def test_filter_by_vg_name(self
, volumes
):
143 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
144 osd
= api
.Volume(lv_name
='volume1', vg_name
='ceph_vg', lv_tags
=lv_tags
)
145 journal
= api
.Volume(lv_name
='volume2', vg_name
='system_vg', lv_tags
='ceph.type=journal')
147 volumes
.append(journal
)
148 volumes
.filter(vg_name
='ceph_vg')
149 assert len(volumes
) == 1
150 assert volumes
[0].lv_name
== 'volume1'
152 def test_filter_by_lv_path(self
, volumes
):
153 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/volume1', lv_tags
='')
154 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/volume2', lv_tags
='')
156 volumes
.append(journal
)
157 volumes
.filter(lv_path
='/dev/volume1')
158 assert len(volumes
) == 1
159 assert volumes
[0].lv_name
== 'volume1'
161 def test_filter_requires_params(self
, volumes
):
162 with pytest
.raises(TypeError):
166 class TestVolumeGroups(object):
168 def test_volume_get_has_no_volume_groups(self
, volume_groups
):
169 assert volume_groups
.get() is None
171 def test_volume_get_filtered_has_no_volumes(self
, volume_groups
):
172 assert volume_groups
.get(vg_name
='ceph') is None
174 def test_volume_has_multiple_matches(self
, volume_groups
):
175 volume1
= volume2
= api
.VolumeGroup(vg_name
='foo', lv_path
='/dev/vg/lv', lv_tags
='')
176 volume_groups
.append(volume1
)
177 volume_groups
.append(volume2
)
178 with pytest
.raises(exceptions
.MultipleVGsError
):
179 volume_groups
.get(vg_name
='foo')
181 def test_find_the_correct_one(self
, volume_groups
):
182 volume1
= api
.VolumeGroup(vg_name
='volume1', lv_tags
='')
183 volume2
= api
.VolumeGroup(vg_name
='volume2', lv_tags
='')
184 volume_groups
.append(volume1
)
185 volume_groups
.append(volume2
)
186 assert volume_groups
.get(vg_name
='volume1') == volume1
188 def test_filter_by_tag(self
, volume_groups
):
189 vg_tags
= "ceph.group=dmcache"
190 osd
= api
.VolumeGroup(vg_name
='volume1', vg_tags
=vg_tags
)
191 journal
= api
.VolumeGroup(vg_name
='volume2', vg_tags
='ceph.group=plain')
192 volume_groups
.append(osd
)
193 volume_groups
.append(journal
)
194 volume_groups
.filter(vg_tags
={'ceph.group': 'dmcache'})
195 assert len(volume_groups
) == 1
196 assert volume_groups
[0].vg_name
== 'volume1'
198 def test_filter_by_vg_name(self
, volume_groups
):
199 vg_tags
= "ceph.type=data,ceph.fsid=000-aaa"
200 osd
= api
.VolumeGroup(vg_name
='ceph_vg', vg_tags
=vg_tags
)
201 journal
= api
.VolumeGroup(vg_name
='volume2', vg_tags
='ceph.type=journal')
202 volume_groups
.append(osd
)
203 volume_groups
.append(journal
)
204 volume_groups
.filter(vg_name
='ceph_vg')
205 assert len(volume_groups
) == 1
206 assert volume_groups
[0].vg_name
== 'ceph_vg'
208 def test_filter_requires_params(self
, volume_groups
):
209 with pytest
.raises(TypeError):
210 volume_groups
.filter()
213 class TestCreateLV(object):
216 self
.foo_volume
= api
.Volume(lv_name
='foo', lv_path
='/path', vg_name
='foo_group', lv_tags
='')
218 def test_uses_size(self
, monkeypatch
, capture
):
219 monkeypatch
.setattr(process
, 'run', capture
)
220 monkeypatch
.setattr(process
, 'call', capture
)
221 monkeypatch
.setattr(api
, 'get_lv', lambda *a
, **kw
: self
.foo_volume
)
222 api
.create_lv('foo', 'foo_group', size
=5, type='data')
223 expected
= ['sudo', 'lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group']
224 assert capture
.calls
[0]['args'][0] == expected
226 def test_calls_to_set_type_tag(self
, monkeypatch
, capture
):
227 monkeypatch
.setattr(process
, 'run', capture
)
228 monkeypatch
.setattr(process
, 'call', capture
)
229 monkeypatch
.setattr(api
, 'get_lv', lambda *a
, **kw
: self
.foo_volume
)
230 api
.create_lv('foo', 'foo_group', size
=5, type='data')
231 ceph_tag
= ['sudo', 'lvchange', '--addtag', 'ceph.type=data', '/path']
232 assert capture
.calls
[1]['args'][0] == ceph_tag
234 def test_calls_to_set_data_tag(self
, monkeypatch
, capture
):
235 monkeypatch
.setattr(process
, 'run', capture
)
236 monkeypatch
.setattr(process
, 'call', capture
)
237 monkeypatch
.setattr(api
, 'get_lv', lambda *a
, **kw
: self
.foo_volume
)
238 api
.create_lv('foo', 'foo_group', size
=5, type='data')
239 data_tag
= ['sudo', 'lvchange', '--addtag', 'ceph.data_device=/path', '/path']
240 assert capture
.calls
[2]['args'][0] == data_tag