]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_api.py
update sources to v12.2.1
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / devices / lvm / test_api.py
CommitLineData
d2e6a577
FG
1import pytest
2from ceph_volume import process, exceptions
3from ceph_volume.devices.lvm import api
4
5
6class TestParseTags(object):
7
8 def test_no_tags_means_empty_dict(self):
9 result = api.parse_tags('')
10 assert result == {}
11
12 def test_single_tag_gets_parsed(self):
13 result = api.parse_tags('ceph.osd_something=1')
14 assert result == {'ceph.osd_something': '1'}
15
16 def test_multiple_csv_expands_in_dict(self):
17 result = api.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000')
18 # assert them piecemeal to avoid the un-ordered dict nature
19 assert result['ceph.osd_something'] == '1'
20 assert result['ceph.foo'] == '2'
21 assert result['ceph.fsid'] == '0000'
22
23
24class TestGetAPIVgs(object):
25
26 def test_report_is_emtpy(self, monkeypatch):
b5b8bbf5 27 monkeypatch.setattr(api.process, 'call', lambda x: ('\n\n', '', 0))
d2e6a577
FG
28 assert api.get_api_vgs() == []
29
30 def test_report_has_stuff(self, monkeypatch):
b5b8bbf5 31 report = [' VolGroup00']
d2e6a577
FG
32 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
33 assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
34
b5b8bbf5
FG
35 def test_report_has_stuff_with_empty_attrs(self, monkeypatch):
36 report = [' VolGroup00 ;;;;;;9g']
d2e6a577 37 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
38 result = api.get_api_vgs()[0]
39 assert len(result.keys()) == 7
40 assert result['vg_name'] == 'VolGroup00'
41 assert result['vg_free'] == '9g'
d2e6a577 42
b5b8bbf5
FG
43 def test_report_has_multiple_items(self, monkeypatch):
44 report = [' VolGroup00;;;;;;;', ' ceph_vg;;;;;;;']
d2e6a577 45 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
46 result = api.get_api_vgs()
47 assert result[0]['vg_name'] == 'VolGroup00'
48 assert result[1]['vg_name'] == 'ceph_vg'
d2e6a577
FG
49
50
51class TestGetAPILvs(object):
52
53 def test_report_is_emtpy(self, monkeypatch):
b5b8bbf5 54 monkeypatch.setattr(api.process, 'call', lambda x: ('', '', 0))
d2e6a577
FG
55 assert api.get_api_lvs() == []
56
57 def test_report_has_stuff(self, monkeypatch):
b5b8bbf5 58 report = [' ;/path;VolGroup00;root']
d2e6a577 59 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
60 result = api.get_api_lvs()
61 assert result[0]['lv_name'] == 'VolGroup00'
d2e6a577
FG
62
63 def test_report_has_multiple_items(self, monkeypatch):
b5b8bbf5 64 report = [' ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg']
d2e6a577 65 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
66 result = api.get_api_lvs()
67 assert result[0]['lv_name'] == 'VolName'
68 assert result[1]['lv_name'] == 'ceph_lv'
d2e6a577
FG
69
70
71@pytest.fixture
72def volumes(monkeypatch):
b5b8bbf5 73 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
d2e6a577
FG
74 volumes = api.Volumes()
75 volumes._purge()
76 return volumes
77
78
181888fb
FG
79@pytest.fixture
80def pvolumes(monkeypatch):
81 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
82 pvolumes = api.PVolumes()
83 pvolumes._purge()
84 return pvolumes
85
86
d2e6a577
FG
87@pytest.fixture
88def volume_groups(monkeypatch):
b5b8bbf5 89 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
d2e6a577
FG
90 vgs = api.VolumeGroups()
91 vgs._purge()
92 return vgs
93
94
95class TestGetLV(object):
96
97 def test_nothing_is_passed_in(self):
98 # so we return a None
99 assert api.get_lv() is None
100
101 def test_single_lv_is_matched(self, volumes, monkeypatch):
102 FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data")
103 volumes.append(FooVolume)
104 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
105 assert api.get_lv(lv_name='foo') == FooVolume
106
181888fb
FG
107 def test_single_lv_is_matched_by_uuid(self, volumes, monkeypatch):
108 FooVolume = api.Volume(
109 lv_name='foo', lv_path='/dev/vg/foo',
110 lv_uuid='1111', lv_tags="ceph.type=data")
111 volumes.append(FooVolume)
112 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
113 assert api.get_lv(lv_uuid='1111') == FooVolume
114
115
116class TestGetPV(object):
117
118 def test_nothing_is_passed_in(self):
119 # so we return a None
120 assert api.get_pv() is None
121
122 def test_single_pv_is_not_matched(self, pvolumes, monkeypatch):
123 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
124 pvolumes.append(FooPVolume)
125 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
126 assert api.get_pv(pv_uuid='foo') is None
127
128 def test_single_pv_is_matched(self, pvolumes, monkeypatch):
129 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
130 pvolumes.append(FooPVolume)
131 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
132 assert api.get_pv(pv_uuid='0000') == FooPVolume
133
134 def test_single_pv_is_matched_by_uuid(self, pvolumes, monkeypatch):
135 FooPVolume = api.PVolume(
136 pv_name='/dev/vg/foo',
137 pv_uuid='1111', pv_tags="ceph.type=data")
138 pvolumes.append(FooPVolume)
139 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
140 assert api.get_pv(pv_uuid='1111') == FooPVolume
141
142
143class TestPVolumes(object):
144
145 def test_filter_by_tag_does_not_match_one(self, pvolumes, monkeypatch):
146 pv_tags = "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa"
147 FooPVolume = api.PVolume(
148 pv_name='/dev/vg/foo',
149 pv_uuid='1111', pv_tags=pv_tags)
150 pvolumes.append(FooPVolume)
151 pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '2'})
152 assert pvolumes == []
153
154 def test_filter_by_tags_matches(self, pvolumes, monkeypatch):
155 pv_tags = "ceph.type=journal,ceph.osd_id=1"
156 FooPVolume = api.PVolume(
157 pv_name='/dev/vg/foo',
158 pv_uuid='1111', pv_tags=pv_tags)
159 pvolumes.append(FooPVolume)
160 pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '1'})
161 assert pvolumes == [FooPVolume]
162
d2e6a577
FG
163
164class TestGetVG(object):
165
166 def test_nothing_is_passed_in(self):
167 # so we return a None
168 assert api.get_vg() is None
169
170 def test_single_vg_is_matched(self, volume_groups, monkeypatch):
171 FooVG = api.VolumeGroup(vg_name='foo')
172 volume_groups.append(FooVG)
173 monkeypatch.setattr(api, 'VolumeGroups', lambda: volume_groups)
174 assert api.get_vg(vg_name='foo') == FooVG
175
176
177class TestVolumes(object):
178
179 def test_volume_get_has_no_volumes(self, volumes):
180 assert volumes.get() is None
181
182 def test_volume_get_filtered_has_no_volumes(self, volumes):
183 assert volumes.get(lv_name='ceph') is None
184
185 def test_volume_has_multiple_matches(self, volumes):
186 volume1 = volume2 = api.Volume(lv_name='foo', lv_path='/dev/vg/lv', lv_tags='')
187 volumes.append(volume1)
188 volumes.append(volume2)
189 with pytest.raises(exceptions.MultipleLVsError):
190 volumes.get(lv_name='foo')
191
192 def test_find_the_correct_one(self, volumes):
193 volume1 = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags='')
194 volume2 = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='')
195 volumes.append(volume1)
196 volumes.append(volume2)
197 assert volumes.get(lv_name='volume1') == volume1
198
199 def test_filter_by_tag(self, volumes):
200 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
201 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
202 journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.type=journal')
203 volumes.append(osd)
204 volumes.append(journal)
205 volumes.filter(lv_tags={'ceph.type': 'data'})
206 assert len(volumes) == 1
207 assert volumes[0].lv_name == 'volume1'
208
181888fb
FG
209 def test_filter_by_tag_does_not_match_one(self, volumes):
210 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
211 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
212 journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.osd_id=1,ceph.type=journal')
213 volumes.append(osd)
214 volumes.append(journal)
215 # note the different osd_id!
216 volumes.filter(lv_tags={'ceph.type': 'data', 'ceph.osd_id': '2'})
217 assert volumes == []
218
d2e6a577
FG
219 def test_filter_by_vg_name(self, volumes):
220 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
221 osd = api.Volume(lv_name='volume1', vg_name='ceph_vg', lv_tags=lv_tags)
222 journal = api.Volume(lv_name='volume2', vg_name='system_vg', lv_tags='ceph.type=journal')
223 volumes.append(osd)
224 volumes.append(journal)
225 volumes.filter(vg_name='ceph_vg')
226 assert len(volumes) == 1
227 assert volumes[0].lv_name == 'volume1'
228
229 def test_filter_by_lv_path(self, volumes):
230 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_tags='')
231 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_tags='')
232 volumes.append(osd)
233 volumes.append(journal)
234 volumes.filter(lv_path='/dev/volume1')
235 assert len(volumes) == 1
236 assert volumes[0].lv_name == 'volume1'
237
181888fb
FG
238 def test_filter_by_lv_uuid(self, volumes):
239 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
240 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
241 volumes.append(osd)
242 volumes.append(journal)
243 volumes.filter(lv_uuid='1111')
244 assert len(volumes) == 1
245 assert volumes[0].lv_name == 'volume1'
246
247 def test_filter_by_lv_uuid_nothing_found(self, volumes):
248 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='')
249 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='')
250 volumes.append(osd)
251 volumes.append(journal)
252 volumes.filter(lv_uuid='22222')
253 assert volumes == []
254
d2e6a577
FG
255 def test_filter_requires_params(self, volumes):
256 with pytest.raises(TypeError):
257 volumes.filter()
258
259
260class TestVolumeGroups(object):
261
262 def test_volume_get_has_no_volume_groups(self, volume_groups):
263 assert volume_groups.get() is None
264
265 def test_volume_get_filtered_has_no_volumes(self, volume_groups):
266 assert volume_groups.get(vg_name='ceph') is None
267
268 def test_volume_has_multiple_matches(self, volume_groups):
269 volume1 = volume2 = api.VolumeGroup(vg_name='foo', lv_path='/dev/vg/lv', lv_tags='')
270 volume_groups.append(volume1)
271 volume_groups.append(volume2)
272 with pytest.raises(exceptions.MultipleVGsError):
273 volume_groups.get(vg_name='foo')
274
275 def test_find_the_correct_one(self, volume_groups):
276 volume1 = api.VolumeGroup(vg_name='volume1', lv_tags='')
277 volume2 = api.VolumeGroup(vg_name='volume2', lv_tags='')
278 volume_groups.append(volume1)
279 volume_groups.append(volume2)
280 assert volume_groups.get(vg_name='volume1') == volume1
281
282 def test_filter_by_tag(self, volume_groups):
283 vg_tags = "ceph.group=dmcache"
284 osd = api.VolumeGroup(vg_name='volume1', vg_tags=vg_tags)
285 journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain')
286 volume_groups.append(osd)
287 volume_groups.append(journal)
288 volume_groups.filter(vg_tags={'ceph.group': 'dmcache'})
289 assert len(volume_groups) == 1
290 assert volume_groups[0].vg_name == 'volume1'
291
181888fb
FG
292 def test_filter_by_tag_does_not_match_one(self, volume_groups):
293 vg_tags = "ceph.group=dmcache,ceph.disk_type=ssd"
294 osd = api.VolumeGroup(vg_name='volume1', vg_path='/dev/vg/lv', vg_tags=vg_tags)
295 volume_groups.append(osd)
296 volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'})
297 assert volume_groups == []
298
d2e6a577
FG
299 def test_filter_by_vg_name(self, volume_groups):
300 vg_tags = "ceph.type=data,ceph.fsid=000-aaa"
301 osd = api.VolumeGroup(vg_name='ceph_vg', vg_tags=vg_tags)
302 journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal')
303 volume_groups.append(osd)
304 volume_groups.append(journal)
305 volume_groups.filter(vg_name='ceph_vg')
306 assert len(volume_groups) == 1
307 assert volume_groups[0].vg_name == 'ceph_vg'
308
309 def test_filter_requires_params(self, volume_groups):
310 with pytest.raises(TypeError):
311 volume_groups.filter()
312
313
314class TestCreateLV(object):
315
316 def setup(self):
317 self.foo_volume = api.Volume(lv_name='foo', lv_path='/path', vg_name='foo_group', lv_tags='')
318
319 def test_uses_size(self, monkeypatch, capture):
320 monkeypatch.setattr(process, 'run', capture)
321 monkeypatch.setattr(process, 'call', capture)
322 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
323 api.create_lv('foo', 'foo_group', size=5, type='data')
324 expected = ['sudo', 'lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group']
325 assert capture.calls[0]['args'][0] == expected
326
327 def test_calls_to_set_type_tag(self, monkeypatch, capture):
328 monkeypatch.setattr(process, 'run', capture)
329 monkeypatch.setattr(process, 'call', capture)
330 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
331 api.create_lv('foo', 'foo_group', size=5, type='data')
332 ceph_tag = ['sudo', 'lvchange', '--addtag', 'ceph.type=data', '/path']
333 assert capture.calls[1]['args'][0] == ceph_tag
334
335 def test_calls_to_set_data_tag(self, monkeypatch, capture):
336 monkeypatch.setattr(process, 'run', capture)
337 monkeypatch.setattr(process, 'call', capture)
338 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
339 api.create_lv('foo', 'foo_group', size=5, type='data')
340 data_tag = ['sudo', 'lvchange', '--addtag', 'ceph.data_device=/path', '/path']
341 assert capture.calls[2]['args'][0] == data_tag