]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/api/test_lvm.py
update sources to v12.2.3
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / api / test_lvm.py
CommitLineData
d2e6a577
FG
1import pytest
2from ceph_volume import process, exceptions
3efd9988 3from ceph_volume.api import lvm as api
d2e6a577
FG
4
5
6class TestParseTags(object):
7
8 def test_no_tags_means_empty_dict(self):
9 result = api.parse_tags('')
10 assert result == {}
11
12 def test_single_tag_gets_parsed(self):
13 result = api.parse_tags('ceph.osd_something=1')
14 assert result == {'ceph.osd_something': '1'}
15
b32b8144
FG
16 def test_non_ceph_tags_are_skipped(self):
17 result = api.parse_tags('foo')
18 assert result == {}
19
20 def test_mixed_non_ceph_tags(self):
21 result = api.parse_tags('foo,ceph.bar=1')
22 assert result == {'ceph.bar': '1'}
23
d2e6a577
FG
24 def test_multiple_csv_expands_in_dict(self):
25 result = api.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000')
26 # assert them piecemeal to avoid the un-ordered dict nature
27 assert result['ceph.osd_something'] == '1'
28 assert result['ceph.foo'] == '2'
29 assert result['ceph.fsid'] == '0000'
30
31
32class TestGetAPIVgs(object):
33
34 def test_report_is_emtpy(self, monkeypatch):
b5b8bbf5 35 monkeypatch.setattr(api.process, 'call', lambda x: ('\n\n', '', 0))
d2e6a577
FG
36 assert api.get_api_vgs() == []
37
38 def test_report_has_stuff(self, monkeypatch):
b5b8bbf5 39 report = [' VolGroup00']
d2e6a577
FG
40 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
41 assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
42
b5b8bbf5
FG
43 def test_report_has_stuff_with_empty_attrs(self, monkeypatch):
44 report = [' VolGroup00 ;;;;;;9g']
d2e6a577 45 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
46 result = api.get_api_vgs()[0]
47 assert len(result.keys()) == 7
48 assert result['vg_name'] == 'VolGroup00'
49 assert result['vg_free'] == '9g'
d2e6a577 50
b5b8bbf5
FG
51 def test_report_has_multiple_items(self, monkeypatch):
52 report = [' VolGroup00;;;;;;;', ' ceph_vg;;;;;;;']
d2e6a577 53 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
54 result = api.get_api_vgs()
55 assert result[0]['vg_name'] == 'VolGroup00'
56 assert result[1]['vg_name'] == 'ceph_vg'
d2e6a577
FG
57
58
59class TestGetAPILvs(object):
60
61 def test_report_is_emtpy(self, monkeypatch):
b5b8bbf5 62 monkeypatch.setattr(api.process, 'call', lambda x: ('', '', 0))
d2e6a577
FG
63 assert api.get_api_lvs() == []
64
65 def test_report_has_stuff(self, monkeypatch):
b5b8bbf5 66 report = [' ;/path;VolGroup00;root']
d2e6a577 67 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
68 result = api.get_api_lvs()
69 assert result[0]['lv_name'] == 'VolGroup00'
d2e6a577
FG
70
71 def test_report_has_multiple_items(self, monkeypatch):
b5b8bbf5 72 report = [' ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg']
d2e6a577 73 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
74 result = api.get_api_lvs()
75 assert result[0]['lv_name'] == 'VolName'
76 assert result[1]['lv_name'] == 'ceph_lv'
d2e6a577
FG
77
78
79@pytest.fixture
80def volumes(monkeypatch):
b5b8bbf5 81 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
d2e6a577
FG
82 volumes = api.Volumes()
83 volumes._purge()
3efd9988
FG
84 # also patch api.Volumes so that when it is called, it will use the newly
85 # created fixture, with whatever the test method wants to append to it
86 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
d2e6a577
FG
87 return volumes
88
89
181888fb
FG
90@pytest.fixture
91def pvolumes(monkeypatch):
92 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
93 pvolumes = api.PVolumes()
94 pvolumes._purge()
95 return pvolumes
96
97
d2e6a577
FG
98@pytest.fixture
99def volume_groups(monkeypatch):
b5b8bbf5 100 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
d2e6a577
FG
101 vgs = api.VolumeGroups()
102 vgs._purge()
103 return vgs
104
105
106class TestGetLV(object):
107
108 def test_nothing_is_passed_in(self):
109 # so we return a None
110 assert api.get_lv() is None
111
112 def test_single_lv_is_matched(self, volumes, monkeypatch):
113 FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data")
114 volumes.append(FooVolume)
115 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
116 assert api.get_lv(lv_name='foo') == FooVolume
117
181888fb
FG
118 def test_single_lv_is_matched_by_uuid(self, volumes, monkeypatch):
119 FooVolume = api.Volume(
120 lv_name='foo', lv_path='/dev/vg/foo',
121 lv_uuid='1111', lv_tags="ceph.type=data")
122 volumes.append(FooVolume)
123 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
124 assert api.get_lv(lv_uuid='1111') == FooVolume
125
126
127class TestGetPV(object):
128
129 def test_nothing_is_passed_in(self):
130 # so we return a None
131 assert api.get_pv() is None
132
133 def test_single_pv_is_not_matched(self, pvolumes, monkeypatch):
b32b8144 134 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={}, vg_name="vg")
181888fb
FG
135 pvolumes.append(FooPVolume)
136 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
137 assert api.get_pv(pv_uuid='foo') is None
138
139 def test_single_pv_is_matched(self, pvolumes, monkeypatch):
b32b8144 140 FooPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
181888fb
FG
141 pvolumes.append(FooPVolume)
142 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
143 assert api.get_pv(pv_uuid='0000') == FooPVolume
144
145 def test_single_pv_is_matched_by_uuid(self, pvolumes, monkeypatch):
146 FooPVolume = api.PVolume(
147 pv_name='/dev/vg/foo',
b32b8144 148 pv_uuid='1111', pv_tags="ceph.type=data", vg_name="vg")
181888fb
FG
149 pvolumes.append(FooPVolume)
150 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
151 assert api.get_pv(pv_uuid='1111') == FooPVolume
152
b32b8144
FG
153 def test_vg_name_is_set(self, pvolumes, monkeypatch):
154 FooPVolume = api.PVolume(
155 pv_name='/dev/vg/foo',
156 pv_uuid='1111', pv_tags="ceph.type=data", vg_name="vg")
157 pvolumes.append(FooPVolume)
158 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
159 pv = api.get_pv(pv_name="/dev/vg/foo")
160 assert pv.vg_name == "vg"
161
181888fb
FG
162
163class TestPVolumes(object):
164
165 def test_filter_by_tag_does_not_match_one(self, pvolumes, monkeypatch):
166 pv_tags = "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa"
167 FooPVolume = api.PVolume(
168 pv_name='/dev/vg/foo',
b32b8144 169 pv_uuid='1111', pv_tags=pv_tags, vg_name='vg')
181888fb
FG
170 pvolumes.append(FooPVolume)
171 pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '2'})
172 assert pvolumes == []
173
174 def test_filter_by_tags_matches(self, pvolumes, monkeypatch):
175 pv_tags = "ceph.type=journal,ceph.osd_id=1"
176 FooPVolume = api.PVolume(
177 pv_name='/dev/vg/foo',
b32b8144 178 pv_uuid='1111', pv_tags=pv_tags, vg_name="vg")
181888fb
FG
179 pvolumes.append(FooPVolume)
180 pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '1'})
181 assert pvolumes == [FooPVolume]
182
d2e6a577
FG
183
184class TestGetVG(object):
185
186 def test_nothing_is_passed_in(self):
187 # so we return a None
188 assert api.get_vg() is None
189
190 def test_single_vg_is_matched(self, volume_groups, monkeypatch):
191 FooVG = api.VolumeGroup(vg_name='foo')
192 volume_groups.append(FooVG)
193 monkeypatch.setattr(api, 'VolumeGroups', lambda: volume_groups)
194 assert api.get_vg(vg_name='foo') == FooVG
195
196
197class TestVolumes(object):
198
199 def test_volume_get_has_no_volumes(self, volumes):
200 assert volumes.get() is None
201
202 def test_volume_get_filtered_has_no_volumes(self, volumes):
203 assert volumes.get(lv_name='ceph') is None
204
205 def test_volume_has_multiple_matches(self, volumes):
206 volume1 = volume2 = api.Volume(lv_name='foo', lv_path='/dev/vg/lv', lv_tags='')
207 volumes.append(volume1)
208 volumes.append(volume2)
209 with pytest.raises(exceptions.MultipleLVsError):
210 volumes.get(lv_name='foo')
211
3efd9988
FG
212 def test_as_dict_infers_type_from_tags(self, volumes):
213 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
214 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
215 volumes.append(osd)
216 result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict()
217 assert result['type'] == 'data'
218
219 def test_as_dict_populates_path_from_lv_api(self, volumes):
220 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
221 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
222 volumes.append(osd)
223 result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict()
224 assert result['path'] == '/dev/vg/lv'
225
d2e6a577
FG
226 def test_find_the_correct_one(self, volumes):
227 volume1 = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags='')
228 volume2 = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='')
229 volumes.append(volume1)
230 volumes.append(volume2)
231 assert volumes.get(lv_name='volume1') == volume1
232
233 def test_filter_by_tag(self, volumes):
234 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
235 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
236 journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.type=journal')
237 volumes.append(osd)
238 volumes.append(journal)
239 volumes.filter(lv_tags={'ceph.type': 'data'})
240 assert len(volumes) == 1
241 assert volumes[0].lv_name == 'volume1'
242
181888fb
FG
243 def test_filter_by_tag_does_not_match_one(self, volumes):
244 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
245 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
246 journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.osd_id=1,ceph.type=journal')
247 volumes.append(osd)
248 volumes.append(journal)
249 # note the different osd_id!
250 volumes.filter(lv_tags={'ceph.type': 'data', 'ceph.osd_id': '2'})
251 assert volumes == []
252
d2e6a577
FG
253 def test_filter_by_vg_name(self, volumes):
254 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
255 osd = api.Volume(lv_name='volume1', vg_name='ceph_vg', lv_tags=lv_tags)
256 journal = api.Volume(lv_name='volume2', vg_name='system_vg', lv_tags='ceph.type=journal')
257 volumes.append(osd)
258 volumes.append(journal)
259 volumes.filter(vg_name='ceph_vg')
260 assert len(volumes) == 1
261 assert volumes[0].lv_name == 'volume1'
262
263 def test_filter_by_lv_path(self, volumes):
264 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_tags='')
265 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_tags='')
266 volumes.append(osd)
267 volumes.append(journal)
268 volumes.filter(lv_path='/dev/volume1')
269 assert len(volumes) == 1
270 assert volumes[0].lv_name == 'volume1'
271
181888fb
FG
272 def test_filter_by_lv_uuid(self, volumes):
273 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
274 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
275 volumes.append(osd)
276 volumes.append(journal)
277 volumes.filter(lv_uuid='1111')
278 assert len(volumes) == 1
279 assert volumes[0].lv_name == 'volume1'
280
281 def test_filter_by_lv_uuid_nothing_found(self, volumes):
282 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
283 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
284 volumes.append(osd)
285 volumes.append(journal)
286 volumes.filter(lv_uuid='22222')
287 assert volumes == []
288
d2e6a577
FG
289 def test_filter_requires_params(self, volumes):
290 with pytest.raises(TypeError):
291 volumes.filter()
292
293
294class TestVolumeGroups(object):
295
296 def test_volume_get_has_no_volume_groups(self, volume_groups):
297 assert volume_groups.get() is None
298
299 def test_volume_get_filtered_has_no_volumes(self, volume_groups):
300 assert volume_groups.get(vg_name='ceph') is None
301
302 def test_volume_has_multiple_matches(self, volume_groups):
303 volume1 = volume2 = api.VolumeGroup(vg_name='foo', lv_path='/dev/vg/lv', lv_tags='')
304 volume_groups.append(volume1)
305 volume_groups.append(volume2)
306 with pytest.raises(exceptions.MultipleVGsError):
307 volume_groups.get(vg_name='foo')
308
309 def test_find_the_correct_one(self, volume_groups):
310 volume1 = api.VolumeGroup(vg_name='volume1', lv_tags='')
311 volume2 = api.VolumeGroup(vg_name='volume2', lv_tags='')
312 volume_groups.append(volume1)
313 volume_groups.append(volume2)
314 assert volume_groups.get(vg_name='volume1') == volume1
315
316 def test_filter_by_tag(self, volume_groups):
317 vg_tags = "ceph.group=dmcache"
318 osd = api.VolumeGroup(vg_name='volume1', vg_tags=vg_tags)
319 journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain')
320 volume_groups.append(osd)
321 volume_groups.append(journal)
322 volume_groups.filter(vg_tags={'ceph.group': 'dmcache'})
323 assert len(volume_groups) == 1
324 assert volume_groups[0].vg_name == 'volume1'
325
181888fb
FG
326 def test_filter_by_tag_does_not_match_one(self, volume_groups):
327 vg_tags = "ceph.group=dmcache,ceph.disk_type=ssd"
328 osd = api.VolumeGroup(vg_name='volume1', vg_path='/dev/vg/lv', vg_tags=vg_tags)
329 volume_groups.append(osd)
330 volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
331 assert volume_groups == []
332
d2e6a577
FG
333 def test_filter_by_vg_name(self, volume_groups):
334 vg_tags = "ceph.type=data,ceph.fsid=000-aaa"
335 osd = api.VolumeGroup(vg_name='ceph_vg', vg_tags=vg_tags)
336 journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal')
337 volume_groups.append(osd)
338 volume_groups.append(journal)
339 volume_groups.filter(vg_name='ceph_vg')
340 assert len(volume_groups) == 1
341 assert volume_groups[0].vg_name == 'ceph_vg'
342
343 def test_filter_requires_params(self, volume_groups):
344 with pytest.raises(TypeError):
345 volume_groups.filter()
346
347
3efd9988
FG
348class TestGetLVFromArgument(object):
349
350 def setup(self):
351 self.foo_volume = api.Volume(
352 lv_name='foo', lv_path='/path/to/lv',
353 vg_name='foo_group', lv_tags=''
354 )
355
356 def test_non_absolute_path_is_not_valid(self, volumes):
357 volumes.append(self.foo_volume)
358 assert api.get_lv_from_argument('foo') is None
359
360 def test_too_many_slashes_is_invalid(self, volumes):
361 volumes.append(self.foo_volume)
362 assert api.get_lv_from_argument('path/to/lv') is None
363
364 def test_absolute_path_is_not_lv(self, volumes):
365 volumes.append(self.foo_volume)
366 assert api.get_lv_from_argument('/path') is None
367
368 def test_absolute_path_is_lv(self, volumes):
369 volumes.append(self.foo_volume)
370 assert api.get_lv_from_argument('/path/to/lv') == self.foo_volume
371
372
373class TestRemoveLV(object):
374
375 def test_removes_lv(self, monkeypatch):
376 def mock_call(cmd, **kw):
377 return ('', '', 0)
378 monkeypatch.setattr(process, 'call', mock_call)
379 assert api.remove_lv("vg/lv")
380
381 def test_fails_to_remove_lv(self, monkeypatch):
382 def mock_call(cmd, **kw):
383 return ('', '', 1)
384 monkeypatch.setattr(process, 'call', mock_call)
385 with pytest.raises(RuntimeError):
386 api.remove_lv("vg/lv")
387
388
d2e6a577
FG
389class TestCreateLV(object):
390
391 def setup(self):
392 self.foo_volume = api.Volume(lv_name='foo', lv_path='/path', vg_name='foo_group', lv_tags='')
393
394 def test_uses_size(self, monkeypatch, capture):
395 monkeypatch.setattr(process, 'run', capture)
396 monkeypatch.setattr(process, 'call', capture)
397 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
3efd9988 398 api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
b32b8144 399 expected = ['lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group']
d2e6a577
FG
400 assert capture.calls[0]['args'][0] == expected
401
402 def test_calls_to_set_type_tag(self, monkeypatch, capture):
403 monkeypatch.setattr(process, 'run', capture)
404 monkeypatch.setattr(process, 'call', capture)
405 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
3efd9988 406 api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
b32b8144 407 ceph_tag = ['lvchange', '--addtag', 'ceph.type=data', '/path']
d2e6a577
FG
408 assert capture.calls[1]['args'][0] == ceph_tag
409
410 def test_calls_to_set_data_tag(self, monkeypatch, capture):
411 monkeypatch.setattr(process, 'run', capture)
412 monkeypatch.setattr(process, 'call', capture)
413 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
3efd9988 414 api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
b32b8144 415 data_tag = ['lvchange', '--addtag', 'ceph.data_device=/path', '/path']
d2e6a577 416 assert capture.calls[2]['args'][0] == data_tag