]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/api/test_lvm.py
8525965a374fcca8c228a6163a8d2a6ac7df0e2f
3 from mock
.mock
import patch
4 from ceph_volume
import process
, exceptions
5 from ceph_volume
.api
import lvm
as api
8 class TestParseTags(object):
10 def test_no_tags_means_empty_dict(self
):
11 result
= api
.parse_tags('')
14 def test_single_tag_gets_parsed(self
):
15 result
= api
.parse_tags('ceph.osd_something=1')
16 assert result
== {'ceph.osd_something': '1'}
18 def test_non_ceph_tags_are_skipped(self
):
19 result
= api
.parse_tags('foo')
22 def test_mixed_non_ceph_tags(self
):
23 result
= api
.parse_tags('foo,ceph.bar=1')
24 assert result
== {'ceph.bar': '1'}
26 def test_multiple_csv_expands_in_dict(self
):
27 result
= api
.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000')
28 # assert them piecemeal to avoid the un-ordered dict nature
29 assert result
['ceph.osd_something'] == '1'
30 assert result
['ceph.foo'] == '2'
31 assert result
['ceph.fsid'] == '0000'
34 class TestGetAPIVgs(object):
36 def test_report_is_emtpy(self
, monkeypatch
):
37 monkeypatch
.setattr(api
.process
, 'call', lambda x
,**kw
: ('\n\n', '', 0))
38 assert api
.get_api_vgs() == []
40 def test_report_has_stuff(self
, monkeypatch
):
41 report
= [' VolGroup00']
42 monkeypatch
.setattr(api
.process
, 'call', lambda x
, **kw
: (report
, '', 0))
43 assert api
.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
45 def test_report_has_stuff_with_empty_attrs(self
, monkeypatch
):
46 report
= [' VolGroup00 ;;;;;;4194304']
47 monkeypatch
.setattr(api
.process
, 'call', lambda x
, **kw
: (report
, '', 0))
48 result
= api
.get_api_vgs()[0]
49 assert len(result
.keys()) == 7
50 assert result
['vg_name'] == 'VolGroup00'
51 assert result
['vg_extent_size'] == '4194304'
53 def test_report_has_multiple_items(self
, monkeypatch
):
54 report
= [' VolGroup00;;;;;;;', ' ceph_vg;;;;;;;']
55 monkeypatch
.setattr(api
.process
, 'call', lambda x
, **kw
: (report
, '', 0))
56 result
= api
.get_api_vgs()
57 assert result
[0]['vg_name'] == 'VolGroup00'
58 assert result
[1]['vg_name'] == 'ceph_vg'
61 class TestGetAPILvs(object):
63 def test_report_is_emtpy(self
, monkeypatch
):
64 monkeypatch
.setattr(api
.process
, 'call', lambda x
, **kw
: ('', '', 0))
65 assert api
.get_api_lvs() == []
67 def test_report_has_stuff(self
, monkeypatch
):
68 report
= [' ;/path;VolGroup00;root']
69 monkeypatch
.setattr(api
.process
, 'call', lambda x
, **kw
: (report
, '', 0))
70 result
= api
.get_api_lvs()
71 assert result
[0]['lv_name'] == 'VolGroup00'
73 def test_report_has_multiple_items(self
, monkeypatch
):
74 report
= [' ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg']
75 monkeypatch
.setattr(api
.process
, 'call', lambda x
, **kw
: (report
, '', 0))
76 result
= api
.get_api_lvs()
77 assert result
[0]['lv_name'] == 'VolName'
78 assert result
[1]['lv_name'] == 'ceph_lv'
82 def volumes(monkeypatch
):
83 monkeypatch
.setattr(process
, 'call', lambda x
, **kw
: ('', '', 0))
84 volumes
= api
.Volumes()
86 # also patch api.Volumes so that when it is called, it will use the newly
87 # created fixture, with whatever the test method wants to append to it
88 monkeypatch
.setattr(api
, 'Volumes', lambda: volumes
)
93 def volume_groups(monkeypatch
):
94 monkeypatch
.setattr(process
, 'call', lambda x
, **kw
: ('', '', 0))
95 vgs
= api
.VolumeGroups()
100 class TestGetLV(object):
102 def test_nothing_is_passed_in(self
):
103 # so we return a None
104 assert api
.get_lv() is None
106 def test_single_lv_is_matched(self
, volumes
, monkeypatch
):
107 FooVolume
= api
.Volume(lv_name
='foo', lv_path
='/dev/vg/foo', lv_tags
="ceph.type=data")
108 volumes
.append(FooVolume
)
109 monkeypatch
.setattr(api
, 'Volumes', lambda: volumes
)
110 assert api
.get_lv(lv_name
='foo') == FooVolume
112 def test_single_lv_is_matched_by_uuid(self
, volumes
, monkeypatch
):
113 FooVolume
= api
.Volume(
114 lv_name
='foo', lv_path
='/dev/vg/foo',
115 lv_uuid
='1111', lv_tags
="ceph.type=data")
116 volumes
.append(FooVolume
)
117 monkeypatch
.setattr(api
, 'Volumes', lambda: volumes
)
118 assert api
.get_lv(lv_uuid
='1111') == FooVolume
121 class TestGetPV(object):
123 def test_nothing_is_passed_in(self
):
124 # so we return a None
125 assert api
.get_pv() is None
127 def test_single_pv_is_not_matched(self
, pvolumes
, monkeypatch
):
128 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={}, vg_name
="vg")
129 pvolumes
.append(FooPVolume
)
130 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
131 assert api
.get_pv(pv_uuid
='foo') is None
133 def test_single_pv_is_matched(self
, pvolumes
, monkeypatch
):
134 FooPVolume
= api
.PVolume(vg_name
="vg", pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={})
135 pvolumes
.append(FooPVolume
)
136 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
137 assert api
.get_pv(pv_uuid
='0000') == FooPVolume
139 def test_multiple_pvs_is_matched_by_uuid(self
, pvolumes
, monkeypatch
):
140 FooPVolume
= api
.PVolume(vg_name
="vg", pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={}, lv_uuid
="0000000")
141 BarPVolume
= api
.PVolume(vg_name
="vg", pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={})
142 pvolumes
.append(FooPVolume
)
143 pvolumes
.append(BarPVolume
)
144 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
145 assert api
.get_pv(pv_uuid
='0000') == FooPVolume
147 def test_multiple_pvs_is_matched_by_name(self
, pvolumes
, monkeypatch
):
148 FooPVolume
= api
.PVolume(vg_name
="vg", pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={}, lv_uuid
="0000000")
149 BarPVolume
= api
.PVolume(vg_name
="vg", pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={})
150 pvolumes
.append(FooPVolume
)
151 pvolumes
.append(BarPVolume
)
152 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
153 assert api
.get_pv(pv_name
='/dev/sda') == FooPVolume
155 def test_multiple_pvs_is_matched_by_tags(self
, pvolumes
, monkeypatch
):
156 FooPVolume
= api
.PVolume(vg_name
="vg1", pv_name
='/dev/sdc', pv_uuid
="1000", pv_tags
="ceph.foo=bar", lv_uuid
="0000000")
157 BarPVolume
= api
.PVolume(vg_name
="vg", pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
="ceph.foo=bar")
158 pvolumes
.append(FooPVolume
)
159 pvolumes
.append(BarPVolume
)
160 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
161 with pytest
.raises(exceptions
.MultiplePVsError
):
162 api
.get_pv(pv_tags
={"ceph.foo": "bar"})
164 def test_single_pv_is_matched_by_uuid(self
, pvolumes
, monkeypatch
):
165 FooPVolume
= api
.PVolume(
166 pv_name
='/dev/vg/foo',
167 pv_uuid
='1111', pv_tags
="ceph.type=data", vg_name
="vg")
168 pvolumes
.append(FooPVolume
)
169 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
170 assert api
.get_pv(pv_uuid
='1111') == FooPVolume
172 def test_vg_name_is_set(self
, pvolumes
, monkeypatch
):
173 FooPVolume
= api
.PVolume(
174 pv_name
='/dev/vg/foo',
175 pv_uuid
='1111', pv_tags
="ceph.type=data", vg_name
="vg")
176 pvolumes
.append(FooPVolume
)
177 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
178 pv
= api
.get_pv(pv_name
="/dev/vg/foo")
179 assert pv
.vg_name
== "vg"
182 class TestPVolumes(object):
184 def test_filter_by_tag_does_not_match_one(self
, pvolumes
, monkeypatch
):
185 pv_tags
= "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa"
186 FooPVolume
= api
.PVolume(
187 pv_name
='/dev/vg/foo',
188 pv_uuid
='1111', pv_tags
=pv_tags
, vg_name
='vg')
189 pvolumes
.append(FooPVolume
)
190 assert pvolumes
.filter(pv_tags
={'ceph.type': 'journal',
191 'ceph.osd_id': '2'}) == []
193 def test_filter_by_tags_matches(self
, pvolumes
, monkeypatch
):
194 pv_tags
= "ceph.type=journal,ceph.osd_id=1"
195 FooPVolume
= api
.PVolume(
196 pv_name
='/dev/vg/foo',
197 pv_uuid
='1111', pv_tags
=pv_tags
, vg_name
="vg")
198 pvolumes
.append(FooPVolume
)
199 assert pvolumes
.filter(pv_tags
={'ceph.type': 'journal',
200 'ceph.osd_id': '1'}) == [FooPVolume
]
203 class TestGetVG(object):
205 def test_nothing_is_passed_in(self
):
206 # so we return a None
207 assert api
.get_vg() is None
209 def test_single_vg_is_matched(self
, volume_groups
, monkeypatch
):
210 FooVG
= api
.VolumeGroup(vg_name
='foo')
211 volume_groups
.append(FooVG
)
212 monkeypatch
.setattr(api
, 'VolumeGroups', lambda: volume_groups
)
213 assert api
.get_vg(vg_name
='foo') == FooVG
216 class TestVolume(object):
218 def test_is_ceph_device(self
):
219 lv_tags
= "ceph.type=data,ceph.osd_id=0"
220 osd
= api
.Volume(lv_name
='osd/volume', lv_tags
=lv_tags
)
221 assert api
.is_ceph_device(osd
)
223 @pytest.mark
.parametrize('dev',[
225 api
.VolumeGroup(vg_name
='foo'),
226 api
.Volume(lv_name
='vg/no_osd', lv_tags
=''),
227 api
.Volume(lv_name
='vg/no_osd', lv_tags
='ceph.osd_id=null'),
230 def test_is_not_ceph_device(self
, dev
):
231 assert not api
.is_ceph_device(dev
)
233 def test_no_empty_lv_name(self
):
234 with pytest
.raises(ValueError):
235 api
.Volume(lv_name
='', lv_tags
='')
238 class TestVolumes(object):
240 def test_volume_get_has_no_volumes(self
, volumes
):
241 assert volumes
.get() is None
243 def test_volume_get_filtered_has_no_volumes(self
, volumes
):
244 assert volumes
.get(lv_name
='ceph') is None
246 def test_volume_has_multiple_matches(self
, volumes
):
247 volume1
= volume2
= api
.Volume(lv_name
='foo', lv_path
='/dev/vg/lv', lv_tags
='')
248 volumes
.append(volume1
)
249 volumes
.append(volume2
)
250 with pytest
.raises(exceptions
.MultipleLVsError
):
251 volumes
.get(lv_name
='foo')
253 def test_as_dict_infers_type_from_tags(self
, volumes
):
254 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
255 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
257 result
= volumes
.get(lv_tags
={'ceph.type': 'data'}).as_dict()
258 assert result
['type'] == 'data'
260 def test_as_dict_populates_path_from_lv_api(self
, volumes
):
261 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
262 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
264 result
= volumes
.get(lv_tags
={'ceph.type': 'data'}).as_dict()
265 assert result
['path'] == '/dev/vg/lv'
267 def test_find_the_correct_one(self
, volumes
):
268 volume1
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
='')
269 volume2
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='')
270 volumes
.append(volume1
)
271 volumes
.append(volume2
)
272 assert volumes
.get(lv_name
='volume1') == volume1
274 def test_filter_by_tag(self
, volumes
):
275 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
276 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
277 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='ceph.type=journal')
279 volumes
.append(journal
)
280 volumes
.filter(lv_tags
={'ceph.type': 'data'})
281 assert len(volumes
) == 1
282 assert volumes
[0].lv_name
== 'volume1'
284 def test_filter_by_tag_does_not_match_one(self
, volumes
):
285 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
286 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
287 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='ceph.osd_id=1,ceph.type=journal')
289 volumes
.append(journal
)
290 # note the different osd_id!
291 volumes
.filter(lv_tags
={'ceph.type': 'data', 'ceph.osd_id': '2'})
294 def test_filter_by_vg_name(self
, volumes
):
295 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
296 osd
= api
.Volume(lv_name
='volume1', vg_name
='ceph_vg', lv_tags
=lv_tags
)
297 journal
= api
.Volume(lv_name
='volume2', vg_name
='system_vg', lv_tags
='ceph.type=journal')
299 volumes
.append(journal
)
300 volumes
.filter(vg_name
='ceph_vg')
301 assert len(volumes
) == 1
302 assert volumes
[0].lv_name
== 'volume1'
304 def test_filter_by_lv_path(self
, volumes
):
305 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/volume1', lv_tags
='')
306 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/volume2', lv_tags
='')
308 volumes
.append(journal
)
309 volumes
.filter(lv_path
='/dev/volume1')
310 assert len(volumes
) == 1
311 assert volumes
[0].lv_name
== 'volume1'
313 def test_filter_by_lv_uuid(self
, volumes
):
314 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/volume1', lv_uuid
='1111', lv_tags
='')
315 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/volume2', lv_uuid
='', lv_tags
='')
317 volumes
.append(journal
)
318 volumes
.filter(lv_uuid
='1111')
319 assert len(volumes
) == 1
320 assert volumes
[0].lv_name
== 'volume1'
322 def test_filter_by_lv_uuid_nothing_found(self
, volumes
):
323 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/volume1', lv_uuid
='1111', lv_tags
='')
324 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/volume2', lv_uuid
='', lv_tags
='')
326 volumes
.append(journal
)
327 volumes
.filter(lv_uuid
='22222')
330 def test_filter_requires_params(self
, volumes
):
331 with pytest
.raises(TypeError):
335 class TestVolumeGroup(object):
337 def test_volume_group_no_empty_name(self
):
338 with pytest
.raises(ValueError):
339 api
.VolumeGroup(vg_name
='')
342 class TestVolumeGroups(object):
344 def test_volume_get_has_no_volume_groups(self
, volume_groups
):
345 assert volume_groups
.get() is None
347 def test_volume_get_filtered_has_no_volumes(self
, volume_groups
):
348 assert volume_groups
.get(vg_name
='ceph') is None
350 def test_volume_has_multiple_matches(self
, volume_groups
):
351 volume1
= volume2
= api
.VolumeGroup(vg_name
='foo', lv_path
='/dev/vg/lv', lv_tags
='')
352 volume_groups
.append(volume1
)
353 volume_groups
.append(volume2
)
354 with pytest
.raises(exceptions
.MultipleVGsError
):
355 volume_groups
.get(vg_name
='foo')
357 def test_find_the_correct_one(self
, volume_groups
):
358 volume1
= api
.VolumeGroup(vg_name
='volume1', lv_tags
='')
359 volume2
= api
.VolumeGroup(vg_name
='volume2', lv_tags
='')
360 volume_groups
.append(volume1
)
361 volume_groups
.append(volume2
)
362 assert volume_groups
.get(vg_name
='volume1') == volume1
364 def test_filter_by_tag(self
, volume_groups
):
365 vg_tags
= "ceph.group=dmcache"
366 osd
= api
.VolumeGroup(vg_name
='volume1', vg_tags
=vg_tags
)
367 journal
= api
.VolumeGroup(vg_name
='volume2', vg_tags
='ceph.group=plain')
368 volume_groups
.append(osd
)
369 volume_groups
.append(journal
)
370 volume_groups
= volume_groups
.filter(vg_tags
={'ceph.group': 'dmcache'})
371 assert len(volume_groups
) == 1
372 assert volume_groups
[0].vg_name
== 'volume1'
374 def test_filter_by_tag_does_not_match_one(self
, volume_groups
):
375 vg_tags
= "ceph.group=dmcache,ceph.disk_type=ssd"
376 osd
= api
.VolumeGroup(vg_name
='volume1', vg_path
='/dev/vg/lv', vg_tags
=vg_tags
)
377 volume_groups
.append(osd
)
378 volume_groups
= volume_groups
.filter(vg_tags
={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
379 assert volume_groups
== []
381 def test_filter_by_vg_name(self
, volume_groups
):
382 vg_tags
= "ceph.type=data,ceph.fsid=000-aaa"
383 osd
= api
.VolumeGroup(vg_name
='ceph_vg', vg_tags
=vg_tags
)
384 journal
= api
.VolumeGroup(vg_name
='volume2', vg_tags
='ceph.type=journal')
385 volume_groups
.append(osd
)
386 volume_groups
.append(journal
)
387 volume_groups
= volume_groups
.filter(vg_name
='ceph_vg')
388 assert len(volume_groups
) == 1
389 assert volume_groups
[0].vg_name
== 'ceph_vg'
391 def test_filter_requires_params(self
, volume_groups
):
392 with pytest
.raises(TypeError):
393 volume_groups
= volume_groups
.filter()
396 class TestVolumeGroupFree(object):
398 def test_integer_gets_produced(self
):
399 vg
= api
.VolumeGroup(vg_name
='nosize', vg_free_count
=100, vg_extent_size
=4194304)
400 assert vg
.free
== 100 * 4194304
403 class TestCreateLVs(object):
406 self
.vg
= api
.VolumeGroup(vg_name
='ceph',
407 vg_extent_size
=1073741824,
408 vg_extent_count
=99999999,
411 def test_creates_correct_lv_number_from_parts(self
, monkeypatch
):
412 monkeypatch
.setattr('ceph_volume.api.lvm.create_lv', lambda *a
, **kw
: (a
, kw
))
413 lvs
= api
.create_lvs(self
.vg
, parts
=4)
416 def test_suffixes_the_size_arg(self
, monkeypatch
):
417 monkeypatch
.setattr('ceph_volume.api.lvm.create_lv', lambda *a
, **kw
: (a
, kw
))
418 lvs
= api
.create_lvs(self
.vg
, parts
=4)
419 assert lvs
[0][1]['extents'] == 249
421 def test_only_uses_free_size(self
, monkeypatch
):
422 monkeypatch
.setattr('ceph_volume.api.lvm.create_lv', lambda *a
, **kw
: (a
, kw
))
423 vg
= api
.VolumeGroup(vg_name
='ceph',
424 vg_extent_size
=1073741824,
425 vg_extent_count
=99999999,
427 lvs
= api
.create_lvs(vg
, parts
=4)
428 assert lvs
[0][1]['extents'] == 250
430 def test_null_tags_are_set_by_default(self
, monkeypatch
):
431 monkeypatch
.setattr('ceph_volume.api.lvm.create_lv', lambda *a
, **kw
: (a
, kw
))
432 kwargs
= api
.create_lvs(self
.vg
, parts
=4)[0][1]
433 assert list(kwargs
['tags'].values()) == ['null', 'null', 'null', 'null']
435 def test_fallback_to_one_part(self
, monkeypatch
):
436 monkeypatch
.setattr('ceph_volume.api.lvm.create_lv', lambda *a
, **kw
: (a
, kw
))
437 lvs
= api
.create_lvs(self
.vg
)
441 class TestVolumeGroupSizing(object):
444 self
.vg
= api
.VolumeGroup(vg_name
='ceph',
445 vg_extent_size
=1073741824,
448 def test_parts_and_size_errors(self
):
449 with pytest
.raises(ValueError) as error
:
450 self
.vg
.sizing(parts
=4, size
=10)
451 assert "Cannot process sizing" in str(error
.value
)
453 def test_zero_parts_produces_100_percent(self
):
454 result
= self
.vg
.sizing(parts
=0)
455 assert result
['percentages'] == 100
457 def test_two_parts_produces_50_percent(self
):
458 result
= self
.vg
.sizing(parts
=2)
459 assert result
['percentages'] == 50
461 def test_two_parts_produces_half_size(self
):
462 result
= self
.vg
.sizing(parts
=2)
463 assert result
['sizes'] == 512
465 def test_half_size_produces_round_sizes(self
):
466 result
= self
.vg
.sizing(size
=512)
467 assert result
['sizes'] == 512
468 assert result
['percentages'] == 50
469 assert result
['parts'] == 2
471 def test_bit_more_than_half_size_allocates_full_size(self
):
472 # 513 can't allocate more than 1, so it just fallsback to using the
474 result
= self
.vg
.sizing(size
=513)
475 assert result
['sizes'] == 1024
476 assert result
['percentages'] == 100
477 assert result
['parts'] == 1
479 def test_extents_are_halfed_rounded_down(self
):
480 result
= self
.vg
.sizing(size
=512)
481 assert result
['extents'] == 512
483 def test_bit_less_size_rounds_down(self
):
484 result
= self
.vg
.sizing(size
=129)
485 assert result
['sizes'] == 146
486 assert result
['percentages'] == 14
487 assert result
['parts'] == 7
489 def test_unable_to_allocate_past_free_size(self
):
490 with pytest
.raises(exceptions
.SizeAllocationError
):
491 self
.vg
.sizing(size
=2048)
494 class TestGetLVFromArgument(object):
497 self
.foo_volume
= api
.Volume(
498 lv_name
='foo', lv_path
='/path/to/lv',
499 vg_name
='foo_group', lv_tags
=''
502 def test_non_absolute_path_is_not_valid(self
, volumes
):
503 volumes
.append(self
.foo_volume
)
504 assert api
.get_lv_from_argument('foo') is None
506 def test_too_many_slashes_is_invalid(self
, volumes
):
507 volumes
.append(self
.foo_volume
)
508 assert api
.get_lv_from_argument('path/to/lv') is None
510 def test_absolute_path_is_not_lv(self
, volumes
):
511 volumes
.append(self
.foo_volume
)
512 assert api
.get_lv_from_argument('/path') is None
514 def test_absolute_path_is_lv(self
, volumes
):
515 volumes
.append(self
.foo_volume
)
516 assert api
.get_lv_from_argument('/path/to/lv') == self
.foo_volume
519 class TestRemoveLV(object):
521 def test_removes_lv(self
, monkeypatch
):
522 def mock_call(cmd
, **kw
):
524 monkeypatch
.setattr(process
, 'call', mock_call
)
525 assert api
.remove_lv("vg/lv")
527 def test_removes_lv_object(self
, fake_call
):
528 foo_volume
= api
.Volume(lv_name
='foo', lv_path
='/path', vg_name
='foo_group', lv_tags
='')
529 api
.remove_lv(foo_volume
)
530 # last argument from the list passed to process.call
531 assert fake_call
.calls
[0]['args'][0][-1] == '/path'
533 def test_fails_to_remove_lv(self
, monkeypatch
):
534 def mock_call(cmd
, **kw
):
536 monkeypatch
.setattr(process
, 'call', mock_call
)
537 with pytest
.raises(RuntimeError):
538 api
.remove_lv("vg/lv")
541 class TestCreateLV(object):
544 self
.foo_volume
= api
.Volume(lv_name
='foo', lv_path
='/path', vg_name
='foo_group', lv_tags
='')
545 self
.foo_group
= api
.VolumeGroup(vg_name
='foo_group',
546 vg_extent_size
=4194304,
549 @patch('ceph_volume.api.lvm.process.run')
550 @patch('ceph_volume.api.lvm.process.call')
551 @patch('ceph_volume.api.lvm.get_lv')
552 def test_uses_size(self
, m_get_lv
, m_call
, m_run
, monkeypatch
):
553 m_get_lv
.return_value
= self
.foo_volume
554 api
.create_lv('foo', 0, vg
=self
.foo_group
, size
=5368709120, tags
={'ceph.type': 'data'})
555 expected
= ['lvcreate', '--yes', '-l', '1280', '-n', 'foo-0', 'foo_group']
556 m_run
.assert_called_with(expected
)
558 @patch('ceph_volume.api.lvm.process.run')
559 @patch('ceph_volume.api.lvm.process.call')
560 @patch('ceph_volume.api.lvm.get_lv')
561 def test_uses_extents(self
, m_get_lv
, m_call
, m_run
, monkeypatch
):
562 m_get_lv
.return_value
= self
.foo_volume
563 api
.create_lv('foo', 0, vg
=self
.foo_group
, extents
='50', tags
={'ceph.type': 'data'})
564 expected
= ['lvcreate', '--yes', '-l', '50', '-n', 'foo-0', 'foo_group']
565 m_run
.assert_called_with(expected
)
567 @pytest.mark
.parametrize("test_input,expected",
570 @patch('ceph_volume.api.lvm.process.run')
571 @patch('ceph_volume.api.lvm.process.call')
572 @patch('ceph_volume.api.lvm.get_lv')
573 def test_uses_slots(self
, m_get_lv
, m_call
, m_run
, monkeypatch
, test_input
, expected
):
574 m_get_lv
.return_value
= self
.foo_volume
575 api
.create_lv('foo', 0, vg
=self
.foo_group
, slots
=test_input
, tags
={'ceph.type': 'data'})
576 expected
= ['lvcreate', '--yes', '-l', str(expected
), '-n', 'foo-0', 'foo_group']
577 m_run
.assert_called_with(expected
)
579 @patch('ceph_volume.api.lvm.process.run')
580 @patch('ceph_volume.api.lvm.process.call')
581 @patch('ceph_volume.api.lvm.get_lv')
582 def test_uses_all(self
, m_get_lv
, m_call
, m_run
, monkeypatch
):
583 m_get_lv
.return_value
= self
.foo_volume
584 api
.create_lv('foo', 0, vg
=self
.foo_group
, tags
={'ceph.type': 'data'})
585 expected
= ['lvcreate', '--yes', '-l', '100%FREE', '-n', 'foo-0', 'foo_group']
586 m_run
.assert_called_with(expected
)
588 @patch('ceph_volume.api.lvm.process.run')
589 @patch('ceph_volume.api.lvm.process.call')
590 @patch('ceph_volume.api.lvm.Volume.set_tags')
591 @patch('ceph_volume.api.lvm.get_lv')
592 def test_calls_to_set_tags_default(self
, m_get_lv
, m_set_tags
, m_call
, m_run
, monkeypatch
):
593 m_get_lv
.return_value
= self
.foo_volume
594 api
.create_lv('foo', 0, vg
=self
.foo_group
)
596 "ceph.osd_id": "null",
598 "ceph.cluster_fsid": "null",
599 "ceph.osd_fsid": "null",
601 m_set_tags
.assert_called_with(tags
)
603 @patch('ceph_volume.api.lvm.process.run')
604 @patch('ceph_volume.api.lvm.process.call')
605 @patch('ceph_volume.api.lvm.Volume.set_tags')
606 @patch('ceph_volume.api.lvm.get_lv')
607 def test_calls_to_set_tags_arg(self
, m_get_lv
, m_set_tags
, m_call
, m_run
, monkeypatch
):
608 m_get_lv
.return_value
= self
.foo_volume
609 api
.create_lv('foo', 0, vg
=self
.foo_group
, tags
={'ceph.type': 'data'})
612 "ceph.data_device": "/path"
614 m_set_tags
.assert_called_with(tags
)
616 @patch('ceph_volume.api.lvm.process.run')
617 @patch('ceph_volume.api.lvm.process.call')
618 @patch('ceph_volume.api.lvm.get_device_vgs')
619 @patch('ceph_volume.api.lvm.create_vg')
620 @patch('ceph_volume.api.lvm.get_lv')
621 def test_create_vg(self
, m_get_lv
, m_create_vg
, m_get_device_vgs
, m_call
,
623 m_get_lv
.return_value
= self
.foo_volume
624 m_get_device_vgs
.return_value
= []
625 api
.create_lv('foo', 0, device
='dev/foo', size
='5G', tags
={'ceph.type': 'data'})
626 m_create_vg
.assert_called_with('dev/foo', name_prefix
='ceph')
629 class TestTags(object):
632 self
.foo_volume_clean
= api
.Volume(lv_name
='foo_clean', lv_path
='/pathclean',
635 self
.foo_volume
= api
.Volume(lv_name
='foo', lv_path
='/path',
637 lv_tags
='ceph.foo0=bar0,ceph.foo1=bar1,ceph.foo2=bar2')
639 def test_set_tag(self
, monkeypatch
, capture
):
640 monkeypatch
.setattr(process
, 'run', capture
)
641 monkeypatch
.setattr(process
, 'call', capture
)
642 self
.foo_volume_clean
.set_tag('foo', 'bar')
643 expected
= ['lvchange', '--addtag', 'foo=bar', '/pathclean']
644 assert capture
.calls
[0]['args'][0] == expected
645 assert self
.foo_volume_clean
.tags
== {'foo': 'bar'}
647 def test_set_clear_tag(self
, monkeypatch
, capture
):
648 monkeypatch
.setattr(process
, 'run', capture
)
649 monkeypatch
.setattr(process
, 'call', capture
)
650 self
.foo_volume_clean
.set_tag('foo', 'bar')
651 assert self
.foo_volume_clean
.tags
== {'foo': 'bar'}
652 self
.foo_volume_clean
.clear_tag('foo')
653 expected
= ['lvchange', '--deltag', 'foo=bar', '/pathclean']
654 assert self
.foo_volume_clean
.tags
== {}
655 assert capture
.calls
[1]['args'][0] == expected
657 def test_set_tags(self
, monkeypatch
, capture
):
658 monkeypatch
.setattr(process
, 'run', capture
)
659 monkeypatch
.setattr(process
, 'call', capture
)
660 tags
= {'ceph.foo0': 'bar0', 'ceph.foo1': 'bar1', 'ceph.foo2': 'bar2'}
661 assert self
.foo_volume
.tags
== tags
663 tags
= {'ceph.foo0': 'bar0', 'ceph.foo1': 'baz1', 'ceph.foo2': 'baz2'}
664 self
.foo_volume
.set_tags(tags
)
665 assert self
.foo_volume
.tags
== tags
667 self
.foo_volume
.set_tag('ceph.foo1', 'other1')
668 tags
['ceph.foo1'] = 'other1'
669 assert self
.foo_volume
.tags
== tags
672 ['lvchange', '--deltag', 'ceph.foo0=bar0', '/path'],
673 ['lvchange', '--addtag', 'ceph.foo0=bar0', '/path'],
674 ['lvchange', '--deltag', 'ceph.foo1=bar1', '/path'],
675 ['lvchange', '--addtag', 'ceph.foo1=baz1', '/path'],
676 ['lvchange', '--deltag', 'ceph.foo2=bar2', '/path'],
677 ['lvchange', '--addtag', 'ceph.foo2=baz2', '/path'],
678 ['lvchange', '--deltag', 'ceph.foo1=baz1', '/path'],
679 ['lvchange', '--addtag', 'ceph.foo1=other1', '/path'],
681 # The order isn't guaranted
682 for call
in capture
.calls
:
683 assert call
['args'][0] in expected
684 assert len(capture
.calls
) == len(expected
)
686 def test_clear_tags(self
, monkeypatch
, capture
):
687 monkeypatch
.setattr(process
, 'run', capture
)
688 monkeypatch
.setattr(process
, 'call', capture
)
689 tags
= {'ceph.foo0': 'bar0', 'ceph.foo1': 'bar1', 'ceph.foo2': 'bar2'}
691 self
.foo_volume_clean
.set_tags(tags
)
692 assert self
.foo_volume_clean
.tags
== tags
693 self
.foo_volume_clean
.clear_tags()
694 assert self
.foo_volume_clean
.tags
== {}
697 ['lvchange', '--addtag', 'ceph.foo0=bar0', '/pathclean'],
698 ['lvchange', '--addtag', 'ceph.foo1=bar1', '/pathclean'],
699 ['lvchange', '--addtag', 'ceph.foo2=bar2', '/pathclean'],
700 ['lvchange', '--deltag', 'ceph.foo0=bar0', '/pathclean'],
701 ['lvchange', '--deltag', 'ceph.foo1=bar1', '/pathclean'],
702 ['lvchange', '--deltag', 'ceph.foo2=bar2', '/pathclean'],
704 # The order isn't guaranted
705 for call
in capture
.calls
:
706 assert call
['args'][0] in expected
707 assert len(capture
.calls
) == len(expected
)
710 class TestExtendVG(object):
713 self
.foo_volume
= api
.VolumeGroup(vg_name
='foo', lv_tags
='')
715 def test_uses_single_device_in_list(self
, monkeypatch
, fake_run
):
716 monkeypatch
.setattr(api
, 'get_vg', lambda **kw
: True)
717 api
.extend_vg(self
.foo_volume
, ['/dev/sda'])
718 expected
= ['vgextend', '--force', '--yes', 'foo', '/dev/sda']
719 assert fake_run
.calls
[0]['args'][0] == expected
721 def test_uses_single_device(self
, monkeypatch
, fake_run
):
722 monkeypatch
.setattr(api
, 'get_vg', lambda **kw
: True)
723 api
.extend_vg(self
.foo_volume
, '/dev/sda')
724 expected
= ['vgextend', '--force', '--yes', 'foo', '/dev/sda']
725 assert fake_run
.calls
[0]['args'][0] == expected
727 def test_uses_multiple_devices(self
, monkeypatch
, fake_run
):
728 monkeypatch
.setattr(api
, 'get_vg', lambda **kw
: True)
729 api
.extend_vg(self
.foo_volume
, ['/dev/sda', '/dev/sdb'])
730 expected
= ['vgextend', '--force', '--yes', 'foo', '/dev/sda', '/dev/sdb']
731 assert fake_run
.calls
[0]['args'][0] == expected
734 class TestReduceVG(object):
737 self
.foo_volume
= api
.VolumeGroup(vg_name
='foo', lv_tags
='')
739 def test_uses_single_device_in_list(self
, monkeypatch
, fake_run
):
740 monkeypatch
.setattr(api
, 'get_vg', lambda **kw
: True)
741 api
.reduce_vg(self
.foo_volume
, ['/dev/sda'])
742 expected
= ['vgreduce', '--force', '--yes', 'foo', '/dev/sda']
743 assert fake_run
.calls
[0]['args'][0] == expected
745 def test_uses_single_device(self
, monkeypatch
, fake_run
):
746 monkeypatch
.setattr(api
, 'get_vg', lambda **kw
: True)
747 api
.reduce_vg(self
.foo_volume
, '/dev/sda')
748 expected
= ['vgreduce', '--force', '--yes', 'foo', '/dev/sda']
749 assert fake_run
.calls
[0]['args'][0] == expected
751 def test_uses_multiple_devices(self
, monkeypatch
, fake_run
):
752 monkeypatch
.setattr(api
, 'get_vg', lambda **kw
: True)
753 api
.reduce_vg(self
.foo_volume
, ['/dev/sda', '/dev/sdb'])
754 expected
= ['vgreduce', '--force', '--yes', 'foo', '/dev/sda', '/dev/sdb']
755 assert fake_run
.calls
[0]['args'][0] == expected
758 class TestCreateVG(object):
761 self
.foo_volume
= api
.VolumeGroup(vg_name
='foo', lv_tags
='')
763 def test_no_name(self
, monkeypatch
, fake_run
):
764 monkeypatch
.setattr(api
, 'get_vg', lambda **kw
: True)
765 api
.create_vg('/dev/sda')
766 result
= fake_run
.calls
[0]['args'][0]
767 assert '/dev/sda' in result
768 assert result
[-2].startswith('ceph-')
770 def test_devices_list(self
, monkeypatch
, fake_run
):
771 monkeypatch
.setattr(api
, 'get_vg', lambda **kw
: True)
772 api
.create_vg(['/dev/sda', '/dev/sdb'], name
='ceph')
773 result
= fake_run
.calls
[0]['args'][0]
774 expected
= ['vgcreate', '--force', '--yes', 'ceph', '/dev/sda', '/dev/sdb']
775 assert result
== expected
777 def test_name_prefix(self
, monkeypatch
, fake_run
):
778 monkeypatch
.setattr(api
, 'get_vg', lambda **kw
: True)
779 api
.create_vg('/dev/sda', name_prefix
='master')
780 result
= fake_run
.calls
[0]['args'][0]
781 assert '/dev/sda' in result
782 assert result
[-2].startswith('master-')
784 def test_specific_name(self
, monkeypatch
, fake_run
):
785 monkeypatch
.setattr(api
, 'get_vg', lambda **kw
: True)
786 api
.create_vg('/dev/sda', name
='master')
787 result
= fake_run
.calls
[0]['args'][0]
788 assert '/dev/sda' in result
789 assert result
[-2] == 'master'
792 # The following tests are pretty gnarly. VDO detection is very convoluted and
793 # involves correlating information from device mappers, realpaths, slaves of
794 # those mappers, and parents or related mappers. This makes it very hard to
795 # patch nicely or keep tests short and readable. These tests are trying to
796 # ensure correctness, the better approach will be to do some functional testing
802 def disable_kvdo_path(monkeypatch
):
803 monkeypatch
.setattr('os.path.isdir', lambda x
, **kw
: False)
807 def enable_kvdo_path(monkeypatch
):
808 monkeypatch
.setattr('os.path.isdir', lambda x
, **kw
: True)
811 # Stub for os.listdir
814 class ListDir(object):
816 def __init__(self
, paths
):
818 self
._normalize
_paths
()
819 self
.listdir
= os
.listdir
821 def _normalize_paths(self
):
822 for k
, v
in self
.paths
.items():
823 self
.paths
[k
.rstrip('/')] = v
.rstrip('/')
825 def add(self
, original
, fake
):
826 self
.paths
[original
.rstrip('/')] = fake
.rstrip('/')
828 def __call__(self
, path
):
829 return self
.listdir(self
.paths
[path
.rstrip('/')])
832 @pytest.fixture(scope
='function')
833 def listdir(monkeypatch
):
834 def apply(paths
=None, stub
=None):
836 stub
= ListDir(paths
)
838 for original
, fake
in paths
.items():
839 stub
.add(original
, fake
)
841 monkeypatch
.setattr('os.listdir', stub
)
845 @pytest.fixture(scope
='function')
846 def makedirs(tmpdir
):
847 def create(directory
):
848 path
= os
.path
.join(str(tmpdir
), directory
)
851 create
.base
= str(tmpdir
)
855 class TestIsVdo(object):
857 def test_no_vdo_dir(self
, disable_kvdo_path
):
858 assert api
._is
_vdo
('/path') is False
860 def test_exceptions_return_false(self
, monkeypatch
):
863 monkeypatch
.setattr('ceph_volume.api.lvm._is_vdo', throw
)
864 assert api
.is_vdo('/path') == '0'
866 def test_is_vdo_returns_a_string(self
, monkeypatch
):
867 monkeypatch
.setattr('ceph_volume.api.lvm._is_vdo', lambda x
, **kw
: True)
868 assert api
.is_vdo('/path') == '1'
870 def test_kvdo_dir_no_devices(self
, makedirs
, enable_kvdo_path
, listdir
, monkeypatch
):
871 kvdo_path
= makedirs('sys/kvdo')
872 listdir(paths
={'/sys/kvdo': kvdo_path
})
873 monkeypatch
.setattr('ceph_volume.api.lvm._vdo_slaves', lambda x
, **kw
: [])
874 monkeypatch
.setattr('ceph_volume.api.lvm._vdo_parents', lambda x
, **kw
: [])
875 assert api
._is
_vdo
('/dev/mapper/vdo0') is False
877 def test_vdo_slaves_found_and_matched(self
, makedirs
, enable_kvdo_path
, listdir
, monkeypatch
):
878 kvdo_path
= makedirs('sys/kvdo')
879 listdir(paths
={'/sys/kvdo': kvdo_path
})
880 monkeypatch
.setattr('ceph_volume.api.lvm._vdo_slaves', lambda x
, **kw
: ['/dev/dm-3'])
881 monkeypatch
.setattr('ceph_volume.api.lvm._vdo_parents', lambda x
, **kw
: [])
882 assert api
._is
_vdo
('/dev/dm-3') is True
884 def test_vdo_parents_found_and_matched(self
, makedirs
, enable_kvdo_path
, listdir
, monkeypatch
):
885 kvdo_path
= makedirs('sys/kvdo')
886 listdir(paths
={'/sys/kvdo': kvdo_path
})
887 monkeypatch
.setattr('ceph_volume.api.lvm._vdo_slaves', lambda x
, **kw
: [])
888 monkeypatch
.setattr('ceph_volume.api.lvm._vdo_parents', lambda x
, **kw
: ['/dev/dm-4'])
889 assert api
._is
_vdo
('/dev/dm-4') is True
892 class TestVdoSlaves(object):
894 def test_slaves_are_not_found(self
, makedirs
, listdir
, monkeypatch
):
895 slaves_path
= makedirs('sys/block/vdo0/slaves')
896 listdir(paths
={'/sys/block/vdo0/slaves': slaves_path
})
897 monkeypatch
.setattr('ceph_volume.api.lvm.os.path.exists', lambda x
, **kw
: True)
898 result
= sorted(api
._vdo
_slaves
(['vdo0']))
899 assert '/dev/mapper/vdo0' in result
900 assert 'vdo0' in result
902 def test_slaves_are_found(self
, makedirs
, listdir
, monkeypatch
):
903 slaves_path
= makedirs('sys/block/vdo0/slaves')
904 makedirs('sys/block/vdo0/slaves/dm-4')
905 makedirs('dev/mapper/vdo0')
906 listdir(paths
={'/sys/block/vdo0/slaves': slaves_path
})
907 monkeypatch
.setattr('ceph_volume.api.lvm.os.path.exists', lambda x
, **kw
: True)
908 result
= sorted(api
._vdo
_slaves
(['vdo0']))
909 assert '/dev/dm-4' in result
910 assert 'dm-4' in result
913 class TestVDOParents(object):
915 def test_parents_are_found(self
, makedirs
, listdir
):
916 block_path
= makedirs('sys/block')
917 slaves_path
= makedirs('sys/block/dm-4/slaves')
918 makedirs('sys/block/dm-4/slaves/dm-3')
920 '/sys/block/dm-4/slaves': slaves_path
,
921 '/sys/block': block_path
})
922 result
= api
._vdo
_parents
(['dm-3'])
923 assert '/dev/dm-4' in result
924 assert 'dm-4' in result
926 def test_parents_are_not_found(self
, makedirs
, listdir
):
927 block_path
= makedirs('sys/block')
928 slaves_path
= makedirs('sys/block/dm-4/slaves')
929 makedirs('sys/block/dm-4/slaves/dm-5')
931 '/sys/block/dm-4/slaves': slaves_path
,
932 '/sys/block': block_path
})
933 result
= api
._vdo
_parents
(['dm-3'])
937 class TestSplitNameParser(object):
939 def test_keys_are_parsed_without_prefix(self
):
940 line
= ["DM_VG_NAME='/dev/mapper/vg';DM_LV_NAME='lv';DM_LV_LAYER=''"]
941 result
= api
._splitname
_parser
(line
)
942 assert result
['VG_NAME'] == 'vg'
943 assert result
['LV_NAME'] == 'lv'
944 assert result
['LV_LAYER'] == ''
946 def test_vg_name_sans_mapper(self
):
947 line
= ["DM_VG_NAME='/dev/mapper/vg';DM_LV_NAME='lv';DM_LV_LAYER=''"]
948 result
= api
._splitname
_parser
(line
)
949 assert '/dev/mapper' not in result
['VG_NAME']
952 class TestIsLV(object):
954 def test_is_not_an_lv(self
, monkeypatch
):
955 monkeypatch
.setattr(api
.process
, 'call', lambda x
, **kw
: ('', '', 0))
956 monkeypatch
.setattr(api
, 'dmsetup_splitname', lambda x
, **kw
: {})
957 assert api
.is_lv('/dev/sda1', lvs
=[]) is False
959 def test_lvs_not_found(self
, monkeypatch
, volumes
):
960 CephVolume
= api
.Volume(lv_name
='foo', lv_path
='/dev/vg/foo', lv_tags
="ceph.type=data")
961 volumes
.append(CephVolume
)
962 splitname
= {'LV_NAME': 'data', 'VG_NAME': 'ceph'}
963 monkeypatch
.setattr(api
, 'dmsetup_splitname', lambda x
, **kw
: splitname
)
964 assert api
.is_lv('/dev/sda1', lvs
=volumes
) is False
966 def test_is_lv(self
, monkeypatch
, volumes
):
967 CephVolume
= api
.Volume(
968 vg_name
='ceph', lv_name
='data',
969 lv_path
='/dev/vg/foo', lv_tags
="ceph.type=data"
971 volumes
.append(CephVolume
)
972 splitname
= {'LV_NAME': 'data', 'VG_NAME': 'ceph'}
973 monkeypatch
.setattr(api
, 'dmsetup_splitname', lambda x
, **kw
: splitname
)
974 assert api
.is_lv('/dev/sda1', lvs
=volumes
) is True
976 class TestGetDeviceVgs(object):
978 @patch('ceph_volume.process.call')
979 @patch('ceph_volume.api.lvm._output_parser')
980 def test_get_device_vgs_with_empty_pv(self
, patched_output_parser
, pcall
):
981 patched_output_parser
.return_value
= [{'vg_name': ''}]
982 pcall
.return_value
= ('', '', '')
983 vgs
= api
.get_device_vgs('/dev/foo')
986 class TestGetDeviceLvs(object):
988 @patch('ceph_volume.process.call')
989 @patch('ceph_volume.api.lvm._output_parser')
990 def test_get_device_lvs_with_empty_vg(self
, patched_output_parser
, pcall
):
991 patched_output_parser
.return_value
= [{'lv_name': ''}]
992 pcall
.return_value
= ('', '', '')
993 vgs
= api
.get_device_lvs('/dev/foo')