]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
0df5ac61e9ea319f79f27bc3e6cf8ce69d0691cb
2 from copy
import deepcopy
3 from ceph_volume
.util
import device
4 from ceph_volume
.api
import lvm
as api
7 class TestDevice(object):
9 def test_sys_api(self
, monkeypatch
, device_info
):
10 volume
= api
.Volume(lv_name
='lv', lv_uuid
='y', vg_name
='vg',
11 lv_tags
={}, lv_path
='/dev/VolGroup/lv')
13 volumes
.append(volume
)
14 monkeypatch
.setattr(api
, 'get_lvs', lambda **kwargs
:
17 data
= {"/dev/sda": {"foo": "bar"}}
18 lsblk
= {"TYPE": "disk"}
19 device_info(devices
=data
,lsblk
=lsblk
)
20 disk
= device
.Device("/dev/sda")
22 assert "foo" in disk
.sys_api
24 def test_lvm_size(self
, monkeypatch
, device_info
):
25 volume
= api
.Volume(lv_name
='lv', lv_uuid
='y', vg_name
='vg',
26 lv_tags
={}, lv_path
='/dev/VolGroup/lv')
28 volumes
.append(volume
)
29 monkeypatch
.setattr(api
, 'get_lvs', lambda **kwargs
:
33 data
= {"/dev/sda": {"size": "5368709120"}}
34 lsblk
= {"TYPE": "disk"}
35 device_info(devices
=data
,lsblk
=lsblk
)
36 disk
= device
.Device("/dev/sda")
37 assert disk
.lvm_size
.gb
== 4
39 def test_lvm_size_rounds_down(self
, device_info
):
41 data
= {"/dev/sda": {"size": "5905580032"}}
42 lsblk
= {"TYPE": "disk"}
43 device_info(devices
=data
,lsblk
=lsblk
)
44 disk
= device
.Device("/dev/sda")
45 assert disk
.lvm_size
.gb
== 4
47 def test_is_lv(self
, device_info
):
48 data
= {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
49 lsblk
= {"TYPE": "lvm"}
50 device_info(lv
=data
,lsblk
=lsblk
)
51 disk
= device
.Device("vg/lv")
54 def test_vgs_is_empty(self
, device_info
, monkeypatch
):
55 BarPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000",
58 pvolumes
.append(BarPVolume
)
59 lsblk
= {"TYPE": "disk"}
60 device_info(lsblk
=lsblk
)
61 monkeypatch
.setattr(api
, 'get_pvs', lambda **kwargs
: {})
63 disk
= device
.Device("/dev/nvme0n1")
66 def test_vgs_is_not_empty(self
, device_info
, monkeypatch
):
67 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=6,
68 vg_extent_size
=1073741824)
69 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
70 lsblk
= {"TYPE": "disk"}
71 device_info(lsblk
=lsblk
)
72 disk
= device
.Device("/dev/nvme0n1")
73 assert len(disk
.vgs
) == 1
75 def test_device_is_device(self
, device_info
):
76 data
= {"/dev/sda": {"foo": "bar"}}
77 lsblk
= {"TYPE": "device"}
78 device_info(devices
=data
, lsblk
=lsblk
)
79 disk
= device
.Device("/dev/sda")
80 assert disk
.is_device
is True
82 def test_device_is_rotational(self
, device_info
):
83 data
= {"/dev/sda": {"rotational": "1"}}
84 lsblk
= {"TYPE": "device"}
85 device_info(devices
=data
, lsblk
=lsblk
)
86 disk
= device
.Device("/dev/sda")
87 assert disk
.rotational
89 def test_device_is_not_rotational(self
, device_info
):
90 data
= {"/dev/sda": {"rotational": "0"}}
91 lsblk
= {"TYPE": "device"}
92 device_info(devices
=data
, lsblk
=lsblk
)
93 disk
= device
.Device("/dev/sda")
94 assert not disk
.rotational
96 def test_device_is_rotational_lsblk(self
, device_info
):
97 data
= {"/dev/sda": {"foo": "bar"}}
98 lsblk
= {"TYPE": "device", "ROTA": "1"}
99 device_info(devices
=data
, lsblk
=lsblk
)
100 disk
= device
.Device("/dev/sda")
101 assert disk
.rotational
103 def test_device_is_not_rotational_lsblk(self
, device_info
):
104 data
= {"/dev/sda": {"rotational": "0"}}
105 lsblk
= {"TYPE": "device", "ROTA": "0"}
106 device_info(devices
=data
, lsblk
=lsblk
)
107 disk
= device
.Device("/dev/sda")
108 assert not disk
.rotational
110 def test_device_is_rotational_defaults_true(self
, device_info
):
111 # rotational will default true if no info from sys_api or lsblk is found
112 data
= {"/dev/sda": {"foo": "bar"}}
113 lsblk
= {"TYPE": "device", "foo": "bar"}
114 device_info(devices
=data
, lsblk
=lsblk
)
115 disk
= device
.Device("/dev/sda")
116 assert disk
.rotational
118 def test_disk_is_device(self
, device_info
):
119 data
= {"/dev/sda": {"foo": "bar"}}
120 lsblk
= {"TYPE": "disk"}
121 device_info(devices
=data
, lsblk
=lsblk
)
122 disk
= device
.Device("/dev/sda")
123 assert disk
.is_device
is True
125 def test_is_partition(self
, device_info
):
126 data
= {"/dev/sda1": {"foo": "bar"}}
127 lsblk
= {"TYPE": "part"}
128 device_info(devices
=data
, lsblk
=lsblk
)
129 disk
= device
.Device("/dev/sda1")
130 assert disk
.is_partition
132 def test_is_not_acceptable_device(self
, device_info
):
133 data
= {"/dev/dm-0": {"foo": "bar"}}
134 lsblk
= {"TYPE": "mpath"}
135 device_info(devices
=data
, lsblk
=lsblk
)
136 disk
= device
.Device("/dev/dm-0")
137 assert not disk
.is_device
139 def test_is_not_lvm_memeber(self
, device_info
):
140 data
= {"/dev/sda1": {"foo": "bar"}}
141 lsblk
= {"TYPE": "part"}
142 device_info(devices
=data
, lsblk
=lsblk
)
143 disk
= device
.Device("/dev/sda1")
144 assert not disk
.is_lvm_member
146 def test_is_lvm_memeber(self
, device_info
):
147 data
= {"/dev/sda1": {"foo": "bar"}}
148 lsblk
= {"TYPE": "part"}
149 device_info(devices
=data
, lsblk
=lsblk
)
150 disk
= device
.Device("/dev/sda1")
151 assert not disk
.is_lvm_member
153 def test_is_mapper_device(self
, device_info
):
154 lsblk
= {"TYPE": "lvm"}
155 device_info(lsblk
=lsblk
)
156 disk
= device
.Device("/dev/mapper/foo")
157 assert disk
.is_mapper
159 def test_dm_is_mapper_device(self
, device_info
):
160 lsblk
= {"TYPE": "lvm"}
161 device_info(lsblk
=lsblk
)
162 disk
= device
.Device("/dev/dm-4")
163 assert disk
.is_mapper
165 def test_is_not_mapper_device(self
, device_info
):
166 lsblk
= {"TYPE": "disk"}
167 device_info(lsblk
=lsblk
)
168 disk
= device
.Device("/dev/sda")
169 assert not disk
.is_mapper
171 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
172 "disable_kernel_queries")
173 def test_is_ceph_disk_lsblk(self
, monkeypatch
, patch_bluestore_label
):
174 disk
= device
.Device("/dev/sda")
175 assert disk
.is_ceph_disk_member
177 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
178 "disable_kernel_queries")
179 def test_is_ceph_disk_blkid(self
, monkeypatch
, patch_bluestore_label
):
180 disk
= device
.Device("/dev/sda")
181 assert disk
.is_ceph_disk_member
183 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
184 "disable_kernel_queries")
185 def test_is_ceph_disk_member_not_available_lsblk(self
, monkeypatch
, patch_bluestore_label
):
186 disk
= device
.Device("/dev/sda")
187 assert disk
.is_ceph_disk_member
188 assert not disk
.available
189 assert "Used by ceph-disk" in disk
.rejected_reasons
191 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
192 "disable_kernel_queries")
193 def test_is_ceph_disk_member_not_available_blkid(self
, monkeypatch
, patch_bluestore_label
):
194 disk
= device
.Device("/dev/sda")
195 assert disk
.is_ceph_disk_member
196 assert not disk
.available
197 assert "Used by ceph-disk" in disk
.rejected_reasons
199 def test_reject_removable_device(self
, device_info
):
200 data
= {"/dev/sdb": {"removable": 1}}
201 lsblk
= {"TYPE": "disk"}
202 device_info(devices
=data
,lsblk
=lsblk
)
203 disk
= device
.Device("/dev/sdb")
204 assert not disk
.available
206 def test_reject_device_with_gpt_headers(self
, device_info
):
207 data
= {"/dev/sdb": {"removable": 0, "size": 5368709120}}
208 lsblk
= {"TYPE": "disk"}
209 blkid
= {"PTTYPE": "gpt"}
215 disk
= device
.Device("/dev/sdb")
216 assert not disk
.available
218 def test_accept_non_removable_device(self
, device_info
):
219 data
= {"/dev/sdb": {"removable": 0, "size": 5368709120}}
220 lsblk
= {"TYPE": "disk"}
221 device_info(devices
=data
,lsblk
=lsblk
)
222 disk
= device
.Device("/dev/sdb")
223 assert disk
.available
225 def test_reject_not_acceptable_device(self
, device_info
):
226 data
= {"/dev/dm-0": {"foo": "bar"}}
227 lsblk
= {"TYPE": "mpath"}
228 device_info(devices
=data
, lsblk
=lsblk
)
229 disk
= device
.Device("/dev/dm-0")
230 assert not disk
.available
232 def test_reject_readonly_device(self
, device_info
):
233 data
= {"/dev/cdrom": {"ro": 1}}
234 lsblk
= {"TYPE": "disk"}
235 device_info(devices
=data
,lsblk
=lsblk
)
236 disk
= device
.Device("/dev/cdrom")
237 assert not disk
.available
239 def test_reject_smaller_than_5gb(self
, device_info
):
240 data
= {"/dev/sda": {"size": 5368709119}}
241 lsblk
= {"TYPE": "disk"}
242 device_info(devices
=data
,lsblk
=lsblk
)
243 disk
= device
.Device("/dev/sda")
244 assert not disk
.available
, 'too small device is available'
246 def test_accept_non_readonly_device(self
, device_info
):
247 data
= {"/dev/sda": {"ro": 0, "size": 5368709120}}
248 lsblk
= {"TYPE": "disk"}
249 device_info(devices
=data
,lsblk
=lsblk
)
250 disk
= device
.Device("/dev/sda")
251 assert disk
.available
253 def test_reject_bluestore_device(self
, monkeypatch
, patch_bluestore_label
, device_info
):
254 patch_bluestore_label
.return_value
= True
255 lsblk
= {"TYPE": "disk"}
256 device_info(lsblk
=lsblk
)
257 disk
= device
.Device("/dev/sda")
258 assert not disk
.available
259 assert "Has BlueStore device label" in disk
.rejected_reasons
261 @pytest.mark
.usefixtures("device_info_not_ceph_disk_member",
262 "disable_kernel_queries")
263 def test_is_not_ceph_disk_member_lsblk(self
, patch_bluestore_label
):
264 disk
= device
.Device("/dev/sda")
265 assert disk
.is_ceph_disk_member
is False
267 def test_existing_vg_available(self
, monkeypatch
, device_info
):
268 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=1536,
269 vg_extent_size
=4194304)
270 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
271 lsblk
= {"TYPE": "disk"}
272 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
273 device_info(devices
=data
, lsblk
=lsblk
)
274 disk
= device
.Device("/dev/nvme0n1")
275 assert disk
.available_lvm
276 assert not disk
.available
277 assert not disk
.available_raw
279 def test_existing_vg_too_small(self
, monkeypatch
, device_info
):
280 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=4,
281 vg_extent_size
=1073741824)
282 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
283 lsblk
= {"TYPE": "disk"}
284 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
285 device_info(devices
=data
, lsblk
=lsblk
)
286 disk
= device
.Device("/dev/nvme0n1")
287 assert not disk
.available_lvm
288 assert not disk
.available
289 assert not disk
.available_raw
291 def test_multiple_existing_vgs(self
, monkeypatch
, device_info
):
292 vg1
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=1000,
293 vg_extent_size
=4194304)
294 vg2
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=536,
295 vg_extent_size
=4194304)
296 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg1
, vg2
])
297 lsblk
= {"TYPE": "disk"}
298 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
299 device_info(devices
=data
, lsblk
=lsblk
)
300 disk
= device
.Device("/dev/nvme0n1")
301 assert disk
.available_lvm
302 assert not disk
.available
303 assert not disk
.available_raw
305 @pytest.mark
.parametrize("ceph_type", ["data", "block"])
306 def test_used_by_ceph(self
, device_info
,
307 monkeypatch
, ceph_type
):
308 data
= {"/dev/sda": {"foo": "bar"}}
309 lsblk
= {"TYPE": "part"}
310 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000",
311 lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
313 pvolumes
.append(FooPVolume
)
314 lv_data
= {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg",
315 "lv_uuid": "0000", "lv_tags":
316 "ceph.osd_id=0,ceph.type="+ceph_type
}
318 lv
= api
.Volume(**lv_data
)
320 monkeypatch
.setattr(api
, 'get_pvs', lambda **kwargs
: pvolumes
)
321 monkeypatch
.setattr(api
, 'get_lvs', lambda **kwargs
:
324 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
325 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=6,
326 vg_extent_size
=1073741824)
327 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
328 disk
= device
.Device("/dev/sda")
329 assert disk
.used_by_ceph
331 def test_not_used_by_ceph(self
, device_info
, monkeypatch
):
332 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
334 pvolumes
.append(FooPVolume
)
335 data
= {"/dev/sda": {"foo": "bar"}}
336 lsblk
= {"TYPE": "part"}
337 lv_data
= {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
338 monkeypatch
.setattr(api
, 'get_pvs', lambda **kwargs
: pvolumes
)
340 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
341 disk
= device
.Device("/dev/sda")
342 assert not disk
.used_by_ceph
344 def test_get_device_id(self
, device_info
):
345 udev
= {k
:k
for k
in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
346 lsblk
= {"TYPE": "disk"}
347 device_info(udevadm
=udev
,lsblk
=lsblk
)
348 disk
= device
.Device("/dev/sda")
349 assert disk
._get
_device
_id
() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
353 class TestDeviceEncryption(object):
355 def test_partition_is_not_encrypted_lsblk(self
, device_info
):
356 lsblk
= {'TYPE': 'part', 'FSTYPE': 'xfs'}
357 device_info(lsblk
=lsblk
)
358 disk
= device
.Device("/dev/sda")
359 assert disk
.is_encrypted
is False
361 def test_partition_is_encrypted_lsblk(self
, device_info
):
362 lsblk
= {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS'}
363 device_info(lsblk
=lsblk
)
364 disk
= device
.Device("/dev/sda")
365 assert disk
.is_encrypted
is True
367 def test_partition_is_not_encrypted_blkid(self
, device_info
):
368 lsblk
= {'TYPE': 'part'}
369 blkid
= {'TYPE': 'ceph data'}
370 device_info(lsblk
=lsblk
, blkid
=blkid
)
371 disk
= device
.Device("/dev/sda")
372 assert disk
.is_encrypted
is False
374 def test_partition_is_encrypted_blkid(self
, device_info
):
375 lsblk
= {'TYPE': 'part'}
376 blkid
= {'TYPE': 'crypto_LUKS'}
377 device_info(lsblk
=lsblk
, blkid
=blkid
)
378 disk
= device
.Device("/dev/sda")
379 assert disk
.is_encrypted
is True
381 def test_mapper_is_encrypted_luks1(self
, device_info
, monkeypatch
):
382 status
= {'type': 'LUKS1'}
383 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
384 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
385 blkid
= {'TYPE': 'mapper'}
386 device_info(lsblk
=lsblk
, blkid
=blkid
)
387 disk
= device
.Device("/dev/mapper/uuid")
388 assert disk
.is_encrypted
is True
390 def test_mapper_is_encrypted_luks2(self
, device_info
, monkeypatch
):
391 status
= {'type': 'LUKS2'}
392 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
393 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
394 blkid
= {'TYPE': 'mapper'}
395 device_info(lsblk
=lsblk
, blkid
=blkid
)
396 disk
= device
.Device("/dev/mapper/uuid")
397 assert disk
.is_encrypted
is True
399 def test_mapper_is_encrypted_plain(self
, device_info
, monkeypatch
):
400 status
= {'type': 'PLAIN'}
401 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
402 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
403 blkid
= {'TYPE': 'mapper'}
404 device_info(lsblk
=lsblk
, blkid
=blkid
)
405 disk
= device
.Device("/dev/mapper/uuid")
406 assert disk
.is_encrypted
is True
408 def test_mapper_is_not_encrypted_plain(self
, device_info
, monkeypatch
):
409 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: {})
410 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
411 blkid
= {'TYPE': 'mapper'}
412 device_info(lsblk
=lsblk
, blkid
=blkid
)
413 disk
= device
.Device("/dev/mapper/uuid")
414 assert disk
.is_encrypted
is False
416 def test_lv_is_encrypted_blkid(self
, device_info
):
417 lsblk
= {'TYPE': 'lvm'}
418 blkid
= {'TYPE': 'crypto_LUKS'}
419 device_info(lsblk
=lsblk
, blkid
=blkid
)
420 disk
= device
.Device("/dev/sda")
422 assert disk
.is_encrypted
is True
424 def test_lv_is_not_encrypted_blkid(self
, factory
, device_info
):
425 lsblk
= {'TYPE': 'lvm'}
426 blkid
= {'TYPE': 'xfs'}
427 device_info(lsblk
=lsblk
, blkid
=blkid
)
428 disk
= device
.Device("/dev/sda")
429 disk
.lv_api
= factory(encrypted
=None)
430 assert disk
.is_encrypted
is False
432 def test_lv_is_encrypted_lsblk(self
, device_info
):
433 lsblk
= {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
434 blkid
= {'TYPE': 'mapper'}
435 device_info(lsblk
=lsblk
, blkid
=blkid
)
436 disk
= device
.Device("/dev/sda")
438 assert disk
.is_encrypted
is True
440 def test_lv_is_not_encrypted_lsblk(self
, factory
, device_info
):
441 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
442 blkid
= {'TYPE': 'mapper'}
443 device_info(lsblk
=lsblk
, blkid
=blkid
)
444 disk
= device
.Device("/dev/sda")
445 disk
.lv_api
= factory(encrypted
=None)
446 assert disk
.is_encrypted
is False
448 def test_lv_is_encrypted_lvm_api(self
, factory
, device_info
):
449 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
450 blkid
= {'TYPE': 'mapper'}
451 device_info(lsblk
=lsblk
, blkid
=blkid
)
452 disk
= device
.Device("/dev/sda")
453 disk
.lv_api
= factory(encrypted
=True)
454 assert disk
.is_encrypted
is True
456 def test_lv_is_not_encrypted_lvm_api(self
, factory
, device_info
):
457 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
458 blkid
= {'TYPE': 'mapper'}
459 device_info(lsblk
=lsblk
, blkid
=blkid
)
460 disk
= device
.Device("/dev/sda")
461 disk
.lv_api
= factory(encrypted
=False)
462 assert disk
.is_encrypted
is False
465 class TestDeviceOrdering(object):
469 "/dev/sda": {"removable": 0},
470 "/dev/sdb": {"removable": 1}, # invalid
471 "/dev/sdc": {"removable": 0},
472 "/dev/sdd": {"removable": 1}, # invalid
475 def test_valid_before_invalid(self
, device_info
):
476 lsblk
= {"TYPE": "disk"}
477 device_info(devices
=self
.data
,lsblk
=lsblk
)
478 sda
= device
.Device("/dev/sda")
479 sdb
= device
.Device("/dev/sdb")
484 def test_valid_alphabetical_ordering(self
, device_info
):
485 lsblk
= {"TYPE": "disk"}
486 device_info(devices
=self
.data
,lsblk
=lsblk
)
487 sda
= device
.Device("/dev/sda")
488 sdc
= device
.Device("/dev/sdc")
493 def test_invalid_alphabetical_ordering(self
, device_info
):
494 lsblk
= {"TYPE": "disk"}
495 device_info(devices
=self
.data
,lsblk
=lsblk
)
496 sdb
= device
.Device("/dev/sdb")
497 sdd
= device
.Device("/dev/sdd")
503 class TestCephDiskDevice(object):
505 def test_partlabel_lsblk(self
, device_info
):
506 lsblk
= {"TYPE": "disk", "PARTLABEL": ""}
507 device_info(lsblk
=lsblk
)
508 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
510 assert disk
.partlabel
== ''
512 def test_partlabel_blkid(self
, device_info
):
513 blkid
= {"TYPE": "disk", "PARTLABEL": "ceph data"}
514 device_info(blkid
=blkid
)
515 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
517 assert disk
.partlabel
== 'ceph data'
519 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
520 "disable_kernel_queries")
521 def test_is_member_blkid(self
, monkeypatch
, patch_bluestore_label
):
522 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
524 assert disk
.is_member
is True
526 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
527 "disable_kernel_queries")
528 def test_is_member_lsblk(self
, patch_bluestore_label
, device_info
):
529 lsblk
= {"TYPE": "disk", "PARTLABEL": "ceph"}
530 device_info(lsblk
=lsblk
)
531 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
533 assert disk
.is_member
is True
535 def test_unknown_type(self
, device_info
):
536 lsblk
= {"TYPE": "disk", "PARTLABEL": "gluster"}
537 device_info(lsblk
=lsblk
)
538 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
540 assert disk
.type == 'unknown'
542 ceph_types
= ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
544 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
545 "disable_kernel_queries")
546 def test_type_blkid(self
, monkeypatch
, device_info
, ceph_partlabel
):
547 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
549 assert disk
.type in self
.ceph_types
551 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
552 "lsblk_ceph_disk_member",
553 "disable_kernel_queries")
554 def test_type_lsblk(self
, device_info
, ceph_partlabel
):
555 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
557 assert disk
.type in self
.ceph_types