]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / util / test_device.py
CommitLineData
91327a77 1import pytest
1adf2230
AA
2from ceph_volume.util import device
3from ceph_volume.api import lvm as api
4
5
6class TestDevice(object):
7
8 def test_sys_api(self, device_info):
9 data = {"/dev/sda": {"foo": "bar"}}
10 device_info(devices=data)
11 disk = device.Device("/dev/sda")
12 assert disk.sys_api
13 assert "foo" in disk.sys_api
14
11fdf7f2
TL
15 def test_lvm_size(self, device_info):
16 # 5GB in size
17 data = {"/dev/sda": {"size": "5368709120"}}
18 device_info(devices=data)
19 disk = device.Device("/dev/sda")
20 assert disk.lvm_size.gb == 4
21
22 def test_lvm_size_rounds_down(self, device_info):
23 # 5.5GB in size
24 data = {"/dev/sda": {"size": "5905580032"}}
25 device_info(devices=data)
26 disk = device.Device("/dev/sda")
27 assert disk.lvm_size.gb == 4
28
1adf2230 29 def test_is_lv(self, device_info):
91327a77 30 data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
1adf2230
AA
31 device_info(lv=data)
32 disk = device.Device("vg/lv")
33 assert disk.is_lv
34
f64942e4
AA
35 def test_vgs_is_empty(self, device_info, pvolumes, monkeypatch):
36 BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", pv_tags={})
37 pvolumes.append(BarPVolume)
38 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
39 lsblk = {"TYPE": "disk"}
40 device_info(lsblk=lsblk)
41 disk = device.Device("/dev/nvme0n1")
42 assert disk.vgs == []
43
44 def test_vgs_is_not_empty(self, device_info, pvolumes, monkeypatch):
45 BarPVolume = api.PVolume(vg_name='foo', lv_uuid='111', pv_name='/dev/nvme0n1', pv_uuid="0000", pv_tags={})
46 pvolumes.append(BarPVolume)
47 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
48 lsblk = {"TYPE": "disk"}
49 device_info(lsblk=lsblk)
50 disk = device.Device("/dev/nvme0n1")
51 assert len(disk.vgs) == 1
52
53 def test_device_is_device(self, device_info, pvolumes):
1adf2230
AA
54 data = {"/dev/sda": {"foo": "bar"}}
55 lsblk = {"TYPE": "device"}
56 device_info(devices=data, lsblk=lsblk)
57 disk = device.Device("/dev/sda")
f64942e4
AA
58 assert disk.is_device is True
59
81eedcae
TL
60 def test_device_is_rotational(self, device_info, pvolumes):
61 data = {"/dev/sda": {"rotational": "1"}}
62 lsblk = {"TYPE": "device"}
63 device_info(devices=data, lsblk=lsblk)
64 disk = device.Device("/dev/sda")
65 assert disk.rotational
66
67 def test_device_is_not_rotational(self, device_info, pvolumes):
68 data = {"/dev/sda": {"rotational": "0"}}
69 lsblk = {"TYPE": "device"}
70 device_info(devices=data, lsblk=lsblk)
71 disk = device.Device("/dev/sda")
72 assert not disk.rotational
73
74 def test_device_is_rotational_lsblk(self, device_info, pvolumes):
75 data = {"/dev/sda": {"foo": "bar"}}
76 lsblk = {"TYPE": "device", "ROTA": "1"}
77 device_info(devices=data, lsblk=lsblk)
78 disk = device.Device("/dev/sda")
79 assert disk.rotational
80
81 def test_device_is_not_rotational_lsblk(self, device_info, pvolumes):
82 data = {"/dev/sda": {"rotational": "0"}}
83 lsblk = {"TYPE": "device", "ROTA": "0"}
84 device_info(devices=data, lsblk=lsblk)
85 disk = device.Device("/dev/sda")
86 assert not disk.rotational
87
88 def test_device_is_rotational_defaults_true(self, device_info, pvolumes):
89 # rotational will default true if no info from sys_api or lsblk is found
90 data = {"/dev/sda": {"foo": "bar"}}
91 lsblk = {"TYPE": "device", "foo": "bar"}
92 device_info(devices=data, lsblk=lsblk)
93 disk = device.Device("/dev/sda")
94 assert disk.rotational
95
f64942e4
AA
96 def test_disk_is_device(self, device_info, pvolumes):
97 data = {"/dev/sda": {"foo": "bar"}}
98 lsblk = {"TYPE": "disk"}
99 device_info(devices=data, lsblk=lsblk)
100 disk = device.Device("/dev/sda")
101 assert disk.is_device is True
1adf2230
AA
102
103 def test_is_partition(self, device_info, pvolumes):
104 data = {"/dev/sda": {"foo": "bar"}}
105 lsblk = {"TYPE": "part"}
106 device_info(devices=data, lsblk=lsblk)
107 disk = device.Device("/dev/sda")
108 assert disk.is_partition
109
110 def test_is_not_lvm_memeber(self, device_info, pvolumes):
111 data = {"/dev/sda": {"foo": "bar"}}
112 lsblk = {"TYPE": "part"}
113 device_info(devices=data, lsblk=lsblk)
114 disk = device.Device("/dev/sda")
115 assert not disk.is_lvm_member
116
117 def test_is_lvm_memeber(self, device_info, pvolumes):
118 data = {"/dev/sda": {"foo": "bar"}}
119 lsblk = {"TYPE": "part"}
120 device_info(devices=data, lsblk=lsblk)
121 disk = device.Device("/dev/sda")
122 assert not disk.is_lvm_member
123
124 def test_is_mapper_device(self, device_info):
125 device_info()
126 disk = device.Device("/dev/mapper/foo")
127 assert disk.is_mapper
128
f64942e4
AA
129 def test_dm_is_mapper_device(self, device_info):
130 device_info()
131 disk = device.Device("/dev/dm-4")
132 assert disk.is_mapper
133
1adf2230
AA
134 def test_is_not_mapper_device(self, device_info):
135 device_info()
136 disk = device.Device("/dev/sda")
137 assert not disk.is_mapper
138
91327a77
AA
139 def test_is_ceph_disk_member_lsblk(self, device_info):
140 lsblk = {"PARTLABEL": "ceph data"}
141 device_info(lsblk=lsblk)
142 disk = device.Device("/dev/sda")
143 assert disk.is_ceph_disk_member
144
f64942e4
AA
145 def test_is_ceph_disk_member_not_available(self, device_info):
146 lsblk = {"PARTLABEL": "ceph data"}
147 device_info(lsblk=lsblk)
148 disk = device.Device("/dev/sda")
149 assert disk.is_ceph_disk_member
150 assert not disk.available
151 assert "Used by ceph-disk" in disk.rejected_reasons
152
91327a77
AA
153 def test_is_not_ceph_disk_member_lsblk(self, device_info):
154 lsblk = {"PARTLABEL": "gluster partition"}
155 device_info(lsblk=lsblk)
156 disk = device.Device("/dev/sda")
157 assert disk.is_ceph_disk_member is False
158
159 def test_is_ceph_disk_member_blkid(self, device_info):
160 # falls back to blkid
161 lsblk = {"PARTLABEL": ""}
162 blkid = {"PARTLABEL": "ceph data"}
163 device_info(lsblk=lsblk, blkid=blkid)
164 disk = device.Device("/dev/sda")
165 assert disk.is_ceph_disk_member
166
167 def test_is_not_ceph_disk_member_blkid(self, device_info):
168 # falls back to blkid
169 lsblk = {"PARTLABEL": ""}
170 blkid = {"PARTLABEL": "gluster partition"}
171 device_info(lsblk=lsblk, blkid=blkid)
172 disk = device.Device("/dev/sda")
173 assert disk.is_ceph_disk_member is False
174
1adf2230 175 def test_pv_api(self, device_info, pvolumes, monkeypatch):
91327a77 176 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
1adf2230
AA
177 pvolumes.append(FooPVolume)
178 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
179 data = {"/dev/sda": {"foo": "bar"}}
180 lsblk = {"TYPE": "part"}
181 device_info(devices=data, lsblk=lsblk)
182 disk = device.Device("/dev/sda")
183 assert disk.pvs_api
91327a77
AA
184
185 @pytest.mark.parametrize("ceph_type", ["data", "block"])
186 def test_used_by_ceph(self, device_info, pvolumes, monkeypatch, ceph_type):
187 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
188 pvolumes.append(FooPVolume)
189 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
190 data = {"/dev/sda": {"foo": "bar"}}
191 lsblk = {"TYPE": "part"}
192 lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type}}
193 device_info(devices=data, lsblk=lsblk, lv=lv_data)
194 disk = device.Device("/dev/sda")
195 assert disk.used_by_ceph
196
197 def test_not_used_by_ceph(self, device_info, pvolumes, monkeypatch):
198 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
199 pvolumes.append(FooPVolume)
200 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
201 data = {"/dev/sda": {"foo": "bar"}}
202 lsblk = {"TYPE": "part"}
203 lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
204 device_info(devices=data, lsblk=lsblk, lv=lv_data)
205 disk = device.Device("/dev/sda")
206 assert not disk.used_by_ceph
207
f64942e4
AA
208 def test_get_device_id(self, device_info):
209 udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
210 device_info(udevadm=udev)
211 disk = device.Device("/dev/sda")
212 assert disk._get_device_id() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
213
214
215
216class TestDeviceEncryption(object):
217
218 def test_partition_is_not_encrypted_lsblk(self, device_info, pvolumes):
219 lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs'}
220 device_info(lsblk=lsblk)
221 disk = device.Device("/dev/sda")
222 assert disk.is_encrypted is False
223
224 def test_partition_is_encrypted_lsblk(self, device_info, pvolumes):
225 lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS'}
226 device_info(lsblk=lsblk)
227 disk = device.Device("/dev/sda")
228 assert disk.is_encrypted is True
229
230 def test_partition_is_not_encrypted_blkid(self, device_info, pvolumes):
231 lsblk = {'TYPE': 'part'}
232 blkid = {'TYPE': 'ceph data'}
233 device_info(lsblk=lsblk, blkid=blkid)
234 disk = device.Device("/dev/sda")
235 assert disk.is_encrypted is False
236
237 def test_partition_is_encrypted_blkid(self, device_info, pvolumes):
238 lsblk = {'TYPE': 'part'}
239 blkid = {'TYPE': 'crypto_LUKS'}
240 device_info(lsblk=lsblk, blkid=blkid)
241 disk = device.Device("/dev/sda")
242 assert disk.is_encrypted is True
243
244 def test_mapper_is_encrypted_luks1(self, device_info, pvolumes, monkeypatch):
245 status = {'type': 'LUKS1'}
246 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
247 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
248 blkid = {'TYPE': 'mapper'}
249 device_info(lsblk=lsblk, blkid=blkid)
250 disk = device.Device("/dev/mapper/uuid")
251 assert disk.is_encrypted is True
252
253 def test_mapper_is_encrypted_luks2(self, device_info, pvolumes, monkeypatch):
254 status = {'type': 'LUKS2'}
255 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
256 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
257 blkid = {'TYPE': 'mapper'}
258 device_info(lsblk=lsblk, blkid=blkid)
259 disk = device.Device("/dev/mapper/uuid")
260 assert disk.is_encrypted is True
261
262 def test_mapper_is_encrypted_plain(self, device_info, pvolumes, monkeypatch):
263 status = {'type': 'PLAIN'}
264 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
265 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
266 blkid = {'TYPE': 'mapper'}
267 device_info(lsblk=lsblk, blkid=blkid)
268 disk = device.Device("/dev/mapper/uuid")
269 assert disk.is_encrypted is True
270
271 def test_mapper_is_not_encrypted_plain(self, device_info, pvolumes, monkeypatch):
272 monkeypatch.setattr(device, 'encryption_status', lambda x: {})
273 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
274 blkid = {'TYPE': 'mapper'}
275 device_info(lsblk=lsblk, blkid=blkid)
276 disk = device.Device("/dev/mapper/uuid")
277 assert disk.is_encrypted is False
278
279 def test_lv_is_encrypted_blkid(self, device_info, pvolumes):
280 lsblk = {'TYPE': 'lvm'}
281 blkid = {'TYPE': 'crypto_LUKS'}
282 device_info(lsblk=lsblk, blkid=blkid)
283 disk = device.Device("/dev/sda")
284 disk.lv_api = {}
285 assert disk.is_encrypted is True
286
287 def test_lv_is_not_encrypted_blkid(self, factory, device_info, pvolumes):
288 lsblk = {'TYPE': 'lvm'}
289 blkid = {'TYPE': 'xfs'}
290 device_info(lsblk=lsblk, blkid=blkid)
291 disk = device.Device("/dev/sda")
292 disk.lv_api = factory(encrypted=None)
293 assert disk.is_encrypted is False
294
295 def test_lv_is_encrypted_lsblk(self, device_info, pvolumes):
296 lsblk = {'FSTYPE': 'crypto_LUKS', 'TYPE': 'lvm'}
297 blkid = {'TYPE': 'mapper'}
298 device_info(lsblk=lsblk, blkid=blkid)
299 disk = device.Device("/dev/sda")
300 disk.lv_api = {}
301 assert disk.is_encrypted is True
302
303 def test_lv_is_not_encrypted_lsblk(self, factory, device_info, pvolumes):
304 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
305 blkid = {'TYPE': 'mapper'}
306 device_info(lsblk=lsblk, blkid=blkid)
307 disk = device.Device("/dev/sda")
308 disk.lv_api = factory(encrypted=None)
309 assert disk.is_encrypted is False
310
311 def test_lv_is_encrypted_lvm_api(self, factory, device_info, pvolumes):
312 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
313 blkid = {'TYPE': 'mapper'}
314 device_info(lsblk=lsblk, blkid=blkid)
315 disk = device.Device("/dev/sda")
316 disk.lv_api = factory(encrypted=True)
317 assert disk.is_encrypted is True
318
319 def test_lv_is_not_encrypted_lvm_api(self, factory, device_info, pvolumes):
320 lsblk = {'FSTYPE': 'xfs', 'TYPE': 'lvm'}
321 blkid = {'TYPE': 'mapper'}
322 device_info(lsblk=lsblk, blkid=blkid)
323 disk = device.Device("/dev/sda")
324 disk.lv_api = factory(encrypted=False)
325 assert disk.is_encrypted is False
326
91327a77
AA
327
328class TestDeviceOrdering(object):
329
330 def setup(self):
331 self.data = {
332 "/dev/sda": {"removable": 0},
333 "/dev/sdb": {"removable": 1}, # invalid
334 "/dev/sdc": {"removable": 0},
335 "/dev/sdd": {"removable": 1}, # invalid
336 }
337
338 def test_valid_before_invalid(self, device_info):
339 device_info(devices=self.data)
340 sda = device.Device("/dev/sda")
341 sdb = device.Device("/dev/sdb")
342
343 assert sda < sdb
344 assert sdb > sda
345
346 def test_valid_alphabetical_ordering(self, device_info):
347 device_info(devices=self.data)
348 sda = device.Device("/dev/sda")
349 sdc = device.Device("/dev/sdc")
350
351 assert sda < sdc
352 assert sdc > sda
353
354 def test_invalid_alphabetical_ordering(self, device_info):
355 device_info(devices=self.data)
356 sdb = device.Device("/dev/sdb")
357 sdd = device.Device("/dev/sdd")
358
359 assert sdb < sdd
360 assert sdd > sdb
361
362
363ceph_partlabels = [
364 'ceph data', 'ceph journal', 'ceph block',
365 'ceph block.wal', 'ceph block.db', 'ceph lockbox'
366]
367
368
369class TestCephDiskDevice(object):
370
371 def test_partlabel_lsblk(self, device_info):
372 lsblk = {"PARTLABEL": ""}
373 device_info(lsblk=lsblk)
374 disk = device.CephDiskDevice(device.Device("/dev/sda"))
375
376 assert disk.partlabel == ''
377
378 def test_partlabel_blkid(self, device_info):
379 lsblk = {"PARTLABEL": ""}
380 blkid = {"PARTLABEL": "ceph data"}
381 device_info(lsblk=lsblk, blkid=blkid)
382 disk = device.CephDiskDevice(device.Device("/dev/sda"))
383
384 assert disk.partlabel == 'ceph data'
385
386 @pytest.mark.parametrize("label", ceph_partlabels)
387 def test_is_member_blkid(self, device_info, label):
388 lsblk = {"PARTLABEL": ""}
389 blkid = {"PARTLABEL": label}
390 device_info(lsblk=lsblk, blkid=blkid)
391 disk = device.CephDiskDevice(device.Device("/dev/sda"))
392
393 assert disk.is_member is True
394
395 def test_reject_removable_device(self, device_info):
396 data = {"/dev/sdb": {"removable": 1}}
397 device_info(devices=data)
398 disk = device.Device("/dev/sdb")
399 assert not disk.available
400
401 def test_accept_non_removable_device(self, device_info):
402 data = {"/dev/sdb": {"removable": 0}}
403 device_info(devices=data)
404 disk = device.Device("/dev/sdb")
405 assert disk.available
406
407 def test_reject_readonly_device(self, device_info):
408 data = {"/dev/cdrom": {"ro": 1}}
409 device_info(devices=data)
410 disk = device.Device("/dev/cdrom")
411 assert not disk.available
412
413 def test_accept_non_readonly_device(self, device_info):
414 data = {"/dev/sda": {"ro": 0}}
415 device_info(devices=data)
416 disk = device.Device("/dev/sda")
417 assert disk.available
418
419 @pytest.mark.parametrize("label", ceph_partlabels)
420 def test_is_member_lsblk(self, device_info, label):
421 lsblk = {"PARTLABEL": label}
422 device_info(lsblk=lsblk)
423 disk = device.CephDiskDevice(device.Device("/dev/sda"))
424
425 assert disk.is_member is True
426
427 def test_unknown_type(self, device_info):
428 lsblk = {"PARTLABEL": "gluster"}
429 device_info(lsblk=lsblk)
430 disk = device.CephDiskDevice(device.Device("/dev/sda"))
431
432 assert disk.type == 'unknown'
433
434 @pytest.mark.parametrize("label", ceph_partlabels)
435 def test_type_blkid(self, device_info, label):
436 expected = label.split()[-1].split('.')[-1]
437 lsblk = {"PARTLABEL": ""}
438 blkid = {"PARTLABEL": label}
439 device_info(lsblk=lsblk, blkid=blkid)
440 disk = device.CephDiskDevice(device.Device("/dev/sda"))
441
442 assert disk.type == expected
443
444 @pytest.mark.parametrize("label", ceph_partlabels)
445 def test_type_lsblk(self, device_info, label):
446 expected = label.split()[-1].split('.')[-1]
447 lsblk = {"PARTLABEL": label}
448 blkid = {"PARTLABEL": ''}
449 device_info(lsblk=lsblk, blkid=blkid)
450 disk = device.CephDiskDevice(device.Device("/dev/sda"))
451
452 assert disk.type == expected