]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_api.py
update sources to v12.2.0
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / devices / lvm / test_api.py
CommitLineData
d2e6a577
FG
1import pytest
2from ceph_volume import process, exceptions
3from ceph_volume.devices.lvm import api
4
5
6class TestParseTags(object):
7
8 def test_no_tags_means_empty_dict(self):
9 result = api.parse_tags('')
10 assert result == {}
11
12 def test_single_tag_gets_parsed(self):
13 result = api.parse_tags('ceph.osd_something=1')
14 assert result == {'ceph.osd_something': '1'}
15
16 def test_multiple_csv_expands_in_dict(self):
17 result = api.parse_tags('ceph.osd_something=1,ceph.foo=2,ceph.fsid=0000')
18 # assert them piecemeal to avoid the un-ordered dict nature
19 assert result['ceph.osd_something'] == '1'
20 assert result['ceph.foo'] == '2'
21 assert result['ceph.fsid'] == '0000'
22
23
24class TestGetAPIVgs(object):
25
26 def test_report_is_emtpy(self, monkeypatch):
b5b8bbf5 27 monkeypatch.setattr(api.process, 'call', lambda x: ('\n\n', '', 0))
d2e6a577
FG
28 assert api.get_api_vgs() == []
29
30 def test_report_has_stuff(self, monkeypatch):
b5b8bbf5 31 report = [' VolGroup00']
d2e6a577
FG
32 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
33 assert api.get_api_vgs() == [{'vg_name': 'VolGroup00'}]
34
b5b8bbf5
FG
35 def test_report_has_stuff_with_empty_attrs(self, monkeypatch):
36 report = [' VolGroup00 ;;;;;;9g']
d2e6a577 37 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
38 result = api.get_api_vgs()[0]
39 assert len(result.keys()) == 7
40 assert result['vg_name'] == 'VolGroup00'
41 assert result['vg_free'] == '9g'
d2e6a577 42
b5b8bbf5
FG
43 def test_report_has_multiple_items(self, monkeypatch):
44 report = [' VolGroup00;;;;;;;', ' ceph_vg;;;;;;;']
d2e6a577 45 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
46 result = api.get_api_vgs()
47 assert result[0]['vg_name'] == 'VolGroup00'
48 assert result[1]['vg_name'] == 'ceph_vg'
d2e6a577
FG
49
50
51class TestGetAPILvs(object):
52
53 def test_report_is_emtpy(self, monkeypatch):
b5b8bbf5 54 monkeypatch.setattr(api.process, 'call', lambda x: ('', '', 0))
d2e6a577
FG
55 assert api.get_api_lvs() == []
56
57 def test_report_has_stuff(self, monkeypatch):
b5b8bbf5 58 report = [' ;/path;VolGroup00;root']
d2e6a577 59 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
60 result = api.get_api_lvs()
61 assert result[0]['lv_name'] == 'VolGroup00'
d2e6a577
FG
62
63 def test_report_has_multiple_items(self, monkeypatch):
b5b8bbf5 64 report = [' ;/path;VolName;root', ';/dev/path;ceph_lv;ceph_vg']
d2e6a577 65 monkeypatch.setattr(api.process, 'call', lambda x: (report, '', 0))
b5b8bbf5
FG
66 result = api.get_api_lvs()
67 assert result[0]['lv_name'] == 'VolName'
68 assert result[1]['lv_name'] == 'ceph_lv'
d2e6a577
FG
69
70
71@pytest.fixture
72def volumes(monkeypatch):
b5b8bbf5 73 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
d2e6a577
FG
74 volumes = api.Volumes()
75 volumes._purge()
76 return volumes
77
78
79@pytest.fixture
80def volume_groups(monkeypatch):
b5b8bbf5 81 monkeypatch.setattr(process, 'call', lambda x: ('', '', 0))
d2e6a577
FG
82 vgs = api.VolumeGroups()
83 vgs._purge()
84 return vgs
85
86
87class TestGetLV(object):
88
89 def test_nothing_is_passed_in(self):
90 # so we return a None
91 assert api.get_lv() is None
92
93 def test_single_lv_is_matched(self, volumes, monkeypatch):
94 FooVolume = api.Volume(lv_name='foo', lv_path='/dev/vg/foo', lv_tags="ceph.type=data")
95 volumes.append(FooVolume)
96 monkeypatch.setattr(api, 'Volumes', lambda: volumes)
97 assert api.get_lv(lv_name='foo') == FooVolume
98
99
100class TestGetVG(object):
101
102 def test_nothing_is_passed_in(self):
103 # so we return a None
104 assert api.get_vg() is None
105
106 def test_single_vg_is_matched(self, volume_groups, monkeypatch):
107 FooVG = api.VolumeGroup(vg_name='foo')
108 volume_groups.append(FooVG)
109 monkeypatch.setattr(api, 'VolumeGroups', lambda: volume_groups)
110 assert api.get_vg(vg_name='foo') == FooVG
111
112
113class TestVolumes(object):
114
115 def test_volume_get_has_no_volumes(self, volumes):
116 assert volumes.get() is None
117
118 def test_volume_get_filtered_has_no_volumes(self, volumes):
119 assert volumes.get(lv_name='ceph') is None
120
121 def test_volume_has_multiple_matches(self, volumes):
122 volume1 = volume2 = api.Volume(lv_name='foo', lv_path='/dev/vg/lv', lv_tags='')
123 volumes.append(volume1)
124 volumes.append(volume2)
125 with pytest.raises(exceptions.MultipleLVsError):
126 volumes.get(lv_name='foo')
127
128 def test_find_the_correct_one(self, volumes):
129 volume1 = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags='')
130 volume2 = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='')
131 volumes.append(volume1)
132 volumes.append(volume2)
133 assert volumes.get(lv_name='volume1') == volume1
134
135 def test_filter_by_tag(self, volumes):
136 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
137 osd = api.Volume(lv_name='volume1', lv_path='/dev/vg/lv', lv_tags=lv_tags)
138 journal = api.Volume(lv_name='volume2', lv_path='/dev/vg/lv', lv_tags='ceph.type=journal')
139 volumes.append(osd)
140 volumes.append(journal)
141 volumes.filter(lv_tags={'ceph.type': 'data'})
142 assert len(volumes) == 1
143 assert volumes[0].lv_name == 'volume1'
144
145 def test_filter_by_vg_name(self, volumes):
146 lv_tags = "ceph.type=data,ceph.fsid=000-aaa"
147 osd = api.Volume(lv_name='volume1', vg_name='ceph_vg', lv_tags=lv_tags)
148 journal = api.Volume(lv_name='volume2', vg_name='system_vg', lv_tags='ceph.type=journal')
149 volumes.append(osd)
150 volumes.append(journal)
151 volumes.filter(vg_name='ceph_vg')
152 assert len(volumes) == 1
153 assert volumes[0].lv_name == 'volume1'
154
155 def test_filter_by_lv_path(self, volumes):
156 osd = api.Volume(lv_name='volume1', lv_path='/dev/volume1', lv_tags='')
157 journal = api.Volume(lv_name='volume2', lv_path='/dev/volume2', lv_tags='')
158 volumes.append(osd)
159 volumes.append(journal)
160 volumes.filter(lv_path='/dev/volume1')
161 assert len(volumes) == 1
162 assert volumes[0].lv_name == 'volume1'
163
164 def test_filter_requires_params(self, volumes):
165 with pytest.raises(TypeError):
166 volumes.filter()
167
168
169class TestVolumeGroups(object):
170
171 def test_volume_get_has_no_volume_groups(self, volume_groups):
172 assert volume_groups.get() is None
173
174 def test_volume_get_filtered_has_no_volumes(self, volume_groups):
175 assert volume_groups.get(vg_name='ceph') is None
176
177 def test_volume_has_multiple_matches(self, volume_groups):
178 volume1 = volume2 = api.VolumeGroup(vg_name='foo', lv_path='/dev/vg/lv', lv_tags='')
179 volume_groups.append(volume1)
180 volume_groups.append(volume2)
181 with pytest.raises(exceptions.MultipleVGsError):
182 volume_groups.get(vg_name='foo')
183
184 def test_find_the_correct_one(self, volume_groups):
185 volume1 = api.VolumeGroup(vg_name='volume1', lv_tags='')
186 volume2 = api.VolumeGroup(vg_name='volume2', lv_tags='')
187 volume_groups.append(volume1)
188 volume_groups.append(volume2)
189 assert volume_groups.get(vg_name='volume1') == volume1
190
191 def test_filter_by_tag(self, volume_groups):
192 vg_tags = "ceph.group=dmcache"
193 osd = api.VolumeGroup(vg_name='volume1', vg_tags=vg_tags)
194 journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.group=plain')
195 volume_groups.append(osd)
196 volume_groups.append(journal)
197 volume_groups.filter(vg_tags={'ceph.group': 'dmcache'})
198 assert len(volume_groups) == 1
199 assert volume_groups[0].vg_name == 'volume1'
200
201 def test_filter_by_vg_name(self, volume_groups):
202 vg_tags = "ceph.type=data,ceph.fsid=000-aaa"
203 osd = api.VolumeGroup(vg_name='ceph_vg', vg_tags=vg_tags)
204 journal = api.VolumeGroup(vg_name='volume2', vg_tags='ceph.type=journal')
205 volume_groups.append(osd)
206 volume_groups.append(journal)
207 volume_groups.filter(vg_name='ceph_vg')
208 assert len(volume_groups) == 1
209 assert volume_groups[0].vg_name == 'ceph_vg'
210
211 def test_filter_requires_params(self, volume_groups):
212 with pytest.raises(TypeError):
213 volume_groups.filter()
214
215
216class TestCreateLV(object):
217
218 def setup(self):
219 self.foo_volume = api.Volume(lv_name='foo', lv_path='/path', vg_name='foo_group', lv_tags='')
220
221 def test_uses_size(self, monkeypatch, capture):
222 monkeypatch.setattr(process, 'run', capture)
223 monkeypatch.setattr(process, 'call', capture)
224 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
225 api.create_lv('foo', 'foo_group', size=5, type='data')
226 expected = ['sudo', 'lvcreate', '--yes', '-L', '5G', '-n', 'foo', 'foo_group']
227 assert capture.calls[0]['args'][0] == expected
228
229 def test_calls_to_set_type_tag(self, monkeypatch, capture):
230 monkeypatch.setattr(process, 'run', capture)
231 monkeypatch.setattr(process, 'call', capture)
232 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
233 api.create_lv('foo', 'foo_group', size=5, type='data')
234 ceph_tag = ['sudo', 'lvchange', '--addtag', 'ceph.type=data', '/path']
235 assert capture.calls[1]['args'][0] == ceph_tag
236
237 def test_calls_to_set_data_tag(self, monkeypatch, capture):
238 monkeypatch.setattr(process, 'run', capture)
239 monkeypatch.setattr(process, 'call', capture)
240 monkeypatch.setattr(api, 'get_lv', lambda *a, **kw: self.foo_volume)
241 api.create_lv('foo', 'foo_group', size=5, type='data')
242 data_tag = ['sudo', 'lvchange', '--addtag', 'ceph.data_device=/path', '/path']
243 assert capture.calls[2]['args'][0] == data_tag