]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_api.py
2 from ceph_volume
import process
, exceptions
3 from ceph_volume
.devices
.lvm
import api
6 class TestParseTags(object):
8 def test_no_tags_means_empty_dict(self
):
9 result
= api
.parse_tags('')
12 def test_single_tag_gets_parsed(self
):
13 result
= api
.parse_tags('ceph.osd_something=1')
14 assert result
== {'ceph.osd_something': '1'}
16 def test_multiple_csv_expands_in_dict(self
):
17 result
= api
.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000')
18 # assert them piecemeal to avoid the un-ordered dict nature
19 assert result
['ceph.osd_something'] == '1'
20 assert result
['ceph.foo'] == '2'
21 assert result
['ceph.fsid'] == '0000'
24 class TestGetAPIVgs(object):
26 def test_report_is_emtpy(self
, monkeypatch
):
27 monkeypatch
.setattr(api
.process
, 'call', lambda x
: ('\n\n', '', 0))
28 assert api
.get_api_vgs() == []
30 def test_report_has_stuff(self
, monkeypatch
):
31 report
= [' VolGroup00']
32 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
33 assert api
.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
35 def test_report_has_stuff_with_empty_attrs(self
, monkeypatch
):
36 report
= [' VolGroup00 ;;;;;;9g']
37 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
38 result
= api
.get_api_vgs()[0]
39 assert len(result
.keys()) == 7
40 assert result
['vg_name'] == 'VolGroup00'
41 assert result
['vg_free'] == '9g'
43 def test_report_has_multiple_items(self
, monkeypatch
):
44 report
= [' VolGroup00;;;;;;;', ' ceph_vg;;;;;;;']
45 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
46 result
= api
.get_api_vgs()
47 assert result
[0]['vg_name'] == 'VolGroup00'
48 assert result
[1]['vg_name'] == 'ceph_vg'
51 class TestGetAPILvs(object):
53 def test_report_is_emtpy(self
, monkeypatch
):
54 monkeypatch
.setattr(api
.process
, 'call', lambda x
: ('', '', 0))
55 assert api
.get_api_lvs() == []
57 def test_report_has_stuff(self
, monkeypatch
):
58 report
= [' ;/path;VolGroup00;root']
59 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
60 result
= api
.get_api_lvs()
61 assert result
[0]['lv_name'] == 'VolGroup00'
63 def test_report_has_multiple_items(self
, monkeypatch
):
64 report
= [' ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg']
65 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
66 result
= api
.get_api_lvs()
67 assert result
[0]['lv_name'] == 'VolName'
68 assert result
[1]['lv_name'] == 'ceph_lv'
72 def volumes(monkeypatch
):
73 monkeypatch
.setattr(process
, 'call', lambda x
: ('', '', 0))
74 volumes
= api
.Volumes()
80 def pvolumes(monkeypatch
):
81 monkeypatch
.setattr(process
, 'call', lambda x
: ('', '', 0))
82 pvolumes
= api
.PVolumes()
88 def volume_groups(monkeypatch
):
89 monkeypatch
.setattr(process
, 'call', lambda x
: ('', '', 0))
90 vgs
= api
.VolumeGroups()
95 class TestGetLV(object):
97 def test_nothing_is_passed_in(self
):
99 assert api
.get_lv() is None
101 def test_single_lv_is_matched(self
, volumes
, monkeypatch
):
102 FooVolume
= api
.Volume(lv_name
='foo', lv_path
='/dev/vg/foo', lv_tags
="ceph.type=data")
103 volumes
.append(FooVolume
)
104 monkeypatch
.setattr(api
, 'Volumes', lambda: volumes
)
105 assert api
.get_lv(lv_name
='foo') == FooVolume
107 def test_single_lv_is_matched_by_uuid(self
, volumes
, monkeypatch
):
108 FooVolume
= api
.Volume(
109 lv_name
='foo', lv_path
='/dev/vg/foo',
110 lv_uuid
='1111', lv_tags
="ceph.type=data")
111 volumes
.append(FooVolume
)
112 monkeypatch
.setattr(api
, 'Volumes', lambda: volumes
)
113 assert api
.get_lv(lv_uuid
='1111') == FooVolume
116 class TestGetPV(object):
118 def test_nothing_is_passed_in(self
):
119 # so we return a None
120 assert api
.get_pv() is None
122 def test_single_pv_is_not_matched(self
, pvolumes
, monkeypatch
):
123 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={})
124 pvolumes
.append(FooPVolume
)
125 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
126 assert api
.get_pv(pv_uuid
='foo') is None
128 def test_single_pv_is_matched(self
, pvolumes
, monkeypatch
):
129 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={})
130 pvolumes
.append(FooPVolume
)
131 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
132 assert api
.get_pv(pv_uuid
='0000') == FooPVolume
134 def test_single_pv_is_matched_by_uuid(self
, pvolumes
, monkeypatch
):
135 FooPVolume
= api
.PVolume(
136 pv_name
='/dev/vg/foo',
137 pv_uuid
='1111', pv_tags
="ceph.type=data")
138 pvolumes
.append(FooPVolume
)
139 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
140 assert api
.get_pv(pv_uuid
='1111') == FooPVolume
143 class TestPVolumes(object):
145 def test_filter_by_tag_does_not_match_one(self
, pvolumes
, monkeypatch
):
146 pv_tags
= "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa"
147 FooPVolume
= api
.PVolume(
148 pv_name
='/dev/vg/foo',
149 pv_uuid
='1111', pv_tags
=pv_tags
)
150 pvolumes
.append(FooPVolume
)
151 pvolumes
.filter(pv_tags
={'ceph.type': 'journal', 'ceph.osd_id': '2'})
152 assert pvolumes
== []
154 def test_filter_by_tags_matches(self
, pvolumes
, monkeypatch
):
155 pv_tags
= "ceph.type=journal,ceph.osd_id=1"
156 FooPVolume
= api
.PVolume(
157 pv_name
='/dev/vg/foo',
158 pv_uuid
='1111', pv_tags
=pv_tags
)
159 pvolumes
.append(FooPVolume
)
160 pvolumes
.filter(pv_tags
={'ceph.type': 'journal', 'ceph.osd_id': '1'})
161 assert pvolumes
== [FooPVolume
]
164 class TestGetVG(object):
166 def test_nothing_is_passed_in(self
):
167 # so we return a None
168 assert api
.get_vg() is None
170 def test_single_vg_is_matched(self
, volume_groups
, monkeypatch
):
171 FooVG
= api
.VolumeGroup(vg_name
='foo')
172 volume_groups
.append(FooVG
)
173 monkeypatch
.setattr(api
, 'VolumeGroups', lambda: volume_groups
)
174 assert api
.get_vg(vg_name
='foo') == FooVG
177 class TestVolumes(object):
179 def test_volume_get_has_no_volumes(self
, volumes
):
180 assert volumes
.get() is None
182 def test_volume_get_filtered_has_no_volumes(self
, volumes
):
183 assert volumes
.get(lv_name
='ceph') is None
185 def test_volume_has_multiple_matches(self
, volumes
):
186 volume1
= volume2
= api
.Volume(lv_name
='foo', lv_path
='/dev/vg/lv', lv_tags
='')
187 volumes
.append(volume1
)
188 volumes
.append(volume2
)
189 with pytest
.raises(exceptions
.MultipleLVsError
):
190 volumes
.get(lv_name
='foo')
192 def test_find_the_correct_one(self
, volumes
):
193 volume1
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
='')
194 volume2
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='')
195 volumes
.append(volume1
)
196 volumes
.append(volume2
)
197 assert volumes
.get(lv_name
='volume1') == volume1
199 def test_filter_by_tag(self
, volumes
):
200 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
201 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
202 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='ceph.type=journal')
204 volumes
.append(journal
)
205 volumes
.filter(lv_tags
={'ceph.type': 'data'})
206 assert len(volumes
) == 1
207 assert volumes
[0].lv_name
== 'volume1'
209 def test_filter_by_tag_does_not_match_one(self
, volumes
):
210 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
211 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
212 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='ceph.osd_id=1,ceph.type=journal')
214 volumes
.append(journal
)
215 # note the different osd_id!
216 volumes
.filter(lv_tags
={'ceph.type': 'data', 'ceph.osd_id': '2'})
219 def test_filter_by_vg_name(self
, volumes
):
220 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
221 osd
= api
.Volume(lv_name
='volume1', vg_name
='ceph_vg', lv_tags
=lv_tags
)
222 journal
= api
.Volume(lv_name
='volume2', vg_name
='system_vg', lv_tags
='ceph.type=journal')
224 volumes
.append(journal
)
225 volumes
.filter(vg_name
='ceph_vg')
226 assert len(volumes
) == 1
227 assert volumes
[0].lv_name
== 'volume1'
229 def test_filter_by_lv_path(self
, volumes
):
230 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/volume1', lv_tags
='')
231 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/volume2', lv_tags
='')
233 volumes
.append(journal
)
234 volumes
.filter(lv_path
='/dev/volume1')
235 assert len(volumes
) == 1
236 assert volumes
[0].lv_name
== 'volume1'
238 def test_filter_by_lv_uuid(self
, volumes
):
239 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/volume1', lv_uuid
='1111', lv_tags
='')
240 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/volume2', lv_uuid
='', lv_tags
='')
242 volumes
.append(journal
)
243 volumes
.filter(lv_uuid
='1111')
244 assert len(volumes
) == 1
245 assert volumes
[0].lv_name
== 'volume1'
247 def test_filter_by_lv_uuid_nothing_found(self
, volumes
):
248 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/volume1', lv_uuid
='1111', lv_tags
='')
249 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/volume2', lv_uuid
='', lv_tags
='')
251 volumes
.append(journal
)
252 volumes
.filter(lv_uuid
='22222')
255 def test_filter_requires_params(self
, volumes
):
256 with pytest
.raises(TypeError):
260 class TestVolumeGroups(object):
262 def test_volume_get_has_no_volume_groups(self
, volume_groups
):
263 assert volume_groups
.get() is None
265 def test_volume_get_filtered_has_no_volumes(self
, volume_groups
):
266 assert volume_groups
.get(vg_name
='ceph') is None
268 def test_volume_has_multiple_matches(self
, volume_groups
):
269 volume1
= volume2
= api
.VolumeGroup(vg_name
='foo', lv_path
='/dev/vg/lv', lv_tags
='')
270 volume_groups
.append(volume1
)
271 volume_groups
.append(volume2
)
272 with pytest
.raises(exceptions
.MultipleVGsError
):
273 volume_groups
.get(vg_name
='foo')
275 def test_find_the_correct_one(self
, volume_groups
):
276 volume1
= api
.VolumeGroup(vg_name
='volume1', lv_tags
='')
277 volume2
= api
.VolumeGroup(vg_name
='volume2', lv_tags
='')
278 volume_groups
.append(volume1
)
279 volume_groups
.append(volume2
)
280 assert volume_groups
.get(vg_name
='volume1') == volume1
282 def test_filter_by_tag(self
, volume_groups
):
283 vg_tags
= "ceph.group=dmcache"
284 osd
= api
.VolumeGroup(vg_name
='volume1', vg_tags
=vg_tags
)
285 journal
= api
.VolumeGroup(vg_name
='volume2', vg_tags
='ceph.group=plain')
286 volume_groups
.append(osd
)
287 volume_groups
.append(journal
)
288 volume_groups
.filter(vg_tags
={'ceph.group': 'dmcache'})
289 assert len(volume_groups
) == 1
290 assert volume_groups
[0].vg_name
== 'volume1'
292 def test_filter_by_tag_does_not_match_one(self
, volume_groups
):
293 vg_tags
= "ceph.group=dmcache,ceph.disk_type=ssd"
294 osd
= api
.VolumeGroup(vg_name
='volume1', vg_path
='/dev/vg/lv', vg_tags
=vg_tags
)
295 volume_groups
.append(osd
)
296 volume_groups
.filter(vg_tags
={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
297 assert volume_groups
== []
299 def test_filter_by_vg_name(self
, volume_groups
):
300 vg_tags
= "ceph.type=data,ceph.fsid=000-aaa"
301 osd
= api
.VolumeGroup(vg_name
='ceph_vg', vg_tags
=vg_tags
)
302 journal
= api
.VolumeGroup(vg_name
='volume2', vg_tags
='ceph.type=journal')
303 volume_groups
.append(osd
)
304 volume_groups
.append(journal
)
305 volume_groups
.filter(vg_name
='ceph_vg')
306 assert len(volume_groups
) == 1
307 assert volume_groups
[0].vg_name
== 'ceph_vg'
309 def test_filter_requires_params(self
, volume_groups
):
310 with pytest
.raises(TypeError):
311 volume_groups
.filter()
314 class TestCreateLV(object):
317 self
.foo_volume
= api
.Volume(lv_name
='foo', lv_path
='/path', vg_name
='foo_group', lv_tags
='')
319 def test_uses_size(self
, monkeypatch
, capture
):
320 monkeypatch
.setattr(process
, 'run', capture
)
321 monkeypatch
.setattr(process
, 'call', capture
)
322 monkeypatch
.setattr(api
, 'get_lv', lambda *a
, **kw
: self
.foo_volume
)
323 api
.create_lv('foo', 'foo_group', size
=5, type='data')
324 expected
= ['sudo', 'lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group']
325 assert capture
.calls
[0]['args'][0] == expected
327 def test_calls_to_set_type_tag(self
, monkeypatch
, capture
):
328 monkeypatch
.setattr(process
, 'run', capture
)
329 monkeypatch
.setattr(process
, 'call', capture
)
330 monkeypatch
.setattr(api
, 'get_lv', lambda *a
, **kw
: self
.foo_volume
)
331 api
.create_lv('foo', 'foo_group', size
=5, type='data')
332 ceph_tag
= ['sudo', 'lvchange', '--addtag', 'ceph.type=data', '/path']
333 assert capture
.calls
[1]['args'][0] == ceph_tag
335 def test_calls_to_set_data_tag(self
, monkeypatch
, capture
):
336 monkeypatch
.setattr(process
, 'run', capture
)
337 monkeypatch
.setattr(process
, 'call', capture
)
338 monkeypatch
.setattr(api
, 'get_lv', lambda *a
, **kw
: self
.foo_volume
)
339 api
.create_lv('foo', 'foo_group', size
=5, type='data')
340 data_tag
= ['sudo', 'lvchange', '--addtag', 'ceph.data_device=/path', '/path']
341 assert capture
.calls
[2]['args'][0] == data_tag