]>
Commit | Line | Data |
---|---|---|
d2e6a577 FG |
1 | import pytest |
2 | from ceph_volume import process, exceptions | |
3 | from ceph_volume.devices.lvm import api | |
4 | ||
5 | ||
6 | class TestParseTags(object): | |
7 | ||
8 | def test_no_tags_means_empty_dict(self): | |
9 | result = api.parse_tags('') | |
10 | assert result == {} | |
11 | ||
12 | def test_single_tag_gets_parsed(self): | |
13 | result = api.parse_tags('ceph.osd_something=1') | |
14 | assert result == {'ceph.osd_something': '1'} | |
15 | ||
16 | def test_multiple_csv_expands_in_dict(self): | |
17 | result = api.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000') | |
18 | # assert them piecemeal to avoid the un-ordered dict nature | |
19 | assert result['ceph.osd_something'] == '1' | |
20 | assert result['ceph.foo'] == '2' | |
21 | assert result['ceph.fsid'] == '0000' | |
22 | ||
23 | ||
24 | class TestGetAPIVgs(object): | |
25 | ||
26 | def test_report_is_emtpy(self, monkeypatch): | |
b5b8bbf5 | 27 | monkeypatch.setattr(api.process, 'call', lambda x: ('\n\n', '', 0)) |
d2e6a577 FG |
28 | assert api.get_api_vgs() == [] |
29 | ||
30 | def test_report_has_stuff(self, monkeypatch): | |
b5b8bbf5 | 31 | report = [' VolGroup00'] |
d2e6a577 FG |
32 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) |
33 | assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}] | |
34 | ||
b5b8bbf5 FG |
35 | def test_report_has_stuff_with_empty_attrs(self, monkeypatch): |
36 | report = [' VolGroup00 ;;;;;;9g'] | |
d2e6a577 | 37 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) |
b5b8bbf5 FG |
38 | result = api.get_api_vgs()[0] |
39 | assert len(result.keys()) == 7 | |
40 | assert result['vg_name'] == 'VolGroup00' | |
41 | assert result['vg_free'] == '9g' | |
d2e6a577 | 42 | |
b5b8bbf5 FG |
43 | def test_report_has_multiple_items(self, monkeypatch): |
44 | report = [' VolGroup00;;;;;;;', ' ceph_vg;;;;;;;'] | |
d2e6a577 | 45 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) |
b5b8bbf5 FG |
46 | result = api.get_api_vgs() |
47 | assert result[0]['vg_name'] == 'VolGroup00' | |
48 | assert result[1]['vg_name'] == 'ceph_vg' | |
d2e6a577 FG |
49 | |
50 | ||
51 | class TestGetAPILvs(object): | |
52 | ||
53 | def test_report_is_emtpy(self, monkeypatch): | |
b5b8bbf5 | 54 | monkeypatch.setattr(api.process, 'call', lambda x: ('', '', 0)) |
d2e6a577 FG |
55 | assert api.get_api_lvs() == [] |
56 | ||
57 | def test_report_has_stuff(self, monkeypatch): | |
b5b8bbf5 | 58 | report = [' ;/path;VolGroup00;root'] |
d2e6a577 | 59 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) |
b5b8bbf5 FG |
60 | result = api.get_api_lvs() |
61 | assert result[0]['lv_name'] == 'VolGroup00' | |
d2e6a577 FG |
62 | |
63 | def test_report_has_multiple_items(self, monkeypatch): | |
b5b8bbf5 | 64 | report = [' ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg'] |
d2e6a577 | 65 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) |
b5b8bbf5 FG |
66 | result = api.get_api_lvs() |
67 | assert result[0]['lv_name'] == 'VolName' | |
68 | assert result[1]['lv_name'] == 'ceph_lv' | |
d2e6a577 FG |
69 | |
70 | ||
71 | @pytest.fixture | |
72 | def volumes(monkeypatch): | |
b5b8bbf5 | 73 | monkeypatch.setattr(process, 'call', lambda x: ('', '', 0)) |
d2e6a577 FG |
74 | volumes = api.Volumes() |
75 | volumes._purge() | |
76 | return volumes | |
77 | ||
78 | ||
181888fb FG |
79 | @pytest.fixture |
80 | def pvolumes(monkeypatch): | |
81 | monkeypatch.setattr(process, 'call', lambda x: ('', '', 0)) | |
82 | pvolumes = api.PVolumes() | |
83 | pvolumes._purge() | |
84 | return pvolumes | |
85 | ||
86 | ||
d2e6a577 FG |
87 | @pytest.fixture |
88 | def volume_groups(monkeypatch): | |
b5b8bbf5 | 89 | monkeypatch.setattr(process, 'call', lambda x: ('', '', 0)) |
d2e6a577 FG |
90 | vgs = api.VolumeGroups() |
91 | vgs._purge() | |
92 | return vgs | |
93 | ||
94 | ||
95 | class TestGetLV(object): | |
96 | ||
97 | def test_nothing_is_passed_in(self): | |
98 | # so we return a None | |
99 | assert api.get_lv() is None | |
100 | ||
101 | def test_single_lv_is_matched(self, volumes, monkeypatch): | |
102 | FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data") | |
103 | volumes.append(FooVolume) | |
104 | monkeypatch.setattr(api, 'Volumes', lambda: volumes) | |
105 | assert api.get_lv(lv_name='foo') == FooVolume | |
106 | ||
181888fb FG |
107 | def test_single_lv_is_matched_by_uuid(self, volumes, monkeypatch): |
108 | FooVolume = api.Volume( | |
109 | lv_name='foo', lv_path='/dev/vg/foo', | |
110 | lv_uuid='1111', lv_tags="ceph.type=data") | |
111 | volumes.append(FooVolume) | |
112 | monkeypatch.setattr(api, 'Volumes', lambda: volumes) | |
113 | assert api.get_lv(lv_uuid='1111') == FooVolume | |
114 | ||
115 | ||
116 | class TestGetPV(object): | |
117 | ||
118 | def test_nothing_is_passed_in(self): | |
119 | # so we return a None | |
120 | assert api.get_pv() is None | |
121 | ||
122 | def test_single_pv_is_not_matched(self, pvolumes, monkeypatch): | |
123 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={}) | |
124 | pvolumes.append(FooPVolume) | |
125 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
126 | assert api.get_pv(pv_uuid='foo') is None | |
127 | ||
128 | def test_single_pv_is_matched(self, pvolumes, monkeypatch): | |
129 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={}) | |
130 | pvolumes.append(FooPVolume) | |
131 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
132 | assert api.get_pv(pv_uuid='0000') == FooPVolume | |
133 | ||
134 | def test_single_pv_is_matched_by_uuid(self, pvolumes, monkeypatch): | |
135 | FooPVolume = api.PVolume( | |
136 | pv_name='/dev/vg/foo', | |
137 | pv_uuid='1111', pv_tags="ceph.type=data") | |
138 | pvolumes.append(FooPVolume) | |
139 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
140 | assert api.get_pv(pv_uuid='1111') == FooPVolume | |
141 | ||
142 | ||
143 | class TestPVolumes(object): | |
144 | ||
145 | def test_filter_by_tag_does_not_match_one(self, pvolumes, monkeypatch): | |
146 | pv_tags = "ceph.type=journal,ceph.osd_id=1,ceph.fsid=000-aaa" | |
147 | FooPVolume = api.PVolume( | |
148 | pv_name='/dev/vg/foo', | |
149 | pv_uuid='1111', pv_tags=pv_tags) | |
150 | pvolumes.append(FooPVolume) | |
151 | pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '2'}) | |
152 | assert pvolumes == [] | |
153 | ||
154 | def test_filter_by_tags_matches(self, pvolumes, monkeypatch): | |
155 | pv_tags = "ceph.type=journal,ceph.osd_id=1" | |
156 | FooPVolume = api.PVolume( | |
157 | pv_name='/dev/vg/foo', | |
158 | pv_uuid='1111', pv_tags=pv_tags) | |
159 | pvolumes.append(FooPVolume) | |
160 | pvolumes.filter(pv_tags={'ceph.type': 'journal', 'ceph.osd_id': '1'}) | |
161 | assert pvolumes == [FooPVolume] | |
162 | ||
d2e6a577 FG |
163 | |
164 | class TestGetVG(object): | |
165 | ||
166 | def test_nothing_is_passed_in(self): | |
167 | # so we return a None | |
168 | assert api.get_vg() is None | |
169 | ||
170 | def test_single_vg_is_matched(self, volume_groups, monkeypatch): | |
171 | FooVG = api.VolumeGroup(vg_name='foo') | |
172 | volume_groups.append(FooVG) | |
173 | monkeypatch.setattr(api, 'VolumeGroups', lambda: volume_groups) | |
174 | assert api.get_vg(vg_name='foo') == FooVG | |
175 | ||
176 | ||
177 | class TestVolumes(object): | |
178 | ||
179 | def test_volume_get_has_no_volumes(self, volumes): | |
180 | assert volumes.get() is None | |
181 | ||
182 | def test_volume_get_filtered_has_no_volumes(self, volumes): | |
183 | assert volumes.get(lv_name='ceph') is None | |
184 | ||
185 | def test_volume_has_multiple_matches(self, volumes): | |
186 | volume1 = volume2 = api.Volume(lv_name='foo', lv_path='/dev/vg/lv', lv_tags='') | |
187 | volumes.append(volume1) | |
188 | volumes.append(volume2) | |
189 | with pytest.raises(exceptions.MultipleLVsError): | |
190 | volumes.get(lv_name='foo') | |
191 | ||
192 | def test_find_the_correct_one(self, volumes): | |
193 | volume1 = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags='') | |
194 | volume2 = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='') | |
195 | volumes.append(volume1) | |
196 | volumes.append(volume2) | |
197 | assert volumes.get(lv_name='volume1') == volume1 | |
198 | ||
199 | def test_filter_by_tag(self, volumes): | |
200 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
201 | osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) | |
202 | journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.type=journal') | |
203 | volumes.append(osd) | |
204 | volumes.append(journal) | |
205 | volumes.filter(lv_tags={'ceph.type': 'data'}) | |
206 | assert len(volumes) == 1 | |
207 | assert volumes[0].lv_name == 'volume1' | |
208 | ||
181888fb FG |
209 | def test_filter_by_tag_does_not_match_one(self, volumes): |
210 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
211 | osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) | |
212 | journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.osd_id=1,ceph.type=journal') | |
213 | volumes.append(osd) | |
214 | volumes.append(journal) | |
215 | # note the different osd_id! | |
216 | volumes.filter(lv_tags={'ceph.type': 'data', 'ceph.osd_id': '2'}) | |
217 | assert volumes == [] | |
218 | ||
d2e6a577 FG |
219 | def test_filter_by_vg_name(self, volumes): |
220 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
221 | osd = api.Volume(lv_name='volume1', vg_name='ceph_vg', lv_tags=lv_tags) | |
222 | journal = api.Volume(lv_name='volume2', vg_name='system_vg', lv_tags='ceph.type=journal') | |
223 | volumes.append(osd) | |
224 | volumes.append(journal) | |
225 | volumes.filter(vg_name='ceph_vg') | |
226 | assert len(volumes) == 1 | |
227 | assert volumes[0].lv_name == 'volume1' | |
228 | ||
229 | def test_filter_by_lv_path(self, volumes): | |
230 | osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_tags='') | |
231 | journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_tags='') | |
232 | volumes.append(osd) | |
233 | volumes.append(journal) | |
234 | volumes.filter(lv_path='/dev/volume1') | |
235 | assert len(volumes) == 1 | |
236 | assert volumes[0].lv_name == 'volume1' | |
237 | ||
181888fb FG |
238 | def test_filter_by_lv_uuid(self, volumes): |
239 | osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='') | |
240 | journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='') | |
241 | volumes.append(osd) | |
242 | volumes.append(journal) | |
243 | volumes.filter(lv_uuid='1111') | |
244 | assert len(volumes) == 1 | |
245 | assert volumes[0].lv_name == 'volume1' | |
246 | ||
247 | def test_filter_by_lv_uuid_nothing_found(self, volumes): | |
248 | osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_uuid='1111', lv_tags='') | |
249 | journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_uuid='', lv_tags='') | |
250 | volumes.append(osd) | |
251 | volumes.append(journal) | |
252 | volumes.filter(lv_uuid='22222') | |
253 | assert volumes == [] | |
254 | ||
d2e6a577 FG |
255 | def test_filter_requires_params(self, volumes): |
256 | with pytest.raises(TypeError): | |
257 | volumes.filter() | |
258 | ||
259 | ||
260 | class TestVolumeGroups(object): | |
261 | ||
262 | def test_volume_get_has_no_volume_groups(self, volume_groups): | |
263 | assert volume_groups.get() is None | |
264 | ||
265 | def test_volume_get_filtered_has_no_volumes(self, volume_groups): | |
266 | assert volume_groups.get(vg_name='ceph') is None | |
267 | ||
268 | def test_volume_has_multiple_matches(self, volume_groups): | |
269 | volume1 = volume2 = api.VolumeGroup(vg_name='foo', lv_path='/dev/vg/lv', lv_tags='') | |
270 | volume_groups.append(volume1) | |
271 | volume_groups.append(volume2) | |
272 | with pytest.raises(exceptions.MultipleVGsError): | |
273 | volume_groups.get(vg_name='foo') | |
274 | ||
275 | def test_find_the_correct_one(self, volume_groups): | |
276 | volume1 = api.VolumeGroup(vg_name='volume1', lv_tags='') | |
277 | volume2 = api.VolumeGroup(vg_name='volume2', lv_tags='') | |
278 | volume_groups.append(volume1) | |
279 | volume_groups.append(volume2) | |
280 | assert volume_groups.get(vg_name='volume1') == volume1 | |
281 | ||
282 | def test_filter_by_tag(self, volume_groups): | |
283 | vg_tags = "ceph.group=dmcache" | |
284 | osd = api.VolumeGroup(vg_name='volume1', vg_tags=vg_tags) | |
285 | journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain') | |
286 | volume_groups.append(osd) | |
287 | volume_groups.append(journal) | |
288 | volume_groups.filter(vg_tags={'ceph.group': 'dmcache'}) | |
289 | assert len(volume_groups) == 1 | |
290 | assert volume_groups[0].vg_name == 'volume1' | |
291 | ||
181888fb FG |
292 | def test_filter_by_tag_does_not_match_one(self, volume_groups): |
293 | vg_tags = "ceph.group=dmcache,ceph.disk_type=ssd" | |
294 | osd = api.VolumeGroup(vg_name='volume1', vg_path='/dev/vg/lv', vg_tags=vg_tags) | |
295 | volume_groups.append(osd) | |
296 | volume_groups.filter(vg_tags={'ceph.group': 'data', 'ceph.disk_type': 'ssd'}) | |
297 | assert volume_groups == [] | |
298 | ||
d2e6a577 FG |
299 | def test_filter_by_vg_name(self, volume_groups): |
300 | vg_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
301 | osd = api.VolumeGroup(vg_name='ceph_vg', vg_tags=vg_tags) | |
302 | journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal') | |
303 | volume_groups.append(osd) | |
304 | volume_groups.append(journal) | |
305 | volume_groups.filter(vg_name='ceph_vg') | |
306 | assert len(volume_groups) == 1 | |
307 | assert volume_groups[0].vg_name == 'ceph_vg' | |
308 | ||
309 | def test_filter_requires_params(self, volume_groups): | |
310 | with pytest.raises(TypeError): | |
311 | volume_groups.filter() | |
312 | ||
313 | ||
314 | class TestCreateLV(object): | |
315 | ||
316 | def setup(self): | |
317 | self.foo_volume = api.Volume(lv_name='foo', lv_path='/path', vg_name='foo_group', lv_tags='') | |
318 | ||
319 | def test_uses_size(self, monkeypatch, capture): | |
320 | monkeypatch.setattr(process, 'run', capture) | |
321 | monkeypatch.setattr(process, 'call', capture) | |
322 | monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) | |
323 | api.create_lv('foo', 'foo_group', size=5, type='data') | |
324 | expected = ['sudo', 'lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group'] | |
325 | assert capture.calls[0]['args'][0] == expected | |
326 | ||
327 | def test_calls_to_set_type_tag(self, monkeypatch, capture): | |
328 | monkeypatch.setattr(process, 'run', capture) | |
329 | monkeypatch.setattr(process, 'call', capture) | |
330 | monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) | |
331 | api.create_lv('foo', 'foo_group', size=5, type='data') | |
332 | ceph_tag = ['sudo', 'lvchange', '--addtag', 'ceph.type=data', '/path'] | |
333 | assert capture.calls[1]['args'][0] == ceph_tag | |
334 | ||
335 | def test_calls_to_set_data_tag(self, monkeypatch, capture): | |
336 | monkeypatch.setattr(process, 'run', capture) | |
337 | monkeypatch.setattr(process, 'call', capture) | |
338 | monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) | |
339 | api.create_lv('foo', 'foo_group', size=5, type='data') | |
340 | data_tag = ['sudo', 'lvchange', '--addtag', 'ceph.data_device=/path', '/path'] | |
341 | assert capture.calls[2]['args'][0] == data_tag |