]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
import ceph 16.2.6
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / util / test_device.py
1 import pytest
2 from copy import deepcopy
3 from ceph_volume.util import device
4 from ceph_volume.api import lvm as api
5 from mock.mock import patch, mock_open
6
7
8 class TestDevice(object):
9
10 def test_sys_api(self, monkeypatch, device_info):
11 volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
12 lv_tags={}, lv_path='/dev/VolGroup/lv')
13 volumes = []
14 volumes.append(volume)
15 monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
16 deepcopy(volumes))
17
18 data = {"/dev/sda": {"foo": "bar"}}
19 lsblk = {"TYPE": "disk"}
20 device_info(devices=data,lsblk=lsblk)
21 disk = device.Device("/dev/sda")
22 assert disk.sys_api
23 assert "foo" in disk.sys_api
24
25 def test_lvm_size(self, monkeypatch, device_info):
26 volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
27 lv_tags={}, lv_path='/dev/VolGroup/lv')
28 volumes = []
29 volumes.append(volume)
30 monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
31 deepcopy(volumes))
32
33 # 5GB in size
34 data = {"/dev/sda": {"size": "5368709120"}}
35 lsblk = {"TYPE": "disk"}
36 device_info(devices=data,lsblk=lsblk)
37 disk = device.Device("/dev/sda")
38 assert disk.lvm_size.gb == 4
39
40 def test_lvm_size_rounds_down(self, device_info):
41 # 5.5GB in size
42 data = {"/dev/sda": {"size": "5905580032"}}
43 lsblk = {"TYPE": "disk"}
44 device_info(devices=data,lsblk=lsblk)
45 disk = device.Device("/dev/sda")
46 assert disk.lvm_size.gb == 4
47
48 def test_is_lv(self, device_info):
49 data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
50 lsblk = {"TYPE": "lvm"}
51 device_info(lv=data,lsblk=lsblk)
52 disk = device.Device("vg/lv")
53 assert disk.is_lv
54
55 def test_vgs_is_empty(self, device_info, monkeypatch):
56 BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
57 pv_tags={})
58 pvolumes = []
59 pvolumes.append(BarPVolume)
60 lsblk = {"TYPE": "disk"}
61 device_info(lsblk=lsblk)
62 monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: {})
63
64 disk = device.Device("/dev/nvme0n1")
65 assert disk.vgs == []
66
67 def test_vgs_is_not_empty(self, device_info, monkeypatch):
68 vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
69 vg_extent_size=1073741824)
70 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
71 lsblk = {"TYPE": "disk"}
72 device_info(lsblk=lsblk)
73 disk = device.Device("/dev/nvme0n1")
74 assert len(disk.vgs) == 1
75
76 def test_device_is_device(self, device_info):
77 data = {"/dev/sda": {"foo": "bar"}}
78 lsblk = {"TYPE": "device"}
79 device_info(devices=data, lsblk=lsblk)
80 disk = device.Device("/dev/sda")
81 assert disk.is_device is True
82
83 def test_device_is_rotational(self, device_info):
84 data = {"/dev/sda": {"rotational": "1"}}
85 lsblk = {"TYPE": "device"}
86 device_info(devices=data, lsblk=lsblk)
87 disk = device.Device("/dev/sda")
88 assert disk.rotational
89
90 def test_device_is_not_rotational(self, device_info):
91 data = {"/dev/sda": {"rotational": "0"}}
92 lsblk = {"TYPE": "device"}
93 device_info(devices=data, lsblk=lsblk)
94 disk = device.Device("/dev/sda")
95 assert not disk.rotational
96
97 def test_device_is_rotational_lsblk(self, device_info):
98 data = {"/dev/sda": {"foo": "bar"}}
99 lsblk = {"TYPE": "device", "ROTA": "1"}
100 device_info(devices=data, lsblk=lsblk)
101 disk = device.Device("/dev/sda")
102 assert disk.rotational
103
104 def test_device_is_not_rotational_lsblk(self, device_info):
105 data = {"/dev/sda": {"rotational": "0"}}
106 lsblk = {"TYPE": "device", "ROTA": "0"}
107 device_info(devices=data, lsblk=lsblk)
108 disk = device.Device("/dev/sda")
109 assert not disk.rotational
110
111 def test_device_is_rotational_defaults_true(self, device_info):
112 # rotational will default true if no info from sys_api or lsblk is found
113 data = {"/dev/sda": {"foo": "bar"}}
114 lsblk = {"TYPE": "device", "foo": "bar"}
115 device_info(devices=data, lsblk=lsblk)
116 disk = device.Device("/dev/sda")
117 assert disk.rotational
118
119 def test_disk_is_device(self, device_info):
120 data = {"/dev/sda": {"foo": "bar"}}
121 lsblk = {"TYPE": "disk"}
122 device_info(devices=data, lsblk=lsblk)
123 disk = device.Device("/dev/sda")
124 assert disk.is_device is True
125
126 def test_is_partition(self, device_info):
127 data = {"/dev/sda1": {"foo": "bar"}}
128 lsblk = {"TYPE": "part", "PKNAME": "sda"}
129 device_info(devices=data, lsblk=lsblk)
130 disk = device.Device("/dev/sda1")
131 assert disk.is_partition
132
133 def test_is_not_acceptable_device(self, device_info):
134 data = {"/dev/dm-0": {"foo": "bar"}}
135 lsblk = {"TYPE": "mpath"}
136 device_info(devices=data, lsblk=lsblk)
137 disk = device.Device("/dev/dm-0")
138 assert not disk.is_device
139
140 def test_is_not_lvm_memeber(self, device_info):
141 data = {"/dev/sda1": {"foo": "bar"}}
142 lsblk = {"TYPE": "part", "PKNAME": "sda"}
143 device_info(devices=data, lsblk=lsblk)
144 disk = device.Device("/dev/sda1")
145 assert not disk.is_lvm_member
146
147 def test_is_lvm_memeber(self, device_info):
148 data = {"/dev/sda1": {"foo": "bar"}}
149 lsblk = {"TYPE": "part", "PKNAME": "sda"}
150 device_info(devices=data, lsblk=lsblk)
151 disk = device.Device("/dev/sda1")
152 assert not disk.is_lvm_member
153
154 def test_is_mapper_device(self, device_info):
155 lsblk = {"TYPE": "lvm"}
156 device_info(lsblk=lsblk)
157 disk = device.Device("/dev/mapper/foo")
158 assert disk.is_mapper
159
160 def test_dm_is_mapper_device(self, device_info):
161 lsblk = {"TYPE": "lvm"}
162 device_info(lsblk=lsblk)
163 disk = device.Device("/dev/dm-4")
164 assert disk.is_mapper
165
166 def test_is_not_mapper_device(self, device_info):
167 lsblk = {"TYPE": "disk"}
168 device_info(lsblk=lsblk)
169 disk = device.Device("/dev/sda")
170 assert not disk.is_mapper
171
172 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
173 "disable_kernel_queries")
174 def test_is_ceph_disk_lsblk(self, monkeypatch, patch_bluestore_label):
175 disk = device.Device("/dev/sda")
176 assert disk.is_ceph_disk_member
177
178 @pytest.mark.usefixtures("blkid_ceph_disk_member",
179 "disable_kernel_queries")
180 def test_is_ceph_disk_blkid(self, monkeypatch, patch_bluestore_label):
181 disk = device.Device("/dev/sda")
182 assert disk.is_ceph_disk_member
183
184 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
185 "disable_kernel_queries")
186 def test_is_ceph_disk_member_not_available_lsblk(self, monkeypatch, patch_bluestore_label):
187 disk = device.Device("/dev/sda")
188 assert disk.is_ceph_disk_member
189 assert not disk.available
190 assert "Used by ceph-disk" in disk.rejected_reasons
191
192 @pytest.mark.usefixtures("blkid_ceph_disk_member",
193 "disable_kernel_queries")
194 def test_is_ceph_disk_member_not_available_blkid(self, monkeypatch, patch_bluestore_label):
195 disk = device.Device("/dev/sda")
196 assert disk.is_ceph_disk_member
197 assert not disk.available
198 assert "Used by ceph-disk" in disk.rejected_reasons
199
200 def test_reject_removable_device(self, device_info):
201 data = {"/dev/sdb": {"removable": 1}}
202 lsblk = {"TYPE": "disk"}
203 device_info(devices=data,lsblk=lsblk)
204 disk = device.Device("/dev/sdb")
205 assert not disk.available
206
207 def test_reject_device_with_gpt_headers(self, device_info):
208 data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
209 lsblk = {"TYPE": "disk"}
210 blkid= {"PTTYPE": "gpt"}
211 device_info(
212 devices=data,
213 blkid=blkid,
214 lsblk=lsblk,
215 )
216 disk = device.Device("/dev/sdb")
217 assert not disk.available
218
219 def test_accept_non_removable_device(self, device_info):
220 data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
221 lsblk = {"TYPE": "disk"}
222 device_info(devices=data,lsblk=lsblk)
223 disk = device.Device("/dev/sdb")
224 assert disk.available
225
226 def test_reject_not_acceptable_device(self, device_info):
227 data = {"/dev/dm-0": {"foo": "bar"}}
228 lsblk = {"TYPE": "mpath"}
229 device_info(devices=data, lsblk=lsblk)
230 disk = device.Device("/dev/dm-0")
231 assert not disk.available
232
233 def test_reject_readonly_device(self, device_info):
234 data = {"/dev/cdrom": {"ro": 1}}
235 lsblk = {"TYPE": "disk"}
236 device_info(devices=data,lsblk=lsblk)
237 disk = device.Device("/dev/cdrom")
238 assert not disk.available
239
240 def test_reject_smaller_than_5gb(self, device_info):
241 data = {"/dev/sda": {"size": 5368709119}}
242 lsblk = {"TYPE": "disk"}
243 device_info(devices=data,lsblk=lsblk)
244 disk = device.Device("/dev/sda")
245 assert not disk.available, 'too small device is available'
246
247 def test_accept_non_readonly_device(self, device_info):
248 data = {"/dev/sda": {"ro": 0, "size": 5368709120}}
249 lsblk = {"TYPE": "disk"}
250 device_info(devices=data,lsblk=lsblk)
251 disk = device.Device("/dev/sda")
252 assert disk.available
253
254 def test_reject_bluestore_device(self, monkeypatch, patch_bluestore_label, device_info):
255 patch_bluestore_label.return_value = True
256 lsblk = {"TYPE": "disk"}
257 device_info(lsblk=lsblk)
258 disk = device.Device("/dev/sda")
259 assert not disk.available
260 assert "Has BlueStore device label" in disk.rejected_reasons
261
262 def test_reject_device_with_oserror(self, monkeypatch, patch_bluestore_label, device_info):
263 patch_bluestore_label.side_effect = OSError('test failure')
264 lsblk = {"TYPE": "disk"}
265 device_info(lsblk=lsblk)
266 disk = device.Device("/dev/sda")
267 assert not disk.available
268 assert "Failed to determine if device is BlueStore" in disk.rejected_reasons
269
270 @pytest.mark.usefixtures("device_info_not_ceph_disk_member",
271 "disable_kernel_queries")
272 def test_is_not_ceph_disk_member_lsblk(self, patch_bluestore_label):
273 disk = device.Device("/dev/sda")
274 assert disk.is_ceph_disk_member is False
275
276 def test_existing_vg_available(self, monkeypatch, device_info):
277 vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=1536,
278 vg_extent_size=4194304)
279 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
280 lsblk = {"TYPE": "disk"}
281 data = {"/dev/nvme0n1": {"size": "6442450944"}}
282 device_info(devices=data, lsblk=lsblk)
283 disk = device.Device("/dev/nvme0n1")
284 assert disk.available_lvm
285 assert not disk.available
286 assert not disk.available_raw
287
288 def test_existing_vg_too_small(self, monkeypatch, device_info):
289 vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4,
290 vg_extent_size=1073741824)
291 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
292 lsblk = {"TYPE": "disk"}
293 data = {"/dev/nvme0n1": {"size": "6442450944"}}
294 device_info(devices=data, lsblk=lsblk)
295 disk = device.Device("/dev/nvme0n1")
296 assert not disk.available_lvm
297 assert not disk.available
298 assert not disk.available_raw
299
300 def test_multiple_existing_vgs(self, monkeypatch, device_info):
301 vg1 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=1000,
302 vg_extent_size=4194304)
303 vg2 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=536,
304 vg_extent_size=4194304)
305 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg1, vg2])
306 lsblk = {"TYPE": "disk"}
307 data = {"/dev/nvme0n1": {"size": "6442450944"}}
308 device_info(devices=data, lsblk=lsblk)
309 disk = device.Device("/dev/nvme0n1")
310 assert disk.available_lvm
311 assert not disk.available
312 assert not disk.available_raw
313
314 @pytest.mark.parametrize("ceph_type", ["data", "block"])
315 def test_used_by_ceph(self, device_info,
316 monkeypatch, ceph_type):
317 data = {"/dev/sda": {"foo": "bar"}}
318 lsblk = {"TYPE": "part", "PKNAME": "sda"}
319 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
320 lv_uuid="0000", pv_tags={}, vg_name="vg")
321 pvolumes = []
322 pvolumes.append(FooPVolume)
323 lv_data = {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg",
324 "lv_uuid": "0000", "lv_tags":
325 "ceph.osd_id=0,ceph.type="+ceph_type}
326 volumes = []
327 lv = api.Volume(**lv_data)
328 volumes.append(lv)
329 monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
330 monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
331 deepcopy(volumes))
332
333 device_info(devices=data, lsblk=lsblk, lv=lv_data)
334 vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
335 vg_extent_size=1073741824)
336 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
337 disk = device.Device("/dev/sda")
338 assert disk.used_by_ceph
339
340 def test_not_used_by_ceph(self, device_info, monkeypatch):
341 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
342 pvolumes = []
343 pvolumes.append(FooPVolume)
344 data = {"/dev/sda": {"foo": "bar"}}
345 lsblk = {"TYPE": "part", "PKNAME": "sda"}
346 lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
347 monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
348
349 device_info(devices=data, lsblk=lsblk, lv=lv_data)
350 disk = device.Device("/dev/sda")
351 assert not disk.used_by_ceph
352
353 def test_get_device_id(self, device_info):
354 udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
355 lsblk = {"TYPE": "disk"}
356 device_info(udevadm=udev,lsblk=lsblk)
357 disk = device.Device("/dev/sda")
358 assert disk._get_device_id() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
359
360 def test_has_bluestore_label(self):
361 # patch device.Device __init__ function to do nothing since we want to only test the
362 # low-level behavior of has_bluestore_label
363 with patch.object(device.Device, "__init__", lambda self, path, with_lsm=False: None):
364 disk = device.Device("/dev/sda")
365 disk.abspath = "/dev/sda"
366 with patch('builtins.open', mock_open(read_data=b'bluestore block device\n')):
367 assert disk.has_bluestore_label
368 with patch('builtins.open', mock_open(read_data=b'not a bluestore block device\n')):
369 assert not disk.has_bluestore_label
370
371
372 class TestDeviceEncryption(object):
373
374 def test_partition_is_not_encrypted_lsblk(self, device_info):
375 lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs', 'PKNAME': 'sda'}
376 device_info(lsblk=lsblk)
377 disk = device.Device("/dev/sda")
378 assert disk.is_encrypted is False
379
380 def test_partition_is_encrypted_lsblk(self, device_info):
381 lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS', 'PKNAME': 'sda'}
382 device_info(lsblk=lsblk)
383 disk = device.Device("/dev/sda")
384 assert disk.is_encrypted is True
385
386 def test_partition_is_not_encrypted_blkid(self, device_info):
387 lsblk = {'TYPE': 'part', 'PKNAME': 'sda'}
388 blkid = {'TYPE': 'ceph data'}
389 device_info(lsblk=lsblk, blkid=blkid)
390 disk = device.Device("/dev/sda")
391 assert disk.is_encrypted is False
392
393 def test_partition_is_encrypted_blkid(self, device_info):
394 lsblk = {'TYPE': 'part', 'PKNAME': 'sda'}
395 blkid = {'TYPE': 'crypto_LUKS'}
396 device_info(lsblk=lsblk, blkid=blkid)
397 disk = device.Device("/dev/sda")
398 assert disk.is_encrypted is True
399
400 def test_mapper_is_encrypted_luks1(self, device_info, monkeypatch):
401 status = {'type': 'LUKS1'}
402 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
403 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
404 blkid = {'TYPE': 'mapper'}
405 device_info(lsblk=lsblk, blkid=blkid)
406 disk = device.Device("/dev/mapper/uuid")
407 assert disk.is_encrypted is True
408
409 def test_mapper_is_encrypted_luks2(self, device_info, monkeypatch):
410 status = {'type': 'LUKS2'}
411 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
412 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
413 blkid = {'TYPE': 'mapper'}
414 device_info(lsblk=lsblk, blkid=blkid)
415 disk = device.Device("/dev/mapper/uuid")
416 assert disk.is_encrypted is True
417
418 def test_mapper_is_encrypted_plain(self, device_info, monkeypatch):
419 status = {'type': 'PLAIN'}
420 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
421 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
422 blkid = {'TYPE': 'mapper'}
423 device_info(lsblk=lsblk, blkid=blkid)
424 disk = device.Device("/dev/mapper/uuid")
425 assert disk.is_encrypted is True
426
427 def test_mapper_is_not_encrypted_plain(self, device_info, monkeypatch):
428 monkeypatch.setattr(device, 'encryption_status', lambda x: {})
429 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
430 blkid = {'TYPE': 'mapper'}
431 device_info(lsblk=lsblk, blkid=blkid)
432 disk = device.Device("/dev/mapper/uuid")
433 assert disk.is_encrypted is False
434
435 def test_lv_is_encrypted_blkid(self, device_info):
436 lsblk = {'TYPE': 'lvm'}
437 blkid = {'TYPE': 'crypto_LUKS'}
438 device_info(lsblk=lsblk, blkid=blkid)
439 disk = device.Device("/dev/sda")
440 disk.lv_api = {}
441 assert disk.is_encrypted is True
442
443 def test_lv_is_not_encrypted_blkid(self, factory, device_info):
444 lsblk = {'TYPE': 'lvm'}
445 blkid = {'TYPE': 'xfs'}
446 device_info(lsblk=lsblk, blkid=blkid)
447 disk = device.Device("/dev/sda")
448 disk.lv_api = factory(encrypted=None)
449 assert disk.is_encrypted is False
450
451 def test_lv_is_encrypted_lsblk(self, device_info):
452 lsblk = {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
453 blkid = {'TYPE': 'mapper'}
454 device_info(lsblk=lsblk, blkid=blkid)
455 disk = device.Device("/dev/sda")
456 disk.lv_api = {}
457 assert disk.is_encrypted is True
458
459 def test_lv_is_not_encrypted_lsblk(self, factory, device_info):
460 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
461 blkid = {'TYPE': 'mapper'}
462 device_info(lsblk=lsblk, blkid=blkid)
463 disk = device.Device("/dev/sda")
464 disk.lv_api = factory(encrypted=None)
465 assert disk.is_encrypted is False
466
467 def test_lv_is_encrypted_lvm_api(self, factory, device_info):
468 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
469 blkid = {'TYPE': 'mapper'}
470 device_info(lsblk=lsblk, blkid=blkid)
471 disk = device.Device("/dev/sda")
472 disk.lv_api = factory(encrypted=True)
473 assert disk.is_encrypted is True
474
475 def test_lv_is_not_encrypted_lvm_api(self, factory, device_info):
476 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
477 blkid = {'TYPE': 'mapper'}
478 device_info(lsblk=lsblk, blkid=blkid)
479 disk = device.Device("/dev/sda")
480 disk.lv_api = factory(encrypted=False)
481 assert disk.is_encrypted is False
482
483
484 class TestDeviceOrdering(object):
485
486 def setup(self):
487 self.data = {
488 "/dev/sda": {"removable": 0},
489 "/dev/sdb": {"removable": 1}, # invalid
490 "/dev/sdc": {"removable": 0},
491 "/dev/sdd": {"removable": 1}, # invalid
492 }
493
494 def test_valid_before_invalid(self, device_info):
495 lsblk = {"TYPE": "disk"}
496 device_info(devices=self.data,lsblk=lsblk)
497 sda = device.Device("/dev/sda")
498 sdb = device.Device("/dev/sdb")
499
500 assert sda < sdb
501 assert sdb > sda
502
503 def test_valid_alphabetical_ordering(self, device_info):
504 lsblk = {"TYPE": "disk"}
505 device_info(devices=self.data,lsblk=lsblk)
506 sda = device.Device("/dev/sda")
507 sdc = device.Device("/dev/sdc")
508
509 assert sda < sdc
510 assert sdc > sda
511
512 def test_invalid_alphabetical_ordering(self, device_info):
513 lsblk = {"TYPE": "disk"}
514 device_info(devices=self.data,lsblk=lsblk)
515 sdb = device.Device("/dev/sdb")
516 sdd = device.Device("/dev/sdd")
517
518 assert sdb < sdd
519 assert sdd > sdb
520
521
522 class TestCephDiskDevice(object):
523
524 def test_partlabel_lsblk(self, device_info):
525 lsblk = {"TYPE": "disk", "PARTLABEL": ""}
526 device_info(lsblk=lsblk)
527 disk = device.CephDiskDevice(device.Device("/dev/sda"))
528
529 assert disk.partlabel == ''
530
531 def test_partlabel_blkid(self, device_info):
532 blkid = {"TYPE": "disk", "PARTLABEL": "ceph data"}
533 device_info(blkid=blkid)
534 disk = device.CephDiskDevice(device.Device("/dev/sda"))
535
536 assert disk.partlabel == 'ceph data'
537
538 @pytest.mark.usefixtures("blkid_ceph_disk_member",
539 "disable_kernel_queries")
540 def test_is_member_blkid(self, monkeypatch, patch_bluestore_label):
541 disk = device.CephDiskDevice(device.Device("/dev/sda"))
542
543 assert disk.is_member is True
544
545 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
546 "disable_kernel_queries")
547 def test_is_member_lsblk(self, patch_bluestore_label, device_info):
548 lsblk = {"TYPE": "disk", "PARTLABEL": "ceph"}
549 device_info(lsblk=lsblk)
550 disk = device.CephDiskDevice(device.Device("/dev/sda"))
551
552 assert disk.is_member is True
553
554 def test_unknown_type(self, device_info):
555 lsblk = {"TYPE": "disk", "PARTLABEL": "gluster"}
556 device_info(lsblk=lsblk)
557 disk = device.CephDiskDevice(device.Device("/dev/sda"))
558
559 assert disk.type == 'unknown'
560
561 ceph_types = ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
562
563 @pytest.mark.usefixtures("blkid_ceph_disk_member",
564 "disable_kernel_queries")
565 def test_type_blkid(self, monkeypatch, device_info, ceph_partlabel):
566 disk = device.CephDiskDevice(device.Device("/dev/sda"))
567
568 assert disk.type in self.ceph_types
569
570 @pytest.mark.usefixtures("blkid_ceph_disk_member",
571 "lsblk_ceph_disk_member",
572 "disable_kernel_queries")
573 def test_type_lsblk(self, device_info, ceph_partlabel):
574 disk = device.CephDiskDevice(device.Device("/dev/sda"))
575
576 assert disk.type in self.ceph_types