]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / util / test_device.py
1 import os
2 import pytest
3 from copy import deepcopy
4 from ceph_volume.util import device
5 from ceph_volume.api import lvm as api
6 from mock.mock import patch, mock_open
7
8
9 class TestDevice(object):
10
11 def test_sys_api(self, monkeypatch, device_info):
12 volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
13 lv_tags={}, lv_path='/dev/VolGroup/lv')
14 volumes = []
15 volumes.append(volume)
16 monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
17 deepcopy(volumes))
18
19 data = {"/dev/sda": {"foo": "bar"}}
20 lsblk = {"TYPE": "disk", "NAME": "sda"}
21 device_info(devices=data, lsblk=lsblk)
22 disk = device.Device("/dev/sda")
23 assert disk.sys_api
24 assert "foo" in disk.sys_api
25
26 def test_lvm_size(self, monkeypatch, device_info):
27 volume = api.Volume(lv_name='lv', lv_uuid='y', vg_name='vg',
28 lv_tags={}, lv_path='/dev/VolGroup/lv')
29 volumes = []
30 volumes.append(volume)
31 monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
32 deepcopy(volumes))
33
34 # 5GB in size
35 data = {"/dev/sda": {"size": "5368709120"}}
36 lsblk = {"TYPE": "disk", "NAME": "sda"}
37 device_info(devices=data,lsblk=lsblk)
38 disk = device.Device("/dev/sda")
39 assert disk.lvm_size.gb == 4
40
41 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
42 def test_lvm_size_rounds_down(self, fake_call, device_info):
43 # 5.5GB in size
44 data = {"/dev/sda": {"size": "5905580032"}}
45 lsblk = {"TYPE": "disk", "NAME": "sda"}
46 device_info(devices=data,lsblk=lsblk)
47 disk = device.Device("/dev/sda")
48 assert disk.lvm_size.gb == 4
49
50 def test_is_lv(self, fake_call, device_info):
51 data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
52 lsblk = {"TYPE": "lvm", "NAME": "vg-lv"}
53 device_info(lv=data,lsblk=lsblk)
54 disk = device.Device("vg/lv")
55 assert disk.is_lv
56
57 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
58 def test_vgs_is_empty(self, fake_call, device_info, monkeypatch):
59 BarPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
60 pv_tags={})
61 pvolumes = []
62 pvolumes.append(BarPVolume)
63 lsblk = {"TYPE": "disk", "NAME": "sda"}
64 device_info(lsblk=lsblk)
65 monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: {})
66
67 disk = device.Device("/dev/nvme0n1")
68 assert disk.vgs == []
69
70 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
71 def test_vgs_is_not_empty(self, fake_call, device_info, monkeypatch):
72 vg = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=6,
73 vg_extent_size=1073741824)
74 monkeypatch.setattr(api, 'get_all_devices_vgs', lambda : [vg])
75 lsblk = {"TYPE": "disk", "NAME": "nvme0n1"}
76 device_info(lsblk=lsblk)
77 disk = device.Device("/dev/nvme0n1")
78 assert len(disk.vgs) == 1
79
80 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
81 def test_device_is_device(self, fake_call, device_info):
82 data = {"/dev/sda": {"foo": "bar"}}
83 lsblk = {"TYPE": "device", "NAME": "sda"}
84 device_info(devices=data, lsblk=lsblk)
85 disk = device.Device("/dev/sda")
86 assert disk.is_device is True
87
88 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
89 def test_loop_device_is_not_device(self, fake_call, device_info):
90 data = {"/dev/loop0": {"foo": "bar"}}
91 lsblk = {"TYPE": "loop"}
92 device_info(devices=data, lsblk=lsblk)
93 disk = device.Device("/dev/loop0")
94 assert disk.is_device is False
95
96 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
97 def test_loop_device_is_device(self, fake_call, device_info):
98 data = {"/dev/loop0": {"foo": "bar"}}
99 lsblk = {"TYPE": "loop"}
100 os.environ["CEPH_VOLUME_ALLOW_LOOP_DEVICES"] = "1"
101 device_info(devices=data, lsblk=lsblk)
102 disk = device.Device("/dev/loop0")
103 assert disk.is_device is True
104 del os.environ["CEPH_VOLUME_ALLOW_LOOP_DEVICES"]
105
106 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
107 def test_device_is_rotational(self, fake_call, device_info):
108 data = {"/dev/sda": {"rotational": "1"}}
109 lsblk = {"TYPE": "device", "NAME": "sda"}
110 device_info(devices=data, lsblk=lsblk)
111 disk = device.Device("/dev/sda")
112 assert disk.rotational
113
114 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
115 def test_device_is_not_rotational(self, fake_call, device_info):
116 data = {"/dev/sda": {"rotational": "0"}}
117 lsblk = {"TYPE": "device", "NAME": "sda"}
118 device_info(devices=data, lsblk=lsblk)
119 disk = device.Device("/dev/sda")
120 assert not disk.rotational
121
122 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
123 def test_device_is_rotational_lsblk(self, fake_call, device_info):
124 data = {"/dev/sda": {"foo": "bar"}}
125 lsblk = {"TYPE": "device", "ROTA": "1", "NAME": "sda"}
126 device_info(devices=data, lsblk=lsblk)
127 disk = device.Device("/dev/sda")
128 assert disk.rotational
129
130 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
131 def test_device_is_not_rotational_lsblk(self, fake_call, device_info):
132 data = {"/dev/sda": {"rotational": "0"}}
133 lsblk = {"TYPE": "device", "ROTA": "0", "NAME": "sda"}
134 device_info(devices=data, lsblk=lsblk)
135 disk = device.Device("/dev/sda")
136 assert not disk.rotational
137
138 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
139 def test_device_is_rotational_defaults_true(self, fake_call, device_info):
140 # rotational will default true if no info from sys_api or lsblk is found
141 data = {"/dev/sda": {"foo": "bar"}}
142 lsblk = {"TYPE": "device", "foo": "bar", "NAME": "sda"}
143 device_info(devices=data, lsblk=lsblk)
144 disk = device.Device("/dev/sda")
145 assert disk.rotational
146
147 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
148 def test_disk_is_device(self, fake_call, device_info):
149 data = {"/dev/sda": {"foo": "bar"}}
150 lsblk = {"TYPE": "disk", "NAME": "sda"}
151 device_info(devices=data, lsblk=lsblk)
152 disk = device.Device("/dev/sda")
153 assert disk.is_device is True
154
155 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
156 def test_is_partition(self, fake_call, device_info):
157 data = {"/dev/sda1": {"foo": "bar"}}
158 lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
159 device_info(devices=data, lsblk=lsblk)
160 disk = device.Device("/dev/sda1")
161 assert disk.is_partition
162
163 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
164 def test_mpath_device_is_device(self, fake_call, device_info):
165 data = {"/dev/foo": {"foo": "bar"}}
166 lsblk = {"TYPE": "mpath", "NAME": "foo"}
167 device_info(devices=data, lsblk=lsblk)
168 disk = device.Device("/dev/foo")
169 assert disk.is_device is True
170
171 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
172 def test_is_not_lvm_member(self, fake_call, device_info):
173 data = {"/dev/sda1": {"foo": "bar"}}
174 lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
175 device_info(devices=data, lsblk=lsblk)
176 disk = device.Device("/dev/sda1")
177 assert not disk.is_lvm_member
178
179 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
180 def test_is_lvm_member(self, fake_call, device_info):
181 data = {"/dev/sda1": {"foo": "bar"}}
182 lsblk = {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
183 device_info(devices=data, lsblk=lsblk)
184 disk = device.Device("/dev/sda1")
185 assert not disk.is_lvm_member
186
187 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
188 def test_is_mapper_device(self, fake_call, device_info):
189 lsblk = {"TYPE": "lvm", "NAME": "foo"}
190 device_info(lsblk=lsblk)
191 disk = device.Device("/dev/mapper/foo")
192 assert disk.is_mapper
193
194 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
195 def test_dm_is_mapper_device(self, fake_call, device_info):
196 lsblk = {"TYPE": "lvm", "NAME": "dm-4"}
197 device_info(lsblk=lsblk)
198 disk = device.Device("/dev/dm-4")
199 assert disk.is_mapper
200
201 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
202 def test_is_not_mapper_device(self, fake_call, device_info):
203 lsblk = {"TYPE": "disk", "NAME": "sda"}
204 device_info(lsblk=lsblk)
205 disk = device.Device("/dev/sda")
206 assert not disk.is_mapper
207
208 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
209 "disable_kernel_queries")
210 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
211 def test_is_ceph_disk_lsblk(self, fake_call, monkeypatch, patch_bluestore_label):
212 disk = device.Device("/dev/sda")
213 assert disk.is_ceph_disk_member
214
215 @pytest.mark.usefixtures("blkid_ceph_disk_member",
216 "lsblk_ceph_disk_member",
217 "disable_kernel_queries")
218 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
219 def test_is_ceph_disk_blkid(self, fake_call, monkeypatch, patch_bluestore_label):
220 disk = device.Device("/dev/sda")
221 assert disk.is_ceph_disk_member
222
223 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
224 "disable_kernel_queries")
225 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
226 def test_is_ceph_disk_member_not_available_lsblk(self, fake_call, monkeypatch, patch_bluestore_label):
227 disk = device.Device("/dev/sda")
228 assert disk.is_ceph_disk_member
229 assert not disk.available
230 assert "Used by ceph-disk" in disk.rejected_reasons
231
232 @pytest.mark.usefixtures("blkid_ceph_disk_member",
233 "lsblk_ceph_disk_member",
234 "disable_kernel_queries")
235 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
236 def test_is_ceph_disk_member_not_available_blkid(self, fake_call, monkeypatch, patch_bluestore_label):
237 disk = device.Device("/dev/sda")
238 assert disk.is_ceph_disk_member
239 assert not disk.available
240 assert "Used by ceph-disk" in disk.rejected_reasons
241
242 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
243 def test_reject_removable_device(self, fake_call, device_info):
244 data = {"/dev/sdb": {"removable": 1}}
245 lsblk = {"TYPE": "disk", "NAME": "sdb"}
246 device_info(devices=data,lsblk=lsblk)
247 disk = device.Device("/dev/sdb")
248 assert not disk.available
249
250 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
251 def test_reject_device_with_gpt_headers(self, fake_call, device_info):
252 data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
253 lsblk = {"TYPE": "disk", "NAME": "sdb"}
254 blkid= {"PTTYPE": "gpt"}
255 device_info(
256 devices=data,
257 blkid=blkid,
258 lsblk=lsblk,
259 )
260 disk = device.Device("/dev/sdb")
261 assert not disk.available
262
263 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
264 def test_accept_non_removable_device(self, fake_call, device_info):
265 data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
266 lsblk = {"TYPE": "disk", "NAME": "sdb"}
267 device_info(devices=data,lsblk=lsblk)
268 disk = device.Device("/dev/sdb")
269 assert disk.available
270
271 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
272 def test_reject_not_acceptable_device(self, fake_call, device_info):
273 data = {"/dev/dm-0": {"foo": "bar"}}
274 lsblk = {"TYPE": "mpath", "NAME": "dm-0"}
275 device_info(devices=data, lsblk=lsblk)
276 disk = device.Device("/dev/dm-0")
277 assert not disk.available
278
279 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
280 @patch('ceph_volume.util.device.os.path.realpath')
281 @patch('ceph_volume.util.device.os.path.islink')
282 def test_accept_symlink_to_device(self,
283 m_os_path_islink,
284 m_os_path_realpath,
285 device_info,
286 fake_call):
287 m_os_path_islink.return_value = True
288 m_os_path_realpath.return_value = '/dev/sdb'
289 data = {"/dev/sdb": {"ro": 0, "size": 5368709120}}
290 lsblk = {"TYPE": "disk"}
291 device_info(devices=data,lsblk=lsblk)
292 disk = device.Device("/dev/test_symlink")
293 print(disk)
294 print(disk.sys_api)
295 assert disk.available
296
297 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
298 @patch('ceph_volume.util.device.os.readlink')
299 @patch('ceph_volume.util.device.os.path.islink')
300 def test_reject_symlink_to_device_mapper(self,
301 m_os_path_islink,
302 m_os_readlink,
303 device_info,
304 fake_call):
305 m_os_path_islink.return_value = True
306 m_os_readlink.return_value = '/dev/dm-0'
307 data = {"/dev/mapper/mpatha": {"ro": 0, "size": 5368709120}}
308 lsblk = {"TYPE": "disk"}
309 device_info(devices=data,lsblk=lsblk)
310 disk = device.Device("/dev/mapper/mpatha")
311 assert disk.available
312
313 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
314 def test_reject_readonly_device(self, fake_call, device_info):
315 data = {"/dev/cdrom": {"ro": 1}}
316 lsblk = {"TYPE": "disk", "NAME": "cdrom"}
317 device_info(devices=data,lsblk=lsblk)
318 disk = device.Device("/dev/cdrom")
319 assert not disk.available
320
321 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
322 def test_reject_smaller_than_5gb(self, fake_call, device_info):
323 data = {"/dev/sda": {"size": 5368709119}}
324 lsblk = {"TYPE": "disk", "NAME": "sda"}
325 device_info(devices=data,lsblk=lsblk)
326 disk = device.Device("/dev/sda")
327 assert not disk.available, 'too small device is available'
328
329 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
330 def test_accept_non_readonly_device(self, fake_call, device_info):
331 data = {"/dev/sda": {"ro": 0, "size": 5368709120}}
332 lsblk = {"TYPE": "disk", "NAME": "sda"}
333 device_info(devices=data,lsblk=lsblk)
334 disk = device.Device("/dev/sda")
335 assert disk.available
336
337 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
338 def test_reject_bluestore_device(self, fake_call, monkeypatch, patch_bluestore_label, device_info):
339 patch_bluestore_label.return_value = True
340 lsblk = {"TYPE": "disk", "NAME": "sda"}
341 device_info(lsblk=lsblk)
342 disk = device.Device("/dev/sda")
343 assert not disk.available
344 assert "Has BlueStore device label" in disk.rejected_reasons
345
346 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
347 def test_reject_device_with_oserror(self, fake_call, monkeypatch, patch_bluestore_label, device_info):
348 patch_bluestore_label.side_effect = OSError('test failure')
349 lsblk = {"TYPE": "disk", "NAME": "sda"}
350 device_info(lsblk=lsblk)
351 disk = device.Device("/dev/sda")
352 assert not disk.available
353 assert "Failed to determine if device is BlueStore" in disk.rejected_reasons
354
355 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
356 "device_info_not_ceph_disk_member",
357 "disable_kernel_queries")
358 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
359 def test_is_not_ceph_disk_member_lsblk(self, fake_call, patch_bluestore_label):
360 disk = device.Device("/dev/sda")
361 assert disk.is_ceph_disk_member is False
362
363 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
364 def test_existing_vg_available(self, fake_call, monkeypatch, device_info):
365 vg = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=1536,
366 vg_extent_size=4194304)
367 monkeypatch.setattr(api, 'get_all_devices_vgs', lambda : [vg])
368 lsblk = {"TYPE": "disk", "NAME": "nvme0n1"}
369 data = {"/dev/nvme0n1": {"size": "6442450944"}}
370 lv = {"tags": {"ceph.osd_id": "1"}}
371 device_info(devices=data, lsblk=lsblk, lv=lv)
372 disk = device.Device("/dev/nvme0n1")
373 assert disk.available_lvm
374 assert not disk.available
375 assert not disk.available_raw
376
377 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
378 def test_existing_vg_too_small(self, fake_call, monkeypatch, device_info):
379 vg = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=4,
380 vg_extent_size=1073741824)
381 monkeypatch.setattr(api, 'get_all_devices_vgs', lambda : [vg])
382 lsblk = {"TYPE": "disk", "NAME": "nvme0n1"}
383 data = {"/dev/nvme0n1": {"size": "6442450944"}}
384 lv = {"tags": {"ceph.osd_id": "1"}}
385 device_info(devices=data, lsblk=lsblk, lv=lv)
386 disk = device.Device("/dev/nvme0n1")
387 assert not disk.available_lvm
388 assert not disk.available
389 assert not disk.available_raw
390
391 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
392 def test_multiple_existing_vgs(self, fake_call, monkeypatch, device_info):
393 vg1 = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=1000,
394 vg_extent_size=4194304)
395 vg2 = api.VolumeGroup(pv_name='/dev/nvme0n1', vg_name='foo/bar', vg_free_count=536,
396 vg_extent_size=4194304)
397 monkeypatch.setattr(api, 'get_all_devices_vgs', lambda : [vg1, vg2])
398 lsblk = {"TYPE": "disk", "NAME": "nvme0n1"}
399 data = {"/dev/nvme0n1": {"size": "6442450944"}}
400 lv = {"tags": {"ceph.osd_id": "1"}}
401 device_info(devices=data, lsblk=lsblk, lv=lv)
402 disk = device.Device("/dev/nvme0n1")
403 assert disk.available_lvm
404 assert not disk.available
405 assert not disk.available_raw
406
407 @pytest.mark.parametrize("ceph_type", ["data", "block"])
408 def test_used_by_ceph(self, fake_call, device_info,
409 monkeypatch, ceph_type):
410 data = {"/dev/sda": {"foo": "bar"}}
411 lsblk = {"TYPE": "part", "NAME": "sda", "PKNAME": "sda"}
412 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000",
413 lv_uuid="0000", pv_tags={}, vg_name="vg")
414 pvolumes = []
415 pvolumes.append(FooPVolume)
416 lv_data = {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg",
417 "lv_uuid": "0000", "lv_tags":
418 "ceph.osd_id=0,ceph.type="+ceph_type}
419 volumes = []
420 lv = api.Volume(**lv_data)
421 volumes.append(lv)
422 monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
423 monkeypatch.setattr(api, 'get_lvs', lambda **kwargs:
424 deepcopy(volumes))
425
426 device_info(devices=data, lsblk=lsblk, lv=lv_data)
427 vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
428 vg_extent_size=1073741824)
429 monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
430 disk = device.Device("/dev/sda")
431 assert disk.used_by_ceph
432
433 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
434 def test_not_used_by_ceph(self, fake_call, device_info, monkeypatch):
435 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
436 pvolumes = []
437 pvolumes.append(FooPVolume)
438 data = {"/dev/sda": {"foo": "bar"}}
439 lsblk = {"TYPE": "part", "NAME": "sda", "PKNAME": "sda"}
440 lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
441 monkeypatch.setattr(api, 'get_pvs', lambda **kwargs: pvolumes)
442
443 device_info(devices=data, lsblk=lsblk, lv=lv_data)
444 disk = device.Device("/dev/sda")
445 assert not disk.used_by_ceph
446
447 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
448 def test_get_device_id(self, fake_call, device_info):
449 udev = {k:k for k in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
450 lsblk = {"TYPE": "disk", "NAME": "sda"}
451 device_info(udevadm=udev,lsblk=lsblk)
452 disk = device.Device("/dev/sda")
453 assert disk._get_device_id() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
454
455 def test_has_bluestore_label(self):
456 # patch device.Device __init__ function to do nothing since we want to only test the
457 # low-level behavior of has_bluestore_label
458 with patch.object(device.Device, "__init__", lambda self, path, with_lsm=False: None):
459 disk = device.Device("/dev/sda")
460 disk.path = "/dev/sda"
461 with patch('builtins.open', mock_open(read_data=b'bluestore block device\n')):
462 assert disk.has_bluestore_label
463 with patch('builtins.open', mock_open(read_data=b'not a bluestore block device\n')):
464 assert not disk.has_bluestore_label
465
466
467 class TestDeviceEncryption(object):
468
469 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
470 def test_partition_is_not_encrypted_lsblk(self, fake_call, device_info):
471 lsblk = {'TYPE': 'part', 'FSTYPE': 'xfs', 'NAME': 'sda', 'PKNAME': 'sda'}
472 device_info(lsblk=lsblk)
473 disk = device.Device("/dev/sda")
474 assert disk.is_encrypted is False
475
476 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
477 def test_partition_is_encrypted_lsblk(self, fake_call, device_info):
478 lsblk = {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS', 'NAME': 'sda', 'PKNAME': 'sda'}
479 device_info(lsblk=lsblk)
480 disk = device.Device("/dev/sda")
481 assert disk.is_encrypted is True
482
483 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
484 def test_partition_is_not_encrypted_blkid(self, fake_call, device_info):
485 lsblk = {'TYPE': 'part', 'NAME': 'sda', 'PKNAME': 'sda'}
486 blkid = {'TYPE': 'ceph data'}
487 device_info(lsblk=lsblk, blkid=blkid)
488 disk = device.Device("/dev/sda")
489 assert disk.is_encrypted is False
490
491 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
492 def test_partition_is_encrypted_blkid(self, fake_call, device_info):
493 lsblk = {'TYPE': 'part', 'NAME': 'sda' ,'PKNAME': 'sda'}
494 blkid = {'TYPE': 'crypto_LUKS'}
495 device_info(lsblk=lsblk, blkid=blkid)
496 disk = device.Device("/dev/sda")
497 assert disk.is_encrypted is True
498
499 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
500 def test_mapper_is_encrypted_luks1(self, fake_call, device_info, monkeypatch):
501 status = {'type': 'LUKS1'}
502 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
503 lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid','TYPE': 'lvm'}
504 blkid = {'TYPE': 'mapper'}
505 device_info(lsblk=lsblk, blkid=blkid)
506 disk = device.Device("/dev/mapper/uuid")
507 assert disk.is_encrypted is True
508
509 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
510 def test_mapper_is_encrypted_luks2(self, fake_call, device_info, monkeypatch):
511 status = {'type': 'LUKS2'}
512 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
513 lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'}
514 blkid = {'TYPE': 'mapper'}
515 device_info(lsblk=lsblk, blkid=blkid)
516 disk = device.Device("/dev/mapper/uuid")
517 assert disk.is_encrypted is True
518
519 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
520 def test_mapper_is_encrypted_plain(self, fake_call, device_info, monkeypatch):
521 status = {'type': 'PLAIN'}
522 monkeypatch.setattr(device, 'encryption_status', lambda x: status)
523 lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'}
524 blkid = {'TYPE': 'mapper'}
525 device_info(lsblk=lsblk, blkid=blkid)
526 disk = device.Device("/dev/mapper/uuid")
527 assert disk.is_encrypted is True
528
529 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
530 def test_mapper_is_not_encrypted_plain(self, fake_call, device_info, monkeypatch):
531 monkeypatch.setattr(device, 'encryption_status', lambda x: {})
532 lsblk = {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'}
533 blkid = {'TYPE': 'mapper'}
534 device_info(lsblk=lsblk, blkid=blkid)
535 disk = device.Device("/dev/mapper/uuid")
536 assert disk.is_encrypted is False
537
538 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
539 def test_lv_is_encrypted_blkid(self, fake_call, device_info):
540 lsblk = {'TYPE': 'lvm', 'NAME': 'sda'}
541 blkid = {'TYPE': 'crypto_LUKS'}
542 device_info(lsblk=lsblk, blkid=blkid)
543 disk = device.Device("/dev/sda")
544 disk.lv_api = {}
545 assert disk.is_encrypted is True
546
547 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
548 def test_lv_is_not_encrypted_blkid(self, fake_call, factory, device_info):
549 lsblk = {'TYPE': 'lvm', 'NAME': 'sda'}
550 blkid = {'TYPE': 'xfs'}
551 device_info(lsblk=lsblk, blkid=blkid)
552 disk = device.Device("/dev/sda")
553 disk.lv_api = factory(encrypted=None)
554 assert disk.is_encrypted is False
555
556 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
557 def test_lv_is_encrypted_lsblk(self, fake_call, device_info):
558 lsblk = {'FSTYPE': 'crypto_LUKS', 'NAME': 'sda', 'TYPE': 'lvm'}
559 blkid = {'TYPE': 'mapper'}
560 device_info(lsblk=lsblk, blkid=blkid)
561 disk = device.Device("/dev/sda")
562 disk.lv_api = {}
563 assert disk.is_encrypted is True
564
565 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
566 def test_lv_is_not_encrypted_lsblk(self, fake_call, factory, device_info):
567 lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
568 blkid = {'TYPE': 'mapper'}
569 device_info(lsblk=lsblk, blkid=blkid)
570 disk = device.Device("/dev/sda")
571 disk.lv_api = factory(encrypted=None)
572 assert disk.is_encrypted is False
573
574 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
575 def test_lv_is_encrypted_lvm_api(self, fake_call, factory, device_info):
576 lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
577 blkid = {'TYPE': 'mapper'}
578 device_info(lsblk=lsblk, blkid=blkid)
579 disk = device.Device("/dev/sda")
580 disk.lv_api = factory(encrypted=True)
581 assert disk.is_encrypted is True
582
583 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
584 def test_lv_is_not_encrypted_lvm_api(self, fake_call, factory, device_info):
585 lsblk = {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
586 blkid = {'TYPE': 'mapper'}
587 device_info(lsblk=lsblk, blkid=blkid)
588 disk = device.Device("/dev/sda")
589 disk.lv_api = factory(encrypted=False)
590 assert disk.is_encrypted is False
591
592
593 class TestDeviceOrdering(object):
594
595 def setup_method(self):
596 self.data = {
597 "/dev/sda": {"removable": 0},
598 "/dev/sdb": {"removable": 1}, # invalid
599 "/dev/sdc": {"removable": 0},
600 "/dev/sdd": {"removable": 1}, # invalid
601 }
602
603 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
604 def test_valid_before_invalid(self, fake_call, device_info):
605 lsblk_sda = {"NAME": "sda", "TYPE": "disk"}
606 lsblk_sdb = {"NAME": "sdb", "TYPE": "disk"}
607 device_info(devices=self.data,lsblk=lsblk_sda)
608 sda = device.Device("/dev/sda")
609 device_info(devices=self.data,lsblk=lsblk_sdb)
610 sdb = device.Device("/dev/sdb")
611
612 assert sda < sdb
613 assert sdb > sda
614
615 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
616 def test_valid_alphabetical_ordering(self, fake_call, device_info):
617 lsblk_sda = {"NAME": "sda", "TYPE": "disk"}
618 lsblk_sdc = {"NAME": "sdc", "TYPE": "disk"}
619 device_info(devices=self.data,lsblk=lsblk_sda)
620 sda = device.Device("/dev/sda")
621 device_info(devices=self.data,lsblk=lsblk_sdc)
622 sdc = device.Device("/dev/sdc")
623
624 assert sda < sdc
625 assert sdc > sda
626
627 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
628 def test_invalid_alphabetical_ordering(self, fake_call, device_info):
629 lsblk_sdb = {"NAME": "sdb", "TYPE": "disk"}
630 lsblk_sdd = {"NAME": "sdd", "TYPE": "disk"}
631 device_info(devices=self.data,lsblk=lsblk_sdb)
632 sdb = device.Device("/dev/sdb")
633 device_info(devices=self.data,lsblk=lsblk_sdd)
634 sdd = device.Device("/dev/sdd")
635
636 assert sdb < sdd
637 assert sdd > sdb
638
639
640 class TestCephDiskDevice(object):
641
642 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
643 def test_partlabel_lsblk(self, fake_call, device_info):
644 lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": ""}
645 device_info(lsblk=lsblk)
646 disk = device.CephDiskDevice(device.Device("/dev/sda"))
647
648 assert disk.partlabel == ''
649
650 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
651 def test_partlabel_blkid(self, fake_call, device_info):
652 lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "ceph data"}
653 blkid = {"TYPE": "disk", "PARTLABEL": "ceph data"}
654 device_info(blkid=blkid, lsblk=lsblk)
655 disk = device.CephDiskDevice(device.Device("/dev/sda"))
656
657 assert disk.partlabel == 'ceph data'
658
659 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
660 "blkid_ceph_disk_member",
661 "disable_kernel_queries")
662 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
663 def test_is_member_blkid(self, fake_call, monkeypatch):
664 disk = device.CephDiskDevice(device.Device("/dev/sda"))
665
666 assert disk.is_member is True
667
668 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
669 "disable_kernel_queries")
670 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
671 def test_is_member_lsblk(self, fake_call, patch_bluestore_label, device_info):
672 lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "ceph"}
673 device_info(lsblk=lsblk)
674 disk = device.CephDiskDevice(device.Device("/dev/sda"))
675
676 assert disk.is_member is True
677
678 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
679 def test_unknown_type(self, fake_call, device_info):
680 lsblk = {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "gluster"}
681 device_info(lsblk=lsblk)
682 disk = device.CephDiskDevice(device.Device("/dev/sda"))
683
684 assert disk.type == 'unknown'
685
686 ceph_types = ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
687
688 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
689 @pytest.mark.usefixtures("lsblk_ceph_disk_member",
690 "blkid_ceph_disk_member",
691 "disable_kernel_queries")
692 def test_type_blkid(self, monkeypatch, fake_call, device_info, ceph_partlabel):
693 disk = device.CephDiskDevice(device.Device("/dev/sda"))
694
695 assert disk.type in self.ceph_types
696
697 @pytest.mark.usefixtures("blkid_ceph_disk_member",
698 "lsblk_ceph_disk_member",
699 "disable_kernel_queries")
700 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
701 def test_type_lsblk(self, fake_call, device_info, ceph_partlabel):
702 disk = device.CephDiskDevice(device.Device("/dev/sda"))
703
704 assert disk.type in self.ceph_types