]>
Commit | Line | Data |
---|---|---|
91327a77 | 1 | import pytest |
1adf2230 AA |
2 | from ceph_volume.util import device |
3 | from ceph_volume.api import lvm as api | |
4 | ||
5 | ||
6 | class TestDevice(object): | |
7 | ||
8 | def test_sys_api(self, device_info): | |
9 | data = {"/dev/sda": {"foo": "bar"}} | |
10 | device_info(devices=data) | |
11 | disk = device.Device("/dev/sda") | |
12 | assert disk.sys_api | |
13 | assert "foo" in disk.sys_api | |
14 | ||
11fdf7f2 TL |
15 | def test_lvm_size(self, device_info): |
16 | # 5GB in size | |
17 | data = {"/dev/sda": {"size": "5368709120"}} | |
18 | device_info(devices=data) | |
19 | disk = device.Device("/dev/sda") | |
20 | assert disk.lvm_size.gb == 4 | |
21 | ||
22 | def test_lvm_size_rounds_down(self, device_info): | |
23 | # 5.5GB in size | |
24 | data = {"/dev/sda": {"size": "5905580032"}} | |
25 | device_info(devices=data) | |
26 | disk = device.Device("/dev/sda") | |
27 | assert disk.lvm_size.gb == 4 | |
28 | ||
1adf2230 | 29 | def test_is_lv(self, device_info): |
91327a77 | 30 | data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"} |
1adf2230 AA |
31 | device_info(lv=data) |
32 | disk = device.Device("vg/lv") | |
33 | assert disk.is_lv | |
34 | ||
f64942e4 AA |
35 | def test_vgs_is_empty(self, device_info, pvolumes, monkeypatch): |
36 | BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={}) | |
37 | pvolumes.append(BarPVolume) | |
38 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
39 | lsblk = {"TYPE": "disk"} | |
40 | device_info(lsblk=lsblk) | |
41 | disk = device.Device("/dev/nvme0n1") | |
42 | assert disk.vgs == [] | |
43 | ||
44 | def test_vgs_is_not_empty(self, device_info, pvolumes, monkeypatch): | |
45 | BarPVolume = api.PVolume(vg_name='foo', lv_uuid='111', pv_name='/dev/nvme0n1', pv_uuid="0000", pv_tags={}) | |
46 | pvolumes.append(BarPVolume) | |
47 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
48 | lsblk = {"TYPE": "disk"} | |
49 | device_info(lsblk=lsblk) | |
50 | disk = device.Device("/dev/nvme0n1") | |
51 | assert len(disk.vgs) == 1 | |
52 | ||
53 | def test_device_is_device(self, device_info, pvolumes): | |
1adf2230 AA |
54 | data = {"/dev/sda": {"foo": "bar"}} |
55 | lsblk = {"TYPE": "device"} | |
56 | device_info(devices=data, lsblk=lsblk) | |
57 | disk = device.Device("/dev/sda") | |
f64942e4 AA |
58 | assert disk.is_device is True |
59 | ||
81eedcae TL |
60 | def test_device_is_rotational(self, device_info, pvolumes): |
61 | data = {"/dev/sda": {"rotational": "1"}} | |
62 | lsblk = {"TYPE": "device"} | |
63 | device_info(devices=data, lsblk=lsblk) | |
64 | disk = device.Device("/dev/sda") | |
65 | assert disk.rotational | |
66 | ||
67 | def test_device_is_not_rotational(self, device_info, pvolumes): | |
68 | data = {"/dev/sda": {"rotational": "0"}} | |
69 | lsblk = {"TYPE": "device"} | |
70 | device_info(devices=data, lsblk=lsblk) | |
71 | disk = device.Device("/dev/sda") | |
72 | assert not disk.rotational | |
73 | ||
74 | def test_device_is_rotational_lsblk(self, device_info, pvolumes): | |
75 | data = {"/dev/sda": {"foo": "bar"}} | |
76 | lsblk = {"TYPE": "device", "ROTA": "1"} | |
77 | device_info(devices=data, lsblk=lsblk) | |
78 | disk = device.Device("/dev/sda") | |
79 | assert disk.rotational | |
80 | ||
81 | def test_device_is_not_rotational_lsblk(self, device_info, pvolumes): | |
82 | data = {"/dev/sda": {"rotational": "0"}} | |
83 | lsblk = {"TYPE": "device", "ROTA": "0"} | |
84 | device_info(devices=data, lsblk=lsblk) | |
85 | disk = device.Device("/dev/sda") | |
86 | assert not disk.rotational | |
87 | ||
88 | def test_device_is_rotational_defaults_true(self, device_info, pvolumes): | |
89 | # rotational will default true if no info from sys_api or lsblk is found | |
90 | data = {"/dev/sda": {"foo": "bar"}} | |
91 | lsblk = {"TYPE": "device", "foo": "bar"} | |
92 | device_info(devices=data, lsblk=lsblk) | |
93 | disk = device.Device("/dev/sda") | |
94 | assert disk.rotational | |
95 | ||
f64942e4 AA |
96 | def test_disk_is_device(self, device_info, pvolumes): |
97 | data = {"/dev/sda": {"foo": "bar"}} | |
98 | lsblk = {"TYPE": "disk"} | |
99 | device_info(devices=data, lsblk=lsblk) | |
100 | disk = device.Device("/dev/sda") | |
101 | assert disk.is_device is True | |
1adf2230 AA |
102 | |
103 | def test_is_partition(self, device_info, pvolumes): | |
104 | data = {"/dev/sda": {"foo": "bar"}} | |
105 | lsblk = {"TYPE": "part"} | |
106 | device_info(devices=data, lsblk=lsblk) | |
107 | disk = device.Device("/dev/sda") | |
108 | assert disk.is_partition | |
109 | ||
110 | def test_is_not_lvm_memeber(self, device_info, pvolumes): | |
111 | data = {"/dev/sda": {"foo": "bar"}} | |
112 | lsblk = {"TYPE": "part"} | |
113 | device_info(devices=data, lsblk=lsblk) | |
114 | disk = device.Device("/dev/sda") | |
115 | assert not disk.is_lvm_member | |
116 | ||
117 | def test_is_lvm_memeber(self, device_info, pvolumes): | |
118 | data = {"/dev/sda": {"foo": "bar"}} | |
119 | lsblk = {"TYPE": "part"} | |
120 | device_info(devices=data, lsblk=lsblk) | |
121 | disk = device.Device("/dev/sda") | |
122 | assert not disk.is_lvm_member | |
123 | ||
124 | def test_is_mapper_device(self, device_info): | |
125 | device_info() | |
126 | disk = device.Device("/dev/mapper/foo") | |
127 | assert disk.is_mapper | |
128 | ||
f64942e4 AA |
129 | def test_dm_is_mapper_device(self, device_info): |
130 | device_info() | |
131 | disk = device.Device("/dev/dm-4") | |
132 | assert disk.is_mapper | |
133 | ||
1adf2230 AA |
134 | def test_is_not_mapper_device(self, device_info): |
135 | device_info() | |
136 | disk = device.Device("/dev/sda") | |
137 | assert not disk.is_mapper | |
138 | ||
91327a77 AA |
139 | def test_is_ceph_disk_member_lsblk(self, device_info): |
140 | lsblk = {"PARTLABEL": "ceph data"} | |
141 | device_info(lsblk=lsblk) | |
142 | disk = device.Device("/dev/sda") | |
143 | assert disk.is_ceph_disk_member | |
144 | ||
f64942e4 AA |
145 | def test_is_ceph_disk_member_not_available(self, device_info): |
146 | lsblk = {"PARTLABEL": "ceph data"} | |
147 | device_info(lsblk=lsblk) | |
148 | disk = device.Device("/dev/sda") | |
149 | assert disk.is_ceph_disk_member | |
150 | assert not disk.available | |
151 | assert "Used by ceph-disk" in disk.rejected_reasons | |
152 | ||
91327a77 AA |
153 | def test_is_not_ceph_disk_member_lsblk(self, device_info): |
154 | lsblk = {"PARTLABEL": "gluster partition"} | |
155 | device_info(lsblk=lsblk) | |
156 | disk = device.Device("/dev/sda") | |
157 | assert disk.is_ceph_disk_member is False | |
158 | ||
159 | def test_is_ceph_disk_member_blkid(self, device_info): | |
160 | # falls back to blkid | |
161 | lsblk = {"PARTLABEL": ""} | |
162 | blkid = {"PARTLABEL": "ceph data"} | |
163 | device_info(lsblk=lsblk, blkid=blkid) | |
164 | disk = device.Device("/dev/sda") | |
165 | assert disk.is_ceph_disk_member | |
166 | ||
167 | def test_is_not_ceph_disk_member_blkid(self, device_info): | |
168 | # falls back to blkid | |
169 | lsblk = {"PARTLABEL": ""} | |
170 | blkid = {"PARTLABEL": "gluster partition"} | |
171 | device_info(lsblk=lsblk, blkid=blkid) | |
172 | disk = device.Device("/dev/sda") | |
173 | assert disk.is_ceph_disk_member is False | |
174 | ||
1adf2230 | 175 | def test_pv_api(self, device_info, pvolumes, monkeypatch): |
91327a77 | 176 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg") |
1adf2230 AA |
177 | pvolumes.append(FooPVolume) |
178 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
179 | data = {"/dev/sda": {"foo": "bar"}} | |
180 | lsblk = {"TYPE": "part"} | |
181 | device_info(devices=data, lsblk=lsblk) | |
182 | disk = device.Device("/dev/sda") | |
183 | assert disk.pvs_api | |
91327a77 AA |
184 | |
185 | @pytest.mark.parametrize("ceph_type", ["data", "block"]) | |
186 | def test_used_by_ceph(self, device_info, pvolumes, monkeypatch, ceph_type): | |
187 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg") | |
188 | pvolumes.append(FooPVolume) | |
189 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
190 | data = {"/dev/sda": {"foo": "bar"}} | |
191 | lsblk = {"TYPE": "part"} | |
192 | lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type}} | |
193 | device_info(devices=data, lsblk=lsblk, lv=lv_data) | |
194 | disk = device.Device("/dev/sda") | |
195 | assert disk.used_by_ceph | |
196 | ||
197 | def test_not_used_by_ceph(self, device_info, pvolumes, monkeypatch): | |
198 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg") | |
199 | pvolumes.append(FooPVolume) | |
200 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
201 | data = {"/dev/sda": {"foo": "bar"}} | |
202 | lsblk = {"TYPE": "part"} | |
203 | lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}} | |
204 | device_info(devices=data, lsblk=lsblk, lv=lv_data) | |
205 | disk = device.Device("/dev/sda") | |
206 | assert not disk.used_by_ceph | |
207 | ||
f64942e4 AA |
208 | def test_get_device_id(self, device_info): |
209 | udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']} | |
210 | device_info(udevadm=udev) | |
211 | disk = device.Device("/dev/sda") | |
212 | assert disk._get_device_id() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL' | |
213 | ||
214 | ||
215 | ||
216 | class TestDeviceEncryption(object): | |
217 | ||
218 | def test_partition_is_not_encrypted_lsblk(self, device_info, pvolumes): | |
219 | lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs'} | |
220 | device_info(lsblk=lsblk) | |
221 | disk = device.Device("/dev/sda") | |
222 | assert disk.is_encrypted is False | |
223 | ||
224 | def test_partition_is_encrypted_lsblk(self, device_info, pvolumes): | |
225 | lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS'} | |
226 | device_info(lsblk=lsblk) | |
227 | disk = device.Device("/dev/sda") | |
228 | assert disk.is_encrypted is True | |
229 | ||
230 | def test_partition_is_not_encrypted_blkid(self, device_info, pvolumes): | |
231 | lsblk = {'TYPE': 'part'} | |
232 | blkid = {'TYPE': 'ceph data'} | |
233 | device_info(lsblk=lsblk, blkid=blkid) | |
234 | disk = device.Device("/dev/sda") | |
235 | assert disk.is_encrypted is False | |
236 | ||
237 | def test_partition_is_encrypted_blkid(self, device_info, pvolumes): | |
238 | lsblk = {'TYPE': 'part'} | |
239 | blkid = {'TYPE': 'crypto_LUKS'} | |
240 | device_info(lsblk=lsblk, blkid=blkid) | |
241 | disk = device.Device("/dev/sda") | |
242 | assert disk.is_encrypted is True | |
243 | ||
244 | def test_mapper_is_encrypted_luks1(self, device_info, pvolumes, monkeypatch): | |
245 | status = {'type': 'LUKS1'} | |
246 | monkeypatch.setattr(device, 'encryption_status', lambda x: status) | |
247 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
248 | blkid = {'TYPE': 'mapper'} | |
249 | device_info(lsblk=lsblk, blkid=blkid) | |
250 | disk = device.Device("/dev/mapper/uuid") | |
251 | assert disk.is_encrypted is True | |
252 | ||
253 | def test_mapper_is_encrypted_luks2(self, device_info, pvolumes, monkeypatch): | |
254 | status = {'type': 'LUKS2'} | |
255 | monkeypatch.setattr(device, 'encryption_status', lambda x: status) | |
256 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
257 | blkid = {'TYPE': 'mapper'} | |
258 | device_info(lsblk=lsblk, blkid=blkid) | |
259 | disk = device.Device("/dev/mapper/uuid") | |
260 | assert disk.is_encrypted is True | |
261 | ||
262 | def test_mapper_is_encrypted_plain(self, device_info, pvolumes, monkeypatch): | |
263 | status = {'type': 'PLAIN'} | |
264 | monkeypatch.setattr(device, 'encryption_status', lambda x: status) | |
265 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
266 | blkid = {'TYPE': 'mapper'} | |
267 | device_info(lsblk=lsblk, blkid=blkid) | |
268 | disk = device.Device("/dev/mapper/uuid") | |
269 | assert disk.is_encrypted is True | |
270 | ||
271 | def test_mapper_is_not_encrypted_plain(self, device_info, pvolumes, monkeypatch): | |
272 | monkeypatch.setattr(device, 'encryption_status', lambda x: {}) | |
273 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
274 | blkid = {'TYPE': 'mapper'} | |
275 | device_info(lsblk=lsblk, blkid=blkid) | |
276 | disk = device.Device("/dev/mapper/uuid") | |
277 | assert disk.is_encrypted is False | |
278 | ||
279 | def test_lv_is_encrypted_blkid(self, device_info, pvolumes): | |
280 | lsblk = {'TYPE': 'lvm'} | |
281 | blkid = {'TYPE': 'crypto_LUKS'} | |
282 | device_info(lsblk=lsblk, blkid=blkid) | |
283 | disk = device.Device("/dev/sda") | |
284 | disk.lv_api = {} | |
285 | assert disk.is_encrypted is True | |
286 | ||
287 | def test_lv_is_not_encrypted_blkid(self, factory, device_info, pvolumes): | |
288 | lsblk = {'TYPE': 'lvm'} | |
289 | blkid = {'TYPE': 'xfs'} | |
290 | device_info(lsblk=lsblk, blkid=blkid) | |
291 | disk = device.Device("/dev/sda") | |
292 | disk.lv_api = factory(encrypted=None) | |
293 | assert disk.is_encrypted is False | |
294 | ||
295 | def test_lv_is_encrypted_lsblk(self, device_info, pvolumes): | |
296 | lsblk = {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'} | |
297 | blkid = {'TYPE': 'mapper'} | |
298 | device_info(lsblk=lsblk, blkid=blkid) | |
299 | disk = device.Device("/dev/sda") | |
300 | disk.lv_api = {} | |
301 | assert disk.is_encrypted is True | |
302 | ||
303 | def test_lv_is_not_encrypted_lsblk(self, factory, device_info, pvolumes): | |
304 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
305 | blkid = {'TYPE': 'mapper'} | |
306 | device_info(lsblk=lsblk, blkid=blkid) | |
307 | disk = device.Device("/dev/sda") | |
308 | disk.lv_api = factory(encrypted=None) | |
309 | assert disk.is_encrypted is False | |
310 | ||
311 | def test_lv_is_encrypted_lvm_api(self, factory, device_info, pvolumes): | |
312 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
313 | blkid = {'TYPE': 'mapper'} | |
314 | device_info(lsblk=lsblk, blkid=blkid) | |
315 | disk = device.Device("/dev/sda") | |
316 | disk.lv_api = factory(encrypted=True) | |
317 | assert disk.is_encrypted is True | |
318 | ||
319 | def test_lv_is_not_encrypted_lvm_api(self, factory, device_info, pvolumes): | |
320 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
321 | blkid = {'TYPE': 'mapper'} | |
322 | device_info(lsblk=lsblk, blkid=blkid) | |
323 | disk = device.Device("/dev/sda") | |
324 | disk.lv_api = factory(encrypted=False) | |
325 | assert disk.is_encrypted is False | |
326 | ||
91327a77 AA |
327 | |
328 | class TestDeviceOrdering(object): | |
329 | ||
330 | def setup(self): | |
331 | self.data = { | |
332 | "/dev/sda": {"removable": 0}, | |
333 | "/dev/sdb": {"removable": 1}, # invalid | |
334 | "/dev/sdc": {"removable": 0}, | |
335 | "/dev/sdd": {"removable": 1}, # invalid | |
336 | } | |
337 | ||
338 | def test_valid_before_invalid(self, device_info): | |
339 | device_info(devices=self.data) | |
340 | sda = device.Device("/dev/sda") | |
341 | sdb = device.Device("/dev/sdb") | |
342 | ||
343 | assert sda < sdb | |
344 | assert sdb > sda | |
345 | ||
346 | def test_valid_alphabetical_ordering(self, device_info): | |
347 | device_info(devices=self.data) | |
348 | sda = device.Device("/dev/sda") | |
349 | sdc = device.Device("/dev/sdc") | |
350 | ||
351 | assert sda < sdc | |
352 | assert sdc > sda | |
353 | ||
354 | def test_invalid_alphabetical_ordering(self, device_info): | |
355 | device_info(devices=self.data) | |
356 | sdb = device.Device("/dev/sdb") | |
357 | sdd = device.Device("/dev/sdd") | |
358 | ||
359 | assert sdb < sdd | |
360 | assert sdd > sdb | |
361 | ||
362 | ||
363 | ceph_partlabels = [ | |
364 | 'ceph data', 'ceph journal', 'ceph block', | |
365 | 'ceph block.wal', 'ceph block.db', 'ceph lockbox' | |
366 | ] | |
367 | ||
368 | ||
369 | class TestCephDiskDevice(object): | |
370 | ||
371 | def test_partlabel_lsblk(self, device_info): | |
372 | lsblk = {"PARTLABEL": ""} | |
373 | device_info(lsblk=lsblk) | |
374 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
375 | ||
376 | assert disk.partlabel == '' | |
377 | ||
378 | def test_partlabel_blkid(self, device_info): | |
379 | lsblk = {"PARTLABEL": ""} | |
380 | blkid = {"PARTLABEL": "ceph data"} | |
381 | device_info(lsblk=lsblk, blkid=blkid) | |
382 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
383 | ||
384 | assert disk.partlabel == 'ceph data' | |
385 | ||
386 | @pytest.mark.parametrize("label", ceph_partlabels) | |
387 | def test_is_member_blkid(self, device_info, label): | |
388 | lsblk = {"PARTLABEL": ""} | |
389 | blkid = {"PARTLABEL": label} | |
390 | device_info(lsblk=lsblk, blkid=blkid) | |
391 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
392 | ||
393 | assert disk.is_member is True | |
394 | ||
395 | def test_reject_removable_device(self, device_info): | |
396 | data = {"/dev/sdb": {"removable": 1}} | |
397 | device_info(devices=data) | |
398 | disk = device.Device("/dev/sdb") | |
399 | assert not disk.available | |
400 | ||
401 | def test_accept_non_removable_device(self, device_info): | |
402 | data = {"/dev/sdb": {"removable": 0}} | |
403 | device_info(devices=data) | |
404 | disk = device.Device("/dev/sdb") | |
405 | assert disk.available | |
406 | ||
407 | def test_reject_readonly_device(self, device_info): | |
408 | data = {"/dev/cdrom": {"ro": 1}} | |
409 | device_info(devices=data) | |
410 | disk = device.Device("/dev/cdrom") | |
411 | assert not disk.available | |
412 | ||
413 | def test_accept_non_readonly_device(self, device_info): | |
414 | data = {"/dev/sda": {"ro": 0}} | |
415 | device_info(devices=data) | |
416 | disk = device.Device("/dev/sda") | |
417 | assert disk.available | |
418 | ||
419 | @pytest.mark.parametrize("label", ceph_partlabels) | |
420 | def test_is_member_lsblk(self, device_info, label): | |
421 | lsblk = {"PARTLABEL": label} | |
422 | device_info(lsblk=lsblk) | |
423 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
424 | ||
425 | assert disk.is_member is True | |
426 | ||
427 | def test_unknown_type(self, device_info): | |
428 | lsblk = {"PARTLABEL": "gluster"} | |
429 | device_info(lsblk=lsblk) | |
430 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
431 | ||
432 | assert disk.type == 'unknown' | |
433 | ||
434 | @pytest.mark.parametrize("label", ceph_partlabels) | |
435 | def test_type_blkid(self, device_info, label): | |
436 | expected = label.split()[-1].split('.')[-1] | |
437 | lsblk = {"PARTLABEL": ""} | |
438 | blkid = {"PARTLABEL": label} | |
439 | device_info(lsblk=lsblk, blkid=blkid) | |
440 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
441 | ||
442 | assert disk.type == expected | |
443 | ||
444 | @pytest.mark.parametrize("label", ceph_partlabels) | |
445 | def test_type_lsblk(self, device_info, label): | |
446 | expected = label.split()[-1].split('.')[-1] | |
447 | lsblk = {"PARTLABEL": label} | |
448 | blkid = {"PARTLABEL": ''} | |
449 | device_info(lsblk=lsblk, blkid=blkid) | |
450 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
451 | ||
452 | assert disk.type == expected |