]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
7477777bc8d57c74645fcf9870c8d852dc6d037a
2 from ceph_volume
.util
import device
3 from ceph_volume
.api
import lvm
as api
6 class TestDevice(object):
8 def test_sys_api(self
, device_info
):
9 data
= {"/dev/sda": {"foo": "bar"}}
10 device_info(devices
=data
)
11 disk
= device
.Device("/dev/sda")
13 assert "foo" in disk
.sys_api
15 def test_lvm_size(self
, device_info
):
17 data
= {"/dev/sda": {"size": "5368709120"}}
18 device_info(devices
=data
)
19 disk
= device
.Device("/dev/sda")
20 assert disk
.lvm_size
.gb
== 4
22 def test_lvm_size_rounds_down(self
, device_info
):
24 data
= {"/dev/sda": {"size": "5905580032"}}
25 device_info(devices
=data
)
26 disk
= device
.Device("/dev/sda")
27 assert disk
.lvm_size
.gb
== 4
29 def test_is_lv(self
, device_info
):
30 data
= {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
32 disk
= device
.Device("vg/lv")
35 def test_vgs_is_empty(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
36 BarPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={})
37 pvolumes
.append(BarPVolume
)
38 monkeypatch
.setattr(api
, 'PVolumes', lambda populate
=True: pvolumes
if populate
else pvolumes_empty
)
39 lsblk
= {"TYPE": "disk"}
40 device_info(lsblk
=lsblk
)
41 disk
= device
.Device("/dev/nvme0n1")
44 def test_vgs_is_not_empty(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
45 BarPVolume
= api
.PVolume(vg_name
='foo', lv_uuid
='111', pv_name
='/dev/nvme0n1', pv_uuid
="0000", pv_tags
={})
46 pvolumes
.append(BarPVolume
)
47 monkeypatch
.setattr(api
, 'PVolumes', lambda populate
=True: pvolumes
if populate
else pvolumes_empty
)
48 lsblk
= {"TYPE": "disk"}
49 device_info(lsblk
=lsblk
)
50 disk
= device
.Device("/dev/nvme0n1")
51 assert len(disk
.vgs
) == 1
53 def test_device_is_device(self
, device_info
, pvolumes
):
54 data
= {"/dev/sda": {"foo": "bar"}}
55 lsblk
= {"TYPE": "device"}
56 device_info(devices
=data
, lsblk
=lsblk
)
57 disk
= device
.Device("/dev/sda")
58 assert disk
.is_device
is True
60 def test_device_is_rotational(self
, device_info
, pvolumes
):
61 data
= {"/dev/sda": {"rotational": "1"}}
62 lsblk
= {"TYPE": "device"}
63 device_info(devices
=data
, lsblk
=lsblk
)
64 disk
= device
.Device("/dev/sda")
65 assert disk
.rotational
67 def test_device_is_not_rotational(self
, device_info
, pvolumes
):
68 data
= {"/dev/sda": {"rotational": "0"}}
69 lsblk
= {"TYPE": "device"}
70 device_info(devices
=data
, lsblk
=lsblk
)
71 disk
= device
.Device("/dev/sda")
72 assert not disk
.rotational
74 def test_device_is_rotational_lsblk(self
, device_info
, pvolumes
):
75 data
= {"/dev/sda": {"foo": "bar"}}
76 lsblk
= {"TYPE": "device", "ROTA": "1"}
77 device_info(devices
=data
, lsblk
=lsblk
)
78 disk
= device
.Device("/dev/sda")
79 assert disk
.rotational
81 def test_device_is_not_rotational_lsblk(self
, device_info
, pvolumes
):
82 data
= {"/dev/sda": {"rotational": "0"}}
83 lsblk
= {"TYPE": "device", "ROTA": "0"}
84 device_info(devices
=data
, lsblk
=lsblk
)
85 disk
= device
.Device("/dev/sda")
86 assert not disk
.rotational
88 def test_device_is_rotational_defaults_true(self
, device_info
, pvolumes
):
89 # rotational will default true if no info from sys_api or lsblk is found
90 data
= {"/dev/sda": {"foo": "bar"}}
91 lsblk
= {"TYPE": "device", "foo": "bar"}
92 device_info(devices
=data
, lsblk
=lsblk
)
93 disk
= device
.Device("/dev/sda")
94 assert disk
.rotational
96 def test_disk_is_device(self
, device_info
, pvolumes
):
97 data
= {"/dev/sda": {"foo": "bar"}}
98 lsblk
= {"TYPE": "disk"}
99 device_info(devices
=data
, lsblk
=lsblk
)
100 disk
= device
.Device("/dev/sda")
101 assert disk
.is_device
is True
103 def test_is_partition(self
, device_info
, pvolumes
):
104 data
= {"/dev/sda": {"foo": "bar"}}
105 lsblk
= {"TYPE": "part"}
106 device_info(devices
=data
, lsblk
=lsblk
)
107 disk
= device
.Device("/dev/sda")
108 assert disk
.is_partition
110 def test_is_not_lvm_memeber(self
, device_info
, pvolumes
):
111 data
= {"/dev/sda": {"foo": "bar"}}
112 lsblk
= {"TYPE": "part"}
113 device_info(devices
=data
, lsblk
=lsblk
)
114 disk
= device
.Device("/dev/sda")
115 assert not disk
.is_lvm_member
117 def test_is_lvm_memeber(self
, device_info
, pvolumes
):
118 data
= {"/dev/sda": {"foo": "bar"}}
119 lsblk
= {"TYPE": "part"}
120 device_info(devices
=data
, lsblk
=lsblk
)
121 disk
= device
.Device("/dev/sda")
122 assert not disk
.is_lvm_member
124 def test_is_mapper_device(self
, device_info
):
126 disk
= device
.Device("/dev/mapper/foo")
127 assert disk
.is_mapper
129 def test_dm_is_mapper_device(self
, device_info
):
131 disk
= device
.Device("/dev/dm-4")
132 assert disk
.is_mapper
134 def test_is_not_mapper_device(self
, device_info
):
136 disk
= device
.Device("/dev/sda")
137 assert not disk
.is_mapper
139 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
140 "disable_kernel_queries",
141 "disable_lvm_queries")
142 def test_is_ceph_disk_lsblk(self
, monkeypatch
):
143 disk
= device
.Device("/dev/sda")
144 assert disk
.is_ceph_disk_member
146 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
147 "disable_kernel_queries",
148 "disable_lvm_queries")
149 def test_is_ceph_disk_blkid(self
, monkeypatch
):
150 monkeypatch
.setattr("ceph_volume.util.device.disk.lsblk",
151 lambda path
: {'PARTLABEL': ""})
152 disk
= device
.Device("/dev/sda")
153 assert disk
.is_ceph_disk_member
155 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
156 "disable_kernel_queries",
157 "disable_lvm_queries")
158 def test_is_ceph_disk_member_not_available_lsblk(self
, monkeypatch
):
159 disk
= device
.Device("/dev/sda")
160 assert disk
.is_ceph_disk_member
161 assert not disk
.available
162 assert "Used by ceph-disk" in disk
.rejected_reasons
164 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
165 "disable_kernel_queries",
166 "disable_lvm_queries")
167 def test_is_ceph_disk_member_not_available_blkid(self
, monkeypatch
):
168 monkeypatch
.setattr("ceph_volume.util.device.disk.lsblk",
169 lambda path
: {'PARTLABEL': ""})
170 disk
= device
.Device("/dev/sda")
171 assert disk
.is_ceph_disk_member
172 assert not disk
.available
173 assert "Used by ceph-disk" in disk
.rejected_reasons
175 @pytest.mark
.usefixtures("device_info_not_ceph_disk_member",
176 "disable_lvm_queries",
177 "disable_kernel_queries")
178 def test_is_not_ceph_disk_member_lsblk(self
):
179 disk
= device
.Device("/dev/sda")
180 assert disk
.is_ceph_disk_member
is False
182 def test_pv_api(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
183 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
184 pvolumes
.append(FooPVolume
)
185 monkeypatch
.setattr(api
, 'PVolumes', lambda populate
=True: pvolumes
if populate
else pvolumes_empty
)
186 data
= {"/dev/sda": {"foo": "bar"}}
187 lsblk
= {"TYPE": "part"}
188 device_info(devices
=data
, lsblk
=lsblk
)
189 disk
= device
.Device("/dev/sda")
192 @pytest.mark
.parametrize("ceph_type", ["data", "block"])
193 def test_used_by_ceph(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
, ceph_type
):
194 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
195 pvolumes
.append(FooPVolume
)
196 monkeypatch
.setattr(api
, 'PVolumes', lambda populate
=True: pvolumes
if populate
else pvolumes_empty
)
197 data
= {"/dev/sda": {"foo": "bar"}}
198 lsblk
= {"TYPE": "part"}
199 lv_data
= {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type
}}
200 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
201 disk
= device
.Device("/dev/sda")
202 assert disk
.used_by_ceph
204 def test_not_used_by_ceph(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
205 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
206 pvolumes
.append(FooPVolume
)
207 monkeypatch
.setattr(api
, 'PVolumes', lambda populate
=True: pvolumes
if populate
else pvolumes_empty
)
208 data
= {"/dev/sda": {"foo": "bar"}}
209 lsblk
= {"TYPE": "part"}
210 lv_data
= {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
211 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
212 disk
= device
.Device("/dev/sda")
213 assert not disk
.used_by_ceph
215 def test_get_device_id(self
, device_info
):
216 udev
= {k
:k
for k
in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
217 device_info(udevadm
=udev
)
218 disk
= device
.Device("/dev/sda")
219 assert disk
._get
_device
_id
() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
223 class TestDeviceEncryption(object):
225 def test_partition_is_not_encrypted_lsblk(self
, device_info
, pvolumes
):
226 lsblk
= {'TYPE': 'part', 'FSTYPE': 'xfs'}
227 device_info(lsblk
=lsblk
)
228 disk
= device
.Device("/dev/sda")
229 assert disk
.is_encrypted
is False
231 def test_partition_is_encrypted_lsblk(self
, device_info
, pvolumes
):
232 lsblk
= {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS'}
233 device_info(lsblk
=lsblk
)
234 disk
= device
.Device("/dev/sda")
235 assert disk
.is_encrypted
is True
237 def test_partition_is_not_encrypted_blkid(self
, device_info
, pvolumes
):
238 lsblk
= {'TYPE': 'part'}
239 blkid
= {'TYPE': 'ceph data'}
240 device_info(lsblk
=lsblk
, blkid
=blkid
)
241 disk
= device
.Device("/dev/sda")
242 assert disk
.is_encrypted
is False
244 def test_partition_is_encrypted_blkid(self
, device_info
, pvolumes
):
245 lsblk
= {'TYPE': 'part'}
246 blkid
= {'TYPE': 'crypto_LUKS'}
247 device_info(lsblk
=lsblk
, blkid
=blkid
)
248 disk
= device
.Device("/dev/sda")
249 assert disk
.is_encrypted
is True
251 def test_mapper_is_encrypted_luks1(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
252 status
= {'type': 'LUKS1'}
253 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
254 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
255 blkid
= {'TYPE': 'mapper'}
256 device_info(lsblk
=lsblk
, blkid
=blkid
)
257 disk
= device
.Device("/dev/mapper/uuid")
258 assert disk
.is_encrypted
is True
260 def test_mapper_is_encrypted_luks2(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
261 status
= {'type': 'LUKS2'}
262 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
263 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
264 blkid
= {'TYPE': 'mapper'}
265 device_info(lsblk
=lsblk
, blkid
=blkid
)
266 disk
= device
.Device("/dev/mapper/uuid")
267 assert disk
.is_encrypted
is True
269 def test_mapper_is_encrypted_plain(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
270 status
= {'type': 'PLAIN'}
271 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
272 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
273 blkid
= {'TYPE': 'mapper'}
274 device_info(lsblk
=lsblk
, blkid
=blkid
)
275 disk
= device
.Device("/dev/mapper/uuid")
276 assert disk
.is_encrypted
is True
278 def test_mapper_is_not_encrypted_plain(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
279 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: {})
280 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
281 blkid
= {'TYPE': 'mapper'}
282 device_info(lsblk
=lsblk
, blkid
=blkid
)
283 disk
= device
.Device("/dev/mapper/uuid")
284 assert disk
.is_encrypted
is False
286 def test_lv_is_encrypted_blkid(self
, device_info
, pvolumes
):
287 lsblk
= {'TYPE': 'lvm'}
288 blkid
= {'TYPE': 'crypto_LUKS'}
289 device_info(lsblk
=lsblk
, blkid
=blkid
)
290 disk
= device
.Device("/dev/sda")
292 assert disk
.is_encrypted
is True
294 def test_lv_is_not_encrypted_blkid(self
, factory
, device_info
, pvolumes
):
295 lsblk
= {'TYPE': 'lvm'}
296 blkid
= {'TYPE': 'xfs'}
297 device_info(lsblk
=lsblk
, blkid
=blkid
)
298 disk
= device
.Device("/dev/sda")
299 disk
.lv_api
= factory(encrypted
=None)
300 assert disk
.is_encrypted
is False
302 def test_lv_is_encrypted_lsblk(self
, device_info
, pvolumes
):
303 lsblk
= {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
304 blkid
= {'TYPE': 'mapper'}
305 device_info(lsblk
=lsblk
, blkid
=blkid
)
306 disk
= device
.Device("/dev/sda")
308 assert disk
.is_encrypted
is True
310 def test_lv_is_not_encrypted_lsblk(self
, factory
, device_info
, pvolumes
):
311 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
312 blkid
= {'TYPE': 'mapper'}
313 device_info(lsblk
=lsblk
, blkid
=blkid
)
314 disk
= device
.Device("/dev/sda")
315 disk
.lv_api
= factory(encrypted
=None)
316 assert disk
.is_encrypted
is False
318 def test_lv_is_encrypted_lvm_api(self
, factory
, device_info
, pvolumes
):
319 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
320 blkid
= {'TYPE': 'mapper'}
321 device_info(lsblk
=lsblk
, blkid
=blkid
)
322 disk
= device
.Device("/dev/sda")
323 disk
.lv_api
= factory(encrypted
=True)
324 assert disk
.is_encrypted
is True
326 def test_lv_is_not_encrypted_lvm_api(self
, factory
, device_info
, pvolumes
):
327 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
328 blkid
= {'TYPE': 'mapper'}
329 device_info(lsblk
=lsblk
, blkid
=blkid
)
330 disk
= device
.Device("/dev/sda")
331 disk
.lv_api
= factory(encrypted
=False)
332 assert disk
.is_encrypted
is False
335 class TestDeviceOrdering(object):
339 "/dev/sda": {"removable": 0},
340 "/dev/sdb": {"removable": 1}, # invalid
341 "/dev/sdc": {"removable": 0},
342 "/dev/sdd": {"removable": 1}, # invalid
345 def test_valid_before_invalid(self
, device_info
):
346 device_info(devices
=self
.data
)
347 sda
= device
.Device("/dev/sda")
348 sdb
= device
.Device("/dev/sdb")
353 def test_valid_alphabetical_ordering(self
, device_info
):
354 device_info(devices
=self
.data
)
355 sda
= device
.Device("/dev/sda")
356 sdc
= device
.Device("/dev/sdc")
361 def test_invalid_alphabetical_ordering(self
, device_info
):
362 device_info(devices
=self
.data
)
363 sdb
= device
.Device("/dev/sdb")
364 sdd
= device
.Device("/dev/sdd")
370 class TestCephDiskDevice(object):
372 def test_partlabel_lsblk(self
, device_info
):
373 lsblk
= {"PARTLABEL": ""}
374 device_info(lsblk
=lsblk
)
375 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
377 assert disk
.partlabel
== ''
379 def test_partlabel_blkid(self
, device_info
):
380 lsblk
= {"PARTLABEL": ""}
381 blkid
= {"PARTLABEL": "ceph data"}
382 device_info(lsblk
=lsblk
, blkid
=blkid
)
383 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
385 assert disk
.partlabel
== 'ceph data'
387 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
388 "disable_kernel_queries",
389 "disable_lvm_queries")
390 def test_is_member_blkid(self
, monkeypatch
):
391 monkeypatch
.setattr("ceph_volume.util.device.disk.lsblk",
392 lambda path
: {'PARTLABEL': ""})
393 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
395 assert disk
.is_member
is True
397 def test_reject_removable_device(self
, device_info
):
398 data
= {"/dev/sdb": {"removable": 1}}
399 device_info(devices
=data
)
400 disk
= device
.Device("/dev/sdb")
401 assert not disk
.available
403 def test_accept_non_removable_device(self
, device_info
):
404 data
= {"/dev/sdb": {"removable": 0}}
405 device_info(devices
=data
)
406 disk
= device
.Device("/dev/sdb")
407 assert disk
.available
409 def test_reject_readonly_device(self
, device_info
):
410 data
= {"/dev/cdrom": {"ro": 1}}
411 device_info(devices
=data
)
412 disk
= device
.Device("/dev/cdrom")
413 assert not disk
.available
415 def test_accept_non_readonly_device(self
, device_info
):
416 data
= {"/dev/sda": {"ro": 0}}
417 device_info(devices
=data
)
418 disk
= device
.Device("/dev/sda")
419 assert disk
.available
421 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
422 "disable_kernel_queries",
423 "disable_lvm_queries")
424 def test_is_member_lsblk(self
):
425 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
427 assert disk
.is_member
is True
429 def test_unknown_type(self
, device_info
):
430 lsblk
= {"PARTLABEL": "gluster"}
431 device_info(lsblk
=lsblk
)
432 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
434 assert disk
.type == 'unknown'
436 ceph_types
= ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
438 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
439 "disable_kernel_queries",
440 "disable_lvm_queries")
441 def test_type_blkid(self
, monkeypatch
, device_info
, ceph_partlabel
):
442 monkeypatch
.setattr("ceph_volume.util.device.disk.lsblk",
443 lambda path
: {'PARTLABEL': ''})
444 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
446 assert disk
.type in self
.ceph_types
448 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
449 "lsblk_ceph_disk_member",
450 "disable_kernel_queries",
451 "disable_lvm_queries")
452 def test_type_lsblk(self
, device_info
, ceph_partlabel
):
453 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
455 assert disk
.type in self
.ceph_types