]>
Commit | Line | Data |
---|---|---|
d2e6a577 FG |
1 | import pytest |
2 | from ceph_volume import process, exceptions | |
3efd9988 | 3 | from ceph_volume.api import lvm as api |
d2e6a577 FG |
4 | |
5 | ||
6 | class TestParseTags(object): | |
7 | ||
8 | def test_no_tags_means_empty_dict(self): | |
9 | result = api.parse_tags('') | |
10 | assert result == {} | |
11 | ||
12 | def test_single_tag_gets_parsed(self): | |
13 | result = api.parse_tags('ceph.osd_something=1') | |
14 | assert result == {'ceph.osd_something': '1'} | |
15 | ||
16 | def test_multiple_csv_expands_in_dict(self): | |
17 | result = api.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000') | |
18 | # assert them piecemeal to avoid the un-ordered dict nature | |
19 | assert result['ceph.osd_something'] == '1' | |
20 | assert result['ceph.foo'] == '2' | |
21 | assert result['ceph.fsid'] == '0000' | |
22 | ||
23 | ||
24 | class TestGetAPIVgs(object): | |
25 | ||
26 | def test_report_is_emtpy(self, monkeypatch): | |
b5b8bbf5 | 27 | monkeypatch.setattr(api.process, 'call', lambda x: ('\n\n', '', 0)) |
d2e6a577 FG |
28 | assert api.get_api_vgs() == [] |
29 | ||
30 | def test_report_has_stuff(self, monkeypatch): | |
b5b8bbf5 | 31 | report = [' VolGroup00'] |
d2e6a577 FG |
32 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) |
33 | assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}] | |
34 | ||
b5b8bbf5 FG |
35 | def test_report_has_stuff_with_empty_attrs(self, monkeypatch): |
36 | report = [' VolGroup00 ;;;;;;9g'] | |
d2e6a577 | 37 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) |
b5b8bbf5 FG |
38 | result = api.get_api_vgs()[0] |
39 | assert len(result.keys()) == 7 | |
40 | assert result['vg_name'] == 'VolGroup00' | |
41 | assert result['vg_free'] == '9g' | |
d2e6a577 | 42 | |
b5b8bbf5 FG |
43 | def test_report_has_multiple_items(self, monkeypatch): |
44 | report = [' VolGroup00;;;;;;;', ' ceph_vg;;;;;;;'] | |
d2e6a577 | 45 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) |
b5b8bbf5 FG |
46 | result = api.get_api_vgs() |
47 | assert result[0]['vg_name'] == 'VolGroup00' | |
48 | assert result[1]['vg_name'] == 'ceph_vg' | |
d2e6a577 FG |
49 | |
50 | ||
51 | class TestGetAPILvs(object): | |
52 | ||
53 | def test_report_is_emtpy(self, monkeypatch): | |
b5b8bbf5 | 54 | monkeypatch.setattr(api.process, 'call', lambda x: ('', '', 0)) |
d2e6a577 FG |
55 | assert api.get_api_lvs() == [] |
56 | ||
57 | def test_report_has_stuff(self, monkeypatch): | |
b5b8bbf5 | 58 | report = [' ;/path;VolGroup00;root'] |
d2e6a577 | 59 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) |
b5b8bbf5 FG |
60 | result = api.get_api_lvs() |
61 | assert result[0]['lv_name'] == 'VolGroup00' | |
d2e6a577 FG |
62 | |
63 | def test_report_has_multiple_items(self, monkeypatch): | |
b5b8bbf5 | 64 | report = [' ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg'] |
d2e6a577 | 65 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) |
b5b8bbf5 FG |
66 | result = api.get_api_lvs() |
67 | assert result[0]['lv_name'] == 'VolName' | |
68 | assert result[1]['lv_name'] == 'ceph_lv' | |
d2e6a577 FG |
69 | |
70 | ||
71 | @pytest.fixture | |
72 | def volumes(monkeypatch): | |
b5b8bbf5 | 73 | monkeypatch.setattr(process, 'call', lambda x: ('', '', 0)) |
d2e6a577 FG |
74 | volumes = api.Volumes() |
75 | volumes._purge() | |
3efd9988 FG |
76 | # also patch api.Volumes so that when it is called, it will use the newly |
77 | # created fixture, with whatever the test method wants to append to it | |
78 | monkeypatch.setattr(api, 'Volumes', lambda: volumes) | |
d2e6a577 FG |
79 | return volumes |
80 | ||
81 | ||
181888fb FG |
82 | @pytest.fixture |
83 | def pvolumes(monkeypatch): | |
84 | monkeypatch.setattr(process, 'call', lambda x: ('', '', 0)) | |
85 | pvolumes = api.PVolumes() | |
86 | pvolumes._purge() | |
87 | return pvolumes | |
88 | ||
89 | ||
d2e6a577 FG |
90 | @pytest.fixture |
91 | def volume_groups(monkeypatch): | |
b5b8bbf5 | 92 | monkeypatch.setattr(process, 'call', lambda x: ('', '', 0)) |
d2e6a577 FG |
93 | vgs = api.VolumeGroups() |
94 | vgs._purge() | |
95 | return vgs | |
96 | ||
97 | ||
98 | class TestGetLV(object): | |
99 | ||
100 | def test_nothing_is_passed_in(self): | |
101 | # so we return a None | |
102 | assert api.get_lv() is None | |
103 | ||
104 | def test_single_lv_is_matched(self, volumes, monkeypatch): | |
105 | FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data") | |
106 | volumes.append(FooVolume) | |
107 | monkeypatch.setattr(api, 'Volumes', lambda: volumes) | |
108 | assert api.get_lv(lv_name='foo') == FooVolume | |
109 | ||
181888fb FG |
110 | def test_single_lv_is_matched_by_uuid(self, volumes, monkeypatch): |
111 | FooVolume = api.Volume( | |
112 | lv_name='foo', lv_path='/dev/vg/foo', | |
113 | lv_uuid='1111', lv_tags="ceph.type=data") | |
114 | volumes.append(FooVolume) | |
115 | monkeypatch.setattr(api, 'Volumes', lambda: volumes) | |
116 | assert api.get_lv(lv_uuid='1111') == FooVolume | |
117 | ||
118 | ||
119 | class TestGetPV(object): | |
120 | ||
121 | def test_nothing_is_passed_in(self): | |
122 | # so we return a None | |
123 | assert api.get_pv() is None | |
124 | ||
125 | def test_single_pv_is_not_matched(self, pvolumes, monkeypatch): | |
126 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={}) | |
127 | pvolumes.append(FooPVolume) | |
128 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
129 | assert api.get_pv(pv_uuid='foo') is None | |
130 | ||
131 | def test_single_pv_is_matched(self, pvolumes, monkeypatch): | |
132 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={}) | |
133 | pvolumes.append(FooPVolume) | |
134 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
135 | assert api.get_pv(pv_uuid='0000') == FooPVolume | |
136 | ||
137 | def test_single_pv_is_matched_by_uuid(self, pvolumes, monkeypatch): | |
138 | FooPVolume = api.PVolume( | |
139 | pv_name='/dev/vg/foo', | |
140 | pv_uuid='1111', pv_tags="ceph.type=data") | |
141 | pvolumes.append(FooPVolume) | |
142 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
143 | assert api.get_pv(pv_uuid='1111') == FooPVolume | |
144 | ||
145 | ||
146 | class TestPVolumes(object): | |
147 | ||
148 | def test_filter_by_tag_does_not_match_one(self, pvolumes, monkeypatch): | |
149 | pv_tags = "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa" | |
150 | FooPVolume = api.PVolume( | |
151 | pv_name='/dev/vg/foo', | |
152 | pv_uuid='1111', pv_tags=pv_tags) | |
153 | pvolumes.append(FooPVolume) | |
154 | pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '2'}) | |
155 | assert pvolumes == [] | |
156 | ||
157 | def test_filter_by_tags_matches(self, pvolumes, monkeypatch): | |
158 | pv_tags = "ceph.type=journal,ceph.osd_id=1" | |
159 | FooPVolume = api.PVolume( | |
160 | pv_name='/dev/vg/foo', | |
161 | pv_uuid='1111', pv_tags=pv_tags) | |
162 | pvolumes.append(FooPVolume) | |
163 | pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '1'}) | |
164 | assert pvolumes == [FooPVolume] | |
165 | ||
d2e6a577 FG |
166 | |
167 | class TestGetVG(object): | |
168 | ||
169 | def test_nothing_is_passed_in(self): | |
170 | # so we return a None | |
171 | assert api.get_vg() is None | |
172 | ||
173 | def test_single_vg_is_matched(self, volume_groups, monkeypatch): | |
174 | FooVG = api.VolumeGroup(vg_name='foo') | |
175 | volume_groups.append(FooVG) | |
176 | monkeypatch.setattr(api, 'VolumeGroups', lambda: volume_groups) | |
177 | assert api.get_vg(vg_name='foo') == FooVG | |
178 | ||
179 | ||
180 | class TestVolumes(object): | |
181 | ||
182 | def test_volume_get_has_no_volumes(self, volumes): | |
183 | assert volumes.get() is None | |
184 | ||
185 | def test_volume_get_filtered_has_no_volumes(self, volumes): | |
186 | assert volumes.get(lv_name='ceph') is None | |
187 | ||
188 | def test_volume_has_multiple_matches(self, volumes): | |
189 | volume1 = volume2 = api.Volume(lv_name='foo', lv_path='/dev/vg/lv', lv_tags='') | |
190 | volumes.append(volume1) | |
191 | volumes.append(volume2) | |
192 | with pytest.raises(exceptions.MultipleLVsError): | |
193 | volumes.get(lv_name='foo') | |
194 | ||
3efd9988 FG |
195 | def test_as_dict_infers_type_from_tags(self, volumes): |
196 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
197 | osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) | |
198 | volumes.append(osd) | |
199 | result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict() | |
200 | assert result['type'] == 'data' | |
201 | ||
202 | def test_as_dict_populates_path_from_lv_api(self, volumes): | |
203 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
204 | osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) | |
205 | volumes.append(osd) | |
206 | result = volumes.get(lv_tags={'ceph.type': 'data'}).as_dict() | |
207 | assert result['path'] == '/dev/vg/lv' | |
208 | ||
d2e6a577 FG |
209 | def test_find_the_correct_one(self, volumes): |
210 | volume1 = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags='') | |
211 | volume2 = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='') | |
212 | volumes.append(volume1) | |
213 | volumes.append(volume2) | |
214 | assert volumes.get(lv_name='volume1') == volume1 | |
215 | ||
216 | def test_filter_by_tag(self, volumes): | |
217 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
218 | osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) | |
219 | journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.type=journal') | |
220 | volumes.append(osd) | |
221 | volumes.append(journal) | |
222 | volumes.filter(lv_tags={'ceph.type': 'data'}) | |
223 | assert len(volumes) == 1 | |
224 | assert volumes[0].lv_name == 'volume1' | |
225 | ||
181888fb FG |
226 | def test_filter_by_tag_does_not_match_one(self, volumes): |
227 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
228 | osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) | |
229 | journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.osd_id=1,ceph.type=journal') | |
230 | volumes.append(osd) | |
231 | volumes.append(journal) | |
232 | # note the different osd_id! | |
233 | volumes.filter(lv_tags={'ceph.type': 'data', 'ceph.osd_id': '2'}) | |
234 | assert volumes == [] | |
235 | ||
d2e6a577 FG |
236 | def test_filter_by_vg_name(self, volumes): |
237 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
238 | osd = api.Volume(lv_name='volume1', vg_name='ceph_vg', lv_tags=lv_tags) | |
239 | journal = api.Volume(lv_name='volume2', vg_name='system_vg', lv_tags='ceph.type=journal') | |
240 | volumes.append(osd) | |
241 | volumes.append(journal) | |
242 | volumes.filter(vg_name='ceph_vg') | |
243 | assert len(volumes) == 1 | |
244 | assert volumes[0].lv_name == 'volume1' | |
245 | ||
246 | def test_filter_by_lv_path(self, volumes): | |
247 | osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_tags='') | |
248 | journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_tags='') | |
249 | volumes.append(osd) | |
250 | volumes.append(journal) | |
251 | volumes.filter(lv_path='/dev/volume1') | |
252 | assert len(volumes) == 1 | |
253 | assert volumes[0].lv_name == 'volume1' | |
254 | ||
181888fb FG |
255 | def test_filter_by_lv_uuid(self, volumes): |
256 | osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='') | |
257 | journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='') | |
258 | volumes.append(osd) | |
259 | volumes.append(journal) | |
260 | volumes.filter(lv_uuid='1111') | |
261 | assert len(volumes) == 1 | |
262 | assert volumes[0].lv_name == 'volume1' | |
263 | ||
264 | def test_filter_by_lv_uuid_nothing_found(self, volumes): | |
265 | osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='') | |
266 | journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='') | |
267 | volumes.append(osd) | |
268 | volumes.append(journal) | |
269 | volumes.filter(lv_uuid='22222') | |
270 | assert volumes == [] | |
271 | ||
d2e6a577 FG |
272 | def test_filter_requires_params(self, volumes): |
273 | with pytest.raises(TypeError): | |
274 | volumes.filter() | |
275 | ||
276 | ||
277 | class TestVolumeGroups(object): | |
278 | ||
279 | def test_volume_get_has_no_volume_groups(self, volume_groups): | |
280 | assert volume_groups.get() is None | |
281 | ||
282 | def test_volume_get_filtered_has_no_volumes(self, volume_groups): | |
283 | assert volume_groups.get(vg_name='ceph') is None | |
284 | ||
285 | def test_volume_has_multiple_matches(self, volume_groups): | |
286 | volume1 = volume2 = api.VolumeGroup(vg_name='foo', lv_path='/dev/vg/lv', lv_tags='') | |
287 | volume_groups.append(volume1) | |
288 | volume_groups.append(volume2) | |
289 | with pytest.raises(exceptions.MultipleVGsError): | |
290 | volume_groups.get(vg_name='foo') | |
291 | ||
292 | def test_find_the_correct_one(self, volume_groups): | |
293 | volume1 = api.VolumeGroup(vg_name='volume1', lv_tags='') | |
294 | volume2 = api.VolumeGroup(vg_name='volume2', lv_tags='') | |
295 | volume_groups.append(volume1) | |
296 | volume_groups.append(volume2) | |
297 | assert volume_groups.get(vg_name='volume1') == volume1 | |
298 | ||
299 | def test_filter_by_tag(self, volume_groups): | |
300 | vg_tags = "ceph.group=dmcache" | |
301 | osd = api.VolumeGroup(vg_name='volume1', vg_tags=vg_tags) | |
302 | journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain') | |
303 | volume_groups.append(osd) | |
304 | volume_groups.append(journal) | |
305 | volume_groups.filter(vg_tags={'ceph.group': 'dmcache'}) | |
306 | assert len(volume_groups) == 1 | |
307 | assert volume_groups[0].vg_name == 'volume1' | |
308 | ||
181888fb FG |
309 | def test_filter_by_tag_does_not_match_one(self, volume_groups): |
310 | vg_tags = "ceph.group=dmcache,ceph.disk_type=ssd" | |
311 | osd = api.VolumeGroup(vg_name='volume1', vg_path='/dev/vg/lv', vg_tags=vg_tags) | |
312 | volume_groups.append(osd) | |
313 | volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'}) | |
314 | assert volume_groups == [] | |
315 | ||
d2e6a577 FG |
316 | def test_filter_by_vg_name(self, volume_groups): |
317 | vg_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
318 | osd = api.VolumeGroup(vg_name='ceph_vg', vg_tags=vg_tags) | |
319 | journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal') | |
320 | volume_groups.append(osd) | |
321 | volume_groups.append(journal) | |
322 | volume_groups.filter(vg_name='ceph_vg') | |
323 | assert len(volume_groups) == 1 | |
324 | assert volume_groups[0].vg_name == 'ceph_vg' | |
325 | ||
326 | def test_filter_requires_params(self, volume_groups): | |
327 | with pytest.raises(TypeError): | |
328 | volume_groups.filter() | |
329 | ||
330 | ||
3efd9988 FG |
331 | class TestGetLVFromArgument(object): |
332 | ||
333 | def setup(self): | |
334 | self.foo_volume = api.Volume( | |
335 | lv_name='foo', lv_path='/path/to/lv', | |
336 | vg_name='foo_group', lv_tags='' | |
337 | ) | |
338 | ||
339 | def test_non_absolute_path_is_not_valid(self, volumes): | |
340 | volumes.append(self.foo_volume) | |
341 | assert api.get_lv_from_argument('foo') is None | |
342 | ||
343 | def test_too_many_slashes_is_invalid(self, volumes): | |
344 | volumes.append(self.foo_volume) | |
345 | assert api.get_lv_from_argument('path/to/lv') is None | |
346 | ||
347 | def test_absolute_path_is_not_lv(self, volumes): | |
348 | volumes.append(self.foo_volume) | |
349 | assert api.get_lv_from_argument('/path') is None | |
350 | ||
351 | def test_absolute_path_is_lv(self, volumes): | |
352 | volumes.append(self.foo_volume) | |
353 | assert api.get_lv_from_argument('/path/to/lv') == self.foo_volume | |
354 | ||
355 | ||
356 | class TestRemoveLV(object): | |
357 | ||
358 | def test_removes_lv(self, monkeypatch): | |
359 | def mock_call(cmd, **kw): | |
360 | return ('', '', 0) | |
361 | monkeypatch.setattr(process, 'call', mock_call) | |
362 | assert api.remove_lv("vg/lv") | |
363 | ||
364 | def test_fails_to_remove_lv(self, monkeypatch): | |
365 | def mock_call(cmd, **kw): | |
366 | return ('', '', 1) | |
367 | monkeypatch.setattr(process, 'call', mock_call) | |
368 | with pytest.raises(RuntimeError): | |
369 | api.remove_lv("vg/lv") | |
370 | ||
371 | ||
d2e6a577 FG |
372 | class TestCreateLV(object): |
373 | ||
374 | def setup(self): | |
375 | self.foo_volume = api.Volume(lv_name='foo', lv_path='/path', vg_name='foo_group', lv_tags='') | |
376 | ||
377 | def test_uses_size(self, monkeypatch, capture): | |
378 | monkeypatch.setattr(process, 'run', capture) | |
379 | monkeypatch.setattr(process, 'call', capture) | |
380 | monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) | |
3efd9988 | 381 | api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'}) |
d2e6a577 FG |
382 | expected = ['sudo', 'lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group'] |
383 | assert capture.calls[0]['args'][0] == expected | |
384 | ||
385 | def test_calls_to_set_type_tag(self, monkeypatch, capture): | |
386 | monkeypatch.setattr(process, 'run', capture) | |
387 | monkeypatch.setattr(process, 'call', capture) | |
388 | monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) | |
3efd9988 | 389 | api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'}) |
d2e6a577 FG |
390 | ceph_tag = ['sudo', 'lvchange', '--addtag', 'ceph.type=data', '/path'] |
391 | assert capture.calls[1]['args'][0] == ceph_tag | |
392 | ||
393 | def test_calls_to_set_data_tag(self, monkeypatch, capture): | |
394 | monkeypatch.setattr(process, 'run', capture) | |
395 | monkeypatch.setattr(process, 'call', capture) | |
396 | monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) | |
3efd9988 | 397 | api.create_lv('foo', 'foo_group', size='5G', tags={'ceph.type': 'data'}) |
d2e6a577 FG |
398 | data_tag = ['sudo', 'lvchange', '--addtag', 'ceph.data_device=/path', '/path'] |
399 | assert capture.calls[2]['args'][0] == data_tag |