]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
2 from copy
import deepcopy
3 from ceph_volume
.util
import device
4 from ceph_volume
.api
import lvm
as api
5 from mock
.mock
import patch
, mock_open
8 class TestDevice(object):
10 def test_sys_api(self
, monkeypatch
, device_info
):
11 volume
= api
.Volume(lv_name
='lv', lv_uuid
='y', vg_name
='vg',
12 lv_tags
={}, lv_path
='/dev/VolGroup/lv')
14 volumes
.append(volume
)
15 monkeypatch
.setattr(api
, 'get_lvs', lambda **kwargs
:
18 data
= {"/dev/sda": {"foo": "bar"}}
19 lsblk
= {"TYPE": "disk"}
20 device_info(devices
=data
,lsblk
=lsblk
)
21 disk
= device
.Device("/dev/sda")
23 assert "foo" in disk
.sys_api
25 def test_lvm_size(self
, monkeypatch
, device_info
):
26 volume
= api
.Volume(lv_name
='lv', lv_uuid
='y', vg_name
='vg',
27 lv_tags
={}, lv_path
='/dev/VolGroup/lv')
29 volumes
.append(volume
)
30 monkeypatch
.setattr(api
, 'get_lvs', lambda **kwargs
:
34 data
= {"/dev/sda": {"size": "5368709120"}}
35 lsblk
= {"TYPE": "disk"}
36 device_info(devices
=data
,lsblk
=lsblk
)
37 disk
= device
.Device("/dev/sda")
38 assert disk
.lvm_size
.gb
== 4
40 def test_lvm_size_rounds_down(self
, device_info
):
42 data
= {"/dev/sda": {"size": "5905580032"}}
43 lsblk
= {"TYPE": "disk"}
44 device_info(devices
=data
,lsblk
=lsblk
)
45 disk
= device
.Device("/dev/sda")
46 assert disk
.lvm_size
.gb
== 4
48 def test_is_lv(self
, device_info
):
49 data
= {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
50 lsblk
= {"TYPE": "lvm"}
51 device_info(lv
=data
,lsblk
=lsblk
)
52 disk
= device
.Device("vg/lv")
55 def test_vgs_is_empty(self
, device_info
, monkeypatch
):
56 BarPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000",
59 pvolumes
.append(BarPVolume
)
60 lsblk
= {"TYPE": "disk"}
61 device_info(lsblk
=lsblk
)
62 monkeypatch
.setattr(api
, 'get_pvs', lambda **kwargs
: {})
64 disk
= device
.Device("/dev/nvme0n1")
67 def test_vgs_is_not_empty(self
, device_info
, monkeypatch
):
68 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=6,
69 vg_extent_size
=1073741824)
70 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
71 lsblk
= {"TYPE": "disk"}
72 device_info(lsblk
=lsblk
)
73 disk
= device
.Device("/dev/nvme0n1")
74 assert len(disk
.vgs
) == 1
76 def test_device_is_device(self
, device_info
):
77 data
= {"/dev/sda": {"foo": "bar"}}
78 lsblk
= {"TYPE": "device"}
79 device_info(devices
=data
, lsblk
=lsblk
)
80 disk
= device
.Device("/dev/sda")
81 assert disk
.is_device
is True
83 def test_device_is_rotational(self
, device_info
):
84 data
= {"/dev/sda": {"rotational": "1"}}
85 lsblk
= {"TYPE": "device"}
86 device_info(devices
=data
, lsblk
=lsblk
)
87 disk
= device
.Device("/dev/sda")
88 assert disk
.rotational
90 def test_device_is_not_rotational(self
, device_info
):
91 data
= {"/dev/sda": {"rotational": "0"}}
92 lsblk
= {"TYPE": "device"}
93 device_info(devices
=data
, lsblk
=lsblk
)
94 disk
= device
.Device("/dev/sda")
95 assert not disk
.rotational
97 def test_device_is_rotational_lsblk(self
, device_info
):
98 data
= {"/dev/sda": {"foo": "bar"}}
99 lsblk
= {"TYPE": "device", "ROTA": "1"}
100 device_info(devices
=data
, lsblk
=lsblk
)
101 disk
= device
.Device("/dev/sda")
102 assert disk
.rotational
104 def test_device_is_not_rotational_lsblk(self
, device_info
):
105 data
= {"/dev/sda": {"rotational": "0"}}
106 lsblk
= {"TYPE": "device", "ROTA": "0"}
107 device_info(devices
=data
, lsblk
=lsblk
)
108 disk
= device
.Device("/dev/sda")
109 assert not disk
.rotational
111 def test_device_is_rotational_defaults_true(self
, device_info
):
112 # rotational will default true if no info from sys_api or lsblk is found
113 data
= {"/dev/sda": {"foo": "bar"}}
114 lsblk
= {"TYPE": "device", "foo": "bar"}
115 device_info(devices
=data
, lsblk
=lsblk
)
116 disk
= device
.Device("/dev/sda")
117 assert disk
.rotational
119 def test_disk_is_device(self
, device_info
):
120 data
= {"/dev/sda": {"foo": "bar"}}
121 lsblk
= {"TYPE": "disk"}
122 device_info(devices
=data
, lsblk
=lsblk
)
123 disk
= device
.Device("/dev/sda")
124 assert disk
.is_device
is True
126 def test_is_partition(self
, device_info
):
127 data
= {"/dev/sda1": {"foo": "bar"}}
128 lsblk
= {"TYPE": "part", "PKNAME": "sda"}
129 device_info(devices
=data
, lsblk
=lsblk
)
130 disk
= device
.Device("/dev/sda1")
131 assert disk
.is_partition
133 def test_is_not_acceptable_device(self
, device_info
):
134 data
= {"/dev/dm-0": {"foo": "bar"}}
135 lsblk
= {"TYPE": "mpath"}
136 device_info(devices
=data
, lsblk
=lsblk
)
137 disk
= device
.Device("/dev/dm-0")
138 assert not disk
.is_device
140 def test_is_not_lvm_memeber(self
, device_info
):
141 data
= {"/dev/sda1": {"foo": "bar"}}
142 lsblk
= {"TYPE": "part", "PKNAME": "sda"}
143 device_info(devices
=data
, lsblk
=lsblk
)
144 disk
= device
.Device("/dev/sda1")
145 assert not disk
.is_lvm_member
147 def test_is_lvm_memeber(self
, device_info
):
148 data
= {"/dev/sda1": {"foo": "bar"}}
149 lsblk
= {"TYPE": "part", "PKNAME": "sda"}
150 device_info(devices
=data
, lsblk
=lsblk
)
151 disk
= device
.Device("/dev/sda1")
152 assert not disk
.is_lvm_member
154 def test_is_mapper_device(self
, device_info
):
155 lsblk
= {"TYPE": "lvm"}
156 device_info(lsblk
=lsblk
)
157 disk
= device
.Device("/dev/mapper/foo")
158 assert disk
.is_mapper
160 def test_dm_is_mapper_device(self
, device_info
):
161 lsblk
= {"TYPE": "lvm"}
162 device_info(lsblk
=lsblk
)
163 disk
= device
.Device("/dev/dm-4")
164 assert disk
.is_mapper
166 def test_is_not_mapper_device(self
, device_info
):
167 lsblk
= {"TYPE": "disk"}
168 device_info(lsblk
=lsblk
)
169 disk
= device
.Device("/dev/sda")
170 assert not disk
.is_mapper
172 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
173 "disable_kernel_queries")
174 def test_is_ceph_disk_lsblk(self
, monkeypatch
, patch_bluestore_label
):
175 disk
= device
.Device("/dev/sda")
176 assert disk
.is_ceph_disk_member
178 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
179 "disable_kernel_queries")
180 def test_is_ceph_disk_blkid(self
, monkeypatch
, patch_bluestore_label
):
181 disk
= device
.Device("/dev/sda")
182 assert disk
.is_ceph_disk_member
184 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
185 "disable_kernel_queries")
186 def test_is_ceph_disk_member_not_available_lsblk(self
, monkeypatch
, patch_bluestore_label
):
187 disk
= device
.Device("/dev/sda")
188 assert disk
.is_ceph_disk_member
189 assert not disk
.available
190 assert "Used by ceph-disk" in disk
.rejected_reasons
192 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
193 "disable_kernel_queries")
194 def test_is_ceph_disk_member_not_available_blkid(self
, monkeypatch
, patch_bluestore_label
):
195 disk
= device
.Device("/dev/sda")
196 assert disk
.is_ceph_disk_member
197 assert not disk
.available
198 assert "Used by ceph-disk" in disk
.rejected_reasons
200 def test_reject_removable_device(self
, device_info
):
201 data
= {"/dev/sdb": {"removable": 1}}
202 lsblk
= {"TYPE": "disk"}
203 device_info(devices
=data
,lsblk
=lsblk
)
204 disk
= device
.Device("/dev/sdb")
205 assert not disk
.available
207 def test_reject_device_with_gpt_headers(self
, device_info
):
208 data
= {"/dev/sdb": {"removable": 0, "size": 5368709120}}
209 lsblk
= {"TYPE": "disk"}
210 blkid
= {"PTTYPE": "gpt"}
216 disk
= device
.Device("/dev/sdb")
217 assert not disk
.available
219 def test_accept_non_removable_device(self
, device_info
):
220 data
= {"/dev/sdb": {"removable": 0, "size": 5368709120}}
221 lsblk
= {"TYPE": "disk"}
222 device_info(devices
=data
,lsblk
=lsblk
)
223 disk
= device
.Device("/dev/sdb")
224 assert disk
.available
226 def test_reject_not_acceptable_device(self
, device_info
):
227 data
= {"/dev/dm-0": {"foo": "bar"}}
228 lsblk
= {"TYPE": "mpath"}
229 device_info(devices
=data
, lsblk
=lsblk
)
230 disk
= device
.Device("/dev/dm-0")
231 assert not disk
.available
233 def test_reject_readonly_device(self
, device_info
):
234 data
= {"/dev/cdrom": {"ro": 1}}
235 lsblk
= {"TYPE": "disk"}
236 device_info(devices
=data
,lsblk
=lsblk
)
237 disk
= device
.Device("/dev/cdrom")
238 assert not disk
.available
240 def test_reject_smaller_than_5gb(self
, device_info
):
241 data
= {"/dev/sda": {"size": 5368709119}}
242 lsblk
= {"TYPE": "disk"}
243 device_info(devices
=data
,lsblk
=lsblk
)
244 disk
= device
.Device("/dev/sda")
245 assert not disk
.available
, 'too small device is available'
247 def test_accept_non_readonly_device(self
, device_info
):
248 data
= {"/dev/sda": {"ro": 0, "size": 5368709120}}
249 lsblk
= {"TYPE": "disk"}
250 device_info(devices
=data
,lsblk
=lsblk
)
251 disk
= device
.Device("/dev/sda")
252 assert disk
.available
254 def test_reject_bluestore_device(self
, monkeypatch
, patch_bluestore_label
, device_info
):
255 patch_bluestore_label
.return_value
= True
256 lsblk
= {"TYPE": "disk"}
257 device_info(lsblk
=lsblk
)
258 disk
= device
.Device("/dev/sda")
259 assert not disk
.available
260 assert "Has BlueStore device label" in disk
.rejected_reasons
262 def test_reject_device_with_oserror(self
, monkeypatch
, patch_bluestore_label
, device_info
):
263 patch_bluestore_label
.side_effect
= OSError('test failure')
264 lsblk
= {"TYPE": "disk"}
265 device_info(lsblk
=lsblk
)
266 disk
= device
.Device("/dev/sda")
267 assert not disk
.available
268 assert "Failed to determine if device is BlueStore" in disk
.rejected_reasons
270 @pytest.mark
.usefixtures("device_info_not_ceph_disk_member",
271 "disable_kernel_queries")
272 def test_is_not_ceph_disk_member_lsblk(self
, patch_bluestore_label
):
273 disk
= device
.Device("/dev/sda")
274 assert disk
.is_ceph_disk_member
is False
276 def test_existing_vg_available(self
, monkeypatch
, device_info
):
277 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=1536,
278 vg_extent_size
=4194304)
279 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
280 lsblk
= {"TYPE": "disk"}
281 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
282 device_info(devices
=data
, lsblk
=lsblk
)
283 disk
= device
.Device("/dev/nvme0n1")
284 assert disk
.available_lvm
285 assert not disk
.available
286 assert not disk
.available_raw
288 def test_existing_vg_too_small(self
, monkeypatch
, device_info
):
289 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=4,
290 vg_extent_size
=1073741824)
291 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
292 lsblk
= {"TYPE": "disk"}
293 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
294 device_info(devices
=data
, lsblk
=lsblk
)
295 disk
= device
.Device("/dev/nvme0n1")
296 assert not disk
.available_lvm
297 assert not disk
.available
298 assert not disk
.available_raw
300 def test_multiple_existing_vgs(self
, monkeypatch
, device_info
):
301 vg1
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=1000,
302 vg_extent_size
=4194304)
303 vg2
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=536,
304 vg_extent_size
=4194304)
305 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg1
, vg2
])
306 lsblk
= {"TYPE": "disk"}
307 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
308 device_info(devices
=data
, lsblk
=lsblk
)
309 disk
= device
.Device("/dev/nvme0n1")
310 assert disk
.available_lvm
311 assert not disk
.available
312 assert not disk
.available_raw
314 @pytest.mark
.parametrize("ceph_type", ["data", "block"])
315 def test_used_by_ceph(self
, device_info
,
316 monkeypatch
, ceph_type
):
317 data
= {"/dev/sda": {"foo": "bar"}}
318 lsblk
= {"TYPE": "part", "PKNAME": "sda"}
319 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000",
320 lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
322 pvolumes
.append(FooPVolume
)
323 lv_data
= {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg",
324 "lv_uuid": "0000", "lv_tags":
325 "ceph.osd_id=0,ceph.type="+ceph_type
}
327 lv
= api
.Volume(**lv_data
)
329 monkeypatch
.setattr(api
, 'get_pvs', lambda **kwargs
: pvolumes
)
330 monkeypatch
.setattr(api
, 'get_lvs', lambda **kwargs
:
333 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
334 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=6,
335 vg_extent_size
=1073741824)
336 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
337 disk
= device
.Device("/dev/sda")
338 assert disk
.used_by_ceph
340 def test_not_used_by_ceph(self
, device_info
, monkeypatch
):
341 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
343 pvolumes
.append(FooPVolume
)
344 data
= {"/dev/sda": {"foo": "bar"}}
345 lsblk
= {"TYPE": "part", "PKNAME": "sda"}
346 lv_data
= {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
347 monkeypatch
.setattr(api
, 'get_pvs', lambda **kwargs
: pvolumes
)
349 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
350 disk
= device
.Device("/dev/sda")
351 assert not disk
.used_by_ceph
353 def test_get_device_id(self
, device_info
):
354 udev
= {k
:k
for k
in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
355 lsblk
= {"TYPE": "disk"}
356 device_info(udevadm
=udev
,lsblk
=lsblk
)
357 disk
= device
.Device("/dev/sda")
358 assert disk
._get
_device
_id
() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
360 def test_has_bluestore_label(self
):
361 # patch device.Device __init__ function to do nothing since we want to only test the
362 # low-level behavior of has_bluestore_label
363 with patch
.object(device
.Device
, "__init__", lambda self
, path
, with_lsm
=False: None):
364 disk
= device
.Device("/dev/sda")
365 disk
.abspath
= "/dev/sda"
366 with
patch('builtins.open', mock_open(read_data
=b
'bluestore block device\n')):
367 assert disk
.has_bluestore_label
368 with
patch('builtins.open', mock_open(read_data
=b
'not a bluestore block device\n')):
369 assert not disk
.has_bluestore_label
372 class TestDeviceEncryption(object):
374 def test_partition_is_not_encrypted_lsblk(self
, device_info
):
375 lsblk
= {'TYPE': 'part', 'FSTYPE': 'xfs', 'PKNAME': 'sda'}
376 device_info(lsblk
=lsblk
)
377 disk
= device
.Device("/dev/sda")
378 assert disk
.is_encrypted
is False
380 def test_partition_is_encrypted_lsblk(self
, device_info
):
381 lsblk
= {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS', 'PKNAME': 'sda'}
382 device_info(lsblk
=lsblk
)
383 disk
= device
.Device("/dev/sda")
384 assert disk
.is_encrypted
is True
386 def test_partition_is_not_encrypted_blkid(self
, device_info
):
387 lsblk
= {'TYPE': 'part', 'PKNAME': 'sda'}
388 blkid
= {'TYPE': 'ceph data'}
389 device_info(lsblk
=lsblk
, blkid
=blkid
)
390 disk
= device
.Device("/dev/sda")
391 assert disk
.is_encrypted
is False
393 def test_partition_is_encrypted_blkid(self
, device_info
):
394 lsblk
= {'TYPE': 'part', 'PKNAME': 'sda'}
395 blkid
= {'TYPE': 'crypto_LUKS'}
396 device_info(lsblk
=lsblk
, blkid
=blkid
)
397 disk
= device
.Device("/dev/sda")
398 assert disk
.is_encrypted
is True
400 def test_mapper_is_encrypted_luks1(self
, device_info
, monkeypatch
):
401 status
= {'type': 'LUKS1'}
402 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
403 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
404 blkid
= {'TYPE': 'mapper'}
405 device_info(lsblk
=lsblk
, blkid
=blkid
)
406 disk
= device
.Device("/dev/mapper/uuid")
407 assert disk
.is_encrypted
is True
409 def test_mapper_is_encrypted_luks2(self
, device_info
, monkeypatch
):
410 status
= {'type': 'LUKS2'}
411 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
412 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
413 blkid
= {'TYPE': 'mapper'}
414 device_info(lsblk
=lsblk
, blkid
=blkid
)
415 disk
= device
.Device("/dev/mapper/uuid")
416 assert disk
.is_encrypted
is True
418 def test_mapper_is_encrypted_plain(self
, device_info
, monkeypatch
):
419 status
= {'type': 'PLAIN'}
420 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
421 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
422 blkid
= {'TYPE': 'mapper'}
423 device_info(lsblk
=lsblk
, blkid
=blkid
)
424 disk
= device
.Device("/dev/mapper/uuid")
425 assert disk
.is_encrypted
is True
427 def test_mapper_is_not_encrypted_plain(self
, device_info
, monkeypatch
):
428 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: {})
429 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
430 blkid
= {'TYPE': 'mapper'}
431 device_info(lsblk
=lsblk
, blkid
=blkid
)
432 disk
= device
.Device("/dev/mapper/uuid")
433 assert disk
.is_encrypted
is False
435 def test_lv_is_encrypted_blkid(self
, device_info
):
436 lsblk
= {'TYPE': 'lvm'}
437 blkid
= {'TYPE': 'crypto_LUKS'}
438 device_info(lsblk
=lsblk
, blkid
=blkid
)
439 disk
= device
.Device("/dev/sda")
441 assert disk
.is_encrypted
is True
443 def test_lv_is_not_encrypted_blkid(self
, factory
, device_info
):
444 lsblk
= {'TYPE': 'lvm'}
445 blkid
= {'TYPE': 'xfs'}
446 device_info(lsblk
=lsblk
, blkid
=blkid
)
447 disk
= device
.Device("/dev/sda")
448 disk
.lv_api
= factory(encrypted
=None)
449 assert disk
.is_encrypted
is False
451 def test_lv_is_encrypted_lsblk(self
, device_info
):
452 lsblk
= {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
453 blkid
= {'TYPE': 'mapper'}
454 device_info(lsblk
=lsblk
, blkid
=blkid
)
455 disk
= device
.Device("/dev/sda")
457 assert disk
.is_encrypted
is True
459 def test_lv_is_not_encrypted_lsblk(self
, factory
, device_info
):
460 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
461 blkid
= {'TYPE': 'mapper'}
462 device_info(lsblk
=lsblk
, blkid
=blkid
)
463 disk
= device
.Device("/dev/sda")
464 disk
.lv_api
= factory(encrypted
=None)
465 assert disk
.is_encrypted
is False
467 def test_lv_is_encrypted_lvm_api(self
, factory
, device_info
):
468 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
469 blkid
= {'TYPE': 'mapper'}
470 device_info(lsblk
=lsblk
, blkid
=blkid
)
471 disk
= device
.Device("/dev/sda")
472 disk
.lv_api
= factory(encrypted
=True)
473 assert disk
.is_encrypted
is True
475 def test_lv_is_not_encrypted_lvm_api(self
, factory
, device_info
):
476 lsblk
= {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
477 blkid
= {'TYPE': 'mapper'}
478 device_info(lsblk
=lsblk
, blkid
=blkid
)
479 disk
= device
.Device("/dev/sda")
480 disk
.lv_api
= factory(encrypted
=False)
481 assert disk
.is_encrypted
is False
484 class TestDeviceOrdering(object):
488 "/dev/sda": {"removable": 0},
489 "/dev/sdb": {"removable": 1}, # invalid
490 "/dev/sdc": {"removable": 0},
491 "/dev/sdd": {"removable": 1}, # invalid
494 def test_valid_before_invalid(self
, device_info
):
495 lsblk
= {"TYPE": "disk"}
496 device_info(devices
=self
.data
,lsblk
=lsblk
)
497 sda
= device
.Device("/dev/sda")
498 sdb
= device
.Device("/dev/sdb")
503 def test_valid_alphabetical_ordering(self
, device_info
):
504 lsblk
= {"TYPE": "disk"}
505 device_info(devices
=self
.data
,lsblk
=lsblk
)
506 sda
= device
.Device("/dev/sda")
507 sdc
= device
.Device("/dev/sdc")
512 def test_invalid_alphabetical_ordering(self
, device_info
):
513 lsblk
= {"TYPE": "disk"}
514 device_info(devices
=self
.data
,lsblk
=lsblk
)
515 sdb
= device
.Device("/dev/sdb")
516 sdd
= device
.Device("/dev/sdd")
522 class TestCephDiskDevice(object):
524 def test_partlabel_lsblk(self
, device_info
):
525 lsblk
= {"TYPE": "disk", "PARTLABEL": ""}
526 device_info(lsblk
=lsblk
)
527 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
529 assert disk
.partlabel
== ''
531 def test_partlabel_blkid(self
, device_info
):
532 blkid
= {"TYPE": "disk", "PARTLABEL": "ceph data"}
533 device_info(blkid
=blkid
)
534 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
536 assert disk
.partlabel
== 'ceph data'
538 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
539 "disable_kernel_queries")
540 def test_is_member_blkid(self
, monkeypatch
, patch_bluestore_label
):
541 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
543 assert disk
.is_member
is True
545 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
546 "disable_kernel_queries")
547 def test_is_member_lsblk(self
, patch_bluestore_label
, device_info
):
548 lsblk
= {"TYPE": "disk", "PARTLABEL": "ceph"}
549 device_info(lsblk
=lsblk
)
550 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
552 assert disk
.is_member
is True
554 def test_unknown_type(self
, device_info
):
555 lsblk
= {"TYPE": "disk", "PARTLABEL": "gluster"}
556 device_info(lsblk
=lsblk
)
557 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
559 assert disk
.type == 'unknown'
561 ceph_types
= ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
563 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
564 "disable_kernel_queries")
565 def test_type_blkid(self
, monkeypatch
, device_info
, ceph_partlabel
):
566 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
568 assert disk
.type in self
.ceph_types
570 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
571 "lsblk_ceph_disk_member",
572 "disable_kernel_queries")
573 def test_type_lsblk(self
, device_info
, ceph_partlabel
):
574 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
576 assert disk
.type in self
.ceph_types