]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/api/test_lvm.py
adfa30c3bbbec3aedb0299494d5422cb8e203971
3 from ceph_volume
import process
, exceptions
4 from ceph_volume
.api
import lvm
as api
7 class TestParseTags(object):
9 def test_no_tags_means_empty_dict(self
):
10 result
= api
.parse_tags('')
13 def test_single_tag_gets_parsed(self
):
14 result
= api
.parse_tags('ceph.osd_something=1')
15 assert result
== {'ceph.osd_something': '1'}
17 def test_non_ceph_tags_are_skipped(self
):
18 result
= api
.parse_tags('foo')
21 def test_mixed_non_ceph_tags(self
):
22 result
= api
.parse_tags('foo,ceph.bar=1')
23 assert result
== {'ceph.bar': '1'}
25 def test_multiple_csv_expands_in_dict(self
):
26 result
= api
.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000')
27 # assert them piecemeal to avoid the un-ordered dict nature
28 assert result
['ceph.osd_something'] == '1'
29 assert result
['ceph.foo'] == '2'
30 assert result
['ceph.fsid'] == '0000'
33 class TestGetAPIVgs(object):
35 def test_report_is_emtpy(self
, monkeypatch
):
36 monkeypatch
.setattr(api
.process
, 'call', lambda x
: ('\n\n', '', 0))
37 assert api
.get_api_vgs() == []
39 def test_report_has_stuff(self
, monkeypatch
):
40 report
= [' VolGroup00']
41 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
42 assert api
.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
44 def test_report_has_stuff_with_empty_attrs(self
, monkeypatch
):
45 report
= [' VolGroup00 ;;;;;;9g']
46 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
47 result
= api
.get_api_vgs()[0]
48 assert len(result
.keys()) == 7
49 assert result
['vg_name'] == 'VolGroup00'
50 assert result
['vg_free'] == '9g'
52 def test_report_has_multiple_items(self
, monkeypatch
):
53 report
= [' VolGroup00;;;;;;;', ' ceph_vg;;;;;;;']
54 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
55 result
= api
.get_api_vgs()
56 assert result
[0]['vg_name'] == 'VolGroup00'
57 assert result
[1]['vg_name'] == 'ceph_vg'
60 class TestGetAPILvs(object):
62 def test_report_is_emtpy(self
, monkeypatch
):
63 monkeypatch
.setattr(api
.process
, 'call', lambda x
: ('', '', 0))
64 assert api
.get_api_lvs() == []
66 def test_report_has_stuff(self
, monkeypatch
):
67 report
= [' ;/path;VolGroup00;root']
68 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
69 result
= api
.get_api_lvs()
70 assert result
[0]['lv_name'] == 'VolGroup00'
72 def test_report_has_multiple_items(self
, monkeypatch
):
73 report
= [' ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg']
74 monkeypatch
.setattr(api
.process
, 'call', lambda x
: (report
, '', 0))
75 result
= api
.get_api_lvs()
76 assert result
[0]['lv_name'] == 'VolName'
77 assert result
[1]['lv_name'] == 'ceph_lv'
81 def volumes(monkeypatch
):
82 monkeypatch
.setattr(process
, 'call', lambda x
: ('', '', 0))
83 volumes
= api
.Volumes()
85 # also patch api.Volumes so that when it is called, it will use the newly
86 # created fixture, with whatever the test method wants to append to it
87 monkeypatch
.setattr(api
, 'Volumes', lambda: volumes
)
92 def pvolumes(monkeypatch
):
93 monkeypatch
.setattr(process
, 'call', lambda x
: ('', '', 0))
94 pvolumes
= api
.PVolumes()
100 def volume_groups(monkeypatch
):
101 monkeypatch
.setattr(process
, 'call', lambda x
: ('', '', 0))
102 vgs
= api
.VolumeGroups()
107 class TestGetLV(object):
109 def test_nothing_is_passed_in(self
):
110 # so we return a None
111 assert api
.get_lv() is None
113 def test_single_lv_is_matched(self
, volumes
, monkeypatch
):
114 FooVolume
= api
.Volume(lv_name
='foo', lv_path
='/dev/vg/foo', lv_tags
="ceph.type=data")
115 volumes
.append(FooVolume
)
116 monkeypatch
.setattr(api
, 'Volumes', lambda: volumes
)
117 assert api
.get_lv(lv_name
='foo') == FooVolume
119 def test_single_lv_is_matched_by_uuid(self
, volumes
, monkeypatch
):
120 FooVolume
= api
.Volume(
121 lv_name
='foo', lv_path
='/dev/vg/foo',
122 lv_uuid
='1111', lv_tags
="ceph.type=data")
123 volumes
.append(FooVolume
)
124 monkeypatch
.setattr(api
, 'Volumes', lambda: volumes
)
125 assert api
.get_lv(lv_uuid
='1111') == FooVolume
128 class TestGetPV(object):
130 def test_nothing_is_passed_in(self
):
131 # so we return a None
132 assert api
.get_pv() is None
134 def test_single_pv_is_not_matched(self
, pvolumes
, monkeypatch
):
135 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={}, vg_name
="vg")
136 pvolumes
.append(FooPVolume
)
137 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
138 assert api
.get_pv(pv_uuid
='foo') is None
140 def test_single_pv_is_matched(self
, pvolumes
, monkeypatch
):
141 FooPVolume
= api
.PVolume(vg_name
="vg", pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={})
142 pvolumes
.append(FooPVolume
)
143 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
144 assert api
.get_pv(pv_uuid
='0000') == FooPVolume
146 def test_single_pv_is_matched_by_uuid(self
, pvolumes
, monkeypatch
):
147 FooPVolume
= api
.PVolume(
148 pv_name
='/dev/vg/foo',
149 pv_uuid
='1111', pv_tags
="ceph.type=data", vg_name
="vg")
150 pvolumes
.append(FooPVolume
)
151 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
152 assert api
.get_pv(pv_uuid
='1111') == FooPVolume
154 def test_vg_name_is_set(self
, pvolumes
, monkeypatch
):
155 FooPVolume
= api
.PVolume(
156 pv_name
='/dev/vg/foo',
157 pv_uuid
='1111', pv_tags
="ceph.type=data", vg_name
="vg")
158 pvolumes
.append(FooPVolume
)
159 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
160 pv
= api
.get_pv(pv_name
="/dev/vg/foo")
161 assert pv
.vg_name
== "vg"
164 class TestPVolumes(object):
166 def test_filter_by_tag_does_not_match_one(self
, pvolumes
, monkeypatch
):
167 pv_tags
= "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa"
168 FooPVolume
= api
.PVolume(
169 pv_name
='/dev/vg/foo',
170 pv_uuid
='1111', pv_tags
=pv_tags
, vg_name
='vg')
171 pvolumes
.append(FooPVolume
)
172 pvolumes
.filter(pv_tags
={'ceph.type': 'journal', 'ceph.osd_id': '2'})
173 assert pvolumes
== []
175 def test_filter_by_tags_matches(self
, pvolumes
, monkeypatch
):
176 pv_tags
= "ceph.type=journal,ceph.osd_id=1"
177 FooPVolume
= api
.PVolume(
178 pv_name
='/dev/vg/foo',
179 pv_uuid
='1111', pv_tags
=pv_tags
, vg_name
="vg")
180 pvolumes
.append(FooPVolume
)
181 pvolumes
.filter(pv_tags
={'ceph.type': 'journal', 'ceph.osd_id': '1'})
182 assert pvolumes
== [FooPVolume
]
185 class TestGetVG(object):
187 def test_nothing_is_passed_in(self
):
188 # so we return a None
189 assert api
.get_vg() is None
191 def test_single_vg_is_matched(self
, volume_groups
, monkeypatch
):
192 FooVG
= api
.VolumeGroup(vg_name
='foo')
193 volume_groups
.append(FooVG
)
194 monkeypatch
.setattr(api
, 'VolumeGroups', lambda: volume_groups
)
195 assert api
.get_vg(vg_name
='foo') == FooVG
198 class TestVolumes(object):
200 def test_volume_get_has_no_volumes(self
, volumes
):
201 assert volumes
.get() is None
203 def test_volume_get_filtered_has_no_volumes(self
, volumes
):
204 assert volumes
.get(lv_name
='ceph') is None
206 def test_volume_has_multiple_matches(self
, volumes
):
207 volume1
= volume2
= api
.Volume(lv_name
='foo', lv_path
='/dev/vg/lv', lv_tags
='')
208 volumes
.append(volume1
)
209 volumes
.append(volume2
)
210 with pytest
.raises(exceptions
.MultipleLVsError
):
211 volumes
.get(lv_name
='foo')
213 def test_as_dict_infers_type_from_tags(self
, volumes
):
214 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
215 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
217 result
= volumes
.get(lv_tags
={'ceph.type': 'data'}).as_dict()
218 assert result
['type'] == 'data'
220 def test_as_dict_populates_path_from_lv_api(self
, volumes
):
221 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
222 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
224 result
= volumes
.get(lv_tags
={'ceph.type': 'data'}).as_dict()
225 assert result
['path'] == '/dev/vg/lv'
227 def test_find_the_correct_one(self
, volumes
):
228 volume1
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
='')
229 volume2
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='')
230 volumes
.append(volume1
)
231 volumes
.append(volume2
)
232 assert volumes
.get(lv_name
='volume1') == volume1
234 def test_filter_by_tag(self
, volumes
):
235 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
236 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
237 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='ceph.type=journal')
239 volumes
.append(journal
)
240 volumes
.filter(lv_tags
={'ceph.type': 'data'})
241 assert len(volumes
) == 1
242 assert volumes
[0].lv_name
== 'volume1'
244 def test_filter_by_tag_does_not_match_one(self
, volumes
):
245 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
246 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/vg/lv', lv_tags
=lv_tags
)
247 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/vg/lv', lv_tags
='ceph.osd_id=1,ceph.type=journal')
249 volumes
.append(journal
)
250 # note the different osd_id!
251 volumes
.filter(lv_tags
={'ceph.type': 'data', 'ceph.osd_id': '2'})
254 def test_filter_by_vg_name(self
, volumes
):
255 lv_tags
= "ceph.type=data,ceph.fsid=000-aaa"
256 osd
= api
.Volume(lv_name
='volume1', vg_name
='ceph_vg', lv_tags
=lv_tags
)
257 journal
= api
.Volume(lv_name
='volume2', vg_name
='system_vg', lv_tags
='ceph.type=journal')
259 volumes
.append(journal
)
260 volumes
.filter(vg_name
='ceph_vg')
261 assert len(volumes
) == 1
262 assert volumes
[0].lv_name
== 'volume1'
264 def test_filter_by_lv_path(self
, volumes
):
265 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/volume1', lv_tags
='')
266 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/volume2', lv_tags
='')
268 volumes
.append(journal
)
269 volumes
.filter(lv_path
='/dev/volume1')
270 assert len(volumes
) == 1
271 assert volumes
[0].lv_name
== 'volume1'
273 def test_filter_by_lv_uuid(self
, volumes
):
274 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/volume1', lv_uuid
='1111', lv_tags
='')
275 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/volume2', lv_uuid
='', lv_tags
='')
277 volumes
.append(journal
)
278 volumes
.filter(lv_uuid
='1111')
279 assert len(volumes
) == 1
280 assert volumes
[0].lv_name
== 'volume1'
282 def test_filter_by_lv_uuid_nothing_found(self
, volumes
):
283 osd
= api
.Volume(lv_name
='volume1', lv_path
='/dev/volume1', lv_uuid
='1111', lv_tags
='')
284 journal
= api
.Volume(lv_name
='volume2', lv_path
='/dev/volume2', lv_uuid
='', lv_tags
='')
286 volumes
.append(journal
)
287 volumes
.filter(lv_uuid
='22222')
290 def test_filter_requires_params(self
, volumes
):
291 with pytest
.raises(TypeError):
295 class TestVolumeGroups(object):
297 def test_volume_get_has_no_volume_groups(self
, volume_groups
):
298 assert volume_groups
.get() is None
300 def test_volume_get_filtered_has_no_volumes(self
, volume_groups
):
301 assert volume_groups
.get(vg_name
='ceph') is None
303 def test_volume_has_multiple_matches(self
, volume_groups
):
304 volume1
= volume2
= api
.VolumeGroup(vg_name
='foo', lv_path
='/dev/vg/lv', lv_tags
='')
305 volume_groups
.append(volume1
)
306 volume_groups
.append(volume2
)
307 with pytest
.raises(exceptions
.MultipleVGsError
):
308 volume_groups
.get(vg_name
='foo')
310 def test_find_the_correct_one(self
, volume_groups
):
311 volume1
= api
.VolumeGroup(vg_name
='volume1', lv_tags
='')
312 volume2
= api
.VolumeGroup(vg_name
='volume2', lv_tags
='')
313 volume_groups
.append(volume1
)
314 volume_groups
.append(volume2
)
315 assert volume_groups
.get(vg_name
='volume1') == volume1
317 def test_filter_by_tag(self
, volume_groups
):
318 vg_tags
= "ceph.group=dmcache"
319 osd
= api
.VolumeGroup(vg_name
='volume1', vg_tags
=vg_tags
)
320 journal
= api
.VolumeGroup(vg_name
='volume2', vg_tags
='ceph.group=plain')
321 volume_groups
.append(osd
)
322 volume_groups
.append(journal
)
323 volume_groups
.filter(vg_tags
={'ceph.group': 'dmcache'})
324 assert len(volume_groups
) == 1
325 assert volume_groups
[0].vg_name
== 'volume1'
327 def test_filter_by_tag_does_not_match_one(self
, volume_groups
):
328 vg_tags
= "ceph.group=dmcache,ceph.disk_type=ssd"
329 osd
= api
.VolumeGroup(vg_name
='volume1', vg_path
='/dev/vg/lv', vg_tags
=vg_tags
)
330 volume_groups
.append(osd
)
331 volume_groups
.filter(vg_tags
={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
332 assert volume_groups
== []
334 def test_filter_by_vg_name(self
, volume_groups
):
335 vg_tags
= "ceph.type=data,ceph.fsid=000-aaa"
336 osd
= api
.VolumeGroup(vg_name
='ceph_vg', vg_tags
=vg_tags
)
337 journal
= api
.VolumeGroup(vg_name
='volume2', vg_tags
='ceph.type=journal')
338 volume_groups
.append(osd
)
339 volume_groups
.append(journal
)
340 volume_groups
.filter(vg_name
='ceph_vg')
341 assert len(volume_groups
) == 1
342 assert volume_groups
[0].vg_name
== 'ceph_vg'
344 def test_filter_requires_params(self
, volume_groups
):
345 with pytest
.raises(TypeError):
346 volume_groups
.filter()
349 class TestGetLVFromArgument(object):
352 self
.foo_volume
= api
.Volume(
353 lv_name
='foo', lv_path
='/path/to/lv',
354 vg_name
='foo_group', lv_tags
=''
357 def test_non_absolute_path_is_not_valid(self
, volumes
):
358 volumes
.append(self
.foo_volume
)
359 assert api
.get_lv_from_argument('foo') is None
361 def test_too_many_slashes_is_invalid(self
, volumes
):
362 volumes
.append(self
.foo_volume
)
363 assert api
.get_lv_from_argument('path/to/lv') is None
365 def test_absolute_path_is_not_lv(self
, volumes
):
366 volumes
.append(self
.foo_volume
)
367 assert api
.get_lv_from_argument('/path') is None
369 def test_absolute_path_is_lv(self
, volumes
):
370 volumes
.append(self
.foo_volume
)
371 assert api
.get_lv_from_argument('/path/to/lv') == self
.foo_volume
374 class TestRemoveLV(object):
376 def test_removes_lv(self
, monkeypatch
):
377 def mock_call(cmd
, **kw
):
379 monkeypatch
.setattr(process
, 'call', mock_call
)
380 assert api
.remove_lv("vg/lv")
382 def test_fails_to_remove_lv(self
, monkeypatch
):
383 def mock_call(cmd
, **kw
):
385 monkeypatch
.setattr(process
, 'call', mock_call
)
386 with pytest
.raises(RuntimeError):
387 api
.remove_lv("vg/lv")
390 class TestCreateLV(object):
393 self
.foo_volume
= api
.Volume(lv_name
='foo', lv_path
='/path', vg_name
='foo_group', lv_tags
='')
395 def test_uses_size(self
, monkeypatch
, capture
):
396 monkeypatch
.setattr(process
, 'run', capture
)
397 monkeypatch
.setattr(process
, 'call', capture
)
398 monkeypatch
.setattr(api
, 'get_lv', lambda *a
, **kw
: self
.foo_volume
)
399 api
.create_lv('foo', 'foo_group', size
='5G', tags
={'ceph.type': 'data'})
400 expected
= ['lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group']
401 assert capture
.calls
[0]['args'][0] == expected
403 def test_calls_to_set_type_tag(self
, monkeypatch
, capture
):
404 monkeypatch
.setattr(process
, 'run', capture
)
405 monkeypatch
.setattr(process
, 'call', capture
)
406 monkeypatch
.setattr(api
, 'get_lv', lambda *a
, **kw
: self
.foo_volume
)
407 api
.create_lv('foo', 'foo_group', size
='5G', tags
={'ceph.type': 'data'})
408 ceph_tag
= ['lvchange', '--addtag', 'ceph.type=data', '/path']
409 assert capture
.calls
[1]['args'][0] == ceph_tag
411 def test_calls_to_set_data_tag(self
, monkeypatch
, capture
):
412 monkeypatch
.setattr(process
, 'run', capture
)
413 monkeypatch
.setattr(process
, 'call', capture
)
414 monkeypatch
.setattr(api
, 'get_lv', lambda *a
, **kw
: self
.foo_volume
)
415 api
.create_lv('foo', 'foo_group', size
='5G', tags
={'ceph.type': 'data'})
416 data_tag
= ['lvchange', '--addtag', 'ceph.data_device=/path', '/path']
417 assert capture
.calls
[2]['args'][0] == data_tag
421 # The following tests are pretty gnarly. VDO detection is very convoluted and
422 # involves correlating information from device mappers, realpaths, slaves of
423 # those mappers, and parents or related mappers. This makes it very hard to
424 # patch nicely or keep tests short and readable. These tests are trying to
425 # ensure correctness, the better approach will be to do some functional testing
431 def disable_kvdo_path(monkeypatch
):
432 monkeypatch
.setattr('os.path.isdir', lambda x
: False)
436 def enable_kvdo_path(monkeypatch
):
437 monkeypatch
.setattr('os.path.isdir', lambda x
: True)
440 # Stub for os.listdir
443 class ListDir(object):
445 def __init__(self
, paths
):
447 self
._normalize
_paths
()
448 self
.listdir
= os
.listdir
450 def _normalize_paths(self
):
451 for k
, v
in self
.paths
.items():
452 self
.paths
[k
.rstrip('/')] = v
.rstrip('/')
454 def add(self
, original
, fake
):
455 self
.paths
[original
.rstrip('/')] = fake
.rstrip('/')
457 def __call__(self
, path
):
458 return self
.listdir(self
.paths
[path
.rstrip('/')])
461 @pytest.fixture(scope
='function')
462 def listdir(monkeypatch
):
463 def apply(paths
=None, stub
=None):
465 stub
= ListDir(paths
)
467 for original
, fake
in paths
.items():
468 stub
.add(original
, fake
)
470 monkeypatch
.setattr('os.listdir', stub
)
474 @pytest.fixture(scope
='function')
475 def makedirs(tmpdir
):
476 def create(directory
):
477 path
= os
.path
.join(str(tmpdir
), directory
)
480 create
.base
= str(tmpdir
)
484 class TestIsVdo(object):
486 def test_no_vdo_dir(self
, disable_kvdo_path
):
487 assert api
._is
_vdo
('/path') is False
489 def test_exceptions_return_false(self
, monkeypatch
):
492 monkeypatch
.setattr('ceph_volume.api.lvm._is_vdo', throw
)
493 assert api
.is_vdo('/path') == '0'
495 def test_is_vdo_returns_a_string(self
, monkeypatch
):
496 monkeypatch
.setattr('ceph_volume.api.lvm._is_vdo', lambda x
: True)
497 assert api
.is_vdo('/path') == '1'
499 def test_kvdo_dir_no_devices(self
, makedirs
, enable_kvdo_path
, listdir
, monkeypatch
):
500 kvdo_path
= makedirs('sys/kvdo')
501 listdir(paths
={'/sys/kvdo': kvdo_path
})
502 monkeypatch
.setattr('ceph_volume.api.lvm._vdo_slaves', lambda x
: [])
503 monkeypatch
.setattr('ceph_volume.api.lvm._vdo_parents', lambda x
: [])
504 assert api
._is
_vdo
('/dev/mapper/vdo0') is False
506 def test_vdo_slaves_found_and_matched(self
, makedirs
, enable_kvdo_path
, listdir
, monkeypatch
):
507 kvdo_path
= makedirs('sys/kvdo')
508 listdir(paths
={'/sys/kvdo': kvdo_path
})
509 monkeypatch
.setattr('ceph_volume.api.lvm._vdo_slaves', lambda x
: ['/dev/dm-3'])
510 monkeypatch
.setattr('ceph_volume.api.lvm._vdo_parents', lambda x
: [])
511 assert api
._is
_vdo
('/dev/dm-3') is True
513 def test_vdo_parents_found_and_matched(self
, makedirs
, enable_kvdo_path
, listdir
, monkeypatch
):
514 kvdo_path
= makedirs('sys/kvdo')
515 listdir(paths
={'/sys/kvdo': kvdo_path
})
516 monkeypatch
.setattr('ceph_volume.api.lvm._vdo_slaves', lambda x
: [])
517 monkeypatch
.setattr('ceph_volume.api.lvm._vdo_parents', lambda x
: ['/dev/dm-4'])
518 assert api
._is
_vdo
('/dev/dm-4') is True
521 class TestVdoSlaves(object):
523 def test_slaves_are_not_found(self
, makedirs
, listdir
, monkeypatch
):
524 slaves_path
= makedirs('sys/block/vdo0/slaves')
525 listdir(paths
={'/sys/block/vdo0/slaves': slaves_path
})
526 monkeypatch
.setattr('ceph_volume.api.lvm.os.path.exists', lambda x
: True)
527 result
= sorted(api
._vdo
_slaves
(['vdo0']))
528 assert '/dev/mapper/vdo0' in result
529 assert 'vdo0' in result
531 def test_slaves_are_found(self
, makedirs
, listdir
, monkeypatch
):
532 slaves_path
= makedirs('sys/block/vdo0/slaves')
533 makedirs('sys/block/vdo0/slaves/dm-4')
534 makedirs('dev/mapper/vdo0')
535 listdir(paths
={'/sys/block/vdo0/slaves': slaves_path
})
536 monkeypatch
.setattr('ceph_volume.api.lvm.os.path.exists', lambda x
: True)
537 result
= sorted(api
._vdo
_slaves
(['vdo0']))
538 assert '/dev/dm-4' in result
539 assert 'dm-4' in result
542 class TestVDOParents(object):
544 def test_parents_are_found(self
, makedirs
, listdir
):
545 block_path
= makedirs('sys/block')
546 slaves_path
= makedirs('sys/block/dm-4/slaves')
547 makedirs('sys/block/dm-4/slaves/dm-3')
549 '/sys/block/dm-4/slaves': slaves_path
,
550 '/sys/block': block_path
})
551 result
= api
._vdo
_parents
(['dm-3'])
552 assert '/dev/dm-4' in result
553 assert 'dm-4' in result
555 def test_parents_are_not_found(self
, makedirs
, listdir
):
556 block_path
= makedirs('sys/block')
557 slaves_path
= makedirs('sys/block/dm-4/slaves')
558 makedirs('sys/block/dm-4/slaves/dm-5')
560 '/sys/block/dm-4/slaves': slaves_path
,
561 '/sys/block': block_path
})
562 result
= api
._vdo
_parents
(['dm-3'])