]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/api/test_lvm.py
adfa30c3bbbec3aedb0299494d5422cb8e203971
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / api / test_lvm.py
1 import os
2 import pytest
3 from ceph_volume import process, exceptions
4 from ceph_volume.api import lvm as api
5
6
7 class TestParseTags(object):
8
9 def test_no_tags_means_empty_dict(self):
10 result = api.parse_tags('')
11 assert result == {}
12
13 def test_single_tag_gets_parsed(self):
14 result = api.parse_tags('ceph.osd_something=1')
15 assert result == {'ceph.osd_something': '1'}
16
17 def test_non_ceph_tags_are_skipped(self):
18 result = api.parse_tags('foo')
19 assert result == {}
20
21 def test_mixed_non_ceph_tags(self):
22 result = api.parse_tags('foo,ceph.bar=1')
23 assert result == {'ceph.bar': '1'}
24
25 def test_multiple_csv_expands_in_dict(self):
26 result = api.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000')
27 # assert them piecemeal to avoid the un-ordered dict nature
28 assert result['ceph.osd_something'] == '1'
29 assert result['ceph.foo'] == '2'
30 assert result['ceph.fsid'] == '0000'
31
32
33 class TestGetAPIVgs(object):
34
35 def test_report_is_emtpy(self, monkeypatch):
36 monkeypatch.setattr(api.process, 'call', lambda x: ('\n\n', '', 0))
37 assert api.get_api_vgs() == []
38
39 def test_report_has_stuff(self, monkeypatch):
40 report = [' VolGroup00']
41 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
42 assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
43
44 def test_report_has_stuff_with_empty_attrs(self, monkeypatch):
45 report = [' VolGroup00 ;;;;;;9g']
46 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
47 result = api.get_api_vgs()[0]
48 assert len(result.keys()) == 7
49 assert result['vg_name'] == 'VolGroup00'
50 assert result['vg_free'] == '9g'
51
52 def test_report_has_multiple_items(self, monkeypatch):
53 report = [' VolGroup00;;;;;;;', ' ceph_vg;;;;;;;']
54 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
55 result = api.get_api_vgs()
56 assert result[0]['vg_name'] == 'VolGroup00'
57 assert result[1]['vg_name'] == 'ceph_vg'
58
59
60 class TestGetAPILvs(object):
61
62 def test_report_is_emtpy(self, monkeypatch):
63 monkeypatch.setattr(api.process, 'call', lambda x: ('', '', 0))
64 assert api.get_api_lvs() == []
65
66 def test_report_has_stuff(self, monkeypatch):
67 report = [' ;/path;VolGroup00;root']
68 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
69 result = api.get_api_lvs()
70 assert result[0]['lv_name'] == 'VolGroup00'
71
72 def test_report_has_multiple_items(self, monkeypatch):
73 report = [' ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg']
74 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
75 result = api.get_api_lvs()
76 assert result[0]['lv_name'] == 'VolName'
77 assert result[1]['lv_name'] == 'ceph_lv'
78
79
80 @pytest.fixture
81 def volumes(monkeypatch):
82 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
83 volumes = api.Volumes()
84 volumes._purge()
85 # also patch api.Volumes so that when it is called, it will use the newly
86 # created fixture, with whatever the test method wants to append to it
87 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
88 return volumes
89
90
91 @pytest.fixture
92 def pvolumes(monkeypatch):
93 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
94 pvolumes = api.PVolumes()
95 pvolumes._purge()
96 return pvolumes
97
98
99 @pytest.fixture
100 def volume_groups(monkeypatch):
101 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
102 vgs = api.VolumeGroups()
103 vgs._purge()
104 return vgs
105
106
107 class TestGetLV(object):
108
109 def test_nothing_is_passed_in(self):
110 # so we return a None
111 assert api.get_lv() is None
112
113 def test_single_lv_is_matched(self, volumes, monkeypatch):
114 FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data")
115 volumes.append(FooVolume)
116 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
117 assert api.get_lv(lv_name='foo') == FooVolume
118
119 def test_single_lv_is_matched_by_uuid(self, volumes, monkeypatch):
120 FooVolume = api.Volume(
121 lv_name='foo', lv_path='/dev/vg/foo',
122 lv_uuid='1111', lv_tags="ceph.type=data")
123 volumes.append(FooVolume)
124 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
125 assert api.get_lv(lv_uuid='1111') == FooVolume
126
127
128 class TestGetPV(object):
129
130 def test_nothing_is_passed_in(self):
131 # so we return a None
132 assert api.get_pv() is None
133
134 def test_single_pv_is_not_matched(self, pvolumes, monkeypatch):
135 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={}, vg_name="vg")
136 pvolumes.append(FooPVolume)
137 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
138 assert api.get_pv(pv_uuid='foo') is None
139
140 def test_single_pv_is_matched(self, pvolumes, monkeypatch):
141 FooPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
142 pvolumes.append(FooPVolume)
143 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
144 assert api.get_pv(pv_uuid='0000') == FooPVolume
145
146 def test_single_pv_is_matched_by_uuid(self, pvolumes, monkeypatch):
147 FooPVolume = api.PVolume(
148 pv_name='/dev/vg/foo',
149 pv_uuid='1111', pv_tags="ceph.type=data", vg_name="vg")
150 pvolumes.append(FooPVolume)
151 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
152 assert api.get_pv(pv_uuid='1111') == FooPVolume
153
154 def test_vg_name_is_set(self, pvolumes, monkeypatch):
155 FooPVolume = api.PVolume(
156 pv_name='/dev/vg/foo',
157 pv_uuid='1111', pv_tags="ceph.type=data", vg_name="vg")
158 pvolumes.append(FooPVolume)
159 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
160 pv = api.get_pv(pv_name="/dev/vg/foo")
161 assert pv.vg_name == "vg"
162
163
164 class TestPVolumes(object):
165
166 def test_filter_by_tag_does_not_match_one(self, pvolumes, monkeypatch):
167 pv_tags = "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa"
168 FooPVolume = api.PVolume(
169 pv_name='/dev/vg/foo',
170 pv_uuid='1111', pv_tags=pv_tags, vg_name='vg')
171 pvolumes.append(FooPVolume)
172 pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '2'})
173 assert pvolumes == []
174
175 def test_filter_by_tags_matches(self, pvolumes, monkeypatch):
176 pv_tags = "ceph.type=journal,ceph.osd_id=1"
177 FooPVolume = api.PVolume(
178 pv_name='/dev/vg/foo',
179 pv_uuid='1111', pv_tags=pv_tags, vg_name="vg")
180 pvolumes.append(FooPVolume)
181 pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '1'})
182 assert pvolumes == [FooPVolume]
183
184
185 class TestGetVG(object):
186
187 def test_nothing_is_passed_in(self):
188 # so we return a None
189 assert api.get_vg() is None
190
191 def test_single_vg_is_matched(self, volume_groups, monkeypatch):
192 FooVG = api.VolumeGroup(vg_name='foo')
193 volume_groups.append(FooVG)
194 monkeypatch.setattr(api, 'VolumeGroups', lambda: volume_groups)
195 assert api.get_vg(vg_name='foo') == FooVG
196
197
198 class TestVolumes(object):
199
200 def test_volume_get_has_no_volumes(self, volumes):
201 assert volumes.get() is None
202
203 def test_volume_get_filtered_has_no_volumes(self, volumes):
204 assert volumes.get(lv_name='ceph') is None
205
206 def test_volume_has_multiple_matches(self, volumes):
207 volume1 = volume2 = api.Volume(lv_name='foo', lv_path='/dev/vg/lv', lv_tags='')
208 volumes.append(volume1)
209 volumes.append(volume2)
210 with pytest.raises(exceptions.MultipleLVsError):
211 volumes.get(lv_name='foo')
212
213 def test_as_dict_infers_type_from_tags(self, volumes):
214 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
215 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
216 volumes.append(osd)
217 result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict()
218 assert result['type'] == 'data'
219
220 def test_as_dict_populates_path_from_lv_api(self, volumes):
221 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
222 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
223 volumes.append(osd)
224 result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict()
225 assert result['path'] == '/dev/vg/lv'
226
227 def test_find_the_correct_one(self, volumes):
228 volume1 = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags='')
229 volume2 = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='')
230 volumes.append(volume1)
231 volumes.append(volume2)
232 assert volumes.get(lv_name='volume1') == volume1
233
234 def test_filter_by_tag(self, volumes):
235 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
236 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
237 journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.type=journal')
238 volumes.append(osd)
239 volumes.append(journal)
240 volumes.filter(lv_tags={'ceph.type': 'data'})
241 assert len(volumes) == 1
242 assert volumes[0].lv_name == 'volume1'
243
244 def test_filter_by_tag_does_not_match_one(self, volumes):
245 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
246 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
247 journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.osd_id=1,ceph.type=journal')
248 volumes.append(osd)
249 volumes.append(journal)
250 # note the different osd_id!
251 volumes.filter(lv_tags={'ceph.type': 'data', 'ceph.osd_id': '2'})
252 assert volumes == []
253
254 def test_filter_by_vg_name(self, volumes):
255 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
256 osd = api.Volume(lv_name='volume1', vg_name='ceph_vg', lv_tags=lv_tags)
257 journal = api.Volume(lv_name='volume2', vg_name='system_vg', lv_tags='ceph.type=journal')
258 volumes.append(osd)
259 volumes.append(journal)
260 volumes.filter(vg_name='ceph_vg')
261 assert len(volumes) == 1
262 assert volumes[0].lv_name == 'volume1'
263
264 def test_filter_by_lv_path(self, volumes):
265 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_tags='')
266 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_tags='')
267 volumes.append(osd)
268 volumes.append(journal)
269 volumes.filter(lv_path='/dev/volume1')
270 assert len(volumes) == 1
271 assert volumes[0].lv_name == 'volume1'
272
273 def test_filter_by_lv_uuid(self, volumes):
274 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
275 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
276 volumes.append(osd)
277 volumes.append(journal)
278 volumes.filter(lv_uuid='1111')
279 assert len(volumes) == 1
280 assert volumes[0].lv_name == 'volume1'
281
282 def test_filter_by_lv_uuid_nothing_found(self, volumes):
283 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
284 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
285 volumes.append(osd)
286 volumes.append(journal)
287 volumes.filter(lv_uuid='22222')
288 assert volumes == []
289
290 def test_filter_requires_params(self, volumes):
291 with pytest.raises(TypeError):
292 volumes.filter()
293
294
295 class TestVolumeGroups(object):
296
297 def test_volume_get_has_no_volume_groups(self, volume_groups):
298 assert volume_groups.get() is None
299
300 def test_volume_get_filtered_has_no_volumes(self, volume_groups):
301 assert volume_groups.get(vg_name='ceph') is None
302
303 def test_volume_has_multiple_matches(self, volume_groups):
304 volume1 = volume2 = api.VolumeGroup(vg_name='foo', lv_path='/dev/vg/lv', lv_tags='')
305 volume_groups.append(volume1)
306 volume_groups.append(volume2)
307 with pytest.raises(exceptions.MultipleVGsError):
308 volume_groups.get(vg_name='foo')
309
310 def test_find_the_correct_one(self, volume_groups):
311 volume1 = api.VolumeGroup(vg_name='volume1', lv_tags='')
312 volume2 = api.VolumeGroup(vg_name='volume2', lv_tags='')
313 volume_groups.append(volume1)
314 volume_groups.append(volume2)
315 assert volume_groups.get(vg_name='volume1') == volume1
316
317 def test_filter_by_tag(self, volume_groups):
318 vg_tags = "ceph.group=dmcache"
319 osd = api.VolumeGroup(vg_name='volume1', vg_tags=vg_tags)
320 journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain')
321 volume_groups.append(osd)
322 volume_groups.append(journal)
323 volume_groups.filter(vg_tags={'ceph.group': 'dmcache'})
324 assert len(volume_groups) == 1
325 assert volume_groups[0].vg_name == 'volume1'
326
327 def test_filter_by_tag_does_not_match_one(self, volume_groups):
328 vg_tags = "ceph.group=dmcache,ceph.disk_type=ssd"
329 osd = api.VolumeGroup(vg_name='volume1', vg_path='/dev/vg/lv', vg_tags=vg_tags)
330 volume_groups.append(osd)
331 volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
332 assert volume_groups == []
333
334 def test_filter_by_vg_name(self, volume_groups):
335 vg_tags = "ceph.type=data,ceph.fsid=000-aaa"
336 osd = api.VolumeGroup(vg_name='ceph_vg', vg_tags=vg_tags)
337 journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal')
338 volume_groups.append(osd)
339 volume_groups.append(journal)
340 volume_groups.filter(vg_name='ceph_vg')
341 assert len(volume_groups) == 1
342 assert volume_groups[0].vg_name == 'ceph_vg'
343
344 def test_filter_requires_params(self, volume_groups):
345 with pytest.raises(TypeError):
346 volume_groups.filter()
347
348
349 class TestGetLVFromArgument(object):
350
351 def setup(self):
352 self.foo_volume = api.Volume(
353 lv_name='foo', lv_path='/path/to/lv',
354 vg_name='foo_group', lv_tags=''
355 )
356
357 def test_non_absolute_path_is_not_valid(self, volumes):
358 volumes.append(self.foo_volume)
359 assert api.get_lv_from_argument('foo') is None
360
361 def test_too_many_slashes_is_invalid(self, volumes):
362 volumes.append(self.foo_volume)
363 assert api.get_lv_from_argument('path/to/lv') is None
364
365 def test_absolute_path_is_not_lv(self, volumes):
366 volumes.append(self.foo_volume)
367 assert api.get_lv_from_argument('/path') is None
368
369 def test_absolute_path_is_lv(self, volumes):
370 volumes.append(self.foo_volume)
371 assert api.get_lv_from_argument('/path/to/lv') == self.foo_volume
372
373
374 class TestRemoveLV(object):
375
376 def test_removes_lv(self, monkeypatch):
377 def mock_call(cmd, **kw):
378 return ('', '', 0)
379 monkeypatch.setattr(process, 'call', mock_call)
380 assert api.remove_lv("vg/lv")
381
382 def test_fails_to_remove_lv(self, monkeypatch):
383 def mock_call(cmd, **kw):
384 return ('', '', 1)
385 monkeypatch.setattr(process, 'call', mock_call)
386 with pytest.raises(RuntimeError):
387 api.remove_lv("vg/lv")
388
389
390 class TestCreateLV(object):
391
392 def setup(self):
393 self.foo_volume = api.Volume(lv_name='foo', lv_path='/path', vg_name='foo_group', lv_tags='')
394
395 def test_uses_size(self, monkeypatch, capture):
396 monkeypatch.setattr(process, 'run', capture)
397 monkeypatch.setattr(process, 'call', capture)
398 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
399 api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
400 expected = ['lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group']
401 assert capture.calls[0]['args'][0] == expected
402
403 def test_calls_to_set_type_tag(self, monkeypatch, capture):
404 monkeypatch.setattr(process, 'run', capture)
405 monkeypatch.setattr(process, 'call', capture)
406 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
407 api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
408 ceph_tag = ['lvchange', '--addtag', 'ceph.type=data', '/path']
409 assert capture.calls[1]['args'][0] == ceph_tag
410
411 def test_calls_to_set_data_tag(self, monkeypatch, capture):
412 monkeypatch.setattr(process, 'run', capture)
413 monkeypatch.setattr(process, 'call', capture)
414 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
415 api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
416 data_tag = ['lvchange', '--addtag', 'ceph.data_device=/path', '/path']
417 assert capture.calls[2]['args'][0] == data_tag
418
419
420 #
421 # The following tests are pretty gnarly. VDO detection is very convoluted and
422 # involves correlating information from device mappers, realpaths, slaves of
423 # those mappers, and parents or related mappers. This makes it very hard to
424 # patch nicely or keep tests short and readable. These tests are trying to
425 # ensure correctness, the better approach will be to do some functional testing
426 # with VDO.
427 #
428
429
430 @pytest.fixture
431 def disable_kvdo_path(monkeypatch):
432 monkeypatch.setattr('os.path.isdir', lambda x: False)
433
434
435 @pytest.fixture
436 def enable_kvdo_path(monkeypatch):
437 monkeypatch.setattr('os.path.isdir', lambda x: True)
438
439
440 # Stub for os.listdir
441
442
443 class ListDir(object):
444
445 def __init__(self, paths):
446 self.paths = paths
447 self._normalize_paths()
448 self.listdir = os.listdir
449
450 def _normalize_paths(self):
451 for k, v in self.paths.items():
452 self.paths[k.rstrip('/')] = v.rstrip('/')
453
454 def add(self, original, fake):
455 self.paths[original.rstrip('/')] = fake.rstrip('/')
456
457 def __call__(self, path):
458 return self.listdir(self.paths[path.rstrip('/')])
459
460
461 @pytest.fixture(scope='function')
462 def listdir(monkeypatch):
463 def apply(paths=None, stub=None):
464 if not stub:
465 stub = ListDir(paths)
466 if paths:
467 for original, fake in paths.items():
468 stub.add(original, fake)
469
470 monkeypatch.setattr('os.listdir', stub)
471 return apply
472
473
474 @pytest.fixture(scope='function')
475 def makedirs(tmpdir):
476 def create(directory):
477 path = os.path.join(str(tmpdir), directory)
478 os.makedirs(path)
479 return path
480 create.base = str(tmpdir)
481 return create
482
483
484 class TestIsVdo(object):
485
486 def test_no_vdo_dir(self, disable_kvdo_path):
487 assert api._is_vdo('/path') is False
488
489 def test_exceptions_return_false(self, monkeypatch):
490 def throw():
491 raise Exception()
492 monkeypatch.setattr('ceph_volume.api.lvm._is_vdo', throw)
493 assert api.is_vdo('/path') == '0'
494
495 def test_is_vdo_returns_a_string(self, monkeypatch):
496 monkeypatch.setattr('ceph_volume.api.lvm._is_vdo', lambda x: True)
497 assert api.is_vdo('/path') == '1'
498
499 def test_kvdo_dir_no_devices(self, makedirs, enable_kvdo_path, listdir, monkeypatch):
500 kvdo_path = makedirs('sys/kvdo')
501 listdir(paths={'/sys/kvdo': kvdo_path})
502 monkeypatch.setattr('ceph_volume.api.lvm._vdo_slaves', lambda x: [])
503 monkeypatch.setattr('ceph_volume.api.lvm._vdo_parents', lambda x: [])
504 assert api._is_vdo('/dev/mapper/vdo0') is False
505
506 def test_vdo_slaves_found_and_matched(self, makedirs, enable_kvdo_path, listdir, monkeypatch):
507 kvdo_path = makedirs('sys/kvdo')
508 listdir(paths={'/sys/kvdo': kvdo_path})
509 monkeypatch.setattr('ceph_volume.api.lvm._vdo_slaves', lambda x: ['/dev/dm-3'])
510 monkeypatch.setattr('ceph_volume.api.lvm._vdo_parents', lambda x: [])
511 assert api._is_vdo('/dev/dm-3') is True
512
513 def test_vdo_parents_found_and_matched(self, makedirs, enable_kvdo_path, listdir, monkeypatch):
514 kvdo_path = makedirs('sys/kvdo')
515 listdir(paths={'/sys/kvdo': kvdo_path})
516 monkeypatch.setattr('ceph_volume.api.lvm._vdo_slaves', lambda x: [])
517 monkeypatch.setattr('ceph_volume.api.lvm._vdo_parents', lambda x: ['/dev/dm-4'])
518 assert api._is_vdo('/dev/dm-4') is True
519
520
521 class TestVdoSlaves(object):
522
523 def test_slaves_are_not_found(self, makedirs, listdir, monkeypatch):
524 slaves_path = makedirs('sys/block/vdo0/slaves')
525 listdir(paths={'/sys/block/vdo0/slaves': slaves_path})
526 monkeypatch.setattr('ceph_volume.api.lvm.os.path.exists', lambda x: True)
527 result = sorted(api._vdo_slaves(['vdo0']))
528 assert '/dev/mapper/vdo0' in result
529 assert 'vdo0' in result
530
531 def test_slaves_are_found(self, makedirs, listdir, monkeypatch):
532 slaves_path = makedirs('sys/block/vdo0/slaves')
533 makedirs('sys/block/vdo0/slaves/dm-4')
534 makedirs('dev/mapper/vdo0')
535 listdir(paths={'/sys/block/vdo0/slaves': slaves_path})
536 monkeypatch.setattr('ceph_volume.api.lvm.os.path.exists', lambda x: True)
537 result = sorted(api._vdo_slaves(['vdo0']))
538 assert '/dev/dm-4' in result
539 assert 'dm-4' in result
540
541
542 class TestVDOParents(object):
543
544 def test_parents_are_found(self, makedirs, listdir):
545 block_path = makedirs('sys/block')
546 slaves_path = makedirs('sys/block/dm-4/slaves')
547 makedirs('sys/block/dm-4/slaves/dm-3')
548 listdir(paths={
549 '/sys/block/dm-4/slaves': slaves_path,
550 '/sys/block': block_path})
551 result = api._vdo_parents(['dm-3'])
552 assert '/dev/dm-4' in result
553 assert 'dm-4' in result
554
555 def test_parents_are_not_found(self, makedirs, listdir):
556 block_path = makedirs('sys/block')
557 slaves_path = makedirs('sys/block/dm-4/slaves')
558 makedirs('sys/block/dm-4/slaves/dm-5')
559 listdir(paths={
560 '/sys/block/dm-4/slaves': slaves_path,
561 '/sys/block': block_path})
562 result = api._vdo_parents(['dm-3'])
563 assert result == []