]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/api/test_lvm.py
a56582b3574e91b48bac539038fe845a146445ca
2 from ceph_volume
import process
, exceptions
3 from ceph_volume
.api
import lvm
as api
6 class TestParseTags(object):
8 def test_no_tags_means_empty_dict(self
):
9 result
= api
.parse_tags('')
12 def test_single_tag_gets_parsed(self
):
13 result
= api
.parse_tags('ceph.osd_something=1')
14 assert result
== {'ceph.osd_something': '1'}
16 def test_non_ceph_tags_are_skipped(self
):
17 result
= api
.parse_tags('foo')
20 def test_mixed_non_ceph_tags(self
):
21 result
= api
.parse_tags('foo,ceph.bar=1')
22 assert result
== {'ceph.bar': '1'}
24 def test_multiple_csv_expands_in_dict(self
):
25 result
= api
.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000')
26 # assert them piecemeal to avoid the un-ordered dict nature
27 assert result
['ceph.osd_something'] == '1'
28 assert result
['ceph.foo'] == '2'
29 assert result
['ceph.fsid'] == '0000'
32 class TestGetAPIVgs(object):
34 def test_report_is_emtpy(self
, monkeypatch
):
35 monkeypatch
.setattr(api
.process
, 'call', lambda x
: ('\n\n', '', 0))
36 assert api
.get_api_vgs() == []
38 def test_report_has_stuff(self
, monkeypatch
):
39 report
= [' VolGroup00']
40 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
41 assert api
.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
43 def test_report_has_stuff_with_empty_attrs(self
, monkeypatch
):
44 report
= [' VolGroup00 ;;;;;;9g']
45 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
46 result
= api
.get_api_vgs()[0]
47 assert len(result
.keys()) == 7
48 assert result
['vg_name'] == 'VolGroup00'
49 assert result
['vg_free'] == '9g'
51 def test_report_has_multiple_items(self
, monkeypatch
):
52 report
= [' VolGroup00;;;;;;;', ' ceph_vg;;;;;;;']
53 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
54 result
= api
.get_api_vgs()
55 assert result
[0]['vg_name'] == 'VolGroup00'
56 assert result
[1]['vg_name'] == 'ceph_vg'
59 class TestGetAPILvs(object):
61 def test_report_is_emtpy(self
, monkeypatch
):
62 monkeypatch
.setattr(api
.process
, 'call', lambda x
: ('', '', 0))
63 assert api
.get_api_lvs() == []
65 def test_report_has_stuff(self
, monkeypatch
):
66 report
= [' ;/path;VolGroup00;root']
67 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
68 result
= api
.get_api_lvs()
69 assert result
[0]['lv_name'] == 'VolGroup00'
71 def test_report_has_multiple_items(self
, monkeypatch
):
72 report
= [' ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg']
73 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
74 result
= api
.get_api_lvs()
75 assert result
[0]['lv_name'] == 'VolName'
76 assert result
[1]['lv_name'] == 'ceph_lv'
80 def volumes(monkeypatch
):
81 monkeypatch
.setattr(process
, 'call', lambda x
: ('', '', 0))
82 volumes
= api
.Volumes()
84 # also patch api.Volumes so that when it is called, it will use the newly
85 # created fixture, with whatever the test method wants to append to it
86 monkeypatch
.setattr(api
, 'Volumes', lambda: volumes
)
91 def pvolumes(monkeypatch
):
92 monkeypatch
.setattr(process
, 'call', lambda x
: ('', '', 0))
93 pvolumes
= api
.PVolumes()
99 def volume_groups(monkeypatch
):
100 monkeypatch
.setattr(process
, 'call', lambda x
: ('', '', 0))
101 vgs
= api
.VolumeGroups()
106 class TestGetLV(object):
108 def test_nothing_is_passed_in(self
):
109 # so we return a None
110 assert api
.get_lv() is None
112 def test_single_lv_is_matched(self
, volumes
, monkeypatch
):
113 FooVolume
= api
.Volume(lv_name
='foo', lv_path
='/dev/vg/foo', lv_tags
="ceph.type=data")
114 volumes
.append(FooVolume
)
115 monkeypatch
.setattr(api
, 'Volumes', lambda: volumes
)
116 assert api
.get_lv(lv_name
='foo') == FooVolume
118 def test_single_lv_is_matched_by_uuid(self
, volumes
, monkeypatch
):
119 FooVolume
= api
.Volume(
120 lv_name
='foo', lv_path
='/dev/vg/foo',
121 lv_uuid
='1111', lv_tags
="ceph.type=data")
122 volumes
.append(FooVolume
)
123 monkeypatch
.setattr(api
, 'Volumes', lambda: volumes
)
124 assert api
.get_lv(lv_uuid
='1111') == FooVolume
127 class TestGetPV(object):
129 def test_nothing_is_passed_in(self
):
130 # so we return a None
131 assert api
.get_pv() is None
133 def test_single_pv_is_not_matched(self
, pvolumes
, monkeypatch
):
134 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={}, vg_name
="vg")
135 pvolumes
.append(FooPVolume
)
136 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
137 assert api
.get_pv(pv_uuid
='foo') is None
139 def test_single_pv_is_matched(self
, pvolumes
, monkeypatch
):
140 FooPVolume
= api
.PVolume(vg_name
="vg", pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={})
141 pvolumes
.append(FooPVolume
)
142 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
143 assert api
.get_pv(pv_uuid
='0000') == FooPVolume
145 def test_single_pv_is_matched_by_uuid(self
, pvolumes
, monkeypatch
):
146 FooPVolume
= api
.PVolume(
147 pv_name
='/dev/vg/foo',
148 pv_uuid
='1111', pv_tags
="ceph.type=data", vg_name
="vg")
149 pvolumes
.append(FooPVolume
)
150 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
151 assert api
.get_pv(pv_uuid
='1111') == FooPVolume
153 def test_vg_name_is_set(self
, pvolumes
, monkeypatch
):
154 FooPVolume
= api
.PVolume(
155 pv_name
='/dev/vg/foo',
156 pv_uuid
='1111', pv_tags
="ceph.type=data", vg_name
="vg")
157 pvolumes
.append(FooPVolume
)
158 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
159 pv
= api
.get_pv(pv_name
="/dev/vg/foo")
160 assert pv
.vg_name
== "vg"
163 class TestPVolumes(object):
165 def test_filter_by_tag_does_not_match_one(self
, pvolumes
, monkeypatch
):
166 pv_tags
= "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa"
167 FooPVolume
= api
.PVolume(
168 pv_name
='/dev/vg/foo',
169 pv_uuid
='1111', pv_tags
=pv_tags
, vg_name
='vg')
170 pvolumes
.append(FooPVolume
)
171 pvolumes
.filter(pv_tags
={'ceph.type': 'journal', 'ceph.osd_id': '2'})
172 assert pvolumes
== []
174 def test_filter_by_tags_matches(self
, pvolumes
, monkeypatch
):
175 pv_tags
= "ceph.type=journal,ceph.osd_id=1"
176 FooPVolume
= api
.PVolume(
177 pv_name
='/dev/vg/foo',
178 pv_uuid
='1111', pv_tags
=pv_tags
, vg_name
="vg")
179 pvolumes
.append(FooPVolume
)
180 pvolumes
.filter(pv_tags
={'ceph.type': 'journal', 'ceph.osd_id': '1'})
181 assert pvolumes
== [FooPVolume
]
184 class TestGetVG(object):
186 def test_nothing_is_passed_in(self
):
187 # so we return a None
188 assert api
.get_vg() is None
190 def test_single_vg_is_matched(self
, volume_groups
, monkeypatch
):
191 FooVG
= api
.VolumeGroup(vg_name
='foo')
192 volume_groups
.append(FooVG
)
193 monkeypatch
.setattr(api
, 'VolumeGroups', lambda: volume_groups
)
194 assert api
.get_vg(vg_name
='foo') == FooVG
197 class TestVolumes(object):
199 def test_volume_get_has_no_volumes(self
, volumes
):
200 assert volumes
.get() is None
202 def test_volume_get_filtered_has_no_volumes(self
, volumes
):
203 assert volumes
.get(lv_name
='ceph') is None
205 def test_volume_has_multiple_matches(self
, volumes
):
206 volume1
= volume2
= api
.Volume(lv_name
='foo', lv_path
='/dev/vg/lv', lv_tags
='')
207 volumes
.append(volume1
)
208 volumes
.append(volume2
)
209 with pytest
.raises(exceptions
.MultipleLVsError
):
210 volumes
.get(lv_name
='foo')
212 def test_as_dict_infers_type_from_tags(self
, volumes
):
213 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
214 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
216 result
= volumes
.get(lv_tags
={'ceph.type': 'data'}).as_dict()
217 assert result
['type'] == 'data'
219 def test_as_dict_populates_path_from_lv_api(self
, volumes
):
220 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
221 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
223 result
= volumes
.get(lv_tags
={'ceph.type': 'data'}).as_dict()
224 assert result
['path'] == '/dev/vg/lv'
226 def test_find_the_correct_one(self
, volumes
):
227 volume1
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
='')
228 volume2
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='')
229 volumes
.append(volume1
)
230 volumes
.append(volume2
)
231 assert volumes
.get(lv_name
='volume1') == volume1
233 def test_filter_by_tag(self
, volumes
):
234 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
235 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
236 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='ceph.type=journal')
238 volumes
.append(journal
)
239 volumes
.filter(lv_tags
={'ceph.type': 'data'})
240 assert len(volumes
) == 1
241 assert volumes
[0].lv_name
== 'volume1'
243 def test_filter_by_tag_does_not_match_one(self
, volumes
):
244 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
245 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
246 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='ceph.osd_id=1,ceph.type=journal')
248 volumes
.append(journal
)
249 # note the different osd_id!
250 volumes
.filter(lv_tags
={'ceph.type': 'data', 'ceph.osd_id': '2'})
253 def test_filter_by_vg_name(self
, volumes
):
254 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
255 osd
= api
.Volume(lv_name
='volume1', vg_name
='ceph_vg', lv_tags
=lv_tags
)
256 journal
= api
.Volume(lv_name
='volume2', vg_name
='system_vg', lv_tags
='ceph.type=journal')
258 volumes
.append(journal
)
259 volumes
.filter(vg_name
='ceph_vg')
260 assert len(volumes
) == 1
261 assert volumes
[0].lv_name
== 'volume1'
263 def test_filter_by_lv_path(self
, volumes
):
264 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/volume1', lv_tags
='')
265 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/volume2', lv_tags
='')
267 volumes
.append(journal
)
268 volumes
.filter(lv_path
='/dev/volume1')
269 assert len(volumes
) == 1
270 assert volumes
[0].lv_name
== 'volume1'
272 def test_filter_by_lv_uuid(self
, volumes
):
273 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/volume1', lv_uuid
='1111', lv_tags
='')
274 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/volume2', lv_uuid
='', lv_tags
='')
276 volumes
.append(journal
)
277 volumes
.filter(lv_uuid
='1111')
278 assert len(volumes
) == 1
279 assert volumes
[0].lv_name
== 'volume1'
281 def test_filter_by_lv_uuid_nothing_found(self
, volumes
):
282 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/volume1', lv_uuid
='1111', lv_tags
='')
283 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/volume2', lv_uuid
='', lv_tags
='')
285 volumes
.append(journal
)
286 volumes
.filter(lv_uuid
='22222')
289 def test_filter_requires_params(self
, volumes
):
290 with pytest
.raises(TypeError):
294 class TestVolumeGroups(object):
296 def test_volume_get_has_no_volume_groups(self
, volume_groups
):
297 assert volume_groups
.get() is None
299 def test_volume_get_filtered_has_no_volumes(self
, volume_groups
):
300 assert volume_groups
.get(vg_name
='ceph') is None
302 def test_volume_has_multiple_matches(self
, volume_groups
):
303 volume1
= volume2
= api
.VolumeGroup(vg_name
='foo', lv_path
='/dev/vg/lv', lv_tags
='')
304 volume_groups
.append(volume1
)
305 volume_groups
.append(volume2
)
306 with pytest
.raises(exceptions
.MultipleVGsError
):
307 volume_groups
.get(vg_name
='foo')
309 def test_find_the_correct_one(self
, volume_groups
):
310 volume1
= api
.VolumeGroup(vg_name
='volume1', lv_tags
='')
311 volume2
= api
.VolumeGroup(vg_name
='volume2', lv_tags
='')
312 volume_groups
.append(volume1
)
313 volume_groups
.append(volume2
)
314 assert volume_groups
.get(vg_name
='volume1') == volume1
316 def test_filter_by_tag(self
, volume_groups
):
317 vg_tags
= "ceph.group=dmcache"
318 osd
= api
.VolumeGroup(vg_name
='volume1', vg_tags
=vg_tags
)
319 journal
= api
.VolumeGroup(vg_name
='volume2', vg_tags
='ceph.group=plain')
320 volume_groups
.append(osd
)
321 volume_groups
.append(journal
)
322 volume_groups
.filter(vg_tags
={'ceph.group': 'dmcache'})
323 assert len(volume_groups
) == 1
324 assert volume_groups
[0].vg_name
== 'volume1'
326 def test_filter_by_tag_does_not_match_one(self
, volume_groups
):
327 vg_tags
= "ceph.group=dmcache,ceph.disk_type=ssd"
328 osd
= api
.VolumeGroup(vg_name
='volume1', vg_path
='/dev/vg/lv', vg_tags
=vg_tags
)
329 volume_groups
.append(osd
)
330 volume_groups
.filter(vg_tags
={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
331 assert volume_groups
== []
333 def test_filter_by_vg_name(self
, volume_groups
):
334 vg_tags
= "ceph.type=data,ceph.fsid=000-aaa"
335 osd
= api
.VolumeGroup(vg_name
='ceph_vg', vg_tags
=vg_tags
)
336 journal
= api
.VolumeGroup(vg_name
='volume2', vg_tags
='ceph.type=journal')
337 volume_groups
.append(osd
)
338 volume_groups
.append(journal
)
339 volume_groups
.filter(vg_name
='ceph_vg')
340 assert len(volume_groups
) == 1
341 assert volume_groups
[0].vg_name
== 'ceph_vg'
343 def test_filter_requires_params(self
, volume_groups
):
344 with pytest
.raises(TypeError):
345 volume_groups
.filter()
348 class TestGetLVFromArgument(object):
351 self
.foo_volume
= api
.Volume(
352 lv_name
='foo', lv_path
='/path/to/lv',
353 vg_name
='foo_group', lv_tags
=''
356 def test_non_absolute_path_is_not_valid(self
, volumes
):
357 volumes
.append(self
.foo_volume
)
358 assert api
.get_lv_from_argument('foo') is None
360 def test_too_many_slashes_is_invalid(self
, volumes
):
361 volumes
.append(self
.foo_volume
)
362 assert api
.get_lv_from_argument('path/to/lv') is None
364 def test_absolute_path_is_not_lv(self
, volumes
):
365 volumes
.append(self
.foo_volume
)
366 assert api
.get_lv_from_argument('/path') is None
368 def test_absolute_path_is_lv(self
, volumes
):
369 volumes
.append(self
.foo_volume
)
370 assert api
.get_lv_from_argument('/path/to/lv') == self
.foo_volume
373 class TestRemoveLV(object):
375 def test_removes_lv(self
, monkeypatch
):
376 def mock_call(cmd
, **kw
):
378 monkeypatch
.setattr(process
, 'call', mock_call
)
379 assert api
.remove_lv("vg/lv")
381 def test_fails_to_remove_lv(self
, monkeypatch
):
382 def mock_call(cmd
, **kw
):
384 monkeypatch
.setattr(process
, 'call', mock_call
)
385 with pytest
.raises(RuntimeError):
386 api
.remove_lv("vg/lv")
389 class TestCreateLV(object):
392 self
.foo_volume
= api
.Volume(lv_name
='foo', lv_path
='/path', vg_name
='foo_group', lv_tags
='')
394 def test_uses_size(self
, monkeypatch
, capture
):
395 monkeypatch
.setattr(process
, 'run', capture
)
396 monkeypatch
.setattr(process
, 'call', capture
)
397 monkeypatch
.setattr(api
, 'get_lv', lambda *a
, **kw
: self
.foo_volume
)
398 api
.create_lv('foo', 'foo_group', size
='5G', tags
={'ceph.type': 'data'})
399 expected
= ['lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group']
400 assert capture
.calls
[0]['args'][0] == expected
402 def test_calls_to_set_type_tag(self
, monkeypatch
, capture
):
403 monkeypatch
.setattr(process
, 'run', capture
)
404 monkeypatch
.setattr(process
, 'call', capture
)
405 monkeypatch
.setattr(api
, 'get_lv', lambda *a
, **kw
: self
.foo_volume
)
406 api
.create_lv('foo', 'foo_group', size
='5G', tags
={'ceph.type': 'data'})
407 ceph_tag
= ['lvchange', '--addtag', 'ceph.type=data', '/path']
408 assert capture
.calls
[1]['args'][0] == ceph_tag
410 def test_calls_to_set_data_tag(self
, monkeypatch
, capture
):
411 monkeypatch
.setattr(process
, 'run', capture
)
412 monkeypatch
.setattr(process
, 'call', capture
)
413 monkeypatch
.setattr(api
, 'get_lv', lambda *a
, **kw
: self
.foo_volume
)
414 api
.create_lv('foo', 'foo_group', size
='5G', tags
={'ceph.type': 'data'})
415 data_tag
= ['lvchange', '--addtag', 'ceph.data_device=/path', '/path']
416 assert capture
.calls
[2]['args'][0] == data_tag