]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/devices/lvm/test_listing.py
Add patch for failing prerm scripts
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / devices / lvm / test_listing.py
CommitLineData
3efd9988
FG
1import pytest
2from ceph_volume.devices import lvm
3from ceph_volume.api import lvm as api
4
5
6class TestReadableTag(object):
7
8 def test_dots_get_replaced(self):
9 result = lvm.listing.readable_tag('ceph.foo')
10 assert result == 'foo'
11
12 def test_underscores_are_replaced_with_spaces(self):
13 result = lvm.listing.readable_tag('ceph.long_tag')
14 assert result == 'long tag'
15
16
17class TestPrettyReport(object):
18
19 def test_is_empty(self, capsys):
20 lvm.listing.pretty_report({})
21 stdout, stderr = capsys.readouterr()
22 assert stdout == '\n'
23
24 def test_type_and_path_are_reported(self, capsys):
28e407b8
AA
25 lvm.listing.pretty_report({0: [
26 {'type': 'data', 'path': '/dev/sda1', 'devices': ['/dev/sda']}
27 ]})
3efd9988 28 stdout, stderr = capsys.readouterr()
11fdf7f2 29 assert '[data] /dev/sda1' in stdout
3efd9988
FG
30
31 def test_osd_id_header_is_reported(self, capsys):
28e407b8
AA
32 lvm.listing.pretty_report({0: [
33 {'type': 'data', 'path': '/dev/sda1', 'devices': ['/dev/sda']}
34 ]})
3efd9988
FG
35 stdout, stderr = capsys.readouterr()
36 assert '====== osd.0 =======' in stdout
37
38 def test_tags_are_included(self, capsys):
39 lvm.listing.pretty_report(
40 {0: [{
41 'type': 'data',
42 'path': '/dev/sda1',
28e407b8
AA
43 'tags': {'ceph.osd_id': '0'},
44 'devices': ['/dev/sda'],
3efd9988
FG
45 }]}
46 )
47 stdout, stderr = capsys.readouterr()
48 assert 'osd id' in stdout
49
28e407b8
AA
50 def test_devices_are_comma_separated(self, capsys):
51 lvm.listing.pretty_report({0: [
52 {'type': 'data', 'path': '/dev/sda1', 'devices': ['/dev/sda', '/dev/sdb1']}
53 ]})
54 stdout, stderr = capsys.readouterr()
55 assert '/dev/sda,/dev/sdb1' in stdout
56
3efd9988
FG
57
58class TestList(object):
59
60 def test_empty_full_json_zero_exit_status(self, is_root, volumes, factory, capsys):
61 args = factory(format='json', device=None)
62 lvm.listing.List([]).list(args)
63 stdout, stderr = capsys.readouterr()
64 assert stdout == '{}\n'
65
66 def test_empty_device_json_zero_exit_status(self, is_root, volumes, factory, capsys):
67 args = factory(format='json', device='/dev/sda1')
68 lvm.listing.List([]).list(args)
69 stdout, stderr = capsys.readouterr()
70 assert stdout == '{}\n'
71
72 def test_empty_full_zero_exit_status(self, is_root, volumes, factory):
73 args = factory(format='pretty', device=None)
74 with pytest.raises(SystemExit):
75 lvm.listing.List([]).list(args)
76
77 def test_empty_device_zero_exit_status(self, is_root, volumes, factory):
78 args = factory(format='pretty', device='/dev/sda1')
79 with pytest.raises(SystemExit):
80 lvm.listing.List([]).list(args)
81
eafe8130
TL
82 def test_lvs_list_is_created_just_once(self, monkeypatch, is_root, volumes, factory):
83 api.volumes_obj_create_count = 0
84
85 def monkey_populate(self):
86 api.volumes_obj_create_count += 1
87 for lv_item in api.get_api_lvs():
88 self.append(api.Volume(**lv_item))
89 monkeypatch.setattr(api.Volumes, '_populate', monkey_populate)
90
91 args = factory(format='pretty', device='/dev/sda1')
92 with pytest.raises(SystemExit):
93 lvm.listing.List([]).list(args)
94
95 # XXX: Ideally, the count should be just 1. Volumes._populate() is
96 # being called thrice out of which only twice is moneky_populate.
97 assert api.volumes_obj_create_count == 2
98
3efd9988
FG
99
100class TestFullReport(object):
101
102 def test_no_ceph_lvs(self, volumes, monkeypatch):
103 # ceph lvs are detected by looking into its tags
104 osd = api.Volume(lv_name='volume1', lv_path='/dev/VolGroup/lv', lv_tags={})
105 volumes.append(osd)
106 monkeypatch.setattr(lvm.listing.api, 'Volumes', lambda: volumes)
107 result = lvm.listing.List([]).full_report()
108 assert result == {}
109
110 def test_ceph_data_lv_reported(self, volumes, monkeypatch):
111 tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
112 osd = api.Volume(
113 lv_name='volume1', lv_uuid='y', lv_path='/dev/VolGroup/lv', lv_tags=tags)
114 volumes.append(osd)
115 monkeypatch.setattr(lvm.listing.api, 'Volumes', lambda: volumes)
116 result = lvm.listing.List([]).full_report()
117 assert result['0'][0]['name'] == 'volume1'
118
119 def test_ceph_journal_lv_reported(self, volumes, monkeypatch):
120 tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
121 journal_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=journal'
122 osd = api.Volume(
123 lv_name='volume1', lv_uuid='y', lv_path='/dev/VolGroup/lv', lv_tags=tags)
124 journal = api.Volume(
125 lv_name='journal', lv_uuid='x', lv_path='/dev/VolGroup/journal', lv_tags=journal_tags)
126 volumes.append(osd)
127 volumes.append(journal)
128 monkeypatch.setattr(lvm.listing.api, 'Volumes', lambda: volumes)
129 result = lvm.listing.List([]).full_report()
130 assert result['0'][0]['name'] == 'volume1'
131 assert result['0'][1]['name'] == 'journal'
132
133 def test_ceph_wal_lv_reported(self, volumes, monkeypatch):
134 tags = 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=data'
135 wal_tags = 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=wal'
136 osd = api.Volume(
137 lv_name='volume1', lv_uuid='y', lv_path='/dev/VolGroup/lv', lv_tags=tags)
138 wal = api.Volume(
139 lv_name='wal', lv_uuid='x', lv_path='/dev/VolGroup/wal', lv_tags=wal_tags)
140 volumes.append(osd)
141 volumes.append(wal)
142 monkeypatch.setattr(lvm.listing.api, 'Volumes', lambda: volumes)
143 result = lvm.listing.List([]).full_report()
144 assert result['0'][0]['name'] == 'volume1'
145 assert result['0'][1]['name'] == 'wal'
146
147 def test_physical_journal_gets_reported(self, volumes, monkeypatch):
148 tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
149 osd = api.Volume(
150 lv_name='volume1', lv_uuid='y', lv_path='/dev/VolGroup/lv', lv_tags=tags)
151 volumes.append(osd)
152 monkeypatch.setattr(lvm.listing.api, 'Volumes', lambda: volumes)
153 monkeypatch.setattr(lvm.listing.disk, 'get_device_from_partuuid', lambda x: '/dev/sda1')
154 result = lvm.listing.List([]).full_report()
155 assert result['0'][1]['path'] == '/dev/sda1'
156 assert result['0'][1]['tags'] == {'PARTUUID': 'x'}
157 assert result['0'][1]['type'] == 'journal'
158
159 def test_physical_wal_gets_reported(self, volumes, monkeypatch):
160 tags = 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=data'
161 osd = api.Volume(
162 lv_name='volume1', lv_uuid='y', lv_path='/dev/VolGroup/lv', lv_tags=tags)
163 volumes.append(osd)
164 monkeypatch.setattr(lvm.listing.api, 'Volumes', lambda: volumes)
165 monkeypatch.setattr(lvm.listing.disk, 'get_device_from_partuuid', lambda x: '/dev/sda1')
166 result = lvm.listing.List([]).full_report()
167 assert result['0'][1]['path'] == '/dev/sda1'
168 assert result['0'][1]['tags'] == {'PARTUUID': 'x'}
169 assert result['0'][1]['type'] == 'wal'
170
171
172class TestSingleReport(object):
173
174 def test_not_a_ceph_lv(self, volumes, monkeypatch):
175 # ceph lvs are detected by looking into its tags
176 lv = api.Volume(
177 lv_name='lv', vg_name='VolGroup', lv_path='/dev/VolGroup/lv', lv_tags={})
178 volumes.append(lv)
179 monkeypatch.setattr(lvm.listing.api, 'Volumes', lambda: volumes)
180 result = lvm.listing.List([]).single_report('VolGroup/lv')
181 assert result == {}
182
183 def test_report_a_ceph_lv(self, volumes, monkeypatch):
184 # ceph lvs are detected by looking into its tags
185 tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
186 lv = api.Volume(
28e407b8
AA
187 lv_name='lv', vg_name='VolGroup',
188 lv_uuid='aaaa', lv_path='/dev/VolGroup/lv', lv_tags=tags
189 )
3efd9988
FG
190 volumes.append(lv)
191 monkeypatch.setattr(lvm.listing.api, 'Volumes', lambda: volumes)
192 result = lvm.listing.List([]).single_report('VolGroup/lv')
193 assert result['0'][0]['name'] == 'lv'
194 assert result['0'][0]['lv_tags'] == tags
195 assert result['0'][0]['path'] == '/dev/VolGroup/lv'
28e407b8 196 assert result['0'][0]['devices'] == []
3efd9988
FG
197
198 def test_report_a_ceph_journal_device(self, volumes, monkeypatch):
199 # ceph lvs are detected by looking into its tags
200 tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,ceph.journal_device=/dev/sda1'
201 lv = api.Volume(
28e407b8
AA
202 lv_name='lv', vg_name='VolGroup', lv_path='/dev/VolGroup/lv',
203 lv_uuid='aaa', lv_tags=tags)
3efd9988
FG
204 volumes.append(lv)
205 monkeypatch.setattr(lvm.listing.api, 'Volumes', lambda: volumes)
206 result = lvm.listing.List([]).single_report('/dev/sda1')
207 assert result['0'][0]['tags'] == {'PARTUUID': 'x'}
208 assert result['0'][0]['type'] == 'journal'
209 assert result['0'][0]['path'] == '/dev/sda1'
28e407b8
AA
210
211 def test_report_a_ceph_lv_with_devices(self, volumes, monkeypatch):
212 tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
213 lv = api.Volume(
214 lv_name='lv', vg_name='VolGroup',
215 lv_uuid='aaaa', lv_path='/dev/VolGroup/lv', lv_tags=tags
216 )
217 volumes.append(lv)
218 monkeypatch.setattr(lvm.listing.api, 'Volumes', lambda: volumes)
219 listing = lvm.listing.List([])
220 listing._pvs = [
221 {'lv_uuid': 'aaaa', 'pv_name': '/dev/sda1', 'pv_tags': '', 'pv_uuid': ''},
222 {'lv_uuid': 'aaaa', 'pv_name': '/dev/sdb1', 'pv_tags': '', 'pv_uuid': ''},
223 ]
224 result = listing.single_report('VolGroup/lv')
225 assert result['0'][0]['name'] == 'lv'
226 assert result['0'][0]['lv_tags'] == tags
227 assert result['0'][0]['path'] == '/dev/VolGroup/lv'
228 assert result['0'][0]['devices'] == ['/dev/sda1', '/dev/sdb1']
229
1adf2230
AA
230 def test_report_a_ceph_lv_with_multiple_pvs_of_same_name(self, pvolumes, monkeypatch):
231 tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
232 lv = api.Volume(
233 lv_name='lv', vg_name='VolGroup',
234 lv_uuid='aaaa', lv_path='/dev/VolGroup/lv', lv_tags=tags
235 )
236 monkeypatch.setattr(api, 'get_lv_from_argument', lambda device: None)
237 monkeypatch.setattr(api, 'get_lv', lambda vg_name: lv)
238 FooPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={}, lv_uuid="aaaa")
239 BarPVolume = api.PVolume(vg_name="vg", pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
240 pvolumes.append(FooPVolume)
241 pvolumes.append(BarPVolume)
242 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
243 listing = lvm.listing.List([])
244 result = listing.single_report('/dev/sda')
245 assert result['0'][0]['name'] == 'lv'
246 assert result['0'][0]['lv_tags'] == tags
247 assert result['0'][0]['path'] == '/dev/VolGroup/lv'
248 assert len(result) == 1
249
28e407b8
AA
250 def test_report_a_ceph_lv_with_no_matching_devices(self, volumes, monkeypatch):
251 tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data'
252 lv = api.Volume(
253 lv_name='lv', vg_name='VolGroup',
254 lv_uuid='aaaa', lv_path='/dev/VolGroup/lv', lv_tags=tags
255 )
256 volumes.append(lv)
257 monkeypatch.setattr(lvm.listing.api, 'Volumes', lambda: volumes)
258 listing = lvm.listing.List([])
259 listing._pvs = [
260 {'lv_uuid': 'ffff', 'pv_name': '/dev/sda1', 'pv_tags': '', 'pv_uuid': ''},
261 {'lv_uuid': 'ffff', 'pv_name': '/dev/sdb1', 'pv_tags': '', 'pv_uuid': ''},
262 ]
263 result = listing.single_report('VolGroup/lv')
264 assert result['0'][0]['name'] == 'lv'
265 assert result['0'][0]['lv_tags'] == tags
266 assert result['0'][0]['path'] == '/dev/VolGroup/lv'
267 assert result['0'][0]['devices'] == []
268
269
270class TestListingPVs(object):
271
272 def setup(self):
273 self.default_pvs = [
274 {'lv_uuid': 'ffff', 'pv_name': '/dev/sda1', 'pv_tags': '', 'pv_uuid': ''},
275 {'lv_uuid': 'ffff', 'pv_name': '/dev/sdb1', 'pv_tags': '', 'pv_uuid': ''},
276 ]
277
278 def test_pvs_is_unset(self, monkeypatch):
279 monkeypatch.setattr(lvm.listing.api, 'get_api_pvs', lambda: self.default_pvs)
280 listing = lvm.listing.List([])
281 assert listing.pvs == self.default_pvs
282
283 def test_pvs_is_set(self, monkeypatch):
284 # keep it patched so that we can fail if this gets returned
285 monkeypatch.setattr(lvm.listing.api, 'get_api_pvs', lambda: self.default_pvs)
286 listing = lvm.listing.List([])
287 listing._pvs = []
288 assert listing.pvs == []