]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / util / test_device.py
CommitLineData
91327a77 1import pytest
1adf2230
AA
2from ceph_volume.util import device
3from ceph_volume.api import lvm as api
4
5
6class TestDevice(object):
7
8 def test_sys_api(self, device_info):
9 data = {"/dev/sda": {"foo": "bar"}}
10 device_info(devices=data)
11 disk = device.Device("/dev/sda")
12 assert disk.sys_api
13 assert "foo" in disk.sys_api
14
11fdf7f2
TL
15 def test_lvm_size(self, device_info):
16 # 5GB in size
17 data = {"/dev/sda": {"size": "5368709120"}}
18 device_info(devices=data)
19 disk = device.Device("/dev/sda")
20 assert disk.lvm_size.gb == 4
21
22 def test_lvm_size_rounds_down(self, device_info):
23 # 5.5GB in size
24 data = {"/dev/sda": {"size": "5905580032"}}
25 device_info(devices=data)
26 disk = device.Device("/dev/sda")
27 assert disk.lvm_size.gb == 4
28
1adf2230 29 def test_is_lv(self, device_info):
91327a77 30 data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
1adf2230
AA
31 device_info(lv=data)
32 disk = device.Device("vg/lv")
33 assert disk.is_lv
34
f64942e4
AA
35 def test_vgs_is_empty(self, device_info, pvolumes, monkeypatch):
36 BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
37 pvolumes.append(BarPVolume)
38 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
39 lsblk = {"TYPE": "disk"}
40 device_info(lsblk=lsblk)
41 disk = device.Device("/dev/nvme0n1")
42 assert disk.vgs == []
43
44 def test_vgs_is_not_empty(self, device_info, pvolumes, monkeypatch):
45 BarPVolume = api.PVolume(vg_name='foo', lv_uuid='111', pv_name='/dev/nvme0n1', pv_uuid="0000", pv_tags={})
46 pvolumes.append(BarPVolume)
47 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
48 lsblk = {"TYPE": "disk"}
49 device_info(lsblk=lsblk)
50 disk = device.Device("/dev/nvme0n1")
51 assert len(disk.vgs) == 1
52
53 def test_device_is_device(self, device_info, pvolumes):
1adf2230
AA
54 data = {"/dev/sda": {"foo": "bar"}}
55 lsblk = {"TYPE": "device"}
56 device_info(devices=data, lsblk=lsblk)
57 disk = device.Device("/dev/sda")
f64942e4
AA
58 assert disk.is_device is True
59
81eedcae
TL
60 def test_device_is_rotational(self, device_info, pvolumes):
61 data = {"/dev/sda": {"rotational": "1"}}
62 lsblk = {"TYPE": "device"}
63 device_info(devices=data, lsblk=lsblk)
64 disk = device.Device("/dev/sda")
65 assert disk.rotational
66
67 def test_device_is_not_rotational(self, device_info, pvolumes):
68 data = {"/dev/sda": {"rotational": "0"}}
69 lsblk = {"TYPE": "device"}
70 device_info(devices=data, lsblk=lsblk)
71 disk = device.Device("/dev/sda")
72 assert not disk.rotational
73
74 def test_device_is_rotational_lsblk(self, device_info, pvolumes):
75 data = {"/dev/sda": {"foo": "bar"}}
76 lsblk = {"TYPE": "device", "ROTA": "1"}
77 device_info(devices=data, lsblk=lsblk)
78 disk = device.Device("/dev/sda")
79 assert disk.rotational
80
81 def test_device_is_not_rotational_lsblk(self, device_info, pvolumes):
82 data = {"/dev/sda": {"rotational": "0"}}
83 lsblk = {"TYPE": "device", "ROTA": "0"}
84 device_info(devices=data, lsblk=lsblk)
85 disk = device.Device("/dev/sda")
86 assert not disk.rotational
87
88 def test_device_is_rotational_defaults_true(self, device_info, pvolumes):
89 # rotational will default true if no info from sys_api or lsblk is found
90 data = {"/dev/sda": {"foo": "bar"}}
91 lsblk = {"TYPE": "device", "foo": "bar"}
92 device_info(devices=data, lsblk=lsblk)
93 disk = device.Device("/dev/sda")
94 assert disk.rotational
95
f64942e4
AA
96 def test_disk_is_device(self, device_info, pvolumes):
97 data = {"/dev/sda": {"foo": "bar"}}
98 lsblk = {"TYPE": "disk"}
99 device_info(devices=data, lsblk=lsblk)
100 disk = device.Device("/dev/sda")
101 assert disk.is_device is True
1adf2230
AA
102
103 def test_is_partition(self, device_info, pvolumes):
104 data = {"/dev/sda": {"foo": "bar"}}
105 lsblk = {"TYPE": "part"}
106 device_info(devices=data, lsblk=lsblk)
107 disk = device.Device("/dev/sda")
108 assert disk.is_partition
109
110 def test_is_not_lvm_memeber(self, device_info, pvolumes):
111 data = {"/dev/sda": {"foo": "bar"}}
112 lsblk = {"TYPE": "part"}
113 device_info(devices=data, lsblk=lsblk)
114 disk = device.Device("/dev/sda")
115 assert not disk.is_lvm_member
116
117 def test_is_lvm_memeber(self, device_info, pvolumes):
118 data = {"/dev/sda": {"foo": "bar"}}
119 lsblk = {"TYPE": "part"}
120 device_info(devices=data, lsblk=lsblk)
121 disk = device.Device("/dev/sda")
122 assert not disk.is_lvm_member
123
124 def test_is_mapper_device(self, device_info):
125 device_info()
126 disk = device.Device("/dev/mapper/foo")
127 assert disk.is_mapper
128
f64942e4
AA
129 def test_dm_is_mapper_device(self, device_info):
130 device_info()
131 disk = device.Device("/dev/dm-4")
132 assert disk.is_mapper
133
1adf2230
AA
134 def test_is_not_mapper_device(self, device_info):
135 device_info()
136 disk = device.Device("/dev/sda")
137 assert not disk.is_mapper
138
494da23a
TL
139 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
140 "disable_kernel_queries",
141 "disable_lvm_queries")
142 def test_is_ceph_disk_lsblk(self, monkeypatch):
91327a77
AA
143 disk = device.Device("/dev/sda")
144 assert disk.is_ceph_disk_member
145
494da23a
TL
146 @pytest.mark.usefixtures("blkid_ceph_disk_member",
147 "disable_kernel_queries",
148 "disable_lvm_queries")
149 def test_is_ceph_disk_blkid(self, monkeypatch):
150 monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
151 lambda path: {'PARTLABEL': ""})
f64942e4
AA
152 disk = device.Device("/dev/sda")
153 assert disk.is_ceph_disk_member
f64942e4 154
494da23a
TL
155 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
156 "disable_kernel_queries",
157 "disable_lvm_queries")
158 def test_is_ceph_disk_member_not_available_lsblk(self, monkeypatch):
91327a77 159 disk = device.Device("/dev/sda")
494da23a
TL
160 assert disk.is_ceph_disk_member
161 assert not disk.available
162 assert "Used by ceph-disk" in disk.rejected_reasons
91327a77 163
494da23a
TL
164 @pytest.mark.usefixtures("blkid_ceph_disk_member",
165 "disable_kernel_queries",
166 "disable_lvm_queries")
167 def test_is_ceph_disk_member_not_available_blkid(self, monkeypatch):
168 monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
169 lambda path: {'PARTLABEL': ""})
91327a77
AA
170 disk = device.Device("/dev/sda")
171 assert disk.is_ceph_disk_member
494da23a
TL
172 assert not disk.available
173 assert "Used by ceph-disk" in disk.rejected_reasons
91327a77 174
494da23a
TL
175 @pytest.mark.usefixtures("device_info_not_ceph_disk_member",
176 "disable_lvm_queries",
177 "disable_kernel_queries")
178 def test_is_not_ceph_disk_member_lsblk(self):
91327a77
AA
179 disk = device.Device("/dev/sda")
180 assert disk.is_ceph_disk_member is False
181
1adf2230 182 def test_pv_api(self, device_info, pvolumes, monkeypatch):
91327a77 183 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
1adf2230
AA
184 pvolumes.append(FooPVolume)
185 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
186 data = {"/dev/sda": {"foo": "bar"}}
187 lsblk = {"TYPE": "part"}
188 device_info(devices=data, lsblk=lsblk)
189 disk = device.Device("/dev/sda")
190 assert disk.pvs_api
91327a77
AA
191
192 @pytest.mark.parametrize("ceph_type", ["data", "block"])
193 def test_used_by_ceph(self, device_info, pvolumes, monkeypatch, ceph_type):
194 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
195 pvolumes.append(FooPVolume)
196 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
197 data = {"/dev/sda": {"foo": "bar"}}
198 lsblk = {"TYPE": "part"}
199 lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type}}
200 device_info(devices=data, lsblk=lsblk, lv=lv_data)
201 disk = device.Device("/dev/sda")
202 assert disk.used_by_ceph
203
204 def test_not_used_by_ceph(self, device_info, pvolumes, monkeypatch):
205 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
206 pvolumes.append(FooPVolume)
207 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
208 data = {"/dev/sda": {"foo": "bar"}}
209 lsblk = {"TYPE": "part"}
210 lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
211 device_info(devices=data, lsblk=lsblk, lv=lv_data)
212 disk = device.Device("/dev/sda")
213 assert not disk.used_by_ceph
214
f64942e4
AA
215 def test_get_device_id(self, device_info):
216 udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
217 device_info(udevadm=udev)
218 disk = device.Device("/dev/sda")
219 assert disk._get_device_id() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
220
221
222
223class TestDeviceEncryption(object):
224
225 def test_partition_is_not_encrypted_lsblk(self, device_info, pvolumes):
226 lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs'}
227 device_info(lsblk=lsblk)
228 disk = device.Device("/dev/sda")
229 assert disk.is_encrypted is False
230
231 def test_partition_is_encrypted_lsblk(self, device_info, pvolumes):
232 lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS'}
233 device_info(lsblk=lsblk)
234 disk = device.Device("/dev/sda")
235 assert disk.is_encrypted is True
236
237 def test_partition_is_not_encrypted_blkid(self, device_info, pvolumes):
238 lsblk = {'TYPE': 'part'}
239 blkid = {'TYPE': 'ceph data'}
240 device_info(lsblk=lsblk, blkid=blkid)
241 disk = device.Device("/dev/sda")
242 assert disk.is_encrypted is False
243
244 def test_partition_is_encrypted_blkid(self, device_info, pvolumes):
245 lsblk = {'TYPE': 'part'}
246 blkid = {'TYPE': 'crypto_LUKS'}
247 device_info(lsblk=lsblk, blkid=blkid)
248 disk = device.Device("/dev/sda")
249 assert disk.is_encrypted is True
250
251 def test_mapper_is_encrypted_luks1(self, device_info, pvolumes, monkeypatch):
252 status = {'type': 'LUKS1'}
253 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
254 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
255 blkid = {'TYPE': 'mapper'}
256 device_info(lsblk=lsblk, blkid=blkid)
257 disk = device.Device("/dev/mapper/uuid")
258 assert disk.is_encrypted is True
259
260 def test_mapper_is_encrypted_luks2(self, device_info, pvolumes, monkeypatch):
261 status = {'type': 'LUKS2'}
262 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
263 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
264 blkid = {'TYPE': 'mapper'}
265 device_info(lsblk=lsblk, blkid=blkid)
266 disk = device.Device("/dev/mapper/uuid")
267 assert disk.is_encrypted is True
268
269 def test_mapper_is_encrypted_plain(self, device_info, pvolumes, monkeypatch):
270 status = {'type': 'PLAIN'}
271 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
272 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
273 blkid = {'TYPE': 'mapper'}
274 device_info(lsblk=lsblk, blkid=blkid)
275 disk = device.Device("/dev/mapper/uuid")
276 assert disk.is_encrypted is True
277
278 def test_mapper_is_not_encrypted_plain(self, device_info, pvolumes, monkeypatch):
279 monkeypatch.setattr(device, 'encryption_status', lambda x: {})
280 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
281 blkid = {'TYPE': 'mapper'}
282 device_info(lsblk=lsblk, blkid=blkid)
283 disk = device.Device("/dev/mapper/uuid")
284 assert disk.is_encrypted is False
285
286 def test_lv_is_encrypted_blkid(self, device_info, pvolumes):
287 lsblk = {'TYPE': 'lvm'}
288 blkid = {'TYPE': 'crypto_LUKS'}
289 device_info(lsblk=lsblk, blkid=blkid)
290 disk = device.Device("/dev/sda")
291 disk.lv_api = {}
292 assert disk.is_encrypted is True
293
294 def test_lv_is_not_encrypted_blkid(self, factory, device_info, pvolumes):
295 lsblk = {'TYPE': 'lvm'}
296 blkid = {'TYPE': 'xfs'}
297 device_info(lsblk=lsblk, blkid=blkid)
298 disk = device.Device("/dev/sda")
299 disk.lv_api = factory(encrypted=None)
300 assert disk.is_encrypted is False
301
302 def test_lv_is_encrypted_lsblk(self, device_info, pvolumes):
303 lsblk = {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
304 blkid = {'TYPE': 'mapper'}
305 device_info(lsblk=lsblk, blkid=blkid)
306 disk = device.Device("/dev/sda")
307 disk.lv_api = {}
308 assert disk.is_encrypted is True
309
310 def test_lv_is_not_encrypted_lsblk(self, factory, device_info, pvolumes):
311 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
312 blkid = {'TYPE': 'mapper'}
313 device_info(lsblk=lsblk, blkid=blkid)
314 disk = device.Device("/dev/sda")
315 disk.lv_api = factory(encrypted=None)
316 assert disk.is_encrypted is False
317
318 def test_lv_is_encrypted_lvm_api(self, factory, device_info, pvolumes):
319 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
320 blkid = {'TYPE': 'mapper'}
321 device_info(lsblk=lsblk, blkid=blkid)
322 disk = device.Device("/dev/sda")
323 disk.lv_api = factory(encrypted=True)
324 assert disk.is_encrypted is True
325
326 def test_lv_is_not_encrypted_lvm_api(self, factory, device_info, pvolumes):
327 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
328 blkid = {'TYPE': 'mapper'}
329 device_info(lsblk=lsblk, blkid=blkid)
330 disk = device.Device("/dev/sda")
331 disk.lv_api = factory(encrypted=False)
332 assert disk.is_encrypted is False
333
91327a77
AA
334
335class TestDeviceOrdering(object):
336
337 def setup(self):
338 self.data = {
339 "/dev/sda": {"removable": 0},
340 "/dev/sdb": {"removable": 1}, # invalid
341 "/dev/sdc": {"removable": 0},
342 "/dev/sdd": {"removable": 1}, # invalid
343 }
344
345 def test_valid_before_invalid(self, device_info):
346 device_info(devices=self.data)
347 sda = device.Device("/dev/sda")
348 sdb = device.Device("/dev/sdb")
349
350 assert sda < sdb
351 assert sdb > sda
352
353 def test_valid_alphabetical_ordering(self, device_info):
354 device_info(devices=self.data)
355 sda = device.Device("/dev/sda")
356 sdc = device.Device("/dev/sdc")
357
358 assert sda < sdc
359 assert sdc > sda
360
361 def test_invalid_alphabetical_ordering(self, device_info):
362 device_info(devices=self.data)
363 sdb = device.Device("/dev/sdb")
364 sdd = device.Device("/dev/sdd")
365
366 assert sdb < sdd
367 assert sdd > sdb
368
369
91327a77
AA
370class TestCephDiskDevice(object):
371
372 def test_partlabel_lsblk(self, device_info):
373 lsblk = {"PARTLABEL": ""}
374 device_info(lsblk=lsblk)
375 disk = device.CephDiskDevice(device.Device("/dev/sda"))
376
377 assert disk.partlabel == ''
378
379 def test_partlabel_blkid(self, device_info):
380 lsblk = {"PARTLABEL": ""}
381 blkid = {"PARTLABEL": "ceph data"}
382 device_info(lsblk=lsblk, blkid=blkid)
383 disk = device.CephDiskDevice(device.Device("/dev/sda"))
384
385 assert disk.partlabel == 'ceph data'
386
494da23a
TL
387 @pytest.mark.usefixtures("blkid_ceph_disk_member",
388 "disable_kernel_queries",
389 "disable_lvm_queries")
390 def test_is_member_blkid(self, monkeypatch):
391 monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
392 lambda path: {'PARTLABEL': ""})
91327a77
AA
393 disk = device.CephDiskDevice(device.Device("/dev/sda"))
394
395 assert disk.is_member is True
396
397 def test_reject_removable_device(self, device_info):
398 data = {"/dev/sdb": {"removable": 1}}
399 device_info(devices=data)
400 disk = device.Device("/dev/sdb")
401 assert not disk.available
402
403 def test_accept_non_removable_device(self, device_info):
404 data = {"/dev/sdb": {"removable": 0}}
405 device_info(devices=data)
406 disk = device.Device("/dev/sdb")
407 assert disk.available
408
409 def test_reject_readonly_device(self, device_info):
410 data = {"/dev/cdrom": {"ro": 1}}
411 device_info(devices=data)
412 disk = device.Device("/dev/cdrom")
413 assert not disk.available
414
415 def test_accept_non_readonly_device(self, device_info):
416 data = {"/dev/sda": {"ro": 0}}
417 device_info(devices=data)
418 disk = device.Device("/dev/sda")
419 assert disk.available
420
494da23a
TL
421 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
422 "disable_kernel_queries",
423 "disable_lvm_queries")
424 def test_is_member_lsblk(self):
91327a77
AA
425 disk = device.CephDiskDevice(device.Device("/dev/sda"))
426
427 assert disk.is_member is True
428
429 def test_unknown_type(self, device_info):
430 lsblk = {"PARTLABEL": "gluster"}
431 device_info(lsblk=lsblk)
432 disk = device.CephDiskDevice(device.Device("/dev/sda"))
433
434 assert disk.type == 'unknown'
435
494da23a
TL
436 ceph_types = ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
437
438 @pytest.mark.usefixtures("blkid_ceph_disk_member",
439 "disable_kernel_queries",
440 "disable_lvm_queries")
441 def test_type_blkid(self, monkeypatch, device_info, ceph_partlabel):
442 monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
443 lambda path: {'PARTLABEL': ''})
91327a77
AA
444 disk = device.CephDiskDevice(device.Device("/dev/sda"))
445
494da23a 446 assert disk.type in self.ceph_types
91327a77 447
494da23a
TL
448 @pytest.mark.usefixtures("blkid_ceph_disk_member",
449 "lsblk_ceph_disk_member",
450 "disable_kernel_queries",
451 "disable_lvm_queries")
452 def test_type_lsblk(self, device_info, ceph_partlabel):
91327a77
AA
453 disk = device.CephDiskDevice(device.Device("/dev/sda"))
454
494da23a 455 assert disk.type in self.ceph_types