]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
2 from ceph_volume
.util
import device
3 from ceph_volume
.api
import lvm
as api
6 class TestDevice(object):
8 def test_sys_api(self
, device_info
):
9 data
= {"/dev/sda": {"foo": "bar"}}
10 device_info(devices
=data
)
11 disk
= device
.Device("/dev/sda")
13 assert "foo" in disk
.sys_api
15 def test_lvm_size(self
, device_info
):
17 data
= {"/dev/sda": {"size": "5368709120"}}
18 device_info(devices
=data
)
19 disk
= device
.Device("/dev/sda")
20 assert disk
.lvm_size
.gb
== 4
22 def test_lvm_size_rounds_down(self
, device_info
):
24 data
= {"/dev/sda": {"size": "5905580032"}}
25 device_info(devices
=data
)
26 disk
= device
.Device("/dev/sda")
27 assert disk
.lvm_size
.gb
== 4
29 def test_is_lv(self
, device_info
):
30 data
= {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
32 disk
= device
.Device("vg/lv")
35 def test_vgs_is_empty(self
, device_info
, pvolumes
, monkeypatch
):
36 BarPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={})
37 pvolumes
.append(BarPVolume
)
38 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
39 lsblk
= {"TYPE": "disk"}
40 device_info(lsblk
=lsblk
)
41 disk
= device
.Device("/dev/nvme0n1")
44 def test_vgs_is_not_empty(self
, device_info
, pvolumes
, monkeypatch
):
45 BarPVolume
= api
.PVolume(vg_name
='foo', lv_uuid
='111', pv_name
='/dev/nvme0n1', pv_uuid
="0000", pv_tags
={})
46 pvolumes
.append(BarPVolume
)
47 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
48 lsblk
= {"TYPE": "disk"}
49 device_info(lsblk
=lsblk
)
50 disk
= device
.Device("/dev/nvme0n1")
51 assert len(disk
.vgs
) == 1
53 def test_device_is_device(self
, device_info
, pvolumes
):
54 data
= {"/dev/sda": {"foo": "bar"}}
55 lsblk
= {"TYPE": "device"}
56 device_info(devices
=data
, lsblk
=lsblk
)
57 disk
= device
.Device("/dev/sda")
58 assert disk
.is_device
is True
60 def test_disk_is_device(self
, device_info
, pvolumes
):
61 data
= {"/dev/sda": {"foo": "bar"}}
62 lsblk
= {"TYPE": "disk"}
63 device_info(devices
=data
, lsblk
=lsblk
)
64 disk
= device
.Device("/dev/sda")
65 assert disk
.is_device
is True
67 def test_is_partition(self
, device_info
, pvolumes
):
68 data
= {"/dev/sda": {"foo": "bar"}}
69 lsblk
= {"TYPE": "part"}
70 device_info(devices
=data
, lsblk
=lsblk
)
71 disk
= device
.Device("/dev/sda")
72 assert disk
.is_partition
74 def test_is_not_lvm_memeber(self
, device_info
, pvolumes
):
75 data
= {"/dev/sda": {"foo": "bar"}}
76 lsblk
= {"TYPE": "part"}
77 device_info(devices
=data
, lsblk
=lsblk
)
78 disk
= device
.Device("/dev/sda")
79 assert not disk
.is_lvm_member
81 def test_is_lvm_memeber(self
, device_info
, pvolumes
):
82 data
= {"/dev/sda": {"foo": "bar"}}
83 lsblk
= {"TYPE": "part"}
84 device_info(devices
=data
, lsblk
=lsblk
)
85 disk
= device
.Device("/dev/sda")
86 assert not disk
.is_lvm_member
88 def test_is_mapper_device(self
, device_info
):
90 disk
= device
.Device("/dev/mapper/foo")
93 def test_dm_is_mapper_device(self
, device_info
):
95 disk
= device
.Device("/dev/dm-4")
98 def test_is_not_mapper_device(self
, device_info
):
100 disk
= device
.Device("/dev/sda")
101 assert not disk
.is_mapper
103 def test_is_ceph_disk_member_lsblk(self
, device_info
):
104 lsblk
= {"PARTLABEL": "ceph data"}
105 device_info(lsblk
=lsblk
)
106 disk
= device
.Device("/dev/sda")
107 assert disk
.is_ceph_disk_member
109 def test_is_ceph_disk_member_not_available(self
, device_info
):
110 lsblk
= {"PARTLABEL": "ceph data"}
111 device_info(lsblk
=lsblk
)
112 disk
= device
.Device("/dev/sda")
113 assert disk
.is_ceph_disk_member
114 assert not disk
.available
115 assert "Used by ceph-disk" in disk
.rejected_reasons
117 def test_is_not_ceph_disk_member_lsblk(self
, device_info
):
118 lsblk
= {"PARTLABEL": "gluster partition"}
119 device_info(lsblk
=lsblk
)
120 disk
= device
.Device("/dev/sda")
121 assert disk
.is_ceph_disk_member
is False
123 def test_is_ceph_disk_member_blkid(self
, device_info
):
124 # falls back to blkid
125 lsblk
= {"PARTLABEL": ""}
126 blkid
= {"PARTLABEL": "ceph data"}
127 device_info(lsblk
=lsblk
, blkid
=blkid
)
128 disk
= device
.Device("/dev/sda")
129 assert disk
.is_ceph_disk_member
131 def test_is_not_ceph_disk_member_blkid(self
, device_info
):
132 # falls back to blkid
133 lsblk
= {"PARTLABEL": ""}
134 blkid
= {"PARTLABEL": "gluster partition"}
135 device_info(lsblk
=lsblk
, blkid
=blkid
)
136 disk
= device
.Device("/dev/sda")
137 assert disk
.is_ceph_disk_member
is False
139 def test_pv_api(self
, device_info
, pvolumes
, monkeypatch
):
140 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
141 pvolumes
.append(FooPVolume
)
142 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
143 data
= {"/dev/sda": {"foo": "bar"}}
144 lsblk
= {"TYPE": "part"}
145 device_info(devices
=data
, lsblk
=lsblk
)
146 disk
= device
.Device("/dev/sda")
149 @pytest.mark
.parametrize("ceph_type", ["data", "block"])
150 def test_used_by_ceph(self
, device_info
, pvolumes
, monkeypatch
, ceph_type
):
151 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
152 pvolumes
.append(FooPVolume
)
153 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
154 data
= {"/dev/sda": {"foo": "bar"}}
155 lsblk
= {"TYPE": "part"}
156 lv_data
= {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type
}}
157 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
158 disk
= device
.Device("/dev/sda")
159 assert disk
.used_by_ceph
161 def test_not_used_by_ceph(self
, device_info
, pvolumes
, monkeypatch
):
162 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
163 pvolumes
.append(FooPVolume
)
164 monkeypatch
.setattr(api
, 'PVolumes', lambda: pvolumes
)
165 data
= {"/dev/sda": {"foo": "bar"}}
166 lsblk
= {"TYPE": "part"}
167 lv_data
= {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
168 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
169 disk
= device
.Device("/dev/sda")
170 assert not disk
.used_by_ceph
172 def test_get_device_id(self
, device_info
):
173 udev
= {k
:k
for k
in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
174 device_info(udevadm
=udev
)
175 disk
= device
.Device("/dev/sda")
176 assert disk
._get
_device
_id
() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
180 class TestDeviceEncryption(object):
182 def test_partition_is_not_encrypted_lsblk(self
, device_info
, pvolumes
):
183 lsblk
= {'TYPE': 'part', 'FSTYPE': 'xfs'}
184 device_info(lsblk
=lsblk
)
185 disk
= device
.Device("/dev/sda")
186 assert disk
.is_encrypted
is False
188 def test_partition_is_encrypted_lsblk(self
, device_info
, pvolumes
):
189 lsblk
= {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS'}
190 device_info(lsblk
=lsblk
)
191 disk
= device
.Device("/dev/sda")
192 assert disk
.is_encrypted
is True
194 def test_partition_is_not_encrypted_blkid(self
, device_info
, pvolumes
):
195 lsblk
= {'TYPE': 'part'}
196 blkid
= {'TYPE': 'ceph data'}
197 device_info(lsblk
=lsblk
, blkid
=blkid
)
198 disk
= device
.Device("/dev/sda")
199 assert disk
.is_encrypted
is False
201 def test_partition_is_encrypted_blkid(self
, device_info
, pvolumes
):
202 lsblk
= {'TYPE': 'part'}
203 blkid
= {'TYPE': 'crypto_LUKS'}
204 device_info(lsblk
=lsblk
, blkid
=blkid
)
205 disk
= device
.Device("/dev/sda")
206 assert disk
.is_encrypted
is True
208 def test_mapper_is_encrypted_luks1(self
, device_info
, pvolumes
, monkeypatch
):
209 status
= {'type': 'LUKS1'}
210 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
211 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
212 blkid
= {'TYPE': 'mapper'}
213 device_info(lsblk
=lsblk
, blkid
=blkid
)
214 disk
= device
.Device("/dev/mapper/uuid")
215 assert disk
.is_encrypted
is True
217 def test_mapper_is_encrypted_luks2(self
, device_info
, pvolumes
, monkeypatch
):
218 status
= {'type': 'LUKS2'}
219 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
220 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
221 blkid
= {'TYPE': 'mapper'}
222 device_info(lsblk
=lsblk
, blkid
=blkid
)
223 disk
= device
.Device("/dev/mapper/uuid")
224 assert disk
.is_encrypted
is True
226 def test_mapper_is_encrypted_plain(self
, device_info
, pvolumes
, monkeypatch
):
227 status
= {'type': 'PLAIN'}
228 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
229 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
230 blkid
= {'TYPE': 'mapper'}
231 device_info(lsblk
=lsblk
, blkid
=blkid
)
232 disk
= device
.Device("/dev/mapper/uuid")
233 assert disk
.is_encrypted
is True
235 def test_mapper_is_not_encrypted_plain(self
, device_info
, pvolumes
, monkeypatch
):
236 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: {})
237 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
238 blkid
= {'TYPE': 'mapper'}
239 device_info(lsblk
=lsblk
, blkid
=blkid
)
240 disk
= device
.Device("/dev/mapper/uuid")
241 assert disk
.is_encrypted
is False
243 def test_lv_is_encrypted_blkid(self
, device_info
, pvolumes
):
244 lsblk
= {'TYPE': 'lvm'}
245 blkid
= {'TYPE': 'crypto_LUKS'}
246 device_info(lsblk
=lsblk
, blkid
=blkid
)
247 disk
= device
.Device("/dev/sda")
249 assert disk
.is_encrypted
is True
251 def test_lv_is_not_encrypted_blkid(self
, factory
, device_info
, pvolumes
):
252 lsblk
= {'TYPE': 'lvm'}
253 blkid
= {'TYPE': 'xfs'}
254 device_info(lsblk
=lsblk
, blkid
=blkid
)
255 disk
= device
.Device("/dev/sda")
256 disk
.lv_api
= factory(encrypted
=None)
257 assert disk
.is_encrypted
is False
259 def test_lv_is_encrypted_lsblk(self
, device_info
, pvolumes
):
260 lsblk
= {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
261 blkid
= {'TYPE': 'mapper'}
262 device_info(lsblk
=lsblk
, blkid
=blkid
)
263 disk
= device
.Device("/dev/sda")
265 assert disk
.is_encrypted
is True
267 def test_lv_is_not_encrypted_lsblk(self
, factory
, device_info
, pvolumes
):
268 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
269 blkid
= {'TYPE': 'mapper'}
270 device_info(lsblk
=lsblk
, blkid
=blkid
)
271 disk
= device
.Device("/dev/sda")
272 disk
.lv_api
= factory(encrypted
=None)
273 assert disk
.is_encrypted
is False
275 def test_lv_is_encrypted_lvm_api(self
, factory
, device_info
, pvolumes
):
276 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
277 blkid
= {'TYPE': 'mapper'}
278 device_info(lsblk
=lsblk
, blkid
=blkid
)
279 disk
= device
.Device("/dev/sda")
280 disk
.lv_api
= factory(encrypted
=True)
281 assert disk
.is_encrypted
is True
283 def test_lv_is_not_encrypted_lvm_api(self
, factory
, device_info
, pvolumes
):
284 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
285 blkid
= {'TYPE': 'mapper'}
286 device_info(lsblk
=lsblk
, blkid
=blkid
)
287 disk
= device
.Device("/dev/sda")
288 disk
.lv_api
= factory(encrypted
=False)
289 assert disk
.is_encrypted
is False
292 class TestDeviceOrdering(object):
296 "/dev/sda": {"removable": 0},
297 "/dev/sdb": {"removable": 1}, # invalid
298 "/dev/sdc": {"removable": 0},
299 "/dev/sdd": {"removable": 1}, # invalid
302 def test_valid_before_invalid(self
, device_info
):
303 device_info(devices
=self
.data
)
304 sda
= device
.Device("/dev/sda")
305 sdb
= device
.Device("/dev/sdb")
310 def test_valid_alphabetical_ordering(self
, device_info
):
311 device_info(devices
=self
.data
)
312 sda
= device
.Device("/dev/sda")
313 sdc
= device
.Device("/dev/sdc")
318 def test_invalid_alphabetical_ordering(self
, device_info
):
319 device_info(devices
=self
.data
)
320 sdb
= device
.Device("/dev/sdb")
321 sdd
= device
.Device("/dev/sdd")
328 'ceph data', 'ceph journal', 'ceph block',
329 'ceph block.wal', 'ceph block.db', 'ceph lockbox'
333 class TestCephDiskDevice(object):
335 def test_partlabel_lsblk(self
, device_info
):
336 lsblk
= {"PARTLABEL": ""}
337 device_info(lsblk
=lsblk
)
338 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
340 assert disk
.partlabel
== ''
342 def test_partlabel_blkid(self
, device_info
):
343 lsblk
= {"PARTLABEL": ""}
344 blkid
= {"PARTLABEL": "ceph data"}
345 device_info(lsblk
=lsblk
, blkid
=blkid
)
346 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
348 assert disk
.partlabel
== 'ceph data'
350 @pytest.mark
.parametrize("label", ceph_partlabels
)
351 def test_is_member_blkid(self
, device_info
, label
):
352 lsblk
= {"PARTLABEL": ""}
353 blkid
= {"PARTLABEL": label
}
354 device_info(lsblk
=lsblk
, blkid
=blkid
)
355 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
357 assert disk
.is_member
is True
359 def test_reject_removable_device(self
, device_info
):
360 data
= {"/dev/sdb": {"removable": 1}}
361 device_info(devices
=data
)
362 disk
= device
.Device("/dev/sdb")
363 assert not disk
.available
365 def test_accept_non_removable_device(self
, device_info
):
366 data
= {"/dev/sdb": {"removable": 0}}
367 device_info(devices
=data
)
368 disk
= device
.Device("/dev/sdb")
369 assert disk
.available
371 def test_reject_readonly_device(self
, device_info
):
372 data
= {"/dev/cdrom": {"ro": 1}}
373 device_info(devices
=data
)
374 disk
= device
.Device("/dev/cdrom")
375 assert not disk
.available
377 def test_accept_non_readonly_device(self
, device_info
):
378 data
= {"/dev/sda": {"ro": 0}}
379 device_info(devices
=data
)
380 disk
= device
.Device("/dev/sda")
381 assert disk
.available
383 @pytest.mark
.parametrize("label", ceph_partlabels
)
384 def test_is_member_lsblk(self
, device_info
, label
):
385 lsblk
= {"PARTLABEL": label
}
386 device_info(lsblk
=lsblk
)
387 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
389 assert disk
.is_member
is True
391 def test_unknown_type(self
, device_info
):
392 lsblk
= {"PARTLABEL": "gluster"}
393 device_info(lsblk
=lsblk
)
394 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
396 assert disk
.type == 'unknown'
398 @pytest.mark
.parametrize("label", ceph_partlabels
)
399 def test_type_blkid(self
, device_info
, label
):
400 expected
= label
.split()[-1].split('.')[-1]
401 lsblk
= {"PARTLABEL": ""}
402 blkid
= {"PARTLABEL": label
}
403 device_info(lsblk
=lsblk
, blkid
=blkid
)
404 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
406 assert disk
.type == expected
408 @pytest.mark
.parametrize("label", ceph_partlabels
)
409 def test_type_lsblk(self
, device_info
, label
):
410 expected
= label
.split()[-1].split('.')[-1]
411 lsblk
= {"PARTLABEL": label
}
412 blkid
= {"PARTLABEL": ''}
413 device_info(lsblk
=lsblk
, blkid
=blkid
)
414 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
416 assert disk
.type == expected