]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
2 from copy
import deepcopy
3 from ceph_volume
.util
import device
4 from ceph_volume
.api
import lvm
as api
7 class TestDevice(object):
9 def test_sys_api(self
, monkeypatch
, device_info
):
10 volume
= api
.Volume(lv_name
='lv', lv_uuid
='y', vg_name
='vg',
11 lv_tags
={}, lv_path
='/dev/VolGroup/lv')
13 volumes
.append(volume
)
14 monkeypatch
.setattr(api
, 'get_lvs', lambda **kwargs
:
17 data
= {"/dev/sda": {"foo": "bar"}}
18 lsblk
= {"TYPE": "disk"}
19 device_info(devices
=data
,lsblk
=lsblk
)
20 disk
= device
.Device("/dev/sda")
22 assert "foo" in disk
.sys_api
24 def test_lvm_size(self
, monkeypatch
, device_info
):
25 volume
= api
.Volume(lv_name
='lv', lv_uuid
='y', vg_name
='vg',
26 lv_tags
={}, lv_path
='/dev/VolGroup/lv')
28 volumes
.append(volume
)
29 monkeypatch
.setattr(api
, 'get_lvs', lambda **kwargs
:
33 data
= {"/dev/sda": {"size": "5368709120"}}
34 lsblk
= {"TYPE": "disk"}
35 device_info(devices
=data
,lsblk
=lsblk
)
36 disk
= device
.Device("/dev/sda")
37 assert disk
.lvm_size
.gb
== 4
39 def test_lvm_size_rounds_down(self
, device_info
):
41 data
= {"/dev/sda": {"size": "5905580032"}}
42 lsblk
= {"TYPE": "disk"}
43 device_info(devices
=data
,lsblk
=lsblk
)
44 disk
= device
.Device("/dev/sda")
45 assert disk
.lvm_size
.gb
== 4
47 def test_is_lv(self
, device_info
):
48 data
= {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
49 lsblk
= {"TYPE": "lvm"}
50 device_info(lv
=data
,lsblk
=lsblk
)
51 disk
= device
.Device("vg/lv")
54 def test_vgs_is_empty(self
, device_info
, monkeypatch
):
55 BarPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000",
58 pvolumes
.append(BarPVolume
)
59 lsblk
= {"TYPE": "disk"}
60 device_info(lsblk
=lsblk
)
61 monkeypatch
.setattr(api
, 'get_pvs', lambda **kwargs
: {})
63 disk
= device
.Device("/dev/nvme0n1")
66 def test_vgs_is_not_empty(self
, device_info
, monkeypatch
):
67 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=6,
68 vg_extent_size
=1073741824)
69 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
70 lsblk
= {"TYPE": "disk"}
71 device_info(lsblk
=lsblk
)
72 disk
= device
.Device("/dev/nvme0n1")
73 assert len(disk
.vgs
) == 1
75 def test_device_is_device(self
, device_info
):
76 data
= {"/dev/sda": {"foo": "bar"}}
77 lsblk
= {"TYPE": "device"}
78 device_info(devices
=data
, lsblk
=lsblk
)
79 disk
= device
.Device("/dev/sda")
80 assert disk
.is_device
is True
82 def test_device_is_rotational(self
, device_info
):
83 data
= {"/dev/sda": {"rotational": "1"}}
84 lsblk
= {"TYPE": "device"}
85 device_info(devices
=data
, lsblk
=lsblk
)
86 disk
= device
.Device("/dev/sda")
87 assert disk
.rotational
89 def test_device_is_not_rotational(self
, device_info
):
90 data
= {"/dev/sda": {"rotational": "0"}}
91 lsblk
= {"TYPE": "device"}
92 device_info(devices
=data
, lsblk
=lsblk
)
93 disk
= device
.Device("/dev/sda")
94 assert not disk
.rotational
96 def test_device_is_rotational_lsblk(self
, device_info
):
97 data
= {"/dev/sda": {"foo": "bar"}}
98 lsblk
= {"TYPE": "device", "ROTA": "1"}
99 device_info(devices
=data
, lsblk
=lsblk
)
100 disk
= device
.Device("/dev/sda")
101 assert disk
.rotational
103 def test_device_is_not_rotational_lsblk(self
, device_info
):
104 data
= {"/dev/sda": {"rotational": "0"}}
105 lsblk
= {"TYPE": "device", "ROTA": "0"}
106 device_info(devices
=data
, lsblk
=lsblk
)
107 disk
= device
.Device("/dev/sda")
108 assert not disk
.rotational
110 def test_device_is_rotational_defaults_true(self
, device_info
):
111 # rotational will default true if no info from sys_api or lsblk is found
112 data
= {"/dev/sda": {"foo": "bar"}}
113 lsblk
= {"TYPE": "device", "foo": "bar"}
114 device_info(devices
=data
, lsblk
=lsblk
)
115 disk
= device
.Device("/dev/sda")
116 assert disk
.rotational
118 def test_disk_is_device(self
, device_info
):
119 data
= {"/dev/sda": {"foo": "bar"}}
120 lsblk
= {"TYPE": "disk"}
121 device_info(devices
=data
, lsblk
=lsblk
)
122 disk
= device
.Device("/dev/sda")
123 assert disk
.is_device
is True
125 def test_is_partition(self
, device_info
):
126 data
= {"/dev/sda1": {"foo": "bar"}}
127 lsblk
= {"TYPE": "part"}
128 device_info(devices
=data
, lsblk
=lsblk
)
129 disk
= device
.Device("/dev/sda1")
130 assert disk
.is_partition
132 def test_is_not_acceptable_device(self
, device_info
):
133 data
= {"/dev/dm-0": {"foo": "bar"}}
134 lsblk
= {"TYPE": "mpath"}
135 device_info(devices
=data
, lsblk
=lsblk
)
136 disk
= device
.Device("/dev/dm-0")
137 assert not disk
.is_device
139 def test_is_not_lvm_memeber(self
, device_info
):
140 data
= {"/dev/sda1": {"foo": "bar"}}
141 lsblk
= {"TYPE": "part"}
142 device_info(devices
=data
, lsblk
=lsblk
)
143 disk
= device
.Device("/dev/sda1")
144 assert not disk
.is_lvm_member
146 def test_is_lvm_memeber(self
, device_info
):
147 data
= {"/dev/sda1": {"foo": "bar"}}
148 lsblk
= {"TYPE": "part"}
149 device_info(devices
=data
, lsblk
=lsblk
)
150 disk
= device
.Device("/dev/sda1")
151 assert not disk
.is_lvm_member
153 def test_is_mapper_device(self
, device_info
):
154 lsblk
= {"TYPE": "lvm"}
155 device_info(lsblk
=lsblk
)
156 disk
= device
.Device("/dev/mapper/foo")
157 assert disk
.is_mapper
159 def test_dm_is_mapper_device(self
, device_info
):
160 lsblk
= {"TYPE": "lvm"}
161 device_info(lsblk
=lsblk
)
162 disk
= device
.Device("/dev/dm-4")
163 assert disk
.is_mapper
165 def test_is_not_mapper_device(self
, device_info
):
166 lsblk
= {"TYPE": "disk"}
167 device_info(lsblk
=lsblk
)
168 disk
= device
.Device("/dev/sda")
169 assert not disk
.is_mapper
171 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
172 "disable_kernel_queries")
173 def test_is_ceph_disk_lsblk(self
, monkeypatch
, patch_bluestore_label
):
174 disk
= device
.Device("/dev/sda")
175 assert disk
.is_ceph_disk_member
177 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
178 "disable_kernel_queries")
179 def test_is_ceph_disk_blkid(self
, monkeypatch
, patch_bluestore_label
):
180 disk
= device
.Device("/dev/sda")
181 assert disk
.is_ceph_disk_member
183 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
184 "disable_kernel_queries")
185 def test_is_ceph_disk_member_not_available_lsblk(self
, monkeypatch
, patch_bluestore_label
):
186 disk
= device
.Device("/dev/sda")
187 assert disk
.is_ceph_disk_member
188 assert not disk
.available
189 assert "Used by ceph-disk" in disk
.rejected_reasons
191 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
192 "disable_kernel_queries")
193 def test_is_ceph_disk_member_not_available_blkid(self
, monkeypatch
, patch_bluestore_label
):
194 disk
= device
.Device("/dev/sda")
195 assert disk
.is_ceph_disk_member
196 assert not disk
.available
197 assert "Used by ceph-disk" in disk
.rejected_reasons
199 def test_reject_removable_device(self
, device_info
):
200 data
= {"/dev/sdb": {"removable": 1}}
201 lsblk
= {"TYPE": "disk"}
202 device_info(devices
=data
,lsblk
=lsblk
)
203 disk
= device
.Device("/dev/sdb")
204 assert not disk
.available
206 def test_accept_non_removable_device(self
, device_info
):
207 data
= {"/dev/sdb": {"removable": 0, "size": 5368709120}}
208 lsblk
= {"TYPE": "disk"}
209 device_info(devices
=data
,lsblk
=lsblk
)
210 disk
= device
.Device("/dev/sdb")
211 assert disk
.available
213 def test_reject_not_acceptable_device(self
, device_info
):
214 data
= {"/dev/dm-0": {"foo": "bar"}}
215 lsblk
= {"TYPE": "mpath"}
216 device_info(devices
=data
, lsblk
=lsblk
)
217 disk
= device
.Device("/dev/dm-0")
218 assert not disk
.available
220 def test_reject_readonly_device(self
, device_info
):
221 data
= {"/dev/cdrom": {"ro": 1}}
222 lsblk
= {"TYPE": "disk"}
223 device_info(devices
=data
,lsblk
=lsblk
)
224 disk
= device
.Device("/dev/cdrom")
225 assert not disk
.available
227 def test_reject_smaller_than_5gb(self
, device_info
):
228 data
= {"/dev/sda": {"size": 5368709119}}
229 lsblk
= {"TYPE": "disk"}
230 device_info(devices
=data
,lsblk
=lsblk
)
231 disk
= device
.Device("/dev/sda")
232 assert not disk
.available
, 'too small device is available'
234 def test_accept_non_readonly_device(self
, device_info
):
235 data
= {"/dev/sda": {"ro": 0, "size": 5368709120}}
236 lsblk
= {"TYPE": "disk"}
237 device_info(devices
=data
,lsblk
=lsblk
)
238 disk
= device
.Device("/dev/sda")
239 assert disk
.available
241 def test_reject_bluestore_device(self
, monkeypatch
, patch_bluestore_label
, device_info
):
242 patch_bluestore_label
.return_value
= True
243 lsblk
= {"TYPE": "disk"}
244 device_info(lsblk
=lsblk
)
245 disk
= device
.Device("/dev/sda")
246 assert not disk
.available
247 assert "Has BlueStore device label" in disk
.rejected_reasons
249 @pytest.mark
.usefixtures("device_info_not_ceph_disk_member",
250 "disable_kernel_queries")
251 def test_is_not_ceph_disk_member_lsblk(self
, patch_bluestore_label
):
252 disk
= device
.Device("/dev/sda")
253 assert disk
.is_ceph_disk_member
is False
255 def test_existing_vg_available(self
, monkeypatch
, device_info
):
256 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=1536,
257 vg_extent_size
=4194304)
258 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
259 lsblk
= {"TYPE": "disk"}
260 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
261 device_info(devices
=data
, lsblk
=lsblk
)
262 disk
= device
.Device("/dev/nvme0n1")
263 assert disk
.available_lvm
264 assert not disk
.available
265 assert not disk
.available_raw
267 def test_existing_vg_too_small(self
, monkeypatch
, device_info
):
268 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=4,
269 vg_extent_size
=1073741824)
270 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
271 lsblk
= {"TYPE": "disk"}
272 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
273 device_info(devices
=data
, lsblk
=lsblk
)
274 disk
= device
.Device("/dev/nvme0n1")
275 assert not disk
.available_lvm
276 assert not disk
.available
277 assert not disk
.available_raw
279 def test_multiple_existing_vgs(self
, monkeypatch
, device_info
):
280 vg1
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=1000,
281 vg_extent_size
=4194304)
282 vg2
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=536,
283 vg_extent_size
=4194304)
284 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg1
, vg2
])
285 lsblk
= {"TYPE": "disk"}
286 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
287 device_info(devices
=data
, lsblk
=lsblk
)
288 disk
= device
.Device("/dev/nvme0n1")
289 assert disk
.available_lvm
290 assert not disk
.available
291 assert not disk
.available_raw
293 @pytest.mark
.parametrize("ceph_type", ["data", "block"])
294 def test_used_by_ceph(self
, device_info
,
295 monkeypatch
, ceph_type
):
296 data
= {"/dev/sda": {"foo": "bar"}}
297 lsblk
= {"TYPE": "part"}
298 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000",
299 lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
301 pvolumes
.append(FooPVolume
)
302 lv_data
= {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg",
303 "lv_uuid": "0000", "lv_tags":
304 "ceph.osd_id=0,ceph.type="+ceph_type
}
306 lv
= api
.Volume(**lv_data
)
308 monkeypatch
.setattr(api
, 'get_pvs', lambda **kwargs
: pvolumes
)
309 monkeypatch
.setattr(api
, 'get_lvs', lambda **kwargs
:
312 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
313 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=6,
314 vg_extent_size
=1073741824)
315 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
316 disk
= device
.Device("/dev/sda")
317 assert disk
.used_by_ceph
319 def test_not_used_by_ceph(self
, device_info
, monkeypatch
):
320 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
322 pvolumes
.append(FooPVolume
)
323 data
= {"/dev/sda": {"foo": "bar"}}
324 lsblk
= {"TYPE": "part"}
325 lv_data
= {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
326 monkeypatch
.setattr(api
, 'get_pvs', lambda **kwargs
: pvolumes
)
328 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
329 disk
= device
.Device("/dev/sda")
330 assert not disk
.used_by_ceph
332 def test_get_device_id(self
, device_info
):
333 udev
= {k
:k
for k
in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
334 lsblk
= {"TYPE": "disk"}
335 device_info(udevadm
=udev
,lsblk
=lsblk
)
336 disk
= device
.Device("/dev/sda")
337 assert disk
._get
_device
_id
() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
341 class TestDeviceEncryption(object):
343 def test_partition_is_not_encrypted_lsblk(self
, device_info
):
344 lsblk
= {'TYPE': 'part', 'FSTYPE': 'xfs'}
345 device_info(lsblk
=lsblk
)
346 disk
= device
.Device("/dev/sda")
347 assert disk
.is_encrypted
is False
349 def test_partition_is_encrypted_lsblk(self
, device_info
):
350 lsblk
= {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS'}
351 device_info(lsblk
=lsblk
)
352 disk
= device
.Device("/dev/sda")
353 assert disk
.is_encrypted
is True
355 def test_partition_is_not_encrypted_blkid(self
, device_info
):
356 lsblk
= {'TYPE': 'part'}
357 blkid
= {'TYPE': 'ceph data'}
358 device_info(lsblk
=lsblk
, blkid
=blkid
)
359 disk
= device
.Device("/dev/sda")
360 assert disk
.is_encrypted
is False
362 def test_partition_is_encrypted_blkid(self
, device_info
):
363 lsblk
= {'TYPE': 'part'}
364 blkid
= {'TYPE': 'crypto_LUKS'}
365 device_info(lsblk
=lsblk
, blkid
=blkid
)
366 disk
= device
.Device("/dev/sda")
367 assert disk
.is_encrypted
is True
369 def test_mapper_is_encrypted_luks1(self
, device_info
, monkeypatch
):
370 status
= {'type': 'LUKS1'}
371 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
372 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
373 blkid
= {'TYPE': 'mapper'}
374 device_info(lsblk
=lsblk
, blkid
=blkid
)
375 disk
= device
.Device("/dev/mapper/uuid")
376 assert disk
.is_encrypted
is True
378 def test_mapper_is_encrypted_luks2(self
, device_info
, monkeypatch
):
379 status
= {'type': 'LUKS2'}
380 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
381 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
382 blkid
= {'TYPE': 'mapper'}
383 device_info(lsblk
=lsblk
, blkid
=blkid
)
384 disk
= device
.Device("/dev/mapper/uuid")
385 assert disk
.is_encrypted
is True
387 def test_mapper_is_encrypted_plain(self
, device_info
, monkeypatch
):
388 status
= {'type': 'PLAIN'}
389 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
390 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
391 blkid
= {'TYPE': 'mapper'}
392 device_info(lsblk
=lsblk
, blkid
=blkid
)
393 disk
= device
.Device("/dev/mapper/uuid")
394 assert disk
.is_encrypted
is True
396 def test_mapper_is_not_encrypted_plain(self
, device_info
, monkeypatch
):
397 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: {})
398 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
399 blkid
= {'TYPE': 'mapper'}
400 device_info(lsblk
=lsblk
, blkid
=blkid
)
401 disk
= device
.Device("/dev/mapper/uuid")
402 assert disk
.is_encrypted
is False
404 def test_lv_is_encrypted_blkid(self
, device_info
):
405 lsblk
= {'TYPE': 'lvm'}
406 blkid
= {'TYPE': 'crypto_LUKS'}
407 device_info(lsblk
=lsblk
, blkid
=blkid
)
408 disk
= device
.Device("/dev/sda")
410 assert disk
.is_encrypted
is True
412 def test_lv_is_not_encrypted_blkid(self
, factory
, device_info
):
413 lsblk
= {'TYPE': 'lvm'}
414 blkid
= {'TYPE': 'xfs'}
415 device_info(lsblk
=lsblk
, blkid
=blkid
)
416 disk
= device
.Device("/dev/sda")
417 disk
.lv_api
= factory(encrypted
=None)
418 assert disk
.is_encrypted
is False
420 def test_lv_is_encrypted_lsblk(self
, device_info
):
421 lsblk
= {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
422 blkid
= {'TYPE': 'mapper'}
423 device_info(lsblk
=lsblk
, blkid
=blkid
)
424 disk
= device
.Device("/dev/sda")
426 assert disk
.is_encrypted
is True
428 def test_lv_is_not_encrypted_lsblk(self
, factory
, device_info
):
429 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
430 blkid
= {'TYPE': 'mapper'}
431 device_info(lsblk
=lsblk
, blkid
=blkid
)
432 disk
= device
.Device("/dev/sda")
433 disk
.lv_api
= factory(encrypted
=None)
434 assert disk
.is_encrypted
is False
436 def test_lv_is_encrypted_lvm_api(self
, factory
, device_info
):
437 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
438 blkid
= {'TYPE': 'mapper'}
439 device_info(lsblk
=lsblk
, blkid
=blkid
)
440 disk
= device
.Device("/dev/sda")
441 disk
.lv_api
= factory(encrypted
=True)
442 assert disk
.is_encrypted
is True
444 def test_lv_is_not_encrypted_lvm_api(self
, factory
, device_info
):
445 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
446 blkid
= {'TYPE': 'mapper'}
447 device_info(lsblk
=lsblk
, blkid
=blkid
)
448 disk
= device
.Device("/dev/sda")
449 disk
.lv_api
= factory(encrypted
=False)
450 assert disk
.is_encrypted
is False
453 class TestDeviceOrdering(object):
457 "/dev/sda": {"removable": 0},
458 "/dev/sdb": {"removable": 1}, # invalid
459 "/dev/sdc": {"removable": 0},
460 "/dev/sdd": {"removable": 1}, # invalid
463 def test_valid_before_invalid(self
, device_info
):
464 lsblk
= {"TYPE": "disk"}
465 device_info(devices
=self
.data
,lsblk
=lsblk
)
466 sda
= device
.Device("/dev/sda")
467 sdb
= device
.Device("/dev/sdb")
472 def test_valid_alphabetical_ordering(self
, device_info
):
473 lsblk
= {"TYPE": "disk"}
474 device_info(devices
=self
.data
,lsblk
=lsblk
)
475 sda
= device
.Device("/dev/sda")
476 sdc
= device
.Device("/dev/sdc")
481 def test_invalid_alphabetical_ordering(self
, device_info
):
482 lsblk
= {"TYPE": "disk"}
483 device_info(devices
=self
.data
,lsblk
=lsblk
)
484 sdb
= device
.Device("/dev/sdb")
485 sdd
= device
.Device("/dev/sdd")
491 class TestCephDiskDevice(object):
493 def test_partlabel_lsblk(self
, device_info
):
494 lsblk
= {"TYPE": "disk", "PARTLABEL": ""}
495 device_info(lsblk
=lsblk
)
496 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
498 assert disk
.partlabel
== ''
500 def test_partlabel_blkid(self
, device_info
):
501 blkid
= {"TYPE": "disk", "PARTLABEL": "ceph data"}
502 device_info(blkid
=blkid
)
503 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
505 assert disk
.partlabel
== 'ceph data'
507 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
508 "disable_kernel_queries")
509 def test_is_member_blkid(self
, monkeypatch
, patch_bluestore_label
):
510 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
512 assert disk
.is_member
is True
514 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
515 "disable_kernel_queries")
516 def test_is_member_lsblk(self
, patch_bluestore_label
, device_info
):
517 lsblk
= {"TYPE": "disk", "PARTLABEL": "ceph"}
518 device_info(lsblk
=lsblk
)
519 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
521 assert disk
.is_member
is True
523 def test_unknown_type(self
, device_info
):
524 lsblk
= {"TYPE": "disk", "PARTLABEL": "gluster"}
525 device_info(lsblk
=lsblk
)
526 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
528 assert disk
.type == 'unknown'
530 ceph_types
= ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
532 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
533 "disable_kernel_queries")
534 def test_type_blkid(self
, monkeypatch
, device_info
, ceph_partlabel
):
535 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
537 assert disk
.type in self
.ceph_types
539 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
540 "lsblk_ceph_disk_member",
541 "disable_kernel_queries")
542 def test_type_lsblk(self
, device_info
, ceph_partlabel
):
543 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
545 assert disk
.type in self
.ceph_types