]>
Commit | Line | Data |
---|---|---|
d2e6a577 FG |
1 | import pytest |
2 | from ceph_volume import process, exceptions | |
3 | from ceph_volume.devices.lvm import api | |
4 | ||
5 | ||
6 | class TestParseTags(object): | |
7 | ||
8 | def test_no_tags_means_empty_dict(self): | |
9 | result = api.parse_tags('') | |
10 | assert result == {} | |
11 | ||
12 | def test_single_tag_gets_parsed(self): | |
13 | result = api.parse_tags('ceph.osd_something=1') | |
14 | assert result == {'ceph.osd_something': '1'} | |
15 | ||
16 | def test_multiple_csv_expands_in_dict(self): | |
17 | result = api.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000') | |
18 | # assert them piecemeal to avoid the un-ordered dict nature | |
19 | assert result['ceph.osd_something'] == '1' | |
20 | assert result['ceph.foo'] == '2' | |
21 | assert result['ceph.fsid'] == '0000' | |
22 | ||
23 | ||
24 | class TestGetAPIVgs(object): | |
25 | ||
26 | def test_report_is_emtpy(self, monkeypatch): | |
27 | monkeypatch.setattr(api.process, 'call', lambda x: ('{}', '', 0)) | |
28 | assert api.get_api_vgs() == [] | |
29 | ||
30 | def test_report_has_stuff(self, monkeypatch): | |
31 | report = '{"report":[{"vg":[{"vg_name":"VolGroup00"}]}]}' | |
32 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) | |
33 | assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}] | |
34 | ||
35 | def test_report_has_multiple_items(self, monkeypatch): | |
36 | report = '{"report":[{"vg":[{"vg_name":"VolGroup00"},{"vg_name":"ceph_vg"}]}]}' | |
37 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) | |
38 | assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}, {'vg_name': 'ceph_vg'}] | |
39 | ||
40 | def test_does_not_get_poluted_with_non_vg_items(self, monkeypatch): | |
41 | report = '{"report":[{"vg":[{"vg_name":"VolGroup00"}],"lv":[{"lv":"1"}]}]}' | |
42 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) | |
43 | assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}] | |
44 | ||
45 | ||
46 | class TestGetAPILvs(object): | |
47 | ||
48 | def test_report_is_emtpy(self, monkeypatch): | |
49 | monkeypatch.setattr(api.process, 'call', lambda x: ('{}', '', 0)) | |
50 | assert api.get_api_lvs() == [] | |
51 | ||
52 | def test_report_has_stuff(self, monkeypatch): | |
53 | report = '{"report":[{"lv":[{"lv_name":"VolGroup00"}]}]}' | |
54 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) | |
55 | assert api.get_api_lvs() == [{'lv_name': 'VolGroup00'}] | |
56 | ||
57 | def test_report_has_multiple_items(self, monkeypatch): | |
58 | report = '{"report":[{"lv":[{"lv_name":"VolName"},{"lv_name":"ceph_lv"}]}]}' | |
59 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) | |
60 | assert api.get_api_lvs() == [{'lv_name': 'VolName'}, {'lv_name': 'ceph_lv'}] | |
61 | ||
62 | def test_does_not_get_poluted_with_non_lv_items(self, monkeypatch): | |
63 | report = '{"report":[{"lv":[{"lv_name":"VolName"}],"vg":[{"vg":"1"}]}]}' | |
64 | monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0)) | |
65 | assert api.get_api_lvs() == [{'lv_name': 'VolName'}] | |
66 | ||
67 | ||
68 | @pytest.fixture | |
69 | def volumes(monkeypatch): | |
70 | monkeypatch.setattr(process, 'call', lambda x: ('{}', '', 0)) | |
71 | volumes = api.Volumes() | |
72 | volumes._purge() | |
73 | return volumes | |
74 | ||
75 | ||
76 | @pytest.fixture | |
77 | def volume_groups(monkeypatch): | |
78 | monkeypatch.setattr(process, 'call', lambda x: ('{}', '', 0)) | |
79 | vgs = api.VolumeGroups() | |
80 | vgs._purge() | |
81 | return vgs | |
82 | ||
83 | ||
84 | class TestGetLV(object): | |
85 | ||
86 | def test_nothing_is_passed_in(self): | |
87 | # so we return a None | |
88 | assert api.get_lv() is None | |
89 | ||
90 | def test_single_lv_is_matched(self, volumes, monkeypatch): | |
91 | FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data") | |
92 | volumes.append(FooVolume) | |
93 | monkeypatch.setattr(api, 'Volumes', lambda: volumes) | |
94 | assert api.get_lv(lv_name='foo') == FooVolume | |
95 | ||
96 | ||
97 | class TestGetVG(object): | |
98 | ||
99 | def test_nothing_is_passed_in(self): | |
100 | # so we return a None | |
101 | assert api.get_vg() is None | |
102 | ||
103 | def test_single_vg_is_matched(self, volume_groups, monkeypatch): | |
104 | FooVG = api.VolumeGroup(vg_name='foo') | |
105 | volume_groups.append(FooVG) | |
106 | monkeypatch.setattr(api, 'VolumeGroups', lambda: volume_groups) | |
107 | assert api.get_vg(vg_name='foo') == FooVG | |
108 | ||
109 | ||
110 | class TestVolumes(object): | |
111 | ||
112 | def test_volume_get_has_no_volumes(self, volumes): | |
113 | assert volumes.get() is None | |
114 | ||
115 | def test_volume_get_filtered_has_no_volumes(self, volumes): | |
116 | assert volumes.get(lv_name='ceph') is None | |
117 | ||
118 | def test_volume_has_multiple_matches(self, volumes): | |
119 | volume1 = volume2 = api.Volume(lv_name='foo', lv_path='/dev/vg/lv', lv_tags='') | |
120 | volumes.append(volume1) | |
121 | volumes.append(volume2) | |
122 | with pytest.raises(exceptions.MultipleLVsError): | |
123 | volumes.get(lv_name='foo') | |
124 | ||
125 | def test_find_the_correct_one(self, volumes): | |
126 | volume1 = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags='') | |
127 | volume2 = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='') | |
128 | volumes.append(volume1) | |
129 | volumes.append(volume2) | |
130 | assert volumes.get(lv_name='volume1') == volume1 | |
131 | ||
132 | def test_filter_by_tag(self, volumes): | |
133 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
134 | osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags) | |
135 | journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.type=journal') | |
136 | volumes.append(osd) | |
137 | volumes.append(journal) | |
138 | volumes.filter(lv_tags={'ceph.type': 'data'}) | |
139 | assert len(volumes) == 1 | |
140 | assert volumes[0].lv_name == 'volume1' | |
141 | ||
142 | def test_filter_by_vg_name(self, volumes): | |
143 | lv_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
144 | osd = api.Volume(lv_name='volume1', vg_name='ceph_vg', lv_tags=lv_tags) | |
145 | journal = api.Volume(lv_name='volume2', vg_name='system_vg', lv_tags='ceph.type=journal') | |
146 | volumes.append(osd) | |
147 | volumes.append(journal) | |
148 | volumes.filter(vg_name='ceph_vg') | |
149 | assert len(volumes) == 1 | |
150 | assert volumes[0].lv_name == 'volume1' | |
151 | ||
152 | def test_filter_by_lv_path(self, volumes): | |
153 | osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_tags='') | |
154 | journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_tags='') | |
155 | volumes.append(osd) | |
156 | volumes.append(journal) | |
157 | volumes.filter(lv_path='/dev/volume1') | |
158 | assert len(volumes) == 1 | |
159 | assert volumes[0].lv_name == 'volume1' | |
160 | ||
161 | def test_filter_requires_params(self, volumes): | |
162 | with pytest.raises(TypeError): | |
163 | volumes.filter() | |
164 | ||
165 | ||
166 | class TestVolumeGroups(object): | |
167 | ||
168 | def test_volume_get_has_no_volume_groups(self, volume_groups): | |
169 | assert volume_groups.get() is None | |
170 | ||
171 | def test_volume_get_filtered_has_no_volumes(self, volume_groups): | |
172 | assert volume_groups.get(vg_name='ceph') is None | |
173 | ||
174 | def test_volume_has_multiple_matches(self, volume_groups): | |
175 | volume1 = volume2 = api.VolumeGroup(vg_name='foo', lv_path='/dev/vg/lv', lv_tags='') | |
176 | volume_groups.append(volume1) | |
177 | volume_groups.append(volume2) | |
178 | with pytest.raises(exceptions.MultipleVGsError): | |
179 | volume_groups.get(vg_name='foo') | |
180 | ||
181 | def test_find_the_correct_one(self, volume_groups): | |
182 | volume1 = api.VolumeGroup(vg_name='volume1', lv_tags='') | |
183 | volume2 = api.VolumeGroup(vg_name='volume2', lv_tags='') | |
184 | volume_groups.append(volume1) | |
185 | volume_groups.append(volume2) | |
186 | assert volume_groups.get(vg_name='volume1') == volume1 | |
187 | ||
188 | def test_filter_by_tag(self, volume_groups): | |
189 | vg_tags = "ceph.group=dmcache" | |
190 | osd = api.VolumeGroup(vg_name='volume1', vg_tags=vg_tags) | |
191 | journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain') | |
192 | volume_groups.append(osd) | |
193 | volume_groups.append(journal) | |
194 | volume_groups.filter(vg_tags={'ceph.group': 'dmcache'}) | |
195 | assert len(volume_groups) == 1 | |
196 | assert volume_groups[0].vg_name == 'volume1' | |
197 | ||
198 | def test_filter_by_vg_name(self, volume_groups): | |
199 | vg_tags = "ceph.type=data,ceph.fsid=000-aaa" | |
200 | osd = api.VolumeGroup(vg_name='ceph_vg', vg_tags=vg_tags) | |
201 | journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal') | |
202 | volume_groups.append(osd) | |
203 | volume_groups.append(journal) | |
204 | volume_groups.filter(vg_name='ceph_vg') | |
205 | assert len(volume_groups) == 1 | |
206 | assert volume_groups[0].vg_name == 'ceph_vg' | |
207 | ||
208 | def test_filter_requires_params(self, volume_groups): | |
209 | with pytest.raises(TypeError): | |
210 | volume_groups.filter() | |
211 | ||
212 | ||
213 | class TestCreateLV(object): | |
214 | ||
215 | def setup(self): | |
216 | self.foo_volume = api.Volume(lv_name='foo', lv_path='/path', vg_name='foo_group', lv_tags='') | |
217 | ||
218 | def test_uses_size(self, monkeypatch, capture): | |
219 | monkeypatch.setattr(process, 'run', capture) | |
220 | monkeypatch.setattr(process, 'call', capture) | |
221 | monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) | |
222 | api.create_lv('foo', 'foo_group', size=5, type='data') | |
223 | expected = ['sudo', 'lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group'] | |
224 | assert capture.calls[0]['args'][0] == expected | |
225 | ||
226 | def test_calls_to_set_type_tag(self, monkeypatch, capture): | |
227 | monkeypatch.setattr(process, 'run', capture) | |
228 | monkeypatch.setattr(process, 'call', capture) | |
229 | monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) | |
230 | api.create_lv('foo', 'foo_group', size=5, type='data') | |
231 | ceph_tag = ['sudo', 'lvchange', '--addtag', 'ceph.type=data', '/path'] | |
232 | assert capture.calls[1]['args'][0] == ceph_tag | |
233 | ||
234 | def test_calls_to_set_data_tag(self, monkeypatch, capture): | |
235 | monkeypatch.setattr(process, 'run', capture) | |
236 | monkeypatch.setattr(process, 'call', capture) | |
237 | monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume) | |
238 | api.create_lv('foo', 'foo_group', size=5, type='data') | |
239 | data_tag = ['sudo', 'lvchange', '--addtag', 'ceph.data_device=/path', '/path'] | |
240 | assert capture.calls[2]['args'][0] == data_tag |