]>
Commit | Line | Data |
---|---|---|
91327a77 | 1 | import pytest |
f6b5b4d7 | 2 | from copy import deepcopy |
1adf2230 AA |
3 | from ceph_volume.util import device |
4 | from ceph_volume.api import lvm as api | |
522d829b | 5 | from mock.mock import patch, mock_open |
1adf2230 AA |
6 | |
7 | ||
8 | class TestDevice(object): | |
9 | ||
f6b5b4d7 TL |
10 | def test_sys_api(self, monkeypatch, device_info): |
11 | volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg', | |
12 | lv_tags={}, lv_path='/dev/VolGroup/lv') | |
13 | volumes = [] | |
14 | volumes.append(volume) | |
15 | monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: | |
16 | deepcopy(volumes)) | |
17 | ||
1adf2230 | 18 | data = {"/dev/sda": {"foo": "bar"}} |
f91f0fd5 TL |
19 | lsblk = {"TYPE": "disk"} |
20 | device_info(devices=data,lsblk=lsblk) | |
1adf2230 AA |
21 | disk = device.Device("/dev/sda") |
22 | assert disk.sys_api | |
23 | assert "foo" in disk.sys_api | |
24 | ||
f6b5b4d7 TL |
25 | def test_lvm_size(self, monkeypatch, device_info): |
26 | volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg', | |
27 | lv_tags={}, lv_path='/dev/VolGroup/lv') | |
28 | volumes = [] | |
29 | volumes.append(volume) | |
30 | monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: | |
31 | deepcopy(volumes)) | |
32 | ||
11fdf7f2 TL |
33 | # 5GB in size |
34 | data = {"/dev/sda": {"size": "5368709120"}} | |
f91f0fd5 TL |
35 | lsblk = {"TYPE": "disk"} |
36 | device_info(devices=data,lsblk=lsblk) | |
11fdf7f2 TL |
37 | disk = device.Device("/dev/sda") |
38 | assert disk.lvm_size.gb == 4 | |
39 | ||
40 | def test_lvm_size_rounds_down(self, device_info): | |
41 | # 5.5GB in size | |
42 | data = {"/dev/sda": {"size": "5905580032"}} | |
f91f0fd5 TL |
43 | lsblk = {"TYPE": "disk"} |
44 | device_info(devices=data,lsblk=lsblk) | |
11fdf7f2 TL |
45 | disk = device.Device("/dev/sda") |
46 | assert disk.lvm_size.gb == 4 | |
47 | ||
1adf2230 | 48 | def test_is_lv(self, device_info): |
91327a77 | 49 | data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"} |
f91f0fd5 TL |
50 | lsblk = {"TYPE": "lvm"} |
51 | device_info(lv=data,lsblk=lsblk) | |
1adf2230 AA |
52 | disk = device.Device("vg/lv") |
53 | assert disk.is_lv | |
54 | ||
f6b5b4d7 TL |
55 | def test_vgs_is_empty(self, device_info, monkeypatch): |
56 | BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", | |
57 | pv_tags={}) | |
58 | pvolumes = [] | |
f64942e4 | 59 | pvolumes.append(BarPVolume) |
f64942e4 AA |
60 | lsblk = {"TYPE": "disk"} |
61 | device_info(lsblk=lsblk) | |
f6b5b4d7 TL |
62 | monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: {}) |
63 | ||
f64942e4 AA |
64 | disk = device.Device("/dev/nvme0n1") |
65 | assert disk.vgs == [] | |
66 | ||
92f5a8d4 TL |
67 | def test_vgs_is_not_empty(self, device_info, monkeypatch): |
68 | vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6, | |
69 | vg_extent_size=1073741824) | |
70 | monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg]) | |
f64942e4 AA |
71 | lsblk = {"TYPE": "disk"} |
72 | device_info(lsblk=lsblk) | |
73 | disk = device.Device("/dev/nvme0n1") | |
74 | assert len(disk.vgs) == 1 | |
75 | ||
f6b5b4d7 | 76 | def test_device_is_device(self, device_info): |
1adf2230 AA |
77 | data = {"/dev/sda": {"foo": "bar"}} |
78 | lsblk = {"TYPE": "device"} | |
79 | device_info(devices=data, lsblk=lsblk) | |
80 | disk = device.Device("/dev/sda") | |
f64942e4 AA |
81 | assert disk.is_device is True |
82 | ||
f6b5b4d7 | 83 | def test_device_is_rotational(self, device_info): |
81eedcae TL |
84 | data = {"/dev/sda": {"rotational": "1"}} |
85 | lsblk = {"TYPE": "device"} | |
86 | device_info(devices=data, lsblk=lsblk) | |
87 | disk = device.Device("/dev/sda") | |
88 | assert disk.rotational | |
89 | ||
f6b5b4d7 | 90 | def test_device_is_not_rotational(self, device_info): |
81eedcae TL |
91 | data = {"/dev/sda": {"rotational": "0"}} |
92 | lsblk = {"TYPE": "device"} | |
93 | device_info(devices=data, lsblk=lsblk) | |
94 | disk = device.Device("/dev/sda") | |
95 | assert not disk.rotational | |
96 | ||
f6b5b4d7 | 97 | def test_device_is_rotational_lsblk(self, device_info): |
81eedcae TL |
98 | data = {"/dev/sda": {"foo": "bar"}} |
99 | lsblk = {"TYPE": "device", "ROTA": "1"} | |
100 | device_info(devices=data, lsblk=lsblk) | |
101 | disk = device.Device("/dev/sda") | |
102 | assert disk.rotational | |
103 | ||
f6b5b4d7 | 104 | def test_device_is_not_rotational_lsblk(self, device_info): |
81eedcae TL |
105 | data = {"/dev/sda": {"rotational": "0"}} |
106 | lsblk = {"TYPE": "device", "ROTA": "0"} | |
107 | device_info(devices=data, lsblk=lsblk) | |
108 | disk = device.Device("/dev/sda") | |
109 | assert not disk.rotational | |
110 | ||
f6b5b4d7 | 111 | def test_device_is_rotational_defaults_true(self, device_info): |
81eedcae TL |
112 | # rotational will default true if no info from sys_api or lsblk is found |
113 | data = {"/dev/sda": {"foo": "bar"}} | |
114 | lsblk = {"TYPE": "device", "foo": "bar"} | |
115 | device_info(devices=data, lsblk=lsblk) | |
116 | disk = device.Device("/dev/sda") | |
117 | assert disk.rotational | |
118 | ||
f6b5b4d7 | 119 | def test_disk_is_device(self, device_info): |
f64942e4 AA |
120 | data = {"/dev/sda": {"foo": "bar"}} |
121 | lsblk = {"TYPE": "disk"} | |
122 | device_info(devices=data, lsblk=lsblk) | |
123 | disk = device.Device("/dev/sda") | |
124 | assert disk.is_device is True | |
1adf2230 | 125 | |
f6b5b4d7 | 126 | def test_is_partition(self, device_info): |
f91f0fd5 | 127 | data = {"/dev/sda1": {"foo": "bar"}} |
522d829b | 128 | lsblk = {"TYPE": "part", "PKNAME": "sda"} |
1adf2230 | 129 | device_info(devices=data, lsblk=lsblk) |
f91f0fd5 | 130 | disk = device.Device("/dev/sda1") |
1adf2230 AA |
131 | assert disk.is_partition |
132 | ||
f6b5b4d7 TL |
133 | def test_is_not_acceptable_device(self, device_info): |
134 | data = {"/dev/dm-0": {"foo": "bar"}} | |
135 | lsblk = {"TYPE": "mpath"} | |
136 | device_info(devices=data, lsblk=lsblk) | |
137 | disk = device.Device("/dev/dm-0") | |
138 | assert not disk.is_device | |
139 | ||
140 | def test_is_not_lvm_memeber(self, device_info): | |
f91f0fd5 | 141 | data = {"/dev/sda1": {"foo": "bar"}} |
522d829b | 142 | lsblk = {"TYPE": "part", "PKNAME": "sda"} |
1adf2230 | 143 | device_info(devices=data, lsblk=lsblk) |
f91f0fd5 | 144 | disk = device.Device("/dev/sda1") |
1adf2230 AA |
145 | assert not disk.is_lvm_member |
146 | ||
f6b5b4d7 | 147 | def test_is_lvm_memeber(self, device_info): |
f91f0fd5 | 148 | data = {"/dev/sda1": {"foo": "bar"}} |
522d829b | 149 | lsblk = {"TYPE": "part", "PKNAME": "sda"} |
1adf2230 | 150 | device_info(devices=data, lsblk=lsblk) |
f91f0fd5 | 151 | disk = device.Device("/dev/sda1") |
1adf2230 AA |
152 | assert not disk.is_lvm_member |
153 | ||
154 | def test_is_mapper_device(self, device_info): | |
f91f0fd5 TL |
155 | lsblk = {"TYPE": "lvm"} |
156 | device_info(lsblk=lsblk) | |
1adf2230 AA |
157 | disk = device.Device("/dev/mapper/foo") |
158 | assert disk.is_mapper | |
159 | ||
f64942e4 | 160 | def test_dm_is_mapper_device(self, device_info): |
f91f0fd5 TL |
161 | lsblk = {"TYPE": "lvm"} |
162 | device_info(lsblk=lsblk) | |
f64942e4 AA |
163 | disk = device.Device("/dev/dm-4") |
164 | assert disk.is_mapper | |
165 | ||
1adf2230 | 166 | def test_is_not_mapper_device(self, device_info): |
f91f0fd5 TL |
167 | lsblk = {"TYPE": "disk"} |
168 | device_info(lsblk=lsblk) | |
1adf2230 AA |
169 | disk = device.Device("/dev/sda") |
170 | assert not disk.is_mapper | |
171 | ||
494da23a | 172 | @pytest.mark.usefixtures("lsblk_ceph_disk_member", |
f6b5b4d7 | 173 | "disable_kernel_queries") |
92f5a8d4 | 174 | def test_is_ceph_disk_lsblk(self, monkeypatch, patch_bluestore_label): |
91327a77 AA |
175 | disk = device.Device("/dev/sda") |
176 | assert disk.is_ceph_disk_member | |
177 | ||
494da23a | 178 | @pytest.mark.usefixtures("blkid_ceph_disk_member", |
f6b5b4d7 | 179 | "disable_kernel_queries") |
92f5a8d4 | 180 | def test_is_ceph_disk_blkid(self, monkeypatch, patch_bluestore_label): |
f64942e4 AA |
181 | disk = device.Device("/dev/sda") |
182 | assert disk.is_ceph_disk_member | |
f64942e4 | 183 | |
494da23a | 184 | @pytest.mark.usefixtures("lsblk_ceph_disk_member", |
f6b5b4d7 | 185 | "disable_kernel_queries") |
92f5a8d4 | 186 | def test_is_ceph_disk_member_not_available_lsblk(self, monkeypatch, patch_bluestore_label): |
91327a77 | 187 | disk = device.Device("/dev/sda") |
494da23a TL |
188 | assert disk.is_ceph_disk_member |
189 | assert not disk.available | |
190 | assert "Used by ceph-disk" in disk.rejected_reasons | |
91327a77 | 191 | |
494da23a | 192 | @pytest.mark.usefixtures("blkid_ceph_disk_member", |
f6b5b4d7 | 193 | "disable_kernel_queries") |
92f5a8d4 | 194 | def test_is_ceph_disk_member_not_available_blkid(self, monkeypatch, patch_bluestore_label): |
91327a77 AA |
195 | disk = device.Device("/dev/sda") |
196 | assert disk.is_ceph_disk_member | |
494da23a TL |
197 | assert not disk.available |
198 | assert "Used by ceph-disk" in disk.rejected_reasons | |
91327a77 | 199 | |
92f5a8d4 TL |
200 | def test_reject_removable_device(self, device_info): |
201 | data = {"/dev/sdb": {"removable": 1}} | |
f91f0fd5 TL |
202 | lsblk = {"TYPE": "disk"} |
203 | device_info(devices=data,lsblk=lsblk) | |
92f5a8d4 TL |
204 | disk = device.Device("/dev/sdb") |
205 | assert not disk.available | |
206 | ||
f67539c2 TL |
207 | def test_reject_device_with_gpt_headers(self, device_info): |
208 | data = {"/dev/sdb": {"removable": 0, "size": 5368709120}} | |
209 | lsblk = {"TYPE": "disk"} | |
210 | blkid= {"PTTYPE": "gpt"} | |
211 | device_info( | |
212 | devices=data, | |
213 | blkid=blkid, | |
214 | lsblk=lsblk, | |
215 | ) | |
216 | disk = device.Device("/dev/sdb") | |
217 | assert not disk.available | |
218 | ||
92f5a8d4 TL |
219 | def test_accept_non_removable_device(self, device_info): |
220 | data = {"/dev/sdb": {"removable": 0, "size": 5368709120}} | |
f91f0fd5 TL |
221 | lsblk = {"TYPE": "disk"} |
222 | device_info(devices=data,lsblk=lsblk) | |
92f5a8d4 TL |
223 | disk = device.Device("/dev/sdb") |
224 | assert disk.available | |
225 | ||
f91f0fd5 TL |
226 | def test_reject_not_acceptable_device(self, device_info): |
227 | data = {"/dev/dm-0": {"foo": "bar"}} | |
228 | lsblk = {"TYPE": "mpath"} | |
229 | device_info(devices=data, lsblk=lsblk) | |
230 | disk = device.Device("/dev/dm-0") | |
231 | assert not disk.available | |
232 | ||
92f5a8d4 TL |
233 | def test_reject_readonly_device(self, device_info): |
234 | data = {"/dev/cdrom": {"ro": 1}} | |
f91f0fd5 TL |
235 | lsblk = {"TYPE": "disk"} |
236 | device_info(devices=data,lsblk=lsblk) | |
92f5a8d4 TL |
237 | disk = device.Device("/dev/cdrom") |
238 | assert not disk.available | |
239 | ||
240 | def test_reject_smaller_than_5gb(self, device_info): | |
241 | data = {"/dev/sda": {"size": 5368709119}} | |
f91f0fd5 TL |
242 | lsblk = {"TYPE": "disk"} |
243 | device_info(devices=data,lsblk=lsblk) | |
92f5a8d4 TL |
244 | disk = device.Device("/dev/sda") |
245 | assert not disk.available, 'too small device is available' | |
246 | ||
247 | def test_accept_non_readonly_device(self, device_info): | |
248 | data = {"/dev/sda": {"ro": 0, "size": 5368709120}} | |
f91f0fd5 TL |
249 | lsblk = {"TYPE": "disk"} |
250 | device_info(devices=data,lsblk=lsblk) | |
92f5a8d4 TL |
251 | disk = device.Device("/dev/sda") |
252 | assert disk.available | |
253 | ||
f91f0fd5 | 254 | def test_reject_bluestore_device(self, monkeypatch, patch_bluestore_label, device_info): |
92f5a8d4 | 255 | patch_bluestore_label.return_value = True |
f91f0fd5 TL |
256 | lsblk = {"TYPE": "disk"} |
257 | device_info(lsblk=lsblk) | |
92f5a8d4 TL |
258 | disk = device.Device("/dev/sda") |
259 | assert not disk.available | |
260 | assert "Has BlueStore device label" in disk.rejected_reasons | |
261 | ||
522d829b TL |
262 | def test_reject_device_with_oserror(self, monkeypatch, patch_bluestore_label, device_info): |
263 | patch_bluestore_label.side_effect = OSError('test failure') | |
264 | lsblk = {"TYPE": "disk"} | |
265 | device_info(lsblk=lsblk) | |
266 | disk = device.Device("/dev/sda") | |
267 | assert not disk.available | |
268 | assert "Failed to determine if device is BlueStore" in disk.rejected_reasons | |
269 | ||
494da23a | 270 | @pytest.mark.usefixtures("device_info_not_ceph_disk_member", |
494da23a | 271 | "disable_kernel_queries") |
92f5a8d4 | 272 | def test_is_not_ceph_disk_member_lsblk(self, patch_bluestore_label): |
91327a77 AA |
273 | disk = device.Device("/dev/sda") |
274 | assert disk.is_ceph_disk_member is False | |
275 | ||
92f5a8d4 | 276 | def test_existing_vg_available(self, monkeypatch, device_info): |
f91f0fd5 TL |
277 | vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=1536, |
278 | vg_extent_size=4194304) | |
92f5a8d4 TL |
279 | monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg]) |
280 | lsblk = {"TYPE": "disk"} | |
281 | data = {"/dev/nvme0n1": {"size": "6442450944"}} | |
1adf2230 | 282 | device_info(devices=data, lsblk=lsblk) |
92f5a8d4 TL |
283 | disk = device.Device("/dev/nvme0n1") |
284 | assert disk.available_lvm | |
285 | assert not disk.available | |
286 | assert not disk.available_raw | |
287 | ||
288 | def test_existing_vg_too_small(self, monkeypatch, device_info): | |
289 | vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4, | |
290 | vg_extent_size=1073741824) | |
291 | monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg]) | |
292 | lsblk = {"TYPE": "disk"} | |
293 | data = {"/dev/nvme0n1": {"size": "6442450944"}} | |
294 | device_info(devices=data, lsblk=lsblk) | |
295 | disk = device.Device("/dev/nvme0n1") | |
296 | assert not disk.available_lvm | |
297 | assert not disk.available | |
298 | assert not disk.available_raw | |
299 | ||
300 | def test_multiple_existing_vgs(self, monkeypatch, device_info): | |
f91f0fd5 TL |
301 | vg1 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=1000, |
302 | vg_extent_size=4194304) | |
303 | vg2 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=536, | |
304 | vg_extent_size=4194304) | |
92f5a8d4 TL |
305 | monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg1, vg2]) |
306 | lsblk = {"TYPE": "disk"} | |
307 | data = {"/dev/nvme0n1": {"size": "6442450944"}} | |
308 | device_info(devices=data, lsblk=lsblk) | |
309 | disk = device.Device("/dev/nvme0n1") | |
310 | assert disk.available_lvm | |
311 | assert not disk.available | |
312 | assert not disk.available_raw | |
91327a77 AA |
313 | |
314 | @pytest.mark.parametrize("ceph_type", ["data", "block"]) | |
f6b5b4d7 TL |
315 | def test_used_by_ceph(self, device_info, |
316 | monkeypatch, ceph_type): | |
91327a77 | 317 | data = {"/dev/sda": {"foo": "bar"}} |
522d829b | 318 | lsblk = {"TYPE": "part", "PKNAME": "sda"} |
f6b5b4d7 TL |
319 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", |
320 | lv_uuid="0000", pv_tags={}, vg_name="vg") | |
321 | pvolumes = [] | |
322 | pvolumes.append(FooPVolume) | |
323 | lv_data = {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg", | |
324 | "lv_uuid": "0000", "lv_tags": | |
325 | "ceph.osd_id=0,ceph.type="+ceph_type} | |
326 | volumes = [] | |
327 | lv = api.Volume(**lv_data) | |
328 | volumes.append(lv) | |
329 | monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes) | |
330 | monkeypatch.setattr(api, 'get_lvs', lambda **kwargs: | |
331 | deepcopy(volumes)) | |
332 | ||
91327a77 | 333 | device_info(devices=data, lsblk=lsblk, lv=lv_data) |
92f5a8d4 TL |
334 | vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6, |
335 | vg_extent_size=1073741824) | |
336 | monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg]) | |
91327a77 AA |
337 | disk = device.Device("/dev/sda") |
338 | assert disk.used_by_ceph | |
339 | ||
f6b5b4d7 | 340 | def test_not_used_by_ceph(self, device_info, monkeypatch): |
91327a77 | 341 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg") |
f6b5b4d7 | 342 | pvolumes = [] |
91327a77 | 343 | pvolumes.append(FooPVolume) |
91327a77 | 344 | data = {"/dev/sda": {"foo": "bar"}} |
522d829b | 345 | lsblk = {"TYPE": "part", "PKNAME": "sda"} |
91327a77 | 346 | lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}} |
f6b5b4d7 TL |
347 | monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes) |
348 | ||
91327a77 AA |
349 | device_info(devices=data, lsblk=lsblk, lv=lv_data) |
350 | disk = device.Device("/dev/sda") | |
351 | assert not disk.used_by_ceph | |
352 | ||
f64942e4 AA |
353 | def test_get_device_id(self, device_info): |
354 | udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']} | |
f91f0fd5 TL |
355 | lsblk = {"TYPE": "disk"} |
356 | device_info(udevadm=udev,lsblk=lsblk) | |
f64942e4 AA |
357 | disk = device.Device("/dev/sda") |
358 | assert disk._get_device_id() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL' | |
359 | ||
522d829b TL |
360 | def test_has_bluestore_label(self): |
361 | # patch device.Device __init__ function to do nothing since we want to only test the | |
362 | # low-level behavior of has_bluestore_label | |
363 | with patch.object(device.Device, "__init__", lambda self, path, with_lsm=False: None): | |
364 | disk = device.Device("/dev/sda") | |
365 | disk.abspath = "/dev/sda" | |
366 | with patch('builtins.open', mock_open(read_data=b'bluestore block device\n')): | |
367 | assert disk.has_bluestore_label | |
368 | with patch('builtins.open', mock_open(read_data=b'not a bluestore block device\n')): | |
369 | assert not disk.has_bluestore_label | |
f64942e4 AA |
370 | |
371 | ||
372 | class TestDeviceEncryption(object): | |
373 | ||
f6b5b4d7 | 374 | def test_partition_is_not_encrypted_lsblk(self, device_info): |
522d829b | 375 | lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs', 'PKNAME': 'sda'} |
f64942e4 AA |
376 | device_info(lsblk=lsblk) |
377 | disk = device.Device("/dev/sda") | |
378 | assert disk.is_encrypted is False | |
379 | ||
f6b5b4d7 | 380 | def test_partition_is_encrypted_lsblk(self, device_info): |
522d829b | 381 | lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS', 'PKNAME': 'sda'} |
f64942e4 AA |
382 | device_info(lsblk=lsblk) |
383 | disk = device.Device("/dev/sda") | |
384 | assert disk.is_encrypted is True | |
385 | ||
f6b5b4d7 | 386 | def test_partition_is_not_encrypted_blkid(self, device_info): |
522d829b | 387 | lsblk = {'TYPE': 'part', 'PKNAME': 'sda'} |
f64942e4 AA |
388 | blkid = {'TYPE': 'ceph data'} |
389 | device_info(lsblk=lsblk, blkid=blkid) | |
390 | disk = device.Device("/dev/sda") | |
391 | assert disk.is_encrypted is False | |
392 | ||
f6b5b4d7 | 393 | def test_partition_is_encrypted_blkid(self, device_info): |
522d829b | 394 | lsblk = {'TYPE': 'part', 'PKNAME': 'sda'} |
f64942e4 AA |
395 | blkid = {'TYPE': 'crypto_LUKS'} |
396 | device_info(lsblk=lsblk, blkid=blkid) | |
397 | disk = device.Device("/dev/sda") | |
398 | assert disk.is_encrypted is True | |
399 | ||
f6b5b4d7 | 400 | def test_mapper_is_encrypted_luks1(self, device_info, monkeypatch): |
f64942e4 AA |
401 | status = {'type': 'LUKS1'} |
402 | monkeypatch.setattr(device, 'encryption_status', lambda x: status) | |
403 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
404 | blkid = {'TYPE': 'mapper'} | |
405 | device_info(lsblk=lsblk, blkid=blkid) | |
406 | disk = device.Device("/dev/mapper/uuid") | |
407 | assert disk.is_encrypted is True | |
408 | ||
f6b5b4d7 | 409 | def test_mapper_is_encrypted_luks2(self, device_info, monkeypatch): |
f64942e4 AA |
410 | status = {'type': 'LUKS2'} |
411 | monkeypatch.setattr(device, 'encryption_status', lambda x: status) | |
412 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
413 | blkid = {'TYPE': 'mapper'} | |
414 | device_info(lsblk=lsblk, blkid=blkid) | |
415 | disk = device.Device("/dev/mapper/uuid") | |
416 | assert disk.is_encrypted is True | |
417 | ||
f6b5b4d7 | 418 | def test_mapper_is_encrypted_plain(self, device_info, monkeypatch): |
f64942e4 AA |
419 | status = {'type': 'PLAIN'} |
420 | monkeypatch.setattr(device, 'encryption_status', lambda x: status) | |
421 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
422 | blkid = {'TYPE': 'mapper'} | |
423 | device_info(lsblk=lsblk, blkid=blkid) | |
424 | disk = device.Device("/dev/mapper/uuid") | |
425 | assert disk.is_encrypted is True | |
426 | ||
f6b5b4d7 | 427 | def test_mapper_is_not_encrypted_plain(self, device_info, monkeypatch): |
f64942e4 AA |
428 | monkeypatch.setattr(device, 'encryption_status', lambda x: {}) |
429 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
430 | blkid = {'TYPE': 'mapper'} | |
431 | device_info(lsblk=lsblk, blkid=blkid) | |
432 | disk = device.Device("/dev/mapper/uuid") | |
433 | assert disk.is_encrypted is False | |
434 | ||
f6b5b4d7 | 435 | def test_lv_is_encrypted_blkid(self, device_info): |
f64942e4 AA |
436 | lsblk = {'TYPE': 'lvm'} |
437 | blkid = {'TYPE': 'crypto_LUKS'} | |
438 | device_info(lsblk=lsblk, blkid=blkid) | |
439 | disk = device.Device("/dev/sda") | |
440 | disk.lv_api = {} | |
441 | assert disk.is_encrypted is True | |
442 | ||
f6b5b4d7 | 443 | def test_lv_is_not_encrypted_blkid(self, factory, device_info): |
f64942e4 AA |
444 | lsblk = {'TYPE': 'lvm'} |
445 | blkid = {'TYPE': 'xfs'} | |
446 | device_info(lsblk=lsblk, blkid=blkid) | |
447 | disk = device.Device("/dev/sda") | |
448 | disk.lv_api = factory(encrypted=None) | |
449 | assert disk.is_encrypted is False | |
450 | ||
f6b5b4d7 | 451 | def test_lv_is_encrypted_lsblk(self, device_info): |
f64942e4 AA |
452 | lsblk = {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'} |
453 | blkid = {'TYPE': 'mapper'} | |
454 | device_info(lsblk=lsblk, blkid=blkid) | |
455 | disk = device.Device("/dev/sda") | |
456 | disk.lv_api = {} | |
457 | assert disk.is_encrypted is True | |
458 | ||
f6b5b4d7 | 459 | def test_lv_is_not_encrypted_lsblk(self, factory, device_info): |
f64942e4 AA |
460 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} |
461 | blkid = {'TYPE': 'mapper'} | |
462 | device_info(lsblk=lsblk, blkid=blkid) | |
463 | disk = device.Device("/dev/sda") | |
464 | disk.lv_api = factory(encrypted=None) | |
465 | assert disk.is_encrypted is False | |
466 | ||
f6b5b4d7 | 467 | def test_lv_is_encrypted_lvm_api(self, factory, device_info): |
f64942e4 AA |
468 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} |
469 | blkid = {'TYPE': 'mapper'} | |
470 | device_info(lsblk=lsblk, blkid=blkid) | |
471 | disk = device.Device("/dev/sda") | |
472 | disk.lv_api = factory(encrypted=True) | |
473 | assert disk.is_encrypted is True | |
474 | ||
f6b5b4d7 | 475 | def test_lv_is_not_encrypted_lvm_api(self, factory, device_info): |
f64942e4 AA |
476 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} |
477 | blkid = {'TYPE': 'mapper'} | |
478 | device_info(lsblk=lsblk, blkid=blkid) | |
479 | disk = device.Device("/dev/sda") | |
480 | disk.lv_api = factory(encrypted=False) | |
481 | assert disk.is_encrypted is False | |
482 | ||
91327a77 AA |
483 | |
484 | class TestDeviceOrdering(object): | |
485 | ||
486 | def setup(self): | |
487 | self.data = { | |
488 | "/dev/sda": {"removable": 0}, | |
489 | "/dev/sdb": {"removable": 1}, # invalid | |
490 | "/dev/sdc": {"removable": 0}, | |
491 | "/dev/sdd": {"removable": 1}, # invalid | |
492 | } | |
493 | ||
494 | def test_valid_before_invalid(self, device_info): | |
f91f0fd5 TL |
495 | lsblk = {"TYPE": "disk"} |
496 | device_info(devices=self.data,lsblk=lsblk) | |
91327a77 AA |
497 | sda = device.Device("/dev/sda") |
498 | sdb = device.Device("/dev/sdb") | |
499 | ||
500 | assert sda < sdb | |
501 | assert sdb > sda | |
502 | ||
503 | def test_valid_alphabetical_ordering(self, device_info): | |
f91f0fd5 TL |
504 | lsblk = {"TYPE": "disk"} |
505 | device_info(devices=self.data,lsblk=lsblk) | |
91327a77 AA |
506 | sda = device.Device("/dev/sda") |
507 | sdc = device.Device("/dev/sdc") | |
508 | ||
509 | assert sda < sdc | |
510 | assert sdc > sda | |
511 | ||
512 | def test_invalid_alphabetical_ordering(self, device_info): | |
f91f0fd5 TL |
513 | lsblk = {"TYPE": "disk"} |
514 | device_info(devices=self.data,lsblk=lsblk) | |
91327a77 AA |
515 | sdb = device.Device("/dev/sdb") |
516 | sdd = device.Device("/dev/sdd") | |
517 | ||
518 | assert sdb < sdd | |
519 | assert sdd > sdb | |
520 | ||
521 | ||
91327a77 AA |
522 | class TestCephDiskDevice(object): |
523 | ||
524 | def test_partlabel_lsblk(self, device_info): | |
f91f0fd5 | 525 | lsblk = {"TYPE": "disk", "PARTLABEL": ""} |
91327a77 AA |
526 | device_info(lsblk=lsblk) |
527 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
528 | ||
529 | assert disk.partlabel == '' | |
530 | ||
531 | def test_partlabel_blkid(self, device_info): | |
f91f0fd5 TL |
532 | blkid = {"TYPE": "disk", "PARTLABEL": "ceph data"} |
533 | device_info(blkid=blkid) | |
91327a77 AA |
534 | disk = device.CephDiskDevice(device.Device("/dev/sda")) |
535 | ||
536 | assert disk.partlabel == 'ceph data' | |
537 | ||
494da23a | 538 | @pytest.mark.usefixtures("blkid_ceph_disk_member", |
f6b5b4d7 | 539 | "disable_kernel_queries") |
92f5a8d4 | 540 | def test_is_member_blkid(self, monkeypatch, patch_bluestore_label): |
91327a77 AA |
541 | disk = device.CephDiskDevice(device.Device("/dev/sda")) |
542 | ||
543 | assert disk.is_member is True | |
544 | ||
494da23a | 545 | @pytest.mark.usefixtures("lsblk_ceph_disk_member", |
f6b5b4d7 | 546 | "disable_kernel_queries") |
f91f0fd5 TL |
547 | def test_is_member_lsblk(self, patch_bluestore_label, device_info): |
548 | lsblk = {"TYPE": "disk", "PARTLABEL": "ceph"} | |
549 | device_info(lsblk=lsblk) | |
91327a77 AA |
550 | disk = device.CephDiskDevice(device.Device("/dev/sda")) |
551 | ||
552 | assert disk.is_member is True | |
553 | ||
554 | def test_unknown_type(self, device_info): | |
f91f0fd5 | 555 | lsblk = {"TYPE": "disk", "PARTLABEL": "gluster"} |
91327a77 AA |
556 | device_info(lsblk=lsblk) |
557 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
558 | ||
559 | assert disk.type == 'unknown' | |
560 | ||
494da23a TL |
561 | ceph_types = ['data', 'wal', 'db', 'lockbox', 'journal', 'block'] |
562 | ||
563 | @pytest.mark.usefixtures("blkid_ceph_disk_member", | |
f6b5b4d7 | 564 | "disable_kernel_queries") |
494da23a | 565 | def test_type_blkid(self, monkeypatch, device_info, ceph_partlabel): |
91327a77 AA |
566 | disk = device.CephDiskDevice(device.Device("/dev/sda")) |
567 | ||
494da23a | 568 | assert disk.type in self.ceph_types |
91327a77 | 569 | |
494da23a TL |
570 | @pytest.mark.usefixtures("blkid_ceph_disk_member", |
571 | "lsblk_ceph_disk_member", | |
f6b5b4d7 | 572 | "disable_kernel_queries") |
494da23a | 573 | def test_type_lsblk(self, device_info, ceph_partlabel): |
91327a77 AA |
574 | disk = device.CephDiskDevice(device.Device("/dev/sda")) |
575 | ||
494da23a | 576 | assert disk.type in self.ceph_types |