]>
Commit | Line | Data |
---|---|---|
3efd9988 FG |
1 | import pytest |
2 | from ceph_volume.devices import lvm | |
3 | from ceph_volume.api import lvm as api | |
4 | ||
92f5a8d4 TL |
5 | # TODO: add tests for following commands - |
6 | # ceph-volume list | |
7 | # ceph-volume list <path-to-pv> | |
8 | # ceph-volume list <path-to-vg> | |
9 | # ceph-volume list <path-to-lv> | |
3efd9988 FG |
10 | |
11 | class TestReadableTag(object): | |
12 | ||
13 | def test_dots_get_replaced(self): | |
14 | result = lvm.listing.readable_tag('ceph.foo') | |
15 | assert result == 'foo' | |
16 | ||
17 | def test_underscores_are_replaced_with_spaces(self): | |
18 | result = lvm.listing.readable_tag('ceph.long_tag') | |
19 | assert result == 'long tag' | |
20 | ||
21 | ||
22 | class TestPrettyReport(object): | |
23 | ||
24 | def test_is_empty(self, capsys): | |
25 | lvm.listing.pretty_report({}) | |
26 | stdout, stderr = capsys.readouterr() | |
27 | assert stdout == '\n' | |
28 | ||
29 | def test_type_and_path_are_reported(self, capsys): | |
28e407b8 AA |
30 | lvm.listing.pretty_report({0: [ |
31 | {'type': 'data', 'path': '/dev/sda1', 'devices': ['/dev/sda']} | |
32 | ]}) | |
3efd9988 | 33 | stdout, stderr = capsys.readouterr() |
11fdf7f2 | 34 | assert '[data] /dev/sda1' in stdout |
3efd9988 FG |
35 | |
36 | def test_osd_id_header_is_reported(self, capsys): | |
28e407b8 AA |
37 | lvm.listing.pretty_report({0: [ |
38 | {'type': 'data', 'path': '/dev/sda1', 'devices': ['/dev/sda']} | |
39 | ]}) | |
3efd9988 FG |
40 | stdout, stderr = capsys.readouterr() |
41 | assert '====== osd.0 =======' in stdout | |
42 | ||
43 | def test_tags_are_included(self, capsys): | |
44 | lvm.listing.pretty_report( | |
45 | {0: [{ | |
46 | 'type': 'data', | |
47 | 'path': '/dev/sda1', | |
28e407b8 AA |
48 | 'tags': {'ceph.osd_id': '0'}, |
49 | 'devices': ['/dev/sda'], | |
3efd9988 FG |
50 | }]} |
51 | ) | |
52 | stdout, stderr = capsys.readouterr() | |
53 | assert 'osd id' in stdout | |
54 | ||
28e407b8 AA |
55 | def test_devices_are_comma_separated(self, capsys): |
56 | lvm.listing.pretty_report({0: [ | |
57 | {'type': 'data', 'path': '/dev/sda1', 'devices': ['/dev/sda', '/dev/sdb1']} | |
58 | ]}) | |
59 | stdout, stderr = capsys.readouterr() | |
60 | assert '/dev/sda,/dev/sdb1' in stdout | |
61 | ||
3efd9988 FG |
62 | |
63 | class TestList(object): | |
64 | ||
92f5a8d4 TL |
65 | def test_empty_full_json_zero_exit_status(self, is_root, volumes, |
66 | factory, capsys): | |
3efd9988 FG |
67 | args = factory(format='json', device=None) |
68 | lvm.listing.List([]).list(args) | |
69 | stdout, stderr = capsys.readouterr() | |
70 | assert stdout == '{}\n' | |
71 | ||
92f5a8d4 TL |
72 | def test_empty_device_json_zero_exit_status(self, is_root, volumes, |
73 | factory, capsys): | |
3efd9988 FG |
74 | args = factory(format='json', device='/dev/sda1') |
75 | lvm.listing.List([]).list(args) | |
76 | stdout, stderr = capsys.readouterr() | |
77 | assert stdout == '{}\n' | |
78 | ||
79 | def test_empty_full_zero_exit_status(self, is_root, volumes, factory): | |
80 | args = factory(format='pretty', device=None) | |
81 | with pytest.raises(SystemExit): | |
82 | lvm.listing.List([]).list(args) | |
83 | ||
84 | def test_empty_device_zero_exit_status(self, is_root, volumes, factory): | |
85 | args = factory(format='pretty', device='/dev/sda1') | |
86 | with pytest.raises(SystemExit): | |
87 | lvm.listing.List([]).list(args) | |
88 | ||
3efd9988 FG |
89 | class TestFullReport(object): |
90 | ||
91 | def test_no_ceph_lvs(self, volumes, monkeypatch): | |
92 | # ceph lvs are detected by looking into its tags | |
92f5a8d4 TL |
93 | osd = api.Volume(lv_name='volume1', lv_path='/dev/VolGroup/lv', |
94 | lv_tags={}) | |
3efd9988 | 95 | volumes.append(osd) |
92f5a8d4 TL |
96 | monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs: |
97 | volumes) | |
98 | ||
3efd9988 FG |
99 | result = lvm.listing.List([]).full_report() |
100 | assert result == {} | |
101 | ||
92f5a8d4 | 102 | def test_ceph_data_lv_reported(self, pvolumes, volumes, monkeypatch): |
3efd9988 | 103 | tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data' |
92f5a8d4 TL |
104 | pv = api.PVolume(pv_name='/dev/sda1', pv_tags={}, pv_uuid="0000", |
105 | vg_name='VolGroup', lv_uuid="aaaa") | |
106 | osd = api.Volume(lv_name='volume1', lv_uuid='y', lv_tags=tags, | |
107 | lv_path='/dev/VolGroup/lv', vg_name='VolGroup') | |
108 | pvolumes.append(pv) | |
3efd9988 | 109 | volumes.append(osd) |
92f5a8d4 TL |
110 | monkeypatch.setattr(lvm.listing.api, 'get_pvs', lambda **kwargs: |
111 | pvolumes) | |
112 | monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs: | |
113 | volumes) | |
114 | ||
3efd9988 FG |
115 | result = lvm.listing.List([]).full_report() |
116 | assert result['0'][0]['name'] == 'volume1' | |
117 | ||
92f5a8d4 | 118 | def test_ceph_journal_lv_reported(self, pvolumes, volumes, monkeypatch): |
3efd9988 FG |
119 | tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data' |
120 | journal_tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=journal' | |
92f5a8d4 TL |
121 | pv = api.PVolume(pv_name='/dev/sda1', pv_tags={}, pv_uuid="0000", |
122 | vg_name="VolGroup", lv_uuid="aaaa") | |
123 | osd = api.Volume(lv_name='volume1', lv_uuid='y', lv_tags=tags, | |
124 | lv_path='/dev/VolGroup/lv', vg_name='VolGroup') | |
3efd9988 | 125 | journal = api.Volume( |
92f5a8d4 TL |
126 | lv_name='journal', lv_uuid='x', lv_tags=journal_tags, |
127 | lv_path='/dev/VolGroup/journal', vg_name='VolGroup') | |
128 | pvolumes.append(pv) | |
3efd9988 FG |
129 | volumes.append(osd) |
130 | volumes.append(journal) | |
92f5a8d4 TL |
131 | monkeypatch.setattr(lvm.listing.api, 'get_pvs', lambda **kwargs: |
132 | pvolumes) | |
133 | monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs: | |
134 | volumes) | |
135 | ||
3efd9988 FG |
136 | result = lvm.listing.List([]).full_report() |
137 | assert result['0'][0]['name'] == 'volume1' | |
138 | assert result['0'][1]['name'] == 'journal' | |
139 | ||
140 | def test_ceph_wal_lv_reported(self, volumes, monkeypatch): | |
141 | tags = 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=data' | |
142 | wal_tags = 'ceph.osd_id=0,ceph.wal_uuid=x,ceph.type=wal' | |
92f5a8d4 TL |
143 | osd = api.Volume(lv_name='volume1', lv_uuid='y', lv_tags=tags, |
144 | lv_path='/dev/VolGroup/lv', vg_name='VolGroup') | |
145 | wal = api.Volume(lv_name='wal', lv_uuid='x', lv_tags=wal_tags, | |
146 | lv_path='/dev/VolGroup/wal', vg_name='VolGroup') | |
3efd9988 FG |
147 | volumes.append(osd) |
148 | volumes.append(wal) | |
92f5a8d4 TL |
149 | monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs: |
150 | volumes) | |
151 | ||
3efd9988 FG |
152 | result = lvm.listing.List([]).full_report() |
153 | assert result['0'][0]['name'] == 'volume1' | |
154 | assert result['0'][1]['name'] == 'wal' | |
155 | ||
92f5a8d4 TL |
156 | @pytest.mark.parametrize('type_', ['journal', 'db', 'wal']) |
157 | def test_physical_2nd_device_gets_reported(self, type_, monkeypatch): | |
158 | tags = ('ceph.osd_id=0,ceph.{t}_uuid=x,ceph.type=data,' | |
159 | 'ceph.{t}_device=/dev/sda1').format(t=type_) | |
160 | osd = api.Volume(lv_name='volume1', lv_uuid='y', lv_tags=tags, | |
161 | vg_name='VolGroup', lv_path='/dev/VolGroup/lv') | |
162 | monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs: | |
163 | [osd]) | |
3efd9988 | 164 | |
3efd9988 FG |
165 | result = lvm.listing.List([]).full_report() |
166 | assert result['0'][1]['path'] == '/dev/sda1' | |
167 | assert result['0'][1]['tags'] == {'PARTUUID': 'x'} | |
92f5a8d4 | 168 | assert result['0'][1]['type'] == type_ |
3efd9988 FG |
169 | |
170 | ||
171 | class TestSingleReport(object): | |
172 | ||
173 | def test_not_a_ceph_lv(self, volumes, monkeypatch): | |
174 | # ceph lvs are detected by looking into its tags | |
92f5a8d4 TL |
175 | lv = api.Volume(lv_name='lv', lv_tags={}, lv_path='/dev/VolGroup/lv', |
176 | vg_name='VolGroup') | |
177 | monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs: | |
178 | [lv]) | |
179 | ||
3efd9988 FG |
180 | result = lvm.listing.List([]).single_report('VolGroup/lv') |
181 | assert result == {} | |
182 | ||
92f5a8d4 | 183 | def test_report_a_ceph_lv(self, pvolumes, volumes, monkeypatch): |
3efd9988 FG |
184 | # ceph lvs are detected by looking into its tags |
185 | tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data' | |
92f5a8d4 TL |
186 | lv = api.Volume(lv_name='lv', vg_name='VolGroup', lv_uuid='aaaa', |
187 | lv_path='/dev/VolGroup/lv', lv_tags=tags) | |
3efd9988 | 188 | volumes.append(lv) |
92f5a8d4 TL |
189 | monkeypatch.setattr(lvm.listing.api, 'get_pvs', lambda **kwargs: |
190 | pvolumes) | |
191 | monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs: | |
192 | volumes) | |
193 | ||
3efd9988 FG |
194 | result = lvm.listing.List([]).single_report('VolGroup/lv') |
195 | assert result['0'][0]['name'] == 'lv' | |
196 | assert result['0'][0]['lv_tags'] == tags | |
197 | assert result['0'][0]['path'] == '/dev/VolGroup/lv' | |
28e407b8 | 198 | assert result['0'][0]['devices'] == [] |
3efd9988 | 199 | |
92f5a8d4 | 200 | def test_report_a_ceph_journal_device(self, monkeypatch): |
3efd9988 | 201 | # ceph lvs are detected by looking into its tags |
92f5a8d4 TL |
202 | tags = 'ceph.osd_id=0,ceph.journal_uuid=x,ceph.type=data,' + \ |
203 | 'ceph.journal_device=/dev/sda1' | |
204 | lv = api.Volume(lv_name='lv', lv_uuid='aaa', lv_tags=tags, | |
205 | lv_path='/dev/VolGroup/lv', vg_name='VolGroup') | |
206 | monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs: | |
207 | [lv] if 'tags' in kwargs else []) | |
208 | ||
3efd9988 FG |
209 | result = lvm.listing.List([]).single_report('/dev/sda1') |
210 | assert result['0'][0]['tags'] == {'PARTUUID': 'x'} | |
211 | assert result['0'][0]['type'] == 'journal' | |
212 | assert result['0'][0]['path'] == '/dev/sda1' | |
28e407b8 | 213 | |
92f5a8d4 TL |
214 | def test_report_a_ceph_lv_with_devices(self, volumes, pvolumes, monkeypatch): |
215 | tags = 'ceph.osd_id=0,ceph.type=data' | |
216 | pv1 = api.PVolume(vg_name="VolGroup", pv_name='/dev/sda1', | |
217 | pv_uuid='', pv_tags={}, lv_uuid="aaaa") | |
218 | pv2 = api.PVolume(vg_name="VolGroup", pv_name='/dev/sdb1', | |
219 | pv_uuid='', pv_tags={}, lv_uuid="aaaa") | |
220 | lv = api.Volume(lv_name='lv', vg_name='VolGroup',lv_uuid='aaaa', | |
221 | lv_path='/dev/VolGroup/lv', lv_tags=tags) | |
222 | pvolumes.append(pv1) | |
223 | pvolumes.append(pv2) | |
28e407b8 | 224 | volumes.append(lv) |
92f5a8d4 TL |
225 | monkeypatch.setattr(lvm.listing.api, 'get_pvs', lambda **kwargs: |
226 | pvolumes) | |
227 | monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs: | |
228 | volumes) | |
229 | ||
28e407b8 AA |
230 | listing = lvm.listing.List([]) |
231 | listing._pvs = [ | |
232 | {'lv_uuid': 'aaaa', 'pv_name': '/dev/sda1', 'pv_tags': '', 'pv_uuid': ''}, | |
233 | {'lv_uuid': 'aaaa', 'pv_name': '/dev/sdb1', 'pv_tags': '', 'pv_uuid': ''}, | |
234 | ] | |
92f5a8d4 | 235 | |
28e407b8 AA |
236 | result = listing.single_report('VolGroup/lv') |
237 | assert result['0'][0]['name'] == 'lv' | |
238 | assert result['0'][0]['lv_tags'] == tags | |
239 | assert result['0'][0]['path'] == '/dev/VolGroup/lv' | |
240 | assert result['0'][0]['devices'] == ['/dev/sda1', '/dev/sdb1'] | |
241 | ||
92f5a8d4 TL |
242 | def test_report_a_ceph_lv_with_no_matching_devices(self, volumes, |
243 | monkeypatch): | |
244 | tags = 'ceph.osd_id=0,ceph.type=data' | |
245 | lv = api.Volume(lv_name='lv', vg_name='VolGroup', lv_uuid='aaaa', | |
246 | lv_path='/dev/VolGroup/lv', lv_tags=tags) | |
28e407b8 | 247 | volumes.append(lv) |
92f5a8d4 TL |
248 | monkeypatch.setattr(lvm.listing.api, 'get_lvs', lambda **kwargs: |
249 | volumes) | |
250 | ||
28e407b8 AA |
251 | listing = lvm.listing.List([]) |
252 | listing._pvs = [ | |
92f5a8d4 TL |
253 | {'lv_uuid': 'ffff', 'pv_name': '/dev/sda1', 'pv_tags': '', |
254 | 'pv_uuid': ''}, | |
255 | {'lv_uuid': 'ffff', 'pv_name': '/dev/sdb1', 'pv_tags': '', | |
256 | 'pv_uuid': ''}] | |
257 | ||
28e407b8 AA |
258 | result = listing.single_report('VolGroup/lv') |
259 | assert result['0'][0]['name'] == 'lv' | |
260 | assert result['0'][0]['lv_tags'] == tags | |
261 | assert result['0'][0]['path'] == '/dev/VolGroup/lv' | |
262 | assert result['0'][0]['devices'] == [] |