]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
import ceph 16.2.6
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / util / test_device.py
CommitLineData
91327a77 1import pytest
f6b5b4d7 2from copy import deepcopy
1adf2230
AA
3from ceph_volume.util import device
4from ceph_volume.api import lvm as api
522d829b 5from mock.mock import patch, mock_open
1adf2230
AA
6
7
8class TestDevice(object):
9
f6b5b4d7
TL
10 def test_sys_api(self, monkeypatch, device_info):
11 volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
12 lv_tags={}, lv_path='/dev/VolGroup/lv')
13 volumes = []
14 volumes.append(volume)
15 monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
16 deepcopy(volumes))
17
1adf2230 18 data = {"/dev/sda": {"foo": "bar"}}
f91f0fd5
TL
19 lsblk = {"TYPE": "disk"}
20 device_info(devices=data,lsblk=lsblk)
1adf2230
AA
21 disk = device.Device("/dev/sda")
22 assert disk.sys_api
23 assert "foo" in disk.sys_api
24
f6b5b4d7
TL
25 def test_lvm_size(self, monkeypatch, device_info):
26 volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
27 lv_tags={}, lv_path='/dev/VolGroup/lv')
28 volumes = []
29 volumes.append(volume)
30 monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
31 deepcopy(volumes))
32
11fdf7f2
TL
33 # 5GB in size
34 data = {"/dev/sda": {"size": "5368709120"}}
f91f0fd5
TL
35 lsblk = {"TYPE": "disk"}
36 device_info(devices=data,lsblk=lsblk)
11fdf7f2
TL
37 disk = device.Device("/dev/sda")
38 assert disk.lvm_size.gb == 4
39
40 def test_lvm_size_rounds_down(self, device_info):
41 # 5.5GB in size
42 data = {"/dev/sda": {"size": "5905580032"}}
f91f0fd5
TL
43 lsblk = {"TYPE": "disk"}
44 device_info(devices=data,lsblk=lsblk)
11fdf7f2
TL
45 disk = device.Device("/dev/sda")
46 assert disk.lvm_size.gb == 4
47
1adf2230 48 def test_is_lv(self, device_info):
91327a77 49 data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
f91f0fd5
TL
50 lsblk = {"TYPE": "lvm"}
51 device_info(lv=data,lsblk=lsblk)
1adf2230
AA
52 disk = device.Device("vg/lv")
53 assert disk.is_lv
54
f6b5b4d7
TL
55 def test_vgs_is_empty(self, device_info, monkeypatch):
56 BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
57 pv_tags={})
58 pvolumes = []
f64942e4 59 pvolumes.append(BarPVolume)
f64942e4
AA
60 lsblk = {"TYPE": "disk"}
61 device_info(lsblk=lsblk)
f6b5b4d7
TL
62 monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: {})
63
f64942e4
AA
64 disk = device.Device("/dev/nvme0n1")
65 assert disk.vgs == []
66
92f5a8d4
TL
67 def test_vgs_is_not_empty(self, device_info, monkeypatch):
68 vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
69 vg_extent_size=1073741824)
70 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
f64942e4
AA
71 lsblk = {"TYPE": "disk"}
72 device_info(lsblk=lsblk)
73 disk = device.Device("/dev/nvme0n1")
74 assert len(disk.vgs) == 1
75
f6b5b4d7 76 def test_device_is_device(self, device_info):
1adf2230
AA
77 data = {"/dev/sda": {"foo": "bar"}}
78 lsblk = {"TYPE": "device"}
79 device_info(devices=data, lsblk=lsblk)
80 disk = device.Device("/dev/sda")
f64942e4
AA
81 assert disk.is_device is True
82
f6b5b4d7 83 def test_device_is_rotational(self, device_info):
81eedcae
TL
84 data = {"/dev/sda": {"rotational": "1"}}
85 lsblk = {"TYPE": "device"}
86 device_info(devices=data, lsblk=lsblk)
87 disk = device.Device("/dev/sda")
88 assert disk.rotational
89
f6b5b4d7 90 def test_device_is_not_rotational(self, device_info):
81eedcae
TL
91 data = {"/dev/sda": {"rotational": "0"}}
92 lsblk = {"TYPE": "device"}
93 device_info(devices=data, lsblk=lsblk)
94 disk = device.Device("/dev/sda")
95 assert not disk.rotational
96
f6b5b4d7 97 def test_device_is_rotational_lsblk(self, device_info):
81eedcae
TL
98 data = {"/dev/sda": {"foo": "bar"}}
99 lsblk = {"TYPE": "device", "ROTA": "1"}
100 device_info(devices=data, lsblk=lsblk)
101 disk = device.Device("/dev/sda")
102 assert disk.rotational
103
f6b5b4d7 104 def test_device_is_not_rotational_lsblk(self, device_info):
81eedcae
TL
105 data = {"/dev/sda": {"rotational": "0"}}
106 lsblk = {"TYPE": "device", "ROTA": "0"}
107 device_info(devices=data, lsblk=lsblk)
108 disk = device.Device("/dev/sda")
109 assert not disk.rotational
110
f6b5b4d7 111 def test_device_is_rotational_defaults_true(self, device_info):
81eedcae
TL
112 # rotational will default true if no info from sys_api or lsblk is found
113 data = {"/dev/sda": {"foo": "bar"}}
114 lsblk = {"TYPE": "device", "foo": "bar"}
115 device_info(devices=data, lsblk=lsblk)
116 disk = device.Device("/dev/sda")
117 assert disk.rotational
118
f6b5b4d7 119 def test_disk_is_device(self, device_info):
f64942e4
AA
120 data = {"/dev/sda": {"foo": "bar"}}
121 lsblk = {"TYPE": "disk"}
122 device_info(devices=data, lsblk=lsblk)
123 disk = device.Device("/dev/sda")
124 assert disk.is_device is True
1adf2230 125
f6b5b4d7 126 def test_is_partition(self, device_info):
f91f0fd5 127 data = {"/dev/sda1": {"foo": "bar"}}
522d829b 128 lsblk = {"TYPE": "part", "PKNAME": "sda"}
1adf2230 129 device_info(devices=data, lsblk=lsblk)
f91f0fd5 130 disk = device.Device("/dev/sda1")
1adf2230
AA
131 assert disk.is_partition
132
f6b5b4d7
TL
133 def test_is_not_acceptable_device(self, device_info):
134 data = {"/dev/dm-0": {"foo": "bar"}}
135 lsblk = {"TYPE": "mpath"}
136 device_info(devices=data, lsblk=lsblk)
137 disk = device.Device("/dev/dm-0")
138 assert not disk.is_device
139
140 def test_is_not_lvm_memeber(self, device_info):
f91f0fd5 141 data = {"/dev/sda1": {"foo": "bar"}}
522d829b 142 lsblk = {"TYPE": "part", "PKNAME": "sda"}
1adf2230 143 device_info(devices=data, lsblk=lsblk)
f91f0fd5 144 disk = device.Device("/dev/sda1")
1adf2230
AA
145 assert not disk.is_lvm_member
146
f6b5b4d7 147 def test_is_lvm_memeber(self, device_info):
f91f0fd5 148 data = {"/dev/sda1": {"foo": "bar"}}
522d829b 149 lsblk = {"TYPE": "part", "PKNAME": "sda"}
1adf2230 150 device_info(devices=data, lsblk=lsblk)
f91f0fd5 151 disk = device.Device("/dev/sda1")
1adf2230
AA
152 assert not disk.is_lvm_member
153
154 def test_is_mapper_device(self, device_info):
f91f0fd5
TL
155 lsblk = {"TYPE": "lvm"}
156 device_info(lsblk=lsblk)
1adf2230
AA
157 disk = device.Device("/dev/mapper/foo")
158 assert disk.is_mapper
159
f64942e4 160 def test_dm_is_mapper_device(self, device_info):
f91f0fd5
TL
161 lsblk = {"TYPE": "lvm"}
162 device_info(lsblk=lsblk)
f64942e4
AA
163 disk = device.Device("/dev/dm-4")
164 assert disk.is_mapper
165
1adf2230 166 def test_is_not_mapper_device(self, device_info):
f91f0fd5
TL
167 lsblk = {"TYPE": "disk"}
168 device_info(lsblk=lsblk)
1adf2230
AA
169 disk = device.Device("/dev/sda")
170 assert not disk.is_mapper
171
494da23a 172 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
f6b5b4d7 173 "disable_kernel_queries")
92f5a8d4 174 def test_is_ceph_disk_lsblk(self, monkeypatch, patch_bluestore_label):
91327a77
AA
175 disk = device.Device("/dev/sda")
176 assert disk.is_ceph_disk_member
177
494da23a 178 @pytest.mark.usefixtures("blkid_ceph_disk_member",
f6b5b4d7 179 "disable_kernel_queries")
92f5a8d4 180 def test_is_ceph_disk_blkid(self, monkeypatch, patch_bluestore_label):
f64942e4
AA
181 disk = device.Device("/dev/sda")
182 assert disk.is_ceph_disk_member
f64942e4 183
494da23a 184 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
f6b5b4d7 185 "disable_kernel_queries")
92f5a8d4 186 def test_is_ceph_disk_member_not_available_lsblk(self, monkeypatch, patch_bluestore_label):
91327a77 187 disk = device.Device("/dev/sda")
494da23a
TL
188 assert disk.is_ceph_disk_member
189 assert not disk.available
190 assert "Used by ceph-disk" in disk.rejected_reasons
91327a77 191
494da23a 192 @pytest.mark.usefixtures("blkid_ceph_disk_member",
f6b5b4d7 193 "disable_kernel_queries")
92f5a8d4 194 def test_is_ceph_disk_member_not_available_blkid(self, monkeypatch, patch_bluestore_label):
91327a77
AA
195 disk = device.Device("/dev/sda")
196 assert disk.is_ceph_disk_member
494da23a
TL
197 assert not disk.available
198 assert "Used by ceph-disk" in disk.rejected_reasons
91327a77 199
92f5a8d4
TL
200 def test_reject_removable_device(self, device_info):
201 data = {"/dev/sdb": {"removable": 1}}
f91f0fd5
TL
202 lsblk = {"TYPE": "disk"}
203 device_info(devices=data,lsblk=lsblk)
92f5a8d4
TL
204 disk = device.Device("/dev/sdb")
205 assert not disk.available
206
f67539c2
TL
207 def test_reject_device_with_gpt_headers(self, device_info):
208 data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
209 lsblk = {"TYPE": "disk"}
210 blkid= {"PTTYPE": "gpt"}
211 device_info(
212 devices=data,
213 blkid=blkid,
214 lsblk=lsblk,
215 )
216 disk = device.Device("/dev/sdb")
217 assert not disk.available
218
92f5a8d4
TL
219 def test_accept_non_removable_device(self, device_info):
220 data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
f91f0fd5
TL
221 lsblk = {"TYPE": "disk"}
222 device_info(devices=data,lsblk=lsblk)
92f5a8d4
TL
223 disk = device.Device("/dev/sdb")
224 assert disk.available
225
f91f0fd5
TL
226 def test_reject_not_acceptable_device(self, device_info):
227 data = {"/dev/dm-0": {"foo": "bar"}}
228 lsblk = {"TYPE": "mpath"}
229 device_info(devices=data, lsblk=lsblk)
230 disk = device.Device("/dev/dm-0")
231 assert not disk.available
232
92f5a8d4
TL
233 def test_reject_readonly_device(self, device_info):
234 data = {"/dev/cdrom": {"ro": 1}}
f91f0fd5
TL
235 lsblk = {"TYPE": "disk"}
236 device_info(devices=data,lsblk=lsblk)
92f5a8d4
TL
237 disk = device.Device("/dev/cdrom")
238 assert not disk.available
239
240 def test_reject_smaller_than_5gb(self, device_info):
241 data = {"/dev/sda": {"size": 5368709119}}
f91f0fd5
TL
242 lsblk = {"TYPE": "disk"}
243 device_info(devices=data,lsblk=lsblk)
92f5a8d4
TL
244 disk = device.Device("/dev/sda")
245 assert not disk.available, 'too small device is available'
246
247 def test_accept_non_readonly_device(self, device_info):
248 data = {"/dev/sda": {"ro": 0, "size": 5368709120}}
f91f0fd5
TL
249 lsblk = {"TYPE": "disk"}
250 device_info(devices=data,lsblk=lsblk)
92f5a8d4
TL
251 disk = device.Device("/dev/sda")
252 assert disk.available
253
f91f0fd5 254 def test_reject_bluestore_device(self, monkeypatch, patch_bluestore_label, device_info):
92f5a8d4 255 patch_bluestore_label.return_value = True
f91f0fd5
TL
256 lsblk = {"TYPE": "disk"}
257 device_info(lsblk=lsblk)
92f5a8d4
TL
258 disk = device.Device("/dev/sda")
259 assert not disk.available
260 assert "Has BlueStore device label" in disk.rejected_reasons
261
522d829b
TL
262 def test_reject_device_with_oserror(self, monkeypatch, patch_bluestore_label, device_info):
263 patch_bluestore_label.side_effect = OSError('test failure')
264 lsblk = {"TYPE": "disk"}
265 device_info(lsblk=lsblk)
266 disk = device.Device("/dev/sda")
267 assert not disk.available
268 assert "Failed to determine if device is BlueStore" in disk.rejected_reasons
269
494da23a 270 @pytest.mark.usefixtures("device_info_not_ceph_disk_member",
494da23a 271 "disable_kernel_queries")
92f5a8d4 272 def test_is_not_ceph_disk_member_lsblk(self, patch_bluestore_label):
91327a77
AA
273 disk = device.Device("/dev/sda")
274 assert disk.is_ceph_disk_member is False
275
92f5a8d4 276 def test_existing_vg_available(self, monkeypatch, device_info):
f91f0fd5
TL
277 vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=1536,
278 vg_extent_size=4194304)
92f5a8d4
TL
279 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
280 lsblk = {"TYPE": "disk"}
281 data = {"/dev/nvme0n1": {"size": "6442450944"}}
1adf2230 282 device_info(devices=data, lsblk=lsblk)
92f5a8d4
TL
283 disk = device.Device("/dev/nvme0n1")
284 assert disk.available_lvm
285 assert not disk.available
286 assert not disk.available_raw
287
288 def test_existing_vg_too_small(self, monkeypatch, device_info):
289 vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4,
290 vg_extent_size=1073741824)
291 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
292 lsblk = {"TYPE": "disk"}
293 data = {"/dev/nvme0n1": {"size": "6442450944"}}
294 device_info(devices=data, lsblk=lsblk)
295 disk = device.Device("/dev/nvme0n1")
296 assert not disk.available_lvm
297 assert not disk.available
298 assert not disk.available_raw
299
300 def test_multiple_existing_vgs(self, monkeypatch, device_info):
f91f0fd5
TL
301 vg1 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=1000,
302 vg_extent_size=4194304)
303 vg2 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=536,
304 vg_extent_size=4194304)
92f5a8d4
TL
305 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg1, vg2])
306 lsblk = {"TYPE": "disk"}
307 data = {"/dev/nvme0n1": {"size": "6442450944"}}
308 device_info(devices=data, lsblk=lsblk)
309 disk = device.Device("/dev/nvme0n1")
310 assert disk.available_lvm
311 assert not disk.available
312 assert not disk.available_raw
91327a77
AA
313
314 @pytest.mark.parametrize("ceph_type", ["data", "block"])
f6b5b4d7
TL
315 def test_used_by_ceph(self, device_info,
316 monkeypatch, ceph_type):
91327a77 317 data = {"/dev/sda": {"foo": "bar"}}
522d829b 318 lsblk = {"TYPE": "part", "PKNAME": "sda"}
f6b5b4d7
TL
319 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
320 lv_uuid="0000", pv_tags={}, vg_name="vg")
321 pvolumes = []
322 pvolumes.append(FooPVolume)
323 lv_data = {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg",
324 "lv_uuid": "0000", "lv_tags":
325 "ceph.osd_id=0,ceph.type="+ceph_type}
326 volumes = []
327 lv = api.Volume(**lv_data)
328 volumes.append(lv)
329 monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
330 monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
331 deepcopy(volumes))
332
91327a77 333 device_info(devices=data, lsblk=lsblk, lv=lv_data)
92f5a8d4
TL
334 vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
335 vg_extent_size=1073741824)
336 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
91327a77
AA
337 disk = device.Device("/dev/sda")
338 assert disk.used_by_ceph
339
f6b5b4d7 340 def test_not_used_by_ceph(self, device_info, monkeypatch):
91327a77 341 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
f6b5b4d7 342 pvolumes = []
91327a77 343 pvolumes.append(FooPVolume)
91327a77 344 data = {"/dev/sda": {"foo": "bar"}}
522d829b 345 lsblk = {"TYPE": "part", "PKNAME": "sda"}
91327a77 346 lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
f6b5b4d7
TL
347 monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
348
91327a77
AA
349 device_info(devices=data, lsblk=lsblk, lv=lv_data)
350 disk = device.Device("/dev/sda")
351 assert not disk.used_by_ceph
352
f64942e4
AA
353 def test_get_device_id(self, device_info):
354 udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
f91f0fd5
TL
355 lsblk = {"TYPE": "disk"}
356 device_info(udevadm=udev,lsblk=lsblk)
f64942e4
AA
357 disk = device.Device("/dev/sda")
358 assert disk._get_device_id() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
359
522d829b
TL
360 def test_has_bluestore_label(self):
361 # patch device.Device __init__ function to do nothing since we want to only test the
362 # low-level behavior of has_bluestore_label
363 with patch.object(device.Device, "__init__", lambda self, path, with_lsm=False: None):
364 disk = device.Device("/dev/sda")
365 disk.abspath = "/dev/sda"
366 with patch('builtins.open', mock_open(read_data=b'bluestore block device\n')):
367 assert disk.has_bluestore_label
368 with patch('builtins.open', mock_open(read_data=b'not a bluestore block device\n')):
369 assert not disk.has_bluestore_label
f64942e4
AA
370
371
372class TestDeviceEncryption(object):
373
f6b5b4d7 374 def test_partition_is_not_encrypted_lsblk(self, device_info):
522d829b 375 lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs', 'PKNAME': 'sda'}
f64942e4
AA
376 device_info(lsblk=lsblk)
377 disk = device.Device("/dev/sda")
378 assert disk.is_encrypted is False
379
f6b5b4d7 380 def test_partition_is_encrypted_lsblk(self, device_info):
522d829b 381 lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS', 'PKNAME': 'sda'}
f64942e4
AA
382 device_info(lsblk=lsblk)
383 disk = device.Device("/dev/sda")
384 assert disk.is_encrypted is True
385
f6b5b4d7 386 def test_partition_is_not_encrypted_blkid(self, device_info):
522d829b 387 lsblk = {'TYPE': 'part', 'PKNAME': 'sda'}
f64942e4
AA
388 blkid = {'TYPE': 'ceph data'}
389 device_info(lsblk=lsblk, blkid=blkid)
390 disk = device.Device("/dev/sda")
391 assert disk.is_encrypted is False
392
f6b5b4d7 393 def test_partition_is_encrypted_blkid(self, device_info):
522d829b 394 lsblk = {'TYPE': 'part', 'PKNAME': 'sda'}
f64942e4
AA
395 blkid = {'TYPE': 'crypto_LUKS'}
396 device_info(lsblk=lsblk, blkid=blkid)
397 disk = device.Device("/dev/sda")
398 assert disk.is_encrypted is True
399
f6b5b4d7 400 def test_mapper_is_encrypted_luks1(self, device_info, monkeypatch):
f64942e4
AA
401 status = {'type': 'LUKS1'}
402 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
403 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
404 blkid = {'TYPE': 'mapper'}
405 device_info(lsblk=lsblk, blkid=blkid)
406 disk = device.Device("/dev/mapper/uuid")
407 assert disk.is_encrypted is True
408
f6b5b4d7 409 def test_mapper_is_encrypted_luks2(self, device_info, monkeypatch):
f64942e4
AA
410 status = {'type': 'LUKS2'}
411 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
412 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
413 blkid = {'TYPE': 'mapper'}
414 device_info(lsblk=lsblk, blkid=blkid)
415 disk = device.Device("/dev/mapper/uuid")
416 assert disk.is_encrypted is True
417
f6b5b4d7 418 def test_mapper_is_encrypted_plain(self, device_info, monkeypatch):
f64942e4
AA
419 status = {'type': 'PLAIN'}
420 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
421 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
422 blkid = {'TYPE': 'mapper'}
423 device_info(lsblk=lsblk, blkid=blkid)
424 disk = device.Device("/dev/mapper/uuid")
425 assert disk.is_encrypted is True
426
f6b5b4d7 427 def test_mapper_is_not_encrypted_plain(self, device_info, monkeypatch):
f64942e4
AA
428 monkeypatch.setattr(device, 'encryption_status', lambda x: {})
429 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
430 blkid = {'TYPE': 'mapper'}
431 device_info(lsblk=lsblk, blkid=blkid)
432 disk = device.Device("/dev/mapper/uuid")
433 assert disk.is_encrypted is False
434
f6b5b4d7 435 def test_lv_is_encrypted_blkid(self, device_info):
f64942e4
AA
436 lsblk = {'TYPE': 'lvm'}
437 blkid = {'TYPE': 'crypto_LUKS'}
438 device_info(lsblk=lsblk, blkid=blkid)
439 disk = device.Device("/dev/sda")
440 disk.lv_api = {}
441 assert disk.is_encrypted is True
442
f6b5b4d7 443 def test_lv_is_not_encrypted_blkid(self, factory, device_info):
f64942e4
AA
444 lsblk = {'TYPE': 'lvm'}
445 blkid = {'TYPE': 'xfs'}
446 device_info(lsblk=lsblk, blkid=blkid)
447 disk = device.Device("/dev/sda")
448 disk.lv_api = factory(encrypted=None)
449 assert disk.is_encrypted is False
450
f6b5b4d7 451 def test_lv_is_encrypted_lsblk(self, device_info):
f64942e4
AA
452 lsblk = {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
453 blkid = {'TYPE': 'mapper'}
454 device_info(lsblk=lsblk, blkid=blkid)
455 disk = device.Device("/dev/sda")
456 disk.lv_api = {}
457 assert disk.is_encrypted is True
458
f6b5b4d7 459 def test_lv_is_not_encrypted_lsblk(self, factory, device_info):
f64942e4
AA
460 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
461 blkid = {'TYPE': 'mapper'}
462 device_info(lsblk=lsblk, blkid=blkid)
463 disk = device.Device("/dev/sda")
464 disk.lv_api = factory(encrypted=None)
465 assert disk.is_encrypted is False
466
f6b5b4d7 467 def test_lv_is_encrypted_lvm_api(self, factory, device_info):
f64942e4
AA
468 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
469 blkid = {'TYPE': 'mapper'}
470 device_info(lsblk=lsblk, blkid=blkid)
471 disk = device.Device("/dev/sda")
472 disk.lv_api = factory(encrypted=True)
473 assert disk.is_encrypted is True
474
f6b5b4d7 475 def test_lv_is_not_encrypted_lvm_api(self, factory, device_info):
f64942e4
AA
476 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
477 blkid = {'TYPE': 'mapper'}
478 device_info(lsblk=lsblk, blkid=blkid)
479 disk = device.Device("/dev/sda")
480 disk.lv_api = factory(encrypted=False)
481 assert disk.is_encrypted is False
482
91327a77
AA
483
484class TestDeviceOrdering(object):
485
486 def setup(self):
487 self.data = {
488 "/dev/sda": {"removable": 0},
489 "/dev/sdb": {"removable": 1}, # invalid
490 "/dev/sdc": {"removable": 0},
491 "/dev/sdd": {"removable": 1}, # invalid
492 }
493
494 def test_valid_before_invalid(self, device_info):
f91f0fd5
TL
495 lsblk = {"TYPE": "disk"}
496 device_info(devices=self.data,lsblk=lsblk)
91327a77
AA
497 sda = device.Device("/dev/sda")
498 sdb = device.Device("/dev/sdb")
499
500 assert sda < sdb
501 assert sdb > sda
502
503 def test_valid_alphabetical_ordering(self, device_info):
f91f0fd5
TL
504 lsblk = {"TYPE": "disk"}
505 device_info(devices=self.data,lsblk=lsblk)
91327a77
AA
506 sda = device.Device("/dev/sda")
507 sdc = device.Device("/dev/sdc")
508
509 assert sda < sdc
510 assert sdc > sda
511
512 def test_invalid_alphabetical_ordering(self, device_info):
f91f0fd5
TL
513 lsblk = {"TYPE": "disk"}
514 device_info(devices=self.data,lsblk=lsblk)
91327a77
AA
515 sdb = device.Device("/dev/sdb")
516 sdd = device.Device("/dev/sdd")
517
518 assert sdb < sdd
519 assert sdd > sdb
520
521
91327a77
AA
522class TestCephDiskDevice(object):
523
524 def test_partlabel_lsblk(self, device_info):
f91f0fd5 525 lsblk = {"TYPE": "disk", "PARTLABEL": ""}
91327a77
AA
526 device_info(lsblk=lsblk)
527 disk = device.CephDiskDevice(device.Device("/dev/sda"))
528
529 assert disk.partlabel == ''
530
531 def test_partlabel_blkid(self, device_info):
f91f0fd5
TL
532 blkid = {"TYPE": "disk", "PARTLABEL": "ceph data"}
533 device_info(blkid=blkid)
91327a77
AA
534 disk = device.CephDiskDevice(device.Device("/dev/sda"))
535
536 assert disk.partlabel == 'ceph data'
537
494da23a 538 @pytest.mark.usefixtures("blkid_ceph_disk_member",
f6b5b4d7 539 "disable_kernel_queries")
92f5a8d4 540 def test_is_member_blkid(self, monkeypatch, patch_bluestore_label):
91327a77
AA
541 disk = device.CephDiskDevice(device.Device("/dev/sda"))
542
543 assert disk.is_member is True
544
494da23a 545 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
f6b5b4d7 546 "disable_kernel_queries")
f91f0fd5
TL
547 def test_is_member_lsblk(self, patch_bluestore_label, device_info):
548 lsblk = {"TYPE": "disk", "PARTLABEL": "ceph"}
549 device_info(lsblk=lsblk)
91327a77
AA
550 disk = device.CephDiskDevice(device.Device("/dev/sda"))
551
552 assert disk.is_member is True
553
554 def test_unknown_type(self, device_info):
f91f0fd5 555 lsblk = {"TYPE": "disk", "PARTLABEL": "gluster"}
91327a77
AA
556 device_info(lsblk=lsblk)
557 disk = device.CephDiskDevice(device.Device("/dev/sda"))
558
559 assert disk.type == 'unknown'
560
494da23a
TL
561 ceph_types = ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
562
563 @pytest.mark.usefixtures("blkid_ceph_disk_member",
f6b5b4d7 564 "disable_kernel_queries")
494da23a 565 def test_type_blkid(self, monkeypatch, device_info, ceph_partlabel):
91327a77
AA
566 disk = device.CephDiskDevice(device.Device("/dev/sda"))
567
494da23a 568 assert disk.type in self.ceph_types
91327a77 569
494da23a
TL
570 @pytest.mark.usefixtures("blkid_ceph_disk_member",
571 "lsblk_ceph_disk_member",
f6b5b4d7 572 "disable_kernel_queries")
494da23a 573 def test_type_lsblk(self, device_info, ceph_partlabel):
91327a77
AA
574 disk = device.CephDiskDevice(device.Device("/dev/sda"))
575
494da23a 576 assert disk.type in self.ceph_types