]>
Commit | Line | Data |
---|---|---|
1 | import pytest | |
2 | from ceph_volume import process, exceptions | |
3 | from ceph_volume.api import lvm as api | |
4 | ||
5 | ||
6 | class TestParseTags(object): | |
7 | ||
8 | def test_no_tags_means_empty_dict(self): | |
9 | result = api.parse_tags('') | |
10 | assert result == {} | |
11 | ||
12 | def test_single_tag_gets_parsed(self): | |
13 | result = api.parse_tags('ceph.osd_something=1') | |
14 | assert result == {'ceph.osd_something': '1'} | |
15 | ||
16 | def test_non_ceph_tags_are_skipped(self): | |
17 | result = api.parse_tags('foo') | |
18 | assert result == {} | |
19 | ||
20 | def test_mixed_non_ceph_tags(self): | |
21 | result = api.parse_tags('foo,ceph.bar=1') | |
22 | assert result == {'ceph.bar': '1'} | |
23 | ||
24 | def test_multiple_csv_expands_in_dict(self): | |
25 | result = api.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000') | |
26 | # assert them piecemeal to avoid the un-ordered dict nature | |
27 | assert result['ceph.osd_something'] == '1' | |
28 | assert result['ceph.foo'] == '2' | |
29 | assert result['ceph.fsid'] == '0000' | |
30 | ||
31 | ||
32 | class TestGetAPIVgs(object): | |
33 | ||
34 | def test_report_is_emtpy(self, monkeypatch): | |
35 | monkeypatch.setattr(api.process, 'call', lambda x: ('\n\n', '', 0)) | |
36 | assert api.get_api_vgs() == [] | |
37 | ||
38 | def test_report_has_stuff(self, monkeypatch): | |
39 | report = [' VolGroup00'] | |
40 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) | |
41 | assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}] | |
42 | ||
43 | def test_report_has_stuff_with_empty_attrs(self, monkeypatch): | |
44 | report = [' VolGroup00 ;;;;;;9g'] | |
45 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) | |
46 | result = api.get_api_vgs()[0] | |
47 | assert len(result.keys()) == 7 | |
48 | assert result['vg_name'] == 'VolGroup00' | |
49 | assert result['vg_free'] == '9g' | |
50 | ||
51 | def test_report_has_multiple_items(self, monkeypatch): | |
52 | report = [' VolGroup00;;;;;;;', ' ceph_vg;;;;;;;'] | |
53 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) | |
54 | result = api.get_api_vgs() | |
55 | assert result[0]['vg_name'] == 'VolGroup00' | |
56 | assert result[1]['vg_name'] == 'ceph_vg' | |
57 | ||
58 | ||
59 | class TestGetAPILvs(object): | |
60 | ||
61 | def test_report_is_emtpy(self, monkeypatch): | |
62 | monkeypatch.setattr(api.process, 'call', lambda x: ('', '', 0)) | |
63 | assert api.get_api_lvs() == [] | |
64 | ||
65 | def test_report_has_stuff(self, monkeypatch): | |
66 | report = [' ;/path;VolGroup00;root'] | |
67 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) | |
68 | result = api.get_api_lvs() | |
69 | assert result[0]['lv_name'] == 'VolGroup00' | |
70 | ||
71 | def test_report_has_multiple_items(self, monkeypatch): | |
72 | report = [' ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg'] | |
73 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) | |
74 | result = api.get_api_lvs() | |
75 | assert result[0]['lv_name'] == 'VolName' | |
76 | assert result[1]['lv_name'] == 'ceph_lv' | |
77 | ||
78 | ||
79 | @pytest.fixture | |
80 | def volumes(monkeypatch): | |
81 | monkeypatch.setattr(process, 'call', lambda x: ('', '', 0)) | |
82 | volumes = api.Volumes() | |
83 | volumes._purge() | |
84 | # also patch api.Volumes so that when it is called, it will use the newly | |
85 | # created fixture, with whatever the test method wants to append to it | |
86 | monkeypatch.setattr(api, 'Volumes', lambda: volumes) | |
87 | return volumes | |
88 | ||
89 | ||
90 | @pytest.fixture | |
91 | def pvolumes(monkeypatch): | |
92 | monkeypatch.setattr(process, 'call', lambda x: ('', '', 0)) | |
93 | pvolumes = api.PVolumes() | |
94 | pvolumes._purge() | |
95 | return pvolumes | |
96 | ||
97 | ||
98 | @pytest.fixture | |
99 | def volume_groups(monkeypatch): | |
100 | monkeypatch.setattr(process, 'call', lambda x: ('', '', 0)) | |
101 | vgs = api.VolumeGroups() | |
102 | vgs._purge() | |
103 | return vgs | |
104 | ||
105 | ||
106 | class TestGetLV(object): | |
107 | ||
108 | def test_nothing_is_passed_in(self): | |
109 | # so we return a None | |
110 | assert api.get_lv() is None | |
111 | ||
112 | def test_single_lv_is_matched(self, volumes, monkeypatch): | |
113 | FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data") | |
114 | volumes.append(FooVolume) | |
115 | monkeypatch.setattr(api, 'Volumes', lambda: volumes) | |
116 | assert api.get_lv(lv_name='foo') == FooVolume | |
117 | ||
118 | def test_single_lv_is_matched_by_uuid(self, volumes, monkeypatch): | |
119 | FooVolume = api.Volume( | |
120 | lv_name='foo', lv_path='/dev/vg/foo', | |
121 | lv_uuid='1111', lv_tags="ceph.type=data") | |
122 | volumes.append(FooVolume) | |
123 | monkeypatch.setattr(api, 'Volumes', lambda: volumes) | |
124 | assert api.get_lv(lv_uuid='1111') == FooVolume | |
125 | ||
126 | ||
127 | class TestGetPV(object): | |
128 | ||
129 | def test_nothing_is_passed_in(self): | |
130 | # so we return a None | |
131 | assert api.get_pv() is None | |
132 | ||
133 | def test_single_pv_is_not_matched(self, pvolumes, monkeypatch): | |
134 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={}, vg_name="vg") | |
135 | pvolumes.append(FooPVolume) | |
136 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
137 | assert api.get_pv(pv_uuid='foo') is None | |
138 | ||
139 | def test_single_pv_is_matched(self, pvolumes, monkeypatch): | |
140 | FooPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={}) | |
141 | pvolumes.append(FooPVolume) | |
142 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
143 | assert api.get_pv(pv_uuid='0000') == FooPVolume | |
144 | ||
145 | def test_single_pv_is_matched_by_uuid(self, pvolumes, monkeypatch): | |
146 | FooPVolume = api.PVolume( | |
147 | pv_name='/dev/vg/foo', | |
148 | pv_uuid='1111', pv_tags="ceph.type=data", vg_name="vg") | |
149 | pvolumes.append(FooPVolume) | |
150 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
151 | assert api.get_pv(pv_uuid='1111') == FooPVolume | |
152 | ||
153 | def test_vg_name_is_set(self, pvolumes, monkeypatch): | |
154 | FooPVolume = api.PVolume( | |
155 | pv_name='/dev/vg/foo', | |
156 | pv_uuid='1111', pv_tags="ceph.type=data", vg_name="vg") | |
157 | pvolumes.append(FooPVolume) | |
158 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
159 | pv = api.get_pv(pv_name="/dev/vg/foo") | |
160 | assert pv.vg_name == "vg" | |
161 | ||
162 | ||
163 | class TestPVolumes(object): | |
164 | ||
165 | def test_filter_by_tag_does_not_match_one(self, pvolumes, monkeypatch): | |
166 | pv_tags = "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa" | |
167 | FooPVolume = api.PVolume( | |
168 | pv_name='/dev/vg/foo', | |
169 | pv_uuid='1111', pv_tags=pv_tags, vg_name='vg') | |
170 | pvolumes.append(FooPVolume) | |
171 | pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '2'}) | |
172 | assert pvolumes == [] | |
173 | ||
174 | def test_filter_by_tags_matches(self, pvolumes, monkeypatch): | |
175 | pv_tags = "ceph.type=journal,ceph.osd_id=1" | |
176 | FooPVolume = api.PVolume( | |
177 | pv_name='/dev/vg/foo', | |
178 | pv_uuid='1111', pv_tags=pv_tags, vg_name="vg") | |
179 | pvolumes.append(FooPVolume) | |
180 | pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '1'}) | |
181 | assert pvolumes == [FooPVolume] | |
182 | ||
183 | ||
184 | class TestGetVG(object): | |
185 | ||
186 | def test_nothing_is_passed_in(self): | |
187 | # so we return a None | |
188 | assert api.get_vg() is None | |
189 | ||
190 | def test_single_vg_is_matched(self, volume_groups, monkeypatch): | |
191 | FooVG = api.VolumeGroup(vg_name='foo') | |
192 | volume_groups.append(FooVG) | |
193 | monkeypatch.setattr(api, 'VolumeGroups', lambda: volume_groups) | |
194 | assert api.get_vg(vg_name='foo') == FooVG | |
195 | ||
196 | ||
197 | class TestVolumes(object): | |
198 | ||
199 | def test_volume_get_has_no_volumes(self, volumes): | |
200 | assert volumes.get() is None | |
201 | ||
202 | def test_volume_get_filtered_has_no_volumes(self, volumes): | |
203 | assert volumes.get(lv_name='ceph') is None | |
204 | ||
205 | def test_volume_has_multiple_matches(self, volumes): | |
206 | volume1 = volume2 = api.Volume(lv_name='foo', lv_path='/dev/vg/lv', lv_tags='') | |
207 | volumes.append(volume1) | |
208 | volumes.append(volume2) | |
209 | with pytest.raises(exceptions.MultipleLVsError): | |
210 | volumes.get(lv_name='foo') | |
211 | ||
212 | def test_as_dict_infers_type_from_tags(self, volumes): | |
213 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
214 | osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) | |
215 | volumes.append(osd) | |
216 | result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict() | |
217 | assert result['type'] == 'data' | |
218 | ||
219 | def test_as_dict_populates_path_from_lv_api(self, volumes): | |
220 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
221 | osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) | |
222 | volumes.append(osd) | |
223 | result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict() | |
224 | assert result['path'] == '/dev/vg/lv' | |
225 | ||
226 | def test_find_the_correct_one(self, volumes): | |
227 | volume1 = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags='') | |
228 | volume2 = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='') | |
229 | volumes.append(volume1) | |
230 | volumes.append(volume2) | |
231 | assert volumes.get(lv_name='volume1') == volume1 | |
232 | ||
233 | def test_filter_by_tag(self, volumes): | |
234 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
235 | osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) | |
236 | journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.type=journal') | |
237 | volumes.append(osd) | |
238 | volumes.append(journal) | |
239 | volumes.filter(lv_tags={'ceph.type': 'data'}) | |
240 | assert len(volumes) == 1 | |
241 | assert volumes[0].lv_name == 'volume1' | |
242 | ||
243 | def test_filter_by_tag_does_not_match_one(self, volumes): | |
244 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
245 | osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) | |
246 | journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.osd_id=1,ceph.type=journal') | |
247 | volumes.append(osd) | |
248 | volumes.append(journal) | |
249 | # note the different osd_id! | |
250 | volumes.filter(lv_tags={'ceph.type': 'data', 'ceph.osd_id': '2'}) | |
251 | assert volumes == [] | |
252 | ||
253 | def test_filter_by_vg_name(self, volumes): | |
254 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
255 | osd = api.Volume(lv_name='volume1', vg_name='ceph_vg', lv_tags=lv_tags) | |
256 | journal = api.Volume(lv_name='volume2', vg_name='system_vg', lv_tags='ceph.type=journal') | |
257 | volumes.append(osd) | |
258 | volumes.append(journal) | |
259 | volumes.filter(vg_name='ceph_vg') | |
260 | assert len(volumes) == 1 | |
261 | assert volumes[0].lv_name == 'volume1' | |
262 | ||
263 | def test_filter_by_lv_path(self, volumes): | |
264 | osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_tags='') | |
265 | journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_tags='') | |
266 | volumes.append(osd) | |
267 | volumes.append(journal) | |
268 | volumes.filter(lv_path='/dev/volume1') | |
269 | assert len(volumes) == 1 | |
270 | assert volumes[0].lv_name == 'volume1' | |
271 | ||
272 | def test_filter_by_lv_uuid(self, volumes): | |
273 | osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='') | |
274 | journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='') | |
275 | volumes.append(osd) | |
276 | volumes.append(journal) | |
277 | volumes.filter(lv_uuid='1111') | |
278 | assert len(volumes) == 1 | |
279 | assert volumes[0].lv_name == 'volume1' | |
280 | ||
281 | def test_filter_by_lv_uuid_nothing_found(self, volumes): | |
282 | osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='') | |
283 | journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='') | |
284 | volumes.append(osd) | |
285 | volumes.append(journal) | |
286 | volumes.filter(lv_uuid='22222') | |
287 | assert volumes == [] | |
288 | ||
289 | def test_filter_requires_params(self, volumes): | |
290 | with pytest.raises(TypeError): | |
291 | volumes.filter() | |
292 | ||
293 | ||
294 | class TestVolumeGroups(object): | |
295 | ||
296 | def test_volume_get_has_no_volume_groups(self, volume_groups): | |
297 | assert volume_groups.get() is None | |
298 | ||
299 | def test_volume_get_filtered_has_no_volumes(self, volume_groups): | |
300 | assert volume_groups.get(vg_name='ceph') is None | |
301 | ||
302 | def test_volume_has_multiple_matches(self, volume_groups): | |
303 | volume1 = volume2 = api.VolumeGroup(vg_name='foo', lv_path='/dev/vg/lv', lv_tags='') | |
304 | volume_groups.append(volume1) | |
305 | volume_groups.append(volume2) | |
306 | with pytest.raises(exceptions.MultipleVGsError): | |
307 | volume_groups.get(vg_name='foo') | |
308 | ||
309 | def test_find_the_correct_one(self, volume_groups): | |
310 | volume1 = api.VolumeGroup(vg_name='volume1', lv_tags='') | |
311 | volume2 = api.VolumeGroup(vg_name='volume2', lv_tags='') | |
312 | volume_groups.append(volume1) | |
313 | volume_groups.append(volume2) | |
314 | assert volume_groups.get(vg_name='volume1') == volume1 | |
315 | ||
316 | def test_filter_by_tag(self, volume_groups): | |
317 | vg_tags = "ceph.group=dmcache" | |
318 | osd = api.VolumeGroup(vg_name='volume1', vg_tags=vg_tags) | |
319 | journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain') | |
320 | volume_groups.append(osd) | |
321 | volume_groups.append(journal) | |
322 | volume_groups.filter(vg_tags={'ceph.group': 'dmcache'}) | |
323 | assert len(volume_groups) == 1 | |
324 | assert volume_groups[0].vg_name == 'volume1' | |
325 | ||
326 | def test_filter_by_tag_does_not_match_one(self, volume_groups): | |
327 | vg_tags = "ceph.group=dmcache,ceph.disk_type=ssd" | |
328 | osd = api.VolumeGroup(vg_name='volume1', vg_path='/dev/vg/lv', vg_tags=vg_tags) | |
329 | volume_groups.append(osd) | |
330 | volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'}) | |
331 | assert volume_groups == [] | |
332 | ||
333 | def test_filter_by_vg_name(self, volume_groups): | |
334 | vg_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
335 | osd = api.VolumeGroup(vg_name='ceph_vg', vg_tags=vg_tags) | |
336 | journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal') | |
337 | volume_groups.append(osd) | |
338 | volume_groups.append(journal) | |
339 | volume_groups.filter(vg_name='ceph_vg') | |
340 | assert len(volume_groups) == 1 | |
341 | assert volume_groups[0].vg_name == 'ceph_vg' | |
342 | ||
343 | def test_filter_requires_params(self, volume_groups): | |
344 | with pytest.raises(TypeError): | |
345 | volume_groups.filter() | |
346 | ||
347 | ||
348 | class TestGetLVFromArgument(object): | |
349 | ||
350 | def setup(self): | |
351 | self.foo_volume = api.Volume( | |
352 | lv_name='foo', lv_path='/path/to/lv', | |
353 | vg_name='foo_group', lv_tags='' | |
354 | ) | |
355 | ||
356 | def test_non_absolute_path_is_not_valid(self, volumes): | |
357 | volumes.append(self.foo_volume) | |
358 | assert api.get_lv_from_argument('foo') is None | |
359 | ||
360 | def test_too_many_slashes_is_invalid(self, volumes): | |
361 | volumes.append(self.foo_volume) | |
362 | assert api.get_lv_from_argument('path/to/lv') is None | |
363 | ||
364 | def test_absolute_path_is_not_lv(self, volumes): | |
365 | volumes.append(self.foo_volume) | |
366 | assert api.get_lv_from_argument('/path') is None | |
367 | ||
368 | def test_absolute_path_is_lv(self, volumes): | |
369 | volumes.append(self.foo_volume) | |
370 | assert api.get_lv_from_argument('/path/to/lv') == self.foo_volume | |
371 | ||
372 | ||
373 | class TestRemoveLV(object): | |
374 | ||
375 | def test_removes_lv(self, monkeypatch): | |
376 | def mock_call(cmd, **kw): | |
377 | return ('', '', 0) | |
378 | monkeypatch.setattr(process, 'call', mock_call) | |
379 | assert api.remove_lv("vg/lv") | |
380 | ||
381 | def test_fails_to_remove_lv(self, monkeypatch): | |
382 | def mock_call(cmd, **kw): | |
383 | return ('', '', 1) | |
384 | monkeypatch.setattr(process, 'call', mock_call) | |
385 | with pytest.raises(RuntimeError): | |
386 | api.remove_lv("vg/lv") | |
387 | ||
388 | ||
389 | class TestCreateLV(object): | |
390 | ||
391 | def setup(self): | |
392 | self.foo_volume = api.Volume(lv_name='foo', lv_path='/path', vg_name='foo_group', lv_tags='') | |
393 | ||
394 | def test_uses_size(self, monkeypatch, capture): | |
395 | monkeypatch.setattr(process, 'run', capture) | |
396 | monkeypatch.setattr(process, 'call', capture) | |
397 | monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) | |
398 | api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'}) | |
399 | expected = ['lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group'] | |
400 | assert capture.calls[0]['args'][0] == expected | |
401 | ||
402 | def test_calls_to_set_type_tag(self, monkeypatch, capture): | |
403 | monkeypatch.setattr(process, 'run', capture) | |
404 | monkeypatch.setattr(process, 'call', capture) | |
405 | monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) | |
406 | api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'}) | |
407 | ceph_tag = ['lvchange', '--addtag', 'ceph.type=data', '/path'] | |
408 | assert capture.calls[1]['args'][0] == ceph_tag | |
409 | ||
410 | def test_calls_to_set_data_tag(self, monkeypatch, capture): | |
411 | monkeypatch.setattr(process, 'run', capture) | |
412 | monkeypatch.setattr(process, 'call', capture) | |
413 | monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) | |
414 | api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'}) | |
415 | data_tag = ['lvchange', '--addtag', 'ceph.data_device=/path', '/path'] | |
416 | assert capture.calls[2]['args'][0] == data_tag |