]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_api.py
update sources to v12.1.3
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / devices / lvm / test_api.py
1 import pytest
2 from ceph_volume import process, exceptions
3 from ceph_volume.devices.lvm import api
4
5
6 class TestParseTags(object):
7
8 def test_no_tags_means_empty_dict(self):
9 result = api.parse_tags('')
10 assert result == {}
11
12 def test_single_tag_gets_parsed(self):
13 result = api.parse_tags('ceph.osd_something=1')
14 assert result == {'ceph.osd_something': '1'}
15
16 def test_multiple_csv_expands_in_dict(self):
17 result = api.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000')
18 # assert them piecemeal to avoid the un-ordered dict nature
19 assert result['ceph.osd_something'] == '1'
20 assert result['ceph.foo'] == '2'
21 assert result['ceph.fsid'] == '0000'
22
23
24 class TestGetAPIVgs(object):
25
26 def test_report_is_emtpy(self, monkeypatch):
27 monkeypatch.setattr(api.process, 'call', lambda x: ('{}', '', 0))
28 assert api.get_api_vgs() == []
29
30 def test_report_has_stuff(self, monkeypatch):
31 report = '{"report":[{"vg":[{"vg_name":"VolGroup00"}]}]}'
32 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
33 assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
34
35 def test_report_has_multiple_items(self, monkeypatch):
36 report = '{"report":[{"vg":[{"vg_name":"VolGroup00"},{"vg_name":"ceph_vg"}]}]}'
37 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
38 assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}, {'vg_name': 'ceph_vg'}]
39
40 def test_does_not_get_poluted_with_non_vg_items(self, monkeypatch):
41 report = '{"report":[{"vg":[{"vg_name":"VolGroup00"}],"lv":[{"lv":"1"}]}]}'
42 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
43 assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
44
45
46 class TestGetAPILvs(object):
47
48 def test_report_is_emtpy(self, monkeypatch):
49 monkeypatch.setattr(api.process, 'call', lambda x: ('{}', '', 0))
50 assert api.get_api_lvs() == []
51
52 def test_report_has_stuff(self, monkeypatch):
53 report = '{"report":[{"lv":[{"lv_name":"VolGroup00"}]}]}'
54 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
55 assert api.get_api_lvs() == [{'lv_name': 'VolGroup00'}]
56
57 def test_report_has_multiple_items(self, monkeypatch):
58 report = '{"report":[{"lv":[{"lv_name":"VolName"},{"lv_name":"ceph_lv"}]}]}'
59 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
60 assert api.get_api_lvs() == [{'lv_name': 'VolName'}, {'lv_name': 'ceph_lv'}]
61
62 def test_does_not_get_poluted_with_non_lv_items(self, monkeypatch):
63 report = '{"report":[{"lv":[{"lv_name":"VolName"}],"vg":[{"vg":"1"}]}]}'
64 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
65 assert api.get_api_lvs() == [{'lv_name': 'VolName'}]
66
67
68 @pytest.fixture
69 def volumes(monkeypatch):
70 monkeypatch.setattr(process, 'call', lambda x: ('{}', '', 0))
71 volumes = api.Volumes()
72 volumes._purge()
73 return volumes
74
75
76 @pytest.fixture
77 def volume_groups(monkeypatch):
78 monkeypatch.setattr(process, 'call', lambda x: ('{}', '', 0))
79 vgs = api.VolumeGroups()
80 vgs._purge()
81 return vgs
82
83
84 class TestGetLV(object):
85
86 def test_nothing_is_passed_in(self):
87 # so we return a None
88 assert api.get_lv() is None
89
90 def test_single_lv_is_matched(self, volumes, monkeypatch):
91 FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data")
92 volumes.append(FooVolume)
93 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
94 assert api.get_lv(lv_name='foo') == FooVolume
95
96
97 class TestGetVG(object):
98
99 def test_nothing_is_passed_in(self):
100 # so we return a None
101 assert api.get_vg() is None
102
103 def test_single_vg_is_matched(self, volume_groups, monkeypatch):
104 FooVG = api.VolumeGroup(vg_name='foo')
105 volume_groups.append(FooVG)
106 monkeypatch.setattr(api, 'VolumeGroups', lambda: volume_groups)
107 assert api.get_vg(vg_name='foo') == FooVG
108
109
110 class TestVolumes(object):
111
112 def test_volume_get_has_no_volumes(self, volumes):
113 assert volumes.get() is None
114
115 def test_volume_get_filtered_has_no_volumes(self, volumes):
116 assert volumes.get(lv_name='ceph') is None
117
118 def test_volume_has_multiple_matches(self, volumes):
119 volume1 = volume2 = api.Volume(lv_name='foo', lv_path='/dev/vg/lv', lv_tags='')
120 volumes.append(volume1)
121 volumes.append(volume2)
122 with pytest.raises(exceptions.MultipleLVsError):
123 volumes.get(lv_name='foo')
124
125 def test_find_the_correct_one(self, volumes):
126 volume1 = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags='')
127 volume2 = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='')
128 volumes.append(volume1)
129 volumes.append(volume2)
130 assert volumes.get(lv_name='volume1') == volume1
131
132 def test_filter_by_tag(self, volumes):
133 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
134 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
135 journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.type=journal')
136 volumes.append(osd)
137 volumes.append(journal)
138 volumes.filter(lv_tags={'ceph.type': 'data'})
139 assert len(volumes) == 1
140 assert volumes[0].lv_name == 'volume1'
141
142 def test_filter_by_vg_name(self, volumes):
143 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
144 osd = api.Volume(lv_name='volume1', vg_name='ceph_vg', lv_tags=lv_tags)
145 journal = api.Volume(lv_name='volume2', vg_name='system_vg', lv_tags='ceph.type=journal')
146 volumes.append(osd)
147 volumes.append(journal)
148 volumes.filter(vg_name='ceph_vg')
149 assert len(volumes) == 1
150 assert volumes[0].lv_name == 'volume1'
151
152 def test_filter_by_lv_path(self, volumes):
153 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_tags='')
154 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_tags='')
155 volumes.append(osd)
156 volumes.append(journal)
157 volumes.filter(lv_path='/dev/volume1')
158 assert len(volumes) == 1
159 assert volumes[0].lv_name == 'volume1'
160
161 def test_filter_requires_params(self, volumes):
162 with pytest.raises(TypeError):
163 volumes.filter()
164
165
166 class TestVolumeGroups(object):
167
168 def test_volume_get_has_no_volume_groups(self, volume_groups):
169 assert volume_groups.get() is None
170
171 def test_volume_get_filtered_has_no_volumes(self, volume_groups):
172 assert volume_groups.get(vg_name='ceph') is None
173
174 def test_volume_has_multiple_matches(self, volume_groups):
175 volume1 = volume2 = api.VolumeGroup(vg_name='foo', lv_path='/dev/vg/lv', lv_tags='')
176 volume_groups.append(volume1)
177 volume_groups.append(volume2)
178 with pytest.raises(exceptions.MultipleVGsError):
179 volume_groups.get(vg_name='foo')
180
181 def test_find_the_correct_one(self, volume_groups):
182 volume1 = api.VolumeGroup(vg_name='volume1', lv_tags='')
183 volume2 = api.VolumeGroup(vg_name='volume2', lv_tags='')
184 volume_groups.append(volume1)
185 volume_groups.append(volume2)
186 assert volume_groups.get(vg_name='volume1') == volume1
187
188 def test_filter_by_tag(self, volume_groups):
189 vg_tags = "ceph.group=dmcache"
190 osd = api.VolumeGroup(vg_name='volume1', vg_tags=vg_tags)
191 journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain')
192 volume_groups.append(osd)
193 volume_groups.append(journal)
194 volume_groups.filter(vg_tags={'ceph.group': 'dmcache'})
195 assert len(volume_groups) == 1
196 assert volume_groups[0].vg_name == 'volume1'
197
198 def test_filter_by_vg_name(self, volume_groups):
199 vg_tags = "ceph.type=data,ceph.fsid=000-aaa"
200 osd = api.VolumeGroup(vg_name='ceph_vg', vg_tags=vg_tags)
201 journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal')
202 volume_groups.append(osd)
203 volume_groups.append(journal)
204 volume_groups.filter(vg_name='ceph_vg')
205 assert len(volume_groups) == 1
206 assert volume_groups[0].vg_name == 'ceph_vg'
207
208 def test_filter_requires_params(self, volume_groups):
209 with pytest.raises(TypeError):
210 volume_groups.filter()
211
212
213 class TestCreateLV(object):
214
215 def setup(self):
216 self.foo_volume = api.Volume(lv_name='foo', lv_path='/path', vg_name='foo_group', lv_tags='')
217
218 def test_uses_size(self, monkeypatch, capture):
219 monkeypatch.setattr(process, 'run', capture)
220 monkeypatch.setattr(process, 'call', capture)
221 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
222 api.create_lv('foo', 'foo_group', size=5, type='data')
223 expected = ['sudo', 'lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group']
224 assert capture.calls[0]['args'][0] == expected
225
226 def test_calls_to_set_type_tag(self, monkeypatch, capture):
227 monkeypatch.setattr(process, 'run', capture)
228 monkeypatch.setattr(process, 'call', capture)
229 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
230 api.create_lv('foo', 'foo_group', size=5, type='data')
231 ceph_tag = ['sudo', 'lvchange', '--addtag', 'ceph.type=data', '/path']
232 assert capture.calls[1]['args'][0] == ceph_tag
233
234 def test_calls_to_set_data_tag(self, monkeypatch, capture):
235 monkeypatch.setattr(process, 'run', capture)
236 monkeypatch.setattr(process, 'call', capture)
237 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
238 api.create_lv('foo', 'foo_group', size=5, type='data')
239 data_tag = ['sudo', 'lvchange', '--addtag', 'ceph.data_device=/path', '/path']
240 assert capture.calls[2]['args'][0] == data_tag