]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/api/test_lvm.py
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / api / test_lvm.py
CommitLineData
d2e6a577
FG
1import pytest
2from ceph_volume import process, exceptions
3efd9988 3from ceph_volume.api import lvm as api
d2e6a577
FG
4
5
6class TestParseTags(object):
7
8 def test_no_tags_means_empty_dict(self):
9 result = api.parse_tags('')
10 assert result == {}
11
12 def test_single_tag_gets_parsed(self):
13 result = api.parse_tags('ceph.osd_something=1')
14 assert result == {'ceph.osd_something': '1'}
15
16 def test_multiple_csv_expands_in_dict(self):
17 result = api.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000')
18 # assert them piecemeal to avoid the un-ordered dict nature
19 assert result['ceph.osd_something'] == '1'
20 assert result['ceph.foo'] == '2'
21 assert result['ceph.fsid'] == '0000'
22
23
24class TestGetAPIVgs(object):
25
26 def test_report_is_emtpy(self, monkeypatch):
b5b8bbf5 27 monkeypatch.setattr(api.process, 'call', lambda x: ('\n\n', '', 0))
d2e6a577
FG
28 assert api.get_api_vgs() == []
29
30 def test_report_has_stuff(self, monkeypatch):
b5b8bbf5 31 report = [' VolGroup00']
d2e6a577
FG
32 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
33 assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
34
b5b8bbf5
FG
35 def test_report_has_stuff_with_empty_attrs(self, monkeypatch):
36 report = [' VolGroup00 ;;;;;;9g']
d2e6a577 37 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
38 result = api.get_api_vgs()[0]
39 assert len(result.keys()) == 7
40 assert result['vg_name'] == 'VolGroup00'
41 assert result['vg_free'] == '9g'
d2e6a577 42
b5b8bbf5
FG
43 def test_report_has_multiple_items(self, monkeypatch):
44 report = [' VolGroup00;;;;;;;', ' ceph_vg;;;;;;;']
d2e6a577 45 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
46 result = api.get_api_vgs()
47 assert result[0]['vg_name'] == 'VolGroup00'
48 assert result[1]['vg_name'] == 'ceph_vg'
d2e6a577
FG
49
50
51class TestGetAPILvs(object):
52
53 def test_report_is_emtpy(self, monkeypatch):
b5b8bbf5 54 monkeypatch.setattr(api.process, 'call', lambda x: ('', '', 0))
d2e6a577
FG
55 assert api.get_api_lvs() == []
56
57 def test_report_has_stuff(self, monkeypatch):
b5b8bbf5 58 report = [' ;/path;VolGroup00;root']
d2e6a577 59 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
60 result = api.get_api_lvs()
61 assert result[0]['lv_name'] == 'VolGroup00'
d2e6a577
FG
62
63 def test_report_has_multiple_items(self, monkeypatch):
b5b8bbf5 64 report = [' ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg']
d2e6a577 65 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
66 result = api.get_api_lvs()
67 assert result[0]['lv_name'] == 'VolName'
68 assert result[1]['lv_name'] == 'ceph_lv'
d2e6a577
FG
69
70
71@pytest.fixture
72def volumes(monkeypatch):
b5b8bbf5 73 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
d2e6a577
FG
74 volumes = api.Volumes()
75 volumes._purge()
3efd9988
FG
76 # also patch api.Volumes so that when it is called, it will use the newly
77 # created fixture, with whatever the test method wants to append to it
78 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
d2e6a577
FG
79 return volumes
80
81
181888fb
FG
82@pytest.fixture
83def pvolumes(monkeypatch):
84 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
85 pvolumes = api.PVolumes()
86 pvolumes._purge()
87 return pvolumes
88
89
d2e6a577
FG
90@pytest.fixture
91def volume_groups(monkeypatch):
b5b8bbf5 92 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
d2e6a577
FG
93 vgs = api.VolumeGroups()
94 vgs._purge()
95 return vgs
96
97
98class TestGetLV(object):
99
100 def test_nothing_is_passed_in(self):
101 # so we return a None
102 assert api.get_lv() is None
103
104 def test_single_lv_is_matched(self, volumes, monkeypatch):
105 FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data")
106 volumes.append(FooVolume)
107 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
108 assert api.get_lv(lv_name='foo') == FooVolume
109
181888fb
FG
110 def test_single_lv_is_matched_by_uuid(self, volumes, monkeypatch):
111 FooVolume = api.Volume(
112 lv_name='foo', lv_path='/dev/vg/foo',
113 lv_uuid='1111', lv_tags="ceph.type=data")
114 volumes.append(FooVolume)
115 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
116 assert api.get_lv(lv_uuid='1111') == FooVolume
117
118
119class TestGetPV(object):
120
121 def test_nothing_is_passed_in(self):
122 # so we return a None
123 assert api.get_pv() is None
124
125 def test_single_pv_is_not_matched(self, pvolumes, monkeypatch):
126 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
127 pvolumes.append(FooPVolume)
128 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
129 assert api.get_pv(pv_uuid='foo') is None
130
131 def test_single_pv_is_matched(self, pvolumes, monkeypatch):
132 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
133 pvolumes.append(FooPVolume)
134 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
135 assert api.get_pv(pv_uuid='0000') == FooPVolume
136
137 def test_single_pv_is_matched_by_uuid(self, pvolumes, monkeypatch):
138 FooPVolume = api.PVolume(
139 pv_name='/dev/vg/foo',
140 pv_uuid='1111', pv_tags="ceph.type=data")
141 pvolumes.append(FooPVolume)
142 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
143 assert api.get_pv(pv_uuid='1111') == FooPVolume
144
145
146class TestPVolumes(object):
147
148 def test_filter_by_tag_does_not_match_one(self, pvolumes, monkeypatch):
149 pv_tags = "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa"
150 FooPVolume = api.PVolume(
151 pv_name='/dev/vg/foo',
152 pv_uuid='1111', pv_tags=pv_tags)
153 pvolumes.append(FooPVolume)
154 pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '2'})
155 assert pvolumes == []
156
157 def test_filter_by_tags_matches(self, pvolumes, monkeypatch):
158 pv_tags = "ceph.type=journal,ceph.osd_id=1"
159 FooPVolume = api.PVolume(
160 pv_name='/dev/vg/foo',
161 pv_uuid='1111', pv_tags=pv_tags)
162 pvolumes.append(FooPVolume)
163 pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '1'})
164 assert pvolumes == [FooPVolume]
165
d2e6a577
FG
166
167class TestGetVG(object):
168
169 def test_nothing_is_passed_in(self):
170 # so we return a None
171 assert api.get_vg() is None
172
173 def test_single_vg_is_matched(self, volume_groups, monkeypatch):
174 FooVG = api.VolumeGroup(vg_name='foo')
175 volume_groups.append(FooVG)
176 monkeypatch.setattr(api, 'VolumeGroups', lambda: volume_groups)
177 assert api.get_vg(vg_name='foo') == FooVG
178
179
180class TestVolumes(object):
181
182 def test_volume_get_has_no_volumes(self, volumes):
183 assert volumes.get() is None
184
185 def test_volume_get_filtered_has_no_volumes(self, volumes):
186 assert volumes.get(lv_name='ceph') is None
187
188 def test_volume_has_multiple_matches(self, volumes):
189 volume1 = volume2 = api.Volume(lv_name='foo', lv_path='/dev/vg/lv', lv_tags='')
190 volumes.append(volume1)
191 volumes.append(volume2)
192 with pytest.raises(exceptions.MultipleLVsError):
193 volumes.get(lv_name='foo')
194
3efd9988
FG
195 def test_as_dict_infers_type_from_tags(self, volumes):
196 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
197 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
198 volumes.append(osd)
199 result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict()
200 assert result['type'] == 'data'
201
202 def test_as_dict_populates_path_from_lv_api(self, volumes):
203 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
204 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
205 volumes.append(osd)
206 result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict()
207 assert result['path'] == '/dev/vg/lv'
208
d2e6a577
FG
209 def test_find_the_correct_one(self, volumes):
210 volume1 = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags='')
211 volume2 = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='')
212 volumes.append(volume1)
213 volumes.append(volume2)
214 assert volumes.get(lv_name='volume1') == volume1
215
216 def test_filter_by_tag(self, volumes):
217 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
218 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
219 journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.type=journal')
220 volumes.append(osd)
221 volumes.append(journal)
222 volumes.filter(lv_tags={'ceph.type': 'data'})
223 assert len(volumes) == 1
224 assert volumes[0].lv_name == 'volume1'
225
181888fb
FG
226 def test_filter_by_tag_does_not_match_one(self, volumes):
227 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
228 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
229 journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.osd_id=1,ceph.type=journal')
230 volumes.append(osd)
231 volumes.append(journal)
232 # note the different osd_id!
233 volumes.filter(lv_tags={'ceph.type': 'data', 'ceph.osd_id': '2'})
234 assert volumes == []
235
d2e6a577
FG
236 def test_filter_by_vg_name(self, volumes):
237 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
238 osd = api.Volume(lv_name='volume1', vg_name='ceph_vg', lv_tags=lv_tags)
239 journal = api.Volume(lv_name='volume2', vg_name='system_vg', lv_tags='ceph.type=journal')
240 volumes.append(osd)
241 volumes.append(journal)
242 volumes.filter(vg_name='ceph_vg')
243 assert len(volumes) == 1
244 assert volumes[0].lv_name == 'volume1'
245
246 def test_filter_by_lv_path(self, volumes):
247 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_tags='')
248 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_tags='')
249 volumes.append(osd)
250 volumes.append(journal)
251 volumes.filter(lv_path='/dev/volume1')
252 assert len(volumes) == 1
253 assert volumes[0].lv_name == 'volume1'
254
181888fb
FG
255 def test_filter_by_lv_uuid(self, volumes):
256 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
257 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
258 volumes.append(osd)
259 volumes.append(journal)
260 volumes.filter(lv_uuid='1111')
261 assert len(volumes) == 1
262 assert volumes[0].lv_name == 'volume1'
263
264 def test_filter_by_lv_uuid_nothing_found(self, volumes):
265 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
266 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
267 volumes.append(osd)
268 volumes.append(journal)
269 volumes.filter(lv_uuid='22222')
270 assert volumes == []
271
d2e6a577
FG
272 def test_filter_requires_params(self, volumes):
273 with pytest.raises(TypeError):
274 volumes.filter()
275
276
277class TestVolumeGroups(object):
278
279 def test_volume_get_has_no_volume_groups(self, volume_groups):
280 assert volume_groups.get() is None
281
282 def test_volume_get_filtered_has_no_volumes(self, volume_groups):
283 assert volume_groups.get(vg_name='ceph') is None
284
285 def test_volume_has_multiple_matches(self, volume_groups):
286 volume1 = volume2 = api.VolumeGroup(vg_name='foo', lv_path='/dev/vg/lv', lv_tags='')
287 volume_groups.append(volume1)
288 volume_groups.append(volume2)
289 with pytest.raises(exceptions.MultipleVGsError):
290 volume_groups.get(vg_name='foo')
291
292 def test_find_the_correct_one(self, volume_groups):
293 volume1 = api.VolumeGroup(vg_name='volume1', lv_tags='')
294 volume2 = api.VolumeGroup(vg_name='volume2', lv_tags='')
295 volume_groups.append(volume1)
296 volume_groups.append(volume2)
297 assert volume_groups.get(vg_name='volume1') == volume1
298
299 def test_filter_by_tag(self, volume_groups):
300 vg_tags = "ceph.group=dmcache"
301 osd = api.VolumeGroup(vg_name='volume1', vg_tags=vg_tags)
302 journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain')
303 volume_groups.append(osd)
304 volume_groups.append(journal)
305 volume_groups.filter(vg_tags={'ceph.group': 'dmcache'})
306 assert len(volume_groups) == 1
307 assert volume_groups[0].vg_name == 'volume1'
308
181888fb
FG
309 def test_filter_by_tag_does_not_match_one(self, volume_groups):
310 vg_tags = "ceph.group=dmcache,ceph.disk_type=ssd"
311 osd = api.VolumeGroup(vg_name='volume1', vg_path='/dev/vg/lv', vg_tags=vg_tags)
312 volume_groups.append(osd)
313 volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
314 assert volume_groups == []
315
d2e6a577
FG
316 def test_filter_by_vg_name(self, volume_groups):
317 vg_tags = "ceph.type=data,ceph.fsid=000-aaa"
318 osd = api.VolumeGroup(vg_name='ceph_vg', vg_tags=vg_tags)
319 journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal')
320 volume_groups.append(osd)
321 volume_groups.append(journal)
322 volume_groups.filter(vg_name='ceph_vg')
323 assert len(volume_groups) == 1
324 assert volume_groups[0].vg_name == 'ceph_vg'
325
326 def test_filter_requires_params(self, volume_groups):
327 with pytest.raises(TypeError):
328 volume_groups.filter()
329
330
3efd9988
FG
331class TestGetLVFromArgument(object):
332
333 def setup(self):
334 self.foo_volume = api.Volume(
335 lv_name='foo', lv_path='/path/to/lv',
336 vg_name='foo_group', lv_tags=''
337 )
338
339 def test_non_absolute_path_is_not_valid(self, volumes):
340 volumes.append(self.foo_volume)
341 assert api.get_lv_from_argument('foo') is None
342
343 def test_too_many_slashes_is_invalid(self, volumes):
344 volumes.append(self.foo_volume)
345 assert api.get_lv_from_argument('path/to/lv') is None
346
347 def test_absolute_path_is_not_lv(self, volumes):
348 volumes.append(self.foo_volume)
349 assert api.get_lv_from_argument('/path') is None
350
351 def test_absolute_path_is_lv(self, volumes):
352 volumes.append(self.foo_volume)
353 assert api.get_lv_from_argument('/path/to/lv') == self.foo_volume
354
355
356class TestRemoveLV(object):
357
358 def test_removes_lv(self, monkeypatch):
359 def mock_call(cmd, **kw):
360 return ('', '', 0)
361 monkeypatch.setattr(process, 'call', mock_call)
362 assert api.remove_lv("vg/lv")
363
364 def test_fails_to_remove_lv(self, monkeypatch):
365 def mock_call(cmd, **kw):
366 return ('', '', 1)
367 monkeypatch.setattr(process, 'call', mock_call)
368 with pytest.raises(RuntimeError):
369 api.remove_lv("vg/lv")
370
371
d2e6a577
FG
372class TestCreateLV(object):
373
374 def setup(self):
375 self.foo_volume = api.Volume(lv_name='foo', lv_path='/path', vg_name='foo_group', lv_tags='')
376
377 def test_uses_size(self, monkeypatch, capture):
378 monkeypatch.setattr(process, 'run', capture)
379 monkeypatch.setattr(process, 'call', capture)
380 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
3efd9988 381 api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
d2e6a577
FG
382 expected = ['sudo', 'lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group']
383 assert capture.calls[0]['args'][0] == expected
384
385 def test_calls_to_set_type_tag(self, monkeypatch, capture):
386 monkeypatch.setattr(process, 'run', capture)
387 monkeypatch.setattr(process, 'call', capture)
388 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
3efd9988 389 api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
d2e6a577
FG
390 ceph_tag = ['sudo', 'lvchange', '--addtag', 'ceph.type=data', '/path']
391 assert capture.calls[1]['args'][0] == ceph_tag
392
393 def test_calls_to_set_data_tag(self, monkeypatch, capture):
394 monkeypatch.setattr(process, 'run', capture)
395 monkeypatch.setattr(process, 'call', capture)
396 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
3efd9988 397 api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'})
d2e6a577
FG
398 data_tag = ['sudo', 'lvchange', '--addtag', 'ceph.data_device=/path', '/path']
399 assert capture.calls[2]['args'][0] == data_tag