]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
2 from ceph_volume
.util
import device
3 from ceph_volume
.api
import lvm
as api
6 class TestDevice(object):
8 def test_sys_api(self
, device_info
):
9 data
= {"/dev/sda": {"foo": "bar"}}
10 device_info(devices
=data
)
11 disk
= device
.Device("/dev/sda")
13 assert "foo" in disk
.sys_api
15 def test_lvm_size(self
, device_info
):
17 data
= {"/dev/sda": {"size": "5368709120"}}
18 device_info(devices
=data
)
19 disk
= device
.Device("/dev/sda")
20 assert disk
.lvm_size
.gb
== 4
22 def test_lvm_size_rounds_down(self
, device_info
):
24 data
= {"/dev/sda": {"size": "5905580032"}}
25 device_info(devices
=data
)
26 disk
= device
.Device("/dev/sda")
27 assert disk
.lvm_size
.gb
== 4
29 def test_is_lv(self
, device_info
):
30 data
= {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
32 disk
= device
.Device("vg/lv")
35 def test_vgs_is_empty(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
36 BarPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", pv_tags
={})
37 pvolumes
.append(BarPVolume
)
38 monkeypatch
.setattr(api
, 'PVolumes', lambda populate
=True: pvolumes
if populate
else pvolumes_empty
)
39 lsblk
= {"TYPE": "disk"}
40 device_info(lsblk
=lsblk
)
41 disk
= device
.Device("/dev/nvme0n1")
44 def test_vgs_is_not_empty(self
, device_info
, monkeypatch
):
45 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=6,
46 vg_extent_size
=1073741824)
47 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
48 lsblk
= {"TYPE": "disk"}
49 device_info(lsblk
=lsblk
)
50 disk
= device
.Device("/dev/nvme0n1")
51 assert len(disk
.vgs
) == 1
53 def test_device_is_device(self
, device_info
, pvolumes
):
54 data
= {"/dev/sda": {"foo": "bar"}}
55 lsblk
= {"TYPE": "device"}
56 device_info(devices
=data
, lsblk
=lsblk
)
57 disk
= device
.Device("/dev/sda")
58 assert disk
.is_device
is True
60 def test_device_is_rotational(self
, device_info
, pvolumes
):
61 data
= {"/dev/sda": {"rotational": "1"}}
62 lsblk
= {"TYPE": "device"}
63 device_info(devices
=data
, lsblk
=lsblk
)
64 disk
= device
.Device("/dev/sda")
65 assert disk
.rotational
67 def test_device_is_not_rotational(self
, device_info
, pvolumes
):
68 data
= {"/dev/sda": {"rotational": "0"}}
69 lsblk
= {"TYPE": "device"}
70 device_info(devices
=data
, lsblk
=lsblk
)
71 disk
= device
.Device("/dev/sda")
72 assert not disk
.rotational
74 def test_device_is_rotational_lsblk(self
, device_info
, pvolumes
):
75 data
= {"/dev/sda": {"foo": "bar"}}
76 lsblk
= {"TYPE": "device", "ROTA": "1"}
77 device_info(devices
=data
, lsblk
=lsblk
)
78 disk
= device
.Device("/dev/sda")
79 assert disk
.rotational
81 def test_device_is_not_rotational_lsblk(self
, device_info
, pvolumes
):
82 data
= {"/dev/sda": {"rotational": "0"}}
83 lsblk
= {"TYPE": "device", "ROTA": "0"}
84 device_info(devices
=data
, lsblk
=lsblk
)
85 disk
= device
.Device("/dev/sda")
86 assert not disk
.rotational
88 def test_device_is_rotational_defaults_true(self
, device_info
, pvolumes
):
89 # rotational will default true if no info from sys_api or lsblk is found
90 data
= {"/dev/sda": {"foo": "bar"}}
91 lsblk
= {"TYPE": "device", "foo": "bar"}
92 device_info(devices
=data
, lsblk
=lsblk
)
93 disk
= device
.Device("/dev/sda")
94 assert disk
.rotational
96 def test_disk_is_device(self
, device_info
, pvolumes
):
97 data
= {"/dev/sda": {"foo": "bar"}}
98 lsblk
= {"TYPE": "disk"}
99 device_info(devices
=data
, lsblk
=lsblk
)
100 disk
= device
.Device("/dev/sda")
101 assert disk
.is_device
is True
103 def test_is_partition(self
, device_info
, pvolumes
):
104 data
= {"/dev/sda": {"foo": "bar"}}
105 lsblk
= {"TYPE": "part"}
106 device_info(devices
=data
, lsblk
=lsblk
)
107 disk
= device
.Device("/dev/sda")
108 assert disk
.is_partition
110 def test_is_not_lvm_memeber(self
, device_info
, pvolumes
):
111 data
= {"/dev/sda": {"foo": "bar"}}
112 lsblk
= {"TYPE": "part"}
113 device_info(devices
=data
, lsblk
=lsblk
)
114 disk
= device
.Device("/dev/sda")
115 assert not disk
.is_lvm_member
117 def test_is_lvm_memeber(self
, device_info
, pvolumes
):
118 data
= {"/dev/sda": {"foo": "bar"}}
119 lsblk
= {"TYPE": "part"}
120 device_info(devices
=data
, lsblk
=lsblk
)
121 disk
= device
.Device("/dev/sda")
122 assert not disk
.is_lvm_member
124 def test_is_mapper_device(self
, device_info
):
126 disk
= device
.Device("/dev/mapper/foo")
127 assert disk
.is_mapper
129 def test_dm_is_mapper_device(self
, device_info
):
131 disk
= device
.Device("/dev/dm-4")
132 assert disk
.is_mapper
134 def test_is_not_mapper_device(self
, device_info
):
136 disk
= device
.Device("/dev/sda")
137 assert not disk
.is_mapper
139 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
140 "disable_kernel_queries",
141 "disable_lvm_queries")
142 def test_is_ceph_disk_lsblk(self
, monkeypatch
, patch_bluestore_label
):
143 disk
= device
.Device("/dev/sda")
144 assert disk
.is_ceph_disk_member
146 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
147 "disable_kernel_queries",
148 "disable_lvm_queries")
149 def test_is_ceph_disk_blkid(self
, monkeypatch
, patch_bluestore_label
):
150 monkeypatch
.setattr("ceph_volume.util.device.disk.lsblk",
151 lambda path
: {'PARTLABEL': ""})
152 disk
= device
.Device("/dev/sda")
153 assert disk
.is_ceph_disk_member
155 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
156 "disable_kernel_queries",
157 "disable_lvm_queries")
158 def test_is_ceph_disk_member_not_available_lsblk(self
, monkeypatch
, patch_bluestore_label
):
159 disk
= device
.Device("/dev/sda")
160 assert disk
.is_ceph_disk_member
161 assert not disk
.available
162 assert "Used by ceph-disk" in disk
.rejected_reasons
164 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
165 "disable_kernel_queries",
166 "disable_lvm_queries")
167 def test_is_ceph_disk_member_not_available_blkid(self
, monkeypatch
, patch_bluestore_label
):
168 monkeypatch
.setattr("ceph_volume.util.device.disk.lsblk",
169 lambda path
: {'PARTLABEL': ""})
170 disk
= device
.Device("/dev/sda")
171 assert disk
.is_ceph_disk_member
172 assert not disk
.available
173 assert "Used by ceph-disk" in disk
.rejected_reasons
175 def test_reject_removable_device(self
, device_info
):
176 data
= {"/dev/sdb": {"removable": 1}}
177 device_info(devices
=data
)
178 disk
= device
.Device("/dev/sdb")
179 assert not disk
.available
181 def test_accept_non_removable_device(self
, device_info
):
182 data
= {"/dev/sdb": {"removable": 0, "size": 5368709120}}
183 device_info(devices
=data
)
184 disk
= device
.Device("/dev/sdb")
185 assert disk
.available
187 def test_reject_readonly_device(self
, device_info
):
188 data
= {"/dev/cdrom": {"ro": 1}}
189 device_info(devices
=data
)
190 disk
= device
.Device("/dev/cdrom")
191 assert not disk
.available
193 def test_reject_smaller_than_5gb(self
, device_info
):
194 data
= {"/dev/sda": {"size": 5368709119}}
195 device_info(devices
=data
)
196 disk
= device
.Device("/dev/sda")
197 assert not disk
.available
, 'too small device is available'
199 def test_accept_non_readonly_device(self
, device_info
):
200 data
= {"/dev/sda": {"ro": 0, "size": 5368709120}}
201 device_info(devices
=data
)
202 disk
= device
.Device("/dev/sda")
203 assert disk
.available
205 def test_reject_bluestore_device(self
, monkeypatch
, patch_bluestore_label
):
206 patch_bluestore_label
.return_value
= True
207 disk
= device
.Device("/dev/sda")
208 assert not disk
.available
209 assert "Has BlueStore device label" in disk
.rejected_reasons
211 @pytest.mark
.usefixtures("device_info_not_ceph_disk_member",
212 "disable_lvm_queries",
213 "disable_kernel_queries")
214 def test_is_not_ceph_disk_member_lsblk(self
, patch_bluestore_label
):
215 disk
= device
.Device("/dev/sda")
216 assert disk
.is_ceph_disk_member
is False
218 def test_existing_vg_available(self
, monkeypatch
, device_info
):
219 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=6,
220 vg_extent_size
=1073741824)
221 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
222 lsblk
= {"TYPE": "disk"}
223 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
224 device_info(devices
=data
, lsblk
=lsblk
)
225 disk
= device
.Device("/dev/nvme0n1")
226 assert disk
.available_lvm
227 assert not disk
.available
228 assert not disk
.available_raw
230 def test_existing_vg_too_small(self
, monkeypatch
, device_info
):
231 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=4,
232 vg_extent_size
=1073741824)
233 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
234 lsblk
= {"TYPE": "disk"}
235 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
236 device_info(devices
=data
, lsblk
=lsblk
)
237 disk
= device
.Device("/dev/nvme0n1")
238 assert not disk
.available_lvm
239 assert not disk
.available
240 assert not disk
.available_raw
242 def test_multiple_existing_vgs(self
, monkeypatch
, device_info
):
243 vg1
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=4,
244 vg_extent_size
=1073741824)
245 vg2
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=6,
246 vg_extent_size
=1073741824)
247 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg1
, vg2
])
248 lsblk
= {"TYPE": "disk"}
249 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
250 device_info(devices
=data
, lsblk
=lsblk
)
251 disk
= device
.Device("/dev/nvme0n1")
252 assert disk
.available_lvm
253 assert not disk
.available
254 assert not disk
.available_raw
256 @pytest.mark
.parametrize("ceph_type", ["data", "block"])
257 def test_used_by_ceph(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
, ceph_type
):
258 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
259 pvolumes
.append(FooPVolume
)
260 monkeypatch
.setattr(api
, 'PVolumes', lambda populate
=True: pvolumes
if populate
else pvolumes_empty
)
261 data
= {"/dev/sda": {"foo": "bar"}}
262 lsblk
= {"TYPE": "part"}
263 lv_data
= {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type
}}
264 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
265 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=6,
266 vg_extent_size
=1073741824)
267 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
268 disk
= device
.Device("/dev/sda")
269 assert disk
.used_by_ceph
271 def test_not_used_by_ceph(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
272 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
273 pvolumes
.append(FooPVolume
)
274 monkeypatch
.setattr(api
, 'PVolumes', lambda populate
=True: pvolumes
if populate
else pvolumes_empty
)
275 data
= {"/dev/sda": {"foo": "bar"}}
276 lsblk
= {"TYPE": "part"}
277 lv_data
= {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
278 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
279 disk
= device
.Device("/dev/sda")
280 assert not disk
.used_by_ceph
282 def test_get_device_id(self
, device_info
):
283 udev
= {k
:k
for k
in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
284 device_info(udevadm
=udev
)
285 disk
= device
.Device("/dev/sda")
286 assert disk
._get
_device
_id
() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
290 class TestDeviceEncryption(object):
292 def test_partition_is_not_encrypted_lsblk(self
, device_info
, pvolumes
):
293 lsblk
= {'TYPE': 'part', 'FSTYPE': 'xfs'}
294 device_info(lsblk
=lsblk
)
295 disk
= device
.Device("/dev/sda")
296 assert disk
.is_encrypted
is False
298 def test_partition_is_encrypted_lsblk(self
, device_info
, pvolumes
):
299 lsblk
= {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS'}
300 device_info(lsblk
=lsblk
)
301 disk
= device
.Device("/dev/sda")
302 assert disk
.is_encrypted
is True
304 def test_partition_is_not_encrypted_blkid(self
, device_info
, pvolumes
):
305 lsblk
= {'TYPE': 'part'}
306 blkid
= {'TYPE': 'ceph data'}
307 device_info(lsblk
=lsblk
, blkid
=blkid
)
308 disk
= device
.Device("/dev/sda")
309 assert disk
.is_encrypted
is False
311 def test_partition_is_encrypted_blkid(self
, device_info
, pvolumes
):
312 lsblk
= {'TYPE': 'part'}
313 blkid
= {'TYPE': 'crypto_LUKS'}
314 device_info(lsblk
=lsblk
, blkid
=blkid
)
315 disk
= device
.Device("/dev/sda")
316 assert disk
.is_encrypted
is True
318 def test_mapper_is_encrypted_luks1(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
319 status
= {'type': 'LUKS1'}
320 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
321 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
322 blkid
= {'TYPE': 'mapper'}
323 device_info(lsblk
=lsblk
, blkid
=blkid
)
324 disk
= device
.Device("/dev/mapper/uuid")
325 assert disk
.is_encrypted
is True
327 def test_mapper_is_encrypted_luks2(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
328 status
= {'type': 'LUKS2'}
329 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
330 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
331 blkid
= {'TYPE': 'mapper'}
332 device_info(lsblk
=lsblk
, blkid
=blkid
)
333 disk
= device
.Device("/dev/mapper/uuid")
334 assert disk
.is_encrypted
is True
336 def test_mapper_is_encrypted_plain(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
337 status
= {'type': 'PLAIN'}
338 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
339 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
340 blkid
= {'TYPE': 'mapper'}
341 device_info(lsblk
=lsblk
, blkid
=blkid
)
342 disk
= device
.Device("/dev/mapper/uuid")
343 assert disk
.is_encrypted
is True
345 def test_mapper_is_not_encrypted_plain(self
, device_info
, pvolumes
, pvolumes_empty
, monkeypatch
):
346 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: {})
347 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
348 blkid
= {'TYPE': 'mapper'}
349 device_info(lsblk
=lsblk
, blkid
=blkid
)
350 disk
= device
.Device("/dev/mapper/uuid")
351 assert disk
.is_encrypted
is False
353 def test_lv_is_encrypted_blkid(self
, device_info
, pvolumes
):
354 lsblk
= {'TYPE': 'lvm'}
355 blkid
= {'TYPE': 'crypto_LUKS'}
356 device_info(lsblk
=lsblk
, blkid
=blkid
)
357 disk
= device
.Device("/dev/sda")
359 assert disk
.is_encrypted
is True
361 def test_lv_is_not_encrypted_blkid(self
, factory
, device_info
, pvolumes
):
362 lsblk
= {'TYPE': 'lvm'}
363 blkid
= {'TYPE': 'xfs'}
364 device_info(lsblk
=lsblk
, blkid
=blkid
)
365 disk
= device
.Device("/dev/sda")
366 disk
.lv_api
= factory(encrypted
=None)
367 assert disk
.is_encrypted
is False
369 def test_lv_is_encrypted_lsblk(self
, device_info
, pvolumes
):
370 lsblk
= {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
371 blkid
= {'TYPE': 'mapper'}
372 device_info(lsblk
=lsblk
, blkid
=blkid
)
373 disk
= device
.Device("/dev/sda")
375 assert disk
.is_encrypted
is True
377 def test_lv_is_not_encrypted_lsblk(self
, factory
, device_info
, pvolumes
):
378 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
379 blkid
= {'TYPE': 'mapper'}
380 device_info(lsblk
=lsblk
, blkid
=blkid
)
381 disk
= device
.Device("/dev/sda")
382 disk
.lv_api
= factory(encrypted
=None)
383 assert disk
.is_encrypted
is False
385 def test_lv_is_encrypted_lvm_api(self
, factory
, device_info
, pvolumes
):
386 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
387 blkid
= {'TYPE': 'mapper'}
388 device_info(lsblk
=lsblk
, blkid
=blkid
)
389 disk
= device
.Device("/dev/sda")
390 disk
.lv_api
= factory(encrypted
=True)
391 assert disk
.is_encrypted
is True
393 def test_lv_is_not_encrypted_lvm_api(self
, factory
, device_info
, pvolumes
):
394 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
395 blkid
= {'TYPE': 'mapper'}
396 device_info(lsblk
=lsblk
, blkid
=blkid
)
397 disk
= device
.Device("/dev/sda")
398 disk
.lv_api
= factory(encrypted
=False)
399 assert disk
.is_encrypted
is False
402 class TestDeviceOrdering(object):
406 "/dev/sda": {"removable": 0},
407 "/dev/sdb": {"removable": 1}, # invalid
408 "/dev/sdc": {"removable": 0},
409 "/dev/sdd": {"removable": 1}, # invalid
412 def test_valid_before_invalid(self
, device_info
):
413 device_info(devices
=self
.data
)
414 sda
= device
.Device("/dev/sda")
415 sdb
= device
.Device("/dev/sdb")
420 def test_valid_alphabetical_ordering(self
, device_info
):
421 device_info(devices
=self
.data
)
422 sda
= device
.Device("/dev/sda")
423 sdc
= device
.Device("/dev/sdc")
428 def test_invalid_alphabetical_ordering(self
, device_info
):
429 device_info(devices
=self
.data
)
430 sdb
= device
.Device("/dev/sdb")
431 sdd
= device
.Device("/dev/sdd")
437 class TestCephDiskDevice(object):
439 def test_partlabel_lsblk(self
, device_info
):
440 lsblk
= {"PARTLABEL": ""}
441 device_info(lsblk
=lsblk
)
442 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
444 assert disk
.partlabel
== ''
446 def test_partlabel_blkid(self
, device_info
):
447 lsblk
= {"PARTLABEL": ""}
448 blkid
= {"PARTLABEL": "ceph data"}
449 device_info(lsblk
=lsblk
, blkid
=blkid
)
450 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
452 assert disk
.partlabel
== 'ceph data'
454 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
455 "disable_kernel_queries",
456 "disable_lvm_queries")
457 def test_is_member_blkid(self
, monkeypatch
, patch_bluestore_label
):
458 monkeypatch
.setattr("ceph_volume.util.device.disk.lsblk",
459 lambda path
: {'PARTLABEL': ""})
460 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
462 assert disk
.is_member
is True
464 def test_reject_removable_device(self
, device_info
):
465 data
= {"/dev/sdb": {"removable": 1}}
466 device_info(devices
=data
)
467 disk
= device
.Device("/dev/sdb")
468 assert not disk
.available
470 def test_accept_non_removable_device(self
, device_info
):
471 data
= {"/dev/sdb": {"removable": 0, "size": 5368709120}}
472 device_info(devices
=data
)
473 disk
= device
.Device("/dev/sdb")
474 assert disk
.available
476 def test_reject_readonly_device(self
, device_info
):
477 data
= {"/dev/cdrom": {"ro": 1}}
478 device_info(devices
=data
)
479 disk
= device
.Device("/dev/cdrom")
480 assert not disk
.available
482 def test_reject_smaller_than_5gb(self
, device_info
):
483 data
= {"/dev/sda": {"size": 5368709119}}
484 device_info(devices
=data
)
485 disk
= device
.Device("/dev/sda")
486 assert not disk
.available
, 'too small device is available'
488 def test_accept_non_readonly_device(self
, device_info
):
489 data
= {"/dev/sda": {"ro": 0, "size": 5368709120}}
490 device_info(devices
=data
)
491 disk
= device
.Device("/dev/sda")
492 assert disk
.available
494 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
495 "disable_kernel_queries",
496 "disable_lvm_queries")
497 def test_is_member_lsblk(self
, patch_bluestore_label
):
498 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
500 assert disk
.is_member
is True
502 def test_unknown_type(self
, device_info
):
503 lsblk
= {"PARTLABEL": "gluster"}
504 device_info(lsblk
=lsblk
)
505 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
507 assert disk
.type == 'unknown'
509 ceph_types
= ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
511 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
512 "disable_kernel_queries",
513 "disable_lvm_queries")
514 def test_type_blkid(self
, monkeypatch
, device_info
, ceph_partlabel
):
515 monkeypatch
.setattr("ceph_volume.util.device.disk.lsblk",
516 lambda path
: {'PARTLABEL': ''})
517 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
519 assert disk
.type in self
.ceph_types
521 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
522 "lsblk_ceph_disk_member",
523 "disable_kernel_queries",
524 "disable_lvm_queries")
525 def test_type_lsblk(self
, device_info
, ceph_partlabel
):
526 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
528 assert disk
.type in self
.ceph_types