]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
0df5ac61e9ea319f79f27bc3e6cf8ce69d0691cb
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / util / test_device.py
1 import pytest
2 from copy import deepcopy
3 from ceph_volume.util import device
4 from ceph_volume.api import lvm as api
5
6
7 class TestDevice(object):
8
9 def test_sys_api(self, monkeypatch, device_info):
10 volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
11 lv_tags={}, lv_path='/dev/VolGroup/lv')
12 volumes = []
13 volumes.append(volume)
14 monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
15 deepcopy(volumes))
16
17 data = {"/dev/sda": {"foo": "bar"}}
18 lsblk = {"TYPE": "disk"}
19 device_info(devices=data,lsblk=lsblk)
20 disk = device.Device("/dev/sda")
21 assert disk.sys_api
22 assert "foo" in disk.sys_api
23
24 def test_lvm_size(self, monkeypatch, device_info):
25 volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
26 lv_tags={}, lv_path='/dev/VolGroup/lv')
27 volumes = []
28 volumes.append(volume)
29 monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
30 deepcopy(volumes))
31
32 # 5GB in size
33 data = {"/dev/sda": {"size": "5368709120"}}
34 lsblk = {"TYPE": "disk"}
35 device_info(devices=data,lsblk=lsblk)
36 disk = device.Device("/dev/sda")
37 assert disk.lvm_size.gb == 4
38
39 def test_lvm_size_rounds_down(self, device_info):
40 # 5.5GB in size
41 data = {"/dev/sda": {"size": "5905580032"}}
42 lsblk = {"TYPE": "disk"}
43 device_info(devices=data,lsblk=lsblk)
44 disk = device.Device("/dev/sda")
45 assert disk.lvm_size.gb == 4
46
47 def test_is_lv(self, device_info):
48 data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
49 lsblk = {"TYPE": "lvm"}
50 device_info(lv=data,lsblk=lsblk)
51 disk = device.Device("vg/lv")
52 assert disk.is_lv
53
54 def test_vgs_is_empty(self, device_info, monkeypatch):
55 BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
56 pv_tags={})
57 pvolumes = []
58 pvolumes.append(BarPVolume)
59 lsblk = {"TYPE": "disk"}
60 device_info(lsblk=lsblk)
61 monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: {})
62
63 disk = device.Device("/dev/nvme0n1")
64 assert disk.vgs == []
65
66 def test_vgs_is_not_empty(self, device_info, monkeypatch):
67 vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
68 vg_extent_size=1073741824)
69 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
70 lsblk = {"TYPE": "disk"}
71 device_info(lsblk=lsblk)
72 disk = device.Device("/dev/nvme0n1")
73 assert len(disk.vgs) == 1
74
75 def test_device_is_device(self, device_info):
76 data = {"/dev/sda": {"foo": "bar"}}
77 lsblk = {"TYPE": "device"}
78 device_info(devices=data, lsblk=lsblk)
79 disk = device.Device("/dev/sda")
80 assert disk.is_device is True
81
82 def test_device_is_rotational(self, device_info):
83 data = {"/dev/sda": {"rotational": "1"}}
84 lsblk = {"TYPE": "device"}
85 device_info(devices=data, lsblk=lsblk)
86 disk = device.Device("/dev/sda")
87 assert disk.rotational
88
89 def test_device_is_not_rotational(self, device_info):
90 data = {"/dev/sda": {"rotational": "0"}}
91 lsblk = {"TYPE": "device"}
92 device_info(devices=data, lsblk=lsblk)
93 disk = device.Device("/dev/sda")
94 assert not disk.rotational
95
96 def test_device_is_rotational_lsblk(self, device_info):
97 data = {"/dev/sda": {"foo": "bar"}}
98 lsblk = {"TYPE": "device", "ROTA": "1"}
99 device_info(devices=data, lsblk=lsblk)
100 disk = device.Device("/dev/sda")
101 assert disk.rotational
102
103 def test_device_is_not_rotational_lsblk(self, device_info):
104 data = {"/dev/sda": {"rotational": "0"}}
105 lsblk = {"TYPE": "device", "ROTA": "0"}
106 device_info(devices=data, lsblk=lsblk)
107 disk = device.Device("/dev/sda")
108 assert not disk.rotational
109
110 def test_device_is_rotational_defaults_true(self, device_info):
111 # rotational will default true if no info from sys_api or lsblk is found
112 data = {"/dev/sda": {"foo": "bar"}}
113 lsblk = {"TYPE": "device", "foo": "bar"}
114 device_info(devices=data, lsblk=lsblk)
115 disk = device.Device("/dev/sda")
116 assert disk.rotational
117
118 def test_disk_is_device(self, device_info):
119 data = {"/dev/sda": {"foo": "bar"}}
120 lsblk = {"TYPE": "disk"}
121 device_info(devices=data, lsblk=lsblk)
122 disk = device.Device("/dev/sda")
123 assert disk.is_device is True
124
125 def test_is_partition(self, device_info):
126 data = {"/dev/sda1": {"foo": "bar"}}
127 lsblk = {"TYPE": "part"}
128 device_info(devices=data, lsblk=lsblk)
129 disk = device.Device("/dev/sda1")
130 assert disk.is_partition
131
132 def test_is_not_acceptable_device(self, device_info):
133 data = {"/dev/dm-0": {"foo": "bar"}}
134 lsblk = {"TYPE": "mpath"}
135 device_info(devices=data, lsblk=lsblk)
136 disk = device.Device("/dev/dm-0")
137 assert not disk.is_device
138
139 def test_is_not_lvm_memeber(self, device_info):
140 data = {"/dev/sda1": {"foo": "bar"}}
141 lsblk = {"TYPE": "part"}
142 device_info(devices=data, lsblk=lsblk)
143 disk = device.Device("/dev/sda1")
144 assert not disk.is_lvm_member
145
146 def test_is_lvm_memeber(self, device_info):
147 data = {"/dev/sda1": {"foo": "bar"}}
148 lsblk = {"TYPE": "part"}
149 device_info(devices=data, lsblk=lsblk)
150 disk = device.Device("/dev/sda1")
151 assert not disk.is_lvm_member
152
153 def test_is_mapper_device(self, device_info):
154 lsblk = {"TYPE": "lvm"}
155 device_info(lsblk=lsblk)
156 disk = device.Device("/dev/mapper/foo")
157 assert disk.is_mapper
158
159 def test_dm_is_mapper_device(self, device_info):
160 lsblk = {"TYPE": "lvm"}
161 device_info(lsblk=lsblk)
162 disk = device.Device("/dev/dm-4")
163 assert disk.is_mapper
164
165 def test_is_not_mapper_device(self, device_info):
166 lsblk = {"TYPE": "disk"}
167 device_info(lsblk=lsblk)
168 disk = device.Device("/dev/sda")
169 assert not disk.is_mapper
170
171 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
172 "disable_kernel_queries")
173 def test_is_ceph_disk_lsblk(self, monkeypatch, patch_bluestore_label):
174 disk = device.Device("/dev/sda")
175 assert disk.is_ceph_disk_member
176
177 @pytest.mark.usefixtures("blkid_ceph_disk_member",
178 "disable_kernel_queries")
179 def test_is_ceph_disk_blkid(self, monkeypatch, patch_bluestore_label):
180 disk = device.Device("/dev/sda")
181 assert disk.is_ceph_disk_member
182
183 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
184 "disable_kernel_queries")
185 def test_is_ceph_disk_member_not_available_lsblk(self, monkeypatch, patch_bluestore_label):
186 disk = device.Device("/dev/sda")
187 assert disk.is_ceph_disk_member
188 assert not disk.available
189 assert "Used by ceph-disk" in disk.rejected_reasons
190
191 @pytest.mark.usefixtures("blkid_ceph_disk_member",
192 "disable_kernel_queries")
193 def test_is_ceph_disk_member_not_available_blkid(self, monkeypatch, patch_bluestore_label):
194 disk = device.Device("/dev/sda")
195 assert disk.is_ceph_disk_member
196 assert not disk.available
197 assert "Used by ceph-disk" in disk.rejected_reasons
198
199 def test_reject_removable_device(self, device_info):
200 data = {"/dev/sdb": {"removable": 1}}
201 lsblk = {"TYPE": "disk"}
202 device_info(devices=data,lsblk=lsblk)
203 disk = device.Device("/dev/sdb")
204 assert not disk.available
205
206 def test_reject_device_with_gpt_headers(self, device_info):
207 data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
208 lsblk = {"TYPE": "disk"}
209 blkid= {"PTTYPE": "gpt"}
210 device_info(
211 devices=data,
212 blkid=blkid,
213 lsblk=lsblk,
214 )
215 disk = device.Device("/dev/sdb")
216 assert not disk.available
217
218 def test_accept_non_removable_device(self, device_info):
219 data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
220 lsblk = {"TYPE": "disk"}
221 device_info(devices=data,lsblk=lsblk)
222 disk = device.Device("/dev/sdb")
223 assert disk.available
224
225 def test_reject_not_acceptable_device(self, device_info):
226 data = {"/dev/dm-0": {"foo": "bar"}}
227 lsblk = {"TYPE": "mpath"}
228 device_info(devices=data, lsblk=lsblk)
229 disk = device.Device("/dev/dm-0")
230 assert not disk.available
231
232 def test_reject_readonly_device(self, device_info):
233 data = {"/dev/cdrom": {"ro": 1}}
234 lsblk = {"TYPE": "disk"}
235 device_info(devices=data,lsblk=lsblk)
236 disk = device.Device("/dev/cdrom")
237 assert not disk.available
238
239 def test_reject_smaller_than_5gb(self, device_info):
240 data = {"/dev/sda": {"size": 5368709119}}
241 lsblk = {"TYPE": "disk"}
242 device_info(devices=data,lsblk=lsblk)
243 disk = device.Device("/dev/sda")
244 assert not disk.available, 'too small device is available'
245
246 def test_accept_non_readonly_device(self, device_info):
247 data = {"/dev/sda": {"ro": 0, "size": 5368709120}}
248 lsblk = {"TYPE": "disk"}
249 device_info(devices=data,lsblk=lsblk)
250 disk = device.Device("/dev/sda")
251 assert disk.available
252
253 def test_reject_bluestore_device(self, monkeypatch, patch_bluestore_label, device_info):
254 patch_bluestore_label.return_value = True
255 lsblk = {"TYPE": "disk"}
256 device_info(lsblk=lsblk)
257 disk = device.Device("/dev/sda")
258 assert not disk.available
259 assert "Has BlueStore device label" in disk.rejected_reasons
260
261 @pytest.mark.usefixtures("device_info_not_ceph_disk_member",
262 "disable_kernel_queries")
263 def test_is_not_ceph_disk_member_lsblk(self, patch_bluestore_label):
264 disk = device.Device("/dev/sda")
265 assert disk.is_ceph_disk_member is False
266
267 def test_existing_vg_available(self, monkeypatch, device_info):
268 vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=1536,
269 vg_extent_size=4194304)
270 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
271 lsblk = {"TYPE": "disk"}
272 data = {"/dev/nvme0n1": {"size": "6442450944"}}
273 device_info(devices=data, lsblk=lsblk)
274 disk = device.Device("/dev/nvme0n1")
275 assert disk.available_lvm
276 assert not disk.available
277 assert not disk.available_raw
278
279 def test_existing_vg_too_small(self, monkeypatch, device_info):
280 vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4,
281 vg_extent_size=1073741824)
282 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
283 lsblk = {"TYPE": "disk"}
284 data = {"/dev/nvme0n1": {"size": "6442450944"}}
285 device_info(devices=data, lsblk=lsblk)
286 disk = device.Device("/dev/nvme0n1")
287 assert not disk.available_lvm
288 assert not disk.available
289 assert not disk.available_raw
290
291 def test_multiple_existing_vgs(self, monkeypatch, device_info):
292 vg1 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=1000,
293 vg_extent_size=4194304)
294 vg2 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=536,
295 vg_extent_size=4194304)
296 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg1, vg2])
297 lsblk = {"TYPE": "disk"}
298 data = {"/dev/nvme0n1": {"size": "6442450944"}}
299 device_info(devices=data, lsblk=lsblk)
300 disk = device.Device("/dev/nvme0n1")
301 assert disk.available_lvm
302 assert not disk.available
303 assert not disk.available_raw
304
305 @pytest.mark.parametrize("ceph_type", ["data", "block"])
306 def test_used_by_ceph(self, device_info,
307 monkeypatch, ceph_type):
308 data = {"/dev/sda": {"foo": "bar"}}
309 lsblk = {"TYPE": "part"}
310 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
311 lv_uuid="0000", pv_tags={}, vg_name="vg")
312 pvolumes = []
313 pvolumes.append(FooPVolume)
314 lv_data = {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg",
315 "lv_uuid": "0000", "lv_tags":
316 "ceph.osd_id=0,ceph.type="+ceph_type}
317 volumes = []
318 lv = api.Volume(**lv_data)
319 volumes.append(lv)
320 monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
321 monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
322 deepcopy(volumes))
323
324 device_info(devices=data, lsblk=lsblk, lv=lv_data)
325 vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
326 vg_extent_size=1073741824)
327 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
328 disk = device.Device("/dev/sda")
329 assert disk.used_by_ceph
330
331 def test_not_used_by_ceph(self, device_info, monkeypatch):
332 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
333 pvolumes = []
334 pvolumes.append(FooPVolume)
335 data = {"/dev/sda": {"foo": "bar"}}
336 lsblk = {"TYPE": "part"}
337 lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
338 monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
339
340 device_info(devices=data, lsblk=lsblk, lv=lv_data)
341 disk = device.Device("/dev/sda")
342 assert not disk.used_by_ceph
343
344 def test_get_device_id(self, device_info):
345 udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
346 lsblk = {"TYPE": "disk"}
347 device_info(udevadm=udev,lsblk=lsblk)
348 disk = device.Device("/dev/sda")
349 assert disk._get_device_id() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
350
351
352
353 class TestDeviceEncryption(object):
354
355 def test_partition_is_not_encrypted_lsblk(self, device_info):
356 lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs'}
357 device_info(lsblk=lsblk)
358 disk = device.Device("/dev/sda")
359 assert disk.is_encrypted is False
360
361 def test_partition_is_encrypted_lsblk(self, device_info):
362 lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS'}
363 device_info(lsblk=lsblk)
364 disk = device.Device("/dev/sda")
365 assert disk.is_encrypted is True
366
367 def test_partition_is_not_encrypted_blkid(self, device_info):
368 lsblk = {'TYPE': 'part'}
369 blkid = {'TYPE': 'ceph data'}
370 device_info(lsblk=lsblk, blkid=blkid)
371 disk = device.Device("/dev/sda")
372 assert disk.is_encrypted is False
373
374 def test_partition_is_encrypted_blkid(self, device_info):
375 lsblk = {'TYPE': 'part'}
376 blkid = {'TYPE': 'crypto_LUKS'}
377 device_info(lsblk=lsblk, blkid=blkid)
378 disk = device.Device("/dev/sda")
379 assert disk.is_encrypted is True
380
381 def test_mapper_is_encrypted_luks1(self, device_info, monkeypatch):
382 status = {'type': 'LUKS1'}
383 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
384 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
385 blkid = {'TYPE': 'mapper'}
386 device_info(lsblk=lsblk, blkid=blkid)
387 disk = device.Device("/dev/mapper/uuid")
388 assert disk.is_encrypted is True
389
390 def test_mapper_is_encrypted_luks2(self, device_info, monkeypatch):
391 status = {'type': 'LUKS2'}
392 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
393 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
394 blkid = {'TYPE': 'mapper'}
395 device_info(lsblk=lsblk, blkid=blkid)
396 disk = device.Device("/dev/mapper/uuid")
397 assert disk.is_encrypted is True
398
399 def test_mapper_is_encrypted_plain(self, device_info, monkeypatch):
400 status = {'type': 'PLAIN'}
401 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
402 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
403 blkid = {'TYPE': 'mapper'}
404 device_info(lsblk=lsblk, blkid=blkid)
405 disk = device.Device("/dev/mapper/uuid")
406 assert disk.is_encrypted is True
407
408 def test_mapper_is_not_encrypted_plain(self, device_info, monkeypatch):
409 monkeypatch.setattr(device, 'encryption_status', lambda x: {})
410 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
411 blkid = {'TYPE': 'mapper'}
412 device_info(lsblk=lsblk, blkid=blkid)
413 disk = device.Device("/dev/mapper/uuid")
414 assert disk.is_encrypted is False
415
416 def test_lv_is_encrypted_blkid(self, device_info):
417 lsblk = {'TYPE': 'lvm'}
418 blkid = {'TYPE': 'crypto_LUKS'}
419 device_info(lsblk=lsblk, blkid=blkid)
420 disk = device.Device("/dev/sda")
421 disk.lv_api = {}
422 assert disk.is_encrypted is True
423
424 def test_lv_is_not_encrypted_blkid(self, factory, device_info):
425 lsblk = {'TYPE': 'lvm'}
426 blkid = {'TYPE': 'xfs'}
427 device_info(lsblk=lsblk, blkid=blkid)
428 disk = device.Device("/dev/sda")
429 disk.lv_api = factory(encrypted=None)
430 assert disk.is_encrypted is False
431
432 def test_lv_is_encrypted_lsblk(self, device_info):
433 lsblk = {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
434 blkid = {'TYPE': 'mapper'}
435 device_info(lsblk=lsblk, blkid=blkid)
436 disk = device.Device("/dev/sda")
437 disk.lv_api = {}
438 assert disk.is_encrypted is True
439
440 def test_lv_is_not_encrypted_lsblk(self, factory, device_info):
441 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
442 blkid = {'TYPE': 'mapper'}
443 device_info(lsblk=lsblk, blkid=blkid)
444 disk = device.Device("/dev/sda")
445 disk.lv_api = factory(encrypted=None)
446 assert disk.is_encrypted is False
447
448 def test_lv_is_encrypted_lvm_api(self, factory, device_info):
449 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
450 blkid = {'TYPE': 'mapper'}
451 device_info(lsblk=lsblk, blkid=blkid)
452 disk = device.Device("/dev/sda")
453 disk.lv_api = factory(encrypted=True)
454 assert disk.is_encrypted is True
455
456 def test_lv_is_not_encrypted_lvm_api(self, factory, device_info):
457 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
458 blkid = {'TYPE': 'mapper'}
459 device_info(lsblk=lsblk, blkid=blkid)
460 disk = device.Device("/dev/sda")
461 disk.lv_api = factory(encrypted=False)
462 assert disk.is_encrypted is False
463
464
465 class TestDeviceOrdering(object):
466
467 def setup(self):
468 self.data = {
469 "/dev/sda": {"removable": 0},
470 "/dev/sdb": {"removable": 1}, # invalid
471 "/dev/sdc": {"removable": 0},
472 "/dev/sdd": {"removable": 1}, # invalid
473 }
474
475 def test_valid_before_invalid(self, device_info):
476 lsblk = {"TYPE": "disk"}
477 device_info(devices=self.data,lsblk=lsblk)
478 sda = device.Device("/dev/sda")
479 sdb = device.Device("/dev/sdb")
480
481 assert sda < sdb
482 assert sdb > sda
483
484 def test_valid_alphabetical_ordering(self, device_info):
485 lsblk = {"TYPE": "disk"}
486 device_info(devices=self.data,lsblk=lsblk)
487 sda = device.Device("/dev/sda")
488 sdc = device.Device("/dev/sdc")
489
490 assert sda < sdc
491 assert sdc > sda
492
493 def test_invalid_alphabetical_ordering(self, device_info):
494 lsblk = {"TYPE": "disk"}
495 device_info(devices=self.data,lsblk=lsblk)
496 sdb = device.Device("/dev/sdb")
497 sdd = device.Device("/dev/sdd")
498
499 assert sdb < sdd
500 assert sdd > sdb
501
502
503 class TestCephDiskDevice(object):
504
505 def test_partlabel_lsblk(self, device_info):
506 lsblk = {"TYPE": "disk", "PARTLABEL": ""}
507 device_info(lsblk=lsblk)
508 disk = device.CephDiskDevice(device.Device("/dev/sda"))
509
510 assert disk.partlabel == ''
511
512 def test_partlabel_blkid(self, device_info):
513 blkid = {"TYPE": "disk", "PARTLABEL": "ceph data"}
514 device_info(blkid=blkid)
515 disk = device.CephDiskDevice(device.Device("/dev/sda"))
516
517 assert disk.partlabel == 'ceph data'
518
519 @pytest.mark.usefixtures("blkid_ceph_disk_member",
520 "disable_kernel_queries")
521 def test_is_member_blkid(self, monkeypatch, patch_bluestore_label):
522 disk = device.CephDiskDevice(device.Device("/dev/sda"))
523
524 assert disk.is_member is True
525
526 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
527 "disable_kernel_queries")
528 def test_is_member_lsblk(self, patch_bluestore_label, device_info):
529 lsblk = {"TYPE": "disk", "PARTLABEL": "ceph"}
530 device_info(lsblk=lsblk)
531 disk = device.CephDiskDevice(device.Device("/dev/sda"))
532
533 assert disk.is_member is True
534
535 def test_unknown_type(self, device_info):
536 lsblk = {"TYPE": "disk", "PARTLABEL": "gluster"}
537 device_info(lsblk=lsblk)
538 disk = device.CephDiskDevice(device.Device("/dev/sda"))
539
540 assert disk.type == 'unknown'
541
542 ceph_types = ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
543
544 @pytest.mark.usefixtures("blkid_ceph_disk_member",
545 "disable_kernel_queries")
546 def test_type_blkid(self, monkeypatch, device_info, ceph_partlabel):
547 disk = device.CephDiskDevice(device.Device("/dev/sda"))
548
549 assert disk.type in self.ceph_types
550
551 @pytest.mark.usefixtures("blkid_ceph_disk_member",
552 "lsblk_ceph_disk_member",
553 "disable_kernel_queries")
554 def test_type_lsblk(self, device_info, ceph_partlabel):
555 disk = device.CephDiskDevice(device.Device("/dev/sda"))
556
557 assert disk.type in self.ceph_types