]>
Commit | Line | Data |
---|---|---|
91327a77 | 1 | import pytest |
1adf2230 AA |
2 | from ceph_volume.util import device |
3 | from ceph_volume.api import lvm as api | |
4 | ||
5 | ||
6 | class TestDevice(object): | |
7 | ||
8 | def test_sys_api(self, device_info): | |
9 | data = {"/dev/sda": {"foo": "bar"}} | |
10 | device_info(devices=data) | |
11 | disk = device.Device("/dev/sda") | |
12 | assert disk.sys_api | |
13 | assert "foo" in disk.sys_api | |
14 | ||
11fdf7f2 TL |
15 | def test_lvm_size(self, device_info): |
16 | # 5GB in size | |
17 | data = {"/dev/sda": {"size": "5368709120"}} | |
18 | device_info(devices=data) | |
19 | disk = device.Device("/dev/sda") | |
20 | assert disk.lvm_size.gb == 4 | |
21 | ||
22 | def test_lvm_size_rounds_down(self, device_info): | |
23 | # 5.5GB in size | |
24 | data = {"/dev/sda": {"size": "5905580032"}} | |
25 | device_info(devices=data) | |
26 | disk = device.Device("/dev/sda") | |
27 | assert disk.lvm_size.gb == 4 | |
28 | ||
1adf2230 | 29 | def test_is_lv(self, device_info): |
91327a77 | 30 | data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"} |
1adf2230 AA |
31 | device_info(lv=data) |
32 | disk = device.Device("vg/lv") | |
33 | assert disk.is_lv | |
34 | ||
eafe8130 | 35 | def test_vgs_is_empty(self, device_info, pvolumes, pvolumes_empty, monkeypatch): |
f64942e4 AA |
36 | BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={}) |
37 | pvolumes.append(BarPVolume) | |
eafe8130 | 38 | monkeypatch.setattr(api, 'PVolumes', lambda populate=True: pvolumes if populate else pvolumes_empty) |
f64942e4 AA |
39 | lsblk = {"TYPE": "disk"} |
40 | device_info(lsblk=lsblk) | |
41 | disk = device.Device("/dev/nvme0n1") | |
42 | assert disk.vgs == [] | |
43 | ||
92f5a8d4 TL |
44 | def test_vgs_is_not_empty(self, device_info, monkeypatch): |
45 | vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6, | |
46 | vg_extent_size=1073741824) | |
47 | monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg]) | |
f64942e4 AA |
48 | lsblk = {"TYPE": "disk"} |
49 | device_info(lsblk=lsblk) | |
50 | disk = device.Device("/dev/nvme0n1") | |
51 | assert len(disk.vgs) == 1 | |
52 | ||
53 | def test_device_is_device(self, device_info, pvolumes): | |
1adf2230 AA |
54 | data = {"/dev/sda": {"foo": "bar"}} |
55 | lsblk = {"TYPE": "device"} | |
56 | device_info(devices=data, lsblk=lsblk) | |
57 | disk = device.Device("/dev/sda") | |
f64942e4 AA |
58 | assert disk.is_device is True |
59 | ||
81eedcae TL |
60 | def test_device_is_rotational(self, device_info, pvolumes): |
61 | data = {"/dev/sda": {"rotational": "1"}} | |
62 | lsblk = {"TYPE": "device"} | |
63 | device_info(devices=data, lsblk=lsblk) | |
64 | disk = device.Device("/dev/sda") | |
65 | assert disk.rotational | |
66 | ||
67 | def test_device_is_not_rotational(self, device_info, pvolumes): | |
68 | data = {"/dev/sda": {"rotational": "0"}} | |
69 | lsblk = {"TYPE": "device"} | |
70 | device_info(devices=data, lsblk=lsblk) | |
71 | disk = device.Device("/dev/sda") | |
72 | assert not disk.rotational | |
73 | ||
74 | def test_device_is_rotational_lsblk(self, device_info, pvolumes): | |
75 | data = {"/dev/sda": {"foo": "bar"}} | |
76 | lsblk = {"TYPE": "device", "ROTA": "1"} | |
77 | device_info(devices=data, lsblk=lsblk) | |
78 | disk = device.Device("/dev/sda") | |
79 | assert disk.rotational | |
80 | ||
81 | def test_device_is_not_rotational_lsblk(self, device_info, pvolumes): | |
82 | data = {"/dev/sda": {"rotational": "0"}} | |
83 | lsblk = {"TYPE": "device", "ROTA": "0"} | |
84 | device_info(devices=data, lsblk=lsblk) | |
85 | disk = device.Device("/dev/sda") | |
86 | assert not disk.rotational | |
87 | ||
88 | def test_device_is_rotational_defaults_true(self, device_info, pvolumes): | |
89 | # rotational will default true if no info from sys_api or lsblk is found | |
90 | data = {"/dev/sda": {"foo": "bar"}} | |
91 | lsblk = {"TYPE": "device", "foo": "bar"} | |
92 | device_info(devices=data, lsblk=lsblk) | |
93 | disk = device.Device("/dev/sda") | |
94 | assert disk.rotational | |
95 | ||
f64942e4 AA |
96 | def test_disk_is_device(self, device_info, pvolumes): |
97 | data = {"/dev/sda": {"foo": "bar"}} | |
98 | lsblk = {"TYPE": "disk"} | |
99 | device_info(devices=data, lsblk=lsblk) | |
100 | disk = device.Device("/dev/sda") | |
101 | assert disk.is_device is True | |
1adf2230 AA |
102 | |
103 | def test_is_partition(self, device_info, pvolumes): | |
104 | data = {"/dev/sda": {"foo": "bar"}} | |
105 | lsblk = {"TYPE": "part"} | |
106 | device_info(devices=data, lsblk=lsblk) | |
107 | disk = device.Device("/dev/sda") | |
108 | assert disk.is_partition | |
109 | ||
110 | def test_is_not_lvm_memeber(self, device_info, pvolumes): | |
111 | data = {"/dev/sda": {"foo": "bar"}} | |
112 | lsblk = {"TYPE": "part"} | |
113 | device_info(devices=data, lsblk=lsblk) | |
114 | disk = device.Device("/dev/sda") | |
115 | assert not disk.is_lvm_member | |
116 | ||
117 | def test_is_lvm_memeber(self, device_info, pvolumes): | |
118 | data = {"/dev/sda": {"foo": "bar"}} | |
119 | lsblk = {"TYPE": "part"} | |
120 | device_info(devices=data, lsblk=lsblk) | |
121 | disk = device.Device("/dev/sda") | |
122 | assert not disk.is_lvm_member | |
123 | ||
124 | def test_is_mapper_device(self, device_info): | |
125 | device_info() | |
126 | disk = device.Device("/dev/mapper/foo") | |
127 | assert disk.is_mapper | |
128 | ||
f64942e4 AA |
129 | def test_dm_is_mapper_device(self, device_info): |
130 | device_info() | |
131 | disk = device.Device("/dev/dm-4") | |
132 | assert disk.is_mapper | |
133 | ||
1adf2230 AA |
134 | def test_is_not_mapper_device(self, device_info): |
135 | device_info() | |
136 | disk = device.Device("/dev/sda") | |
137 | assert not disk.is_mapper | |
138 | ||
494da23a TL |
139 | @pytest.mark.usefixtures("lsblk_ceph_disk_member", |
140 | "disable_kernel_queries", | |
141 | "disable_lvm_queries") | |
92f5a8d4 | 142 | def test_is_ceph_disk_lsblk(self, monkeypatch, patch_bluestore_label): |
91327a77 AA |
143 | disk = device.Device("/dev/sda") |
144 | assert disk.is_ceph_disk_member | |
145 | ||
494da23a TL |
146 | @pytest.mark.usefixtures("blkid_ceph_disk_member", |
147 | "disable_kernel_queries", | |
148 | "disable_lvm_queries") | |
92f5a8d4 | 149 | def test_is_ceph_disk_blkid(self, monkeypatch, patch_bluestore_label): |
494da23a TL |
150 | monkeypatch.setattr("ceph_volume.util.device.disk.lsblk", |
151 | lambda path: {'PARTLABEL': ""}) | |
f64942e4 AA |
152 | disk = device.Device("/dev/sda") |
153 | assert disk.is_ceph_disk_member | |
f64942e4 | 154 | |
494da23a TL |
155 | @pytest.mark.usefixtures("lsblk_ceph_disk_member", |
156 | "disable_kernel_queries", | |
157 | "disable_lvm_queries") | |
92f5a8d4 | 158 | def test_is_ceph_disk_member_not_available_lsblk(self, monkeypatch, patch_bluestore_label): |
91327a77 | 159 | disk = device.Device("/dev/sda") |
494da23a TL |
160 | assert disk.is_ceph_disk_member |
161 | assert not disk.available | |
162 | assert "Used by ceph-disk" in disk.rejected_reasons | |
91327a77 | 163 | |
494da23a TL |
164 | @pytest.mark.usefixtures("blkid_ceph_disk_member", |
165 | "disable_kernel_queries", | |
166 | "disable_lvm_queries") | |
92f5a8d4 | 167 | def test_is_ceph_disk_member_not_available_blkid(self, monkeypatch, patch_bluestore_label): |
494da23a TL |
168 | monkeypatch.setattr("ceph_volume.util.device.disk.lsblk", |
169 | lambda path: {'PARTLABEL': ""}) | |
91327a77 AA |
170 | disk = device.Device("/dev/sda") |
171 | assert disk.is_ceph_disk_member | |
494da23a TL |
172 | assert not disk.available |
173 | assert "Used by ceph-disk" in disk.rejected_reasons | |
91327a77 | 174 | |
92f5a8d4 TL |
175 | def test_reject_removable_device(self, device_info): |
176 | data = {"/dev/sdb": {"removable": 1}} | |
177 | device_info(devices=data) | |
178 | disk = device.Device("/dev/sdb") | |
179 | assert not disk.available | |
180 | ||
181 | def test_accept_non_removable_device(self, device_info): | |
182 | data = {"/dev/sdb": {"removable": 0, "size": 5368709120}} | |
183 | device_info(devices=data) | |
184 | disk = device.Device("/dev/sdb") | |
185 | assert disk.available | |
186 | ||
187 | def test_reject_readonly_device(self, device_info): | |
188 | data = {"/dev/cdrom": {"ro": 1}} | |
189 | device_info(devices=data) | |
190 | disk = device.Device("/dev/cdrom") | |
191 | assert not disk.available | |
192 | ||
193 | def test_reject_smaller_than_5gb(self, device_info): | |
194 | data = {"/dev/sda": {"size": 5368709119}} | |
195 | device_info(devices=data) | |
196 | disk = device.Device("/dev/sda") | |
197 | assert not disk.available, 'too small device is available' | |
198 | ||
199 | def test_accept_non_readonly_device(self, device_info): | |
200 | data = {"/dev/sda": {"ro": 0, "size": 5368709120}} | |
201 | device_info(devices=data) | |
202 | disk = device.Device("/dev/sda") | |
203 | assert disk.available | |
204 | ||
205 | def test_reject_bluestore_device(self, monkeypatch, patch_bluestore_label): | |
206 | patch_bluestore_label.return_value = True | |
207 | disk = device.Device("/dev/sda") | |
208 | assert not disk.available | |
209 | assert "Has BlueStore device label" in disk.rejected_reasons | |
210 | ||
494da23a TL |
211 | @pytest.mark.usefixtures("device_info_not_ceph_disk_member", |
212 | "disable_lvm_queries", | |
213 | "disable_kernel_queries") | |
92f5a8d4 | 214 | def test_is_not_ceph_disk_member_lsblk(self, patch_bluestore_label): |
91327a77 AA |
215 | disk = device.Device("/dev/sda") |
216 | assert disk.is_ceph_disk_member is False | |
217 | ||
92f5a8d4 TL |
218 | def test_existing_vg_available(self, monkeypatch, device_info): |
219 | vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6, | |
220 | vg_extent_size=1073741824) | |
221 | monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg]) | |
222 | lsblk = {"TYPE": "disk"} | |
223 | data = {"/dev/nvme0n1": {"size": "6442450944"}} | |
1adf2230 | 224 | device_info(devices=data, lsblk=lsblk) |
92f5a8d4 TL |
225 | disk = device.Device("/dev/nvme0n1") |
226 | assert disk.available_lvm | |
227 | assert not disk.available | |
228 | assert not disk.available_raw | |
229 | ||
230 | def test_existing_vg_too_small(self, monkeypatch, device_info): | |
231 | vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4, | |
232 | vg_extent_size=1073741824) | |
233 | monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg]) | |
234 | lsblk = {"TYPE": "disk"} | |
235 | data = {"/dev/nvme0n1": {"size": "6442450944"}} | |
236 | device_info(devices=data, lsblk=lsblk) | |
237 | disk = device.Device("/dev/nvme0n1") | |
238 | assert not disk.available_lvm | |
239 | assert not disk.available | |
240 | assert not disk.available_raw | |
241 | ||
242 | def test_multiple_existing_vgs(self, monkeypatch, device_info): | |
243 | vg1 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4, | |
244 | vg_extent_size=1073741824) | |
245 | vg2 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6, | |
246 | vg_extent_size=1073741824) | |
247 | monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg1, vg2]) | |
248 | lsblk = {"TYPE": "disk"} | |
249 | data = {"/dev/nvme0n1": {"size": "6442450944"}} | |
250 | device_info(devices=data, lsblk=lsblk) | |
251 | disk = device.Device("/dev/nvme0n1") | |
252 | assert disk.available_lvm | |
253 | assert not disk.available | |
254 | assert not disk.available_raw | |
91327a77 AA |
255 | |
256 | @pytest.mark.parametrize("ceph_type", ["data", "block"]) | |
eafe8130 | 257 | def test_used_by_ceph(self, device_info, pvolumes, pvolumes_empty, monkeypatch, ceph_type): |
91327a77 AA |
258 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg") |
259 | pvolumes.append(FooPVolume) | |
eafe8130 | 260 | monkeypatch.setattr(api, 'PVolumes', lambda populate=True: pvolumes if populate else pvolumes_empty) |
91327a77 AA |
261 | data = {"/dev/sda": {"foo": "bar"}} |
262 | lsblk = {"TYPE": "part"} | |
263 | lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type}} | |
264 | device_info(devices=data, lsblk=lsblk, lv=lv_data) | |
92f5a8d4 TL |
265 | vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6, |
266 | vg_extent_size=1073741824) | |
267 | monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg]) | |
91327a77 AA |
268 | disk = device.Device("/dev/sda") |
269 | assert disk.used_by_ceph | |
270 | ||
eafe8130 | 271 | def test_not_used_by_ceph(self, device_info, pvolumes, pvolumes_empty, monkeypatch): |
91327a77 AA |
272 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg") |
273 | pvolumes.append(FooPVolume) | |
eafe8130 | 274 | monkeypatch.setattr(api, 'PVolumes', lambda populate=True: pvolumes if populate else pvolumes_empty) |
91327a77 AA |
275 | data = {"/dev/sda": {"foo": "bar"}} |
276 | lsblk = {"TYPE": "part"} | |
277 | lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}} | |
278 | device_info(devices=data, lsblk=lsblk, lv=lv_data) | |
279 | disk = device.Device("/dev/sda") | |
280 | assert not disk.used_by_ceph | |
281 | ||
f64942e4 AA |
282 | def test_get_device_id(self, device_info): |
283 | udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']} | |
284 | device_info(udevadm=udev) | |
285 | disk = device.Device("/dev/sda") | |
286 | assert disk._get_device_id() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL' | |
287 | ||
288 | ||
289 | ||
290 | class TestDeviceEncryption(object): | |
291 | ||
292 | def test_partition_is_not_encrypted_lsblk(self, device_info, pvolumes): | |
293 | lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs'} | |
294 | device_info(lsblk=lsblk) | |
295 | disk = device.Device("/dev/sda") | |
296 | assert disk.is_encrypted is False | |
297 | ||
298 | def test_partition_is_encrypted_lsblk(self, device_info, pvolumes): | |
299 | lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS'} | |
300 | device_info(lsblk=lsblk) | |
301 | disk = device.Device("/dev/sda") | |
302 | assert disk.is_encrypted is True | |
303 | ||
304 | def test_partition_is_not_encrypted_blkid(self, device_info, pvolumes): | |
305 | lsblk = {'TYPE': 'part'} | |
306 | blkid = {'TYPE': 'ceph data'} | |
307 | device_info(lsblk=lsblk, blkid=blkid) | |
308 | disk = device.Device("/dev/sda") | |
309 | assert disk.is_encrypted is False | |
310 | ||
311 | def test_partition_is_encrypted_blkid(self, device_info, pvolumes): | |
312 | lsblk = {'TYPE': 'part'} | |
313 | blkid = {'TYPE': 'crypto_LUKS'} | |
314 | device_info(lsblk=lsblk, blkid=blkid) | |
315 | disk = device.Device("/dev/sda") | |
316 | assert disk.is_encrypted is True | |
317 | ||
eafe8130 | 318 | def test_mapper_is_encrypted_luks1(self, device_info, pvolumes, pvolumes_empty, monkeypatch): |
f64942e4 AA |
319 | status = {'type': 'LUKS1'} |
320 | monkeypatch.setattr(device, 'encryption_status', lambda x: status) | |
321 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
322 | blkid = {'TYPE': 'mapper'} | |
323 | device_info(lsblk=lsblk, blkid=blkid) | |
324 | disk = device.Device("/dev/mapper/uuid") | |
325 | assert disk.is_encrypted is True | |
326 | ||
eafe8130 | 327 | def test_mapper_is_encrypted_luks2(self, device_info, pvolumes, pvolumes_empty, monkeypatch): |
f64942e4 AA |
328 | status = {'type': 'LUKS2'} |
329 | monkeypatch.setattr(device, 'encryption_status', lambda x: status) | |
330 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
331 | blkid = {'TYPE': 'mapper'} | |
332 | device_info(lsblk=lsblk, blkid=blkid) | |
333 | disk = device.Device("/dev/mapper/uuid") | |
334 | assert disk.is_encrypted is True | |
335 | ||
eafe8130 | 336 | def test_mapper_is_encrypted_plain(self, device_info, pvolumes, pvolumes_empty, monkeypatch): |
f64942e4 AA |
337 | status = {'type': 'PLAIN'} |
338 | monkeypatch.setattr(device, 'encryption_status', lambda x: status) | |
339 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
340 | blkid = {'TYPE': 'mapper'} | |
341 | device_info(lsblk=lsblk, blkid=blkid) | |
342 | disk = device.Device("/dev/mapper/uuid") | |
343 | assert disk.is_encrypted is True | |
344 | ||
eafe8130 | 345 | def test_mapper_is_not_encrypted_plain(self, device_info, pvolumes, pvolumes_empty, monkeypatch): |
f64942e4 AA |
346 | monkeypatch.setattr(device, 'encryption_status', lambda x: {}) |
347 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
348 | blkid = {'TYPE': 'mapper'} | |
349 | device_info(lsblk=lsblk, blkid=blkid) | |
350 | disk = device.Device("/dev/mapper/uuid") | |
351 | assert disk.is_encrypted is False | |
352 | ||
353 | def test_lv_is_encrypted_blkid(self, device_info, pvolumes): | |
354 | lsblk = {'TYPE': 'lvm'} | |
355 | blkid = {'TYPE': 'crypto_LUKS'} | |
356 | device_info(lsblk=lsblk, blkid=blkid) | |
357 | disk = device.Device("/dev/sda") | |
358 | disk.lv_api = {} | |
359 | assert disk.is_encrypted is True | |
360 | ||
361 | def test_lv_is_not_encrypted_blkid(self, factory, device_info, pvolumes): | |
362 | lsblk = {'TYPE': 'lvm'} | |
363 | blkid = {'TYPE': 'xfs'} | |
364 | device_info(lsblk=lsblk, blkid=blkid) | |
365 | disk = device.Device("/dev/sda") | |
366 | disk.lv_api = factory(encrypted=None) | |
367 | assert disk.is_encrypted is False | |
368 | ||
369 | def test_lv_is_encrypted_lsblk(self, device_info, pvolumes): | |
370 | lsblk = {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'} | |
371 | blkid = {'TYPE': 'mapper'} | |
372 | device_info(lsblk=lsblk, blkid=blkid) | |
373 | disk = device.Device("/dev/sda") | |
374 | disk.lv_api = {} | |
375 | assert disk.is_encrypted is True | |
376 | ||
377 | def test_lv_is_not_encrypted_lsblk(self, factory, device_info, pvolumes): | |
378 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
379 | blkid = {'TYPE': 'mapper'} | |
380 | device_info(lsblk=lsblk, blkid=blkid) | |
381 | disk = device.Device("/dev/sda") | |
382 | disk.lv_api = factory(encrypted=None) | |
383 | assert disk.is_encrypted is False | |
384 | ||
385 | def test_lv_is_encrypted_lvm_api(self, factory, device_info, pvolumes): | |
386 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
387 | blkid = {'TYPE': 'mapper'} | |
388 | device_info(lsblk=lsblk, blkid=blkid) | |
389 | disk = device.Device("/dev/sda") | |
390 | disk.lv_api = factory(encrypted=True) | |
391 | assert disk.is_encrypted is True | |
392 | ||
393 | def test_lv_is_not_encrypted_lvm_api(self, factory, device_info, pvolumes): | |
394 | lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'} | |
395 | blkid = {'TYPE': 'mapper'} | |
396 | device_info(lsblk=lsblk, blkid=blkid) | |
397 | disk = device.Device("/dev/sda") | |
398 | disk.lv_api = factory(encrypted=False) | |
399 | assert disk.is_encrypted is False | |
400 | ||
91327a77 AA |
401 | |
402 | class TestDeviceOrdering(object): | |
403 | ||
404 | def setup(self): | |
405 | self.data = { | |
406 | "/dev/sda": {"removable": 0}, | |
407 | "/dev/sdb": {"removable": 1}, # invalid | |
408 | "/dev/sdc": {"removable": 0}, | |
409 | "/dev/sdd": {"removable": 1}, # invalid | |
410 | } | |
411 | ||
412 | def test_valid_before_invalid(self, device_info): | |
413 | device_info(devices=self.data) | |
414 | sda = device.Device("/dev/sda") | |
415 | sdb = device.Device("/dev/sdb") | |
416 | ||
417 | assert sda < sdb | |
418 | assert sdb > sda | |
419 | ||
420 | def test_valid_alphabetical_ordering(self, device_info): | |
421 | device_info(devices=self.data) | |
422 | sda = device.Device("/dev/sda") | |
423 | sdc = device.Device("/dev/sdc") | |
424 | ||
425 | assert sda < sdc | |
426 | assert sdc > sda | |
427 | ||
428 | def test_invalid_alphabetical_ordering(self, device_info): | |
429 | device_info(devices=self.data) | |
430 | sdb = device.Device("/dev/sdb") | |
431 | sdd = device.Device("/dev/sdd") | |
432 | ||
433 | assert sdb < sdd | |
434 | assert sdd > sdb | |
435 | ||
436 | ||
91327a77 AA |
437 | class TestCephDiskDevice(object): |
438 | ||
439 | def test_partlabel_lsblk(self, device_info): | |
440 | lsblk = {"PARTLABEL": ""} | |
441 | device_info(lsblk=lsblk) | |
442 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
443 | ||
444 | assert disk.partlabel == '' | |
445 | ||
446 | def test_partlabel_blkid(self, device_info): | |
447 | lsblk = {"PARTLABEL": ""} | |
448 | blkid = {"PARTLABEL": "ceph data"} | |
449 | device_info(lsblk=lsblk, blkid=blkid) | |
450 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
451 | ||
452 | assert disk.partlabel == 'ceph data' | |
453 | ||
494da23a TL |
454 | @pytest.mark.usefixtures("blkid_ceph_disk_member", |
455 | "disable_kernel_queries", | |
456 | "disable_lvm_queries") | |
92f5a8d4 | 457 | def test_is_member_blkid(self, monkeypatch, patch_bluestore_label): |
494da23a TL |
458 | monkeypatch.setattr("ceph_volume.util.device.disk.lsblk", |
459 | lambda path: {'PARTLABEL': ""}) | |
91327a77 AA |
460 | disk = device.CephDiskDevice(device.Device("/dev/sda")) |
461 | ||
462 | assert disk.is_member is True | |
463 | ||
494da23a TL |
464 | @pytest.mark.usefixtures("lsblk_ceph_disk_member", |
465 | "disable_kernel_queries", | |
466 | "disable_lvm_queries") | |
92f5a8d4 | 467 | def test_is_member_lsblk(self, patch_bluestore_label): |
91327a77 AA |
468 | disk = device.CephDiskDevice(device.Device("/dev/sda")) |
469 | ||
470 | assert disk.is_member is True | |
471 | ||
472 | def test_unknown_type(self, device_info): | |
473 | lsblk = {"PARTLABEL": "gluster"} | |
474 | device_info(lsblk=lsblk) | |
475 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
476 | ||
477 | assert disk.type == 'unknown' | |
478 | ||
494da23a TL |
479 | ceph_types = ['data', 'wal', 'db', 'lockbox', 'journal', 'block'] |
480 | ||
481 | @pytest.mark.usefixtures("blkid_ceph_disk_member", | |
482 | "disable_kernel_queries", | |
483 | "disable_lvm_queries") | |
484 | def test_type_blkid(self, monkeypatch, device_info, ceph_partlabel): | |
485 | monkeypatch.setattr("ceph_volume.util.device.disk.lsblk", | |
486 | lambda path: {'PARTLABEL': ''}) | |
91327a77 AA |
487 | disk = device.CephDiskDevice(device.Device("/dev/sda")) |
488 | ||
494da23a | 489 | assert disk.type in self.ceph_types |
91327a77 | 490 | |
494da23a TL |
491 | @pytest.mark.usefixtures("blkid_ceph_disk_member", |
492 | "lsblk_ceph_disk_member", | |
493 | "disable_kernel_queries", | |
494 | "disable_lvm_queries") | |
495 | def test_type_lsblk(self, device_info, ceph_partlabel): | |
91327a77 AA |
496 | disk = device.CephDiskDevice(device.Device("/dev/sda")) |
497 | ||
494da23a | 498 | assert disk.type in self.ceph_types |