]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
3 from copy
import deepcopy
4 from ceph_volume
.util
import device
5 from ceph_volume
.api
import lvm
as api
6 from mock
.mock
import patch
, mock_open
9 class TestDevice(object):
11 def test_sys_api(self
, monkeypatch
, device_info
):
12 volume
= api
.Volume(lv_name
='lv', lv_uuid
='y', vg_name
='vg',
13 lv_tags
={}, lv_path
='/dev/VolGroup/lv')
15 volumes
.append(volume
)
16 monkeypatch
.setattr(api
, 'get_lvs', lambda **kwargs
:
19 data
= {"/dev/sda": {"foo": "bar"}}
20 lsblk
= {"TYPE": "disk", "NAME": "sda"}
21 device_info(devices
=data
, lsblk
=lsblk
)
22 disk
= device
.Device("/dev/sda")
24 assert "foo" in disk
.sys_api
26 def test_lvm_size(self
, monkeypatch
, device_info
):
27 volume
= api
.Volume(lv_name
='lv', lv_uuid
='y', vg_name
='vg',
28 lv_tags
={}, lv_path
='/dev/VolGroup/lv')
30 volumes
.append(volume
)
31 monkeypatch
.setattr(api
, 'get_lvs', lambda **kwargs
:
35 data
= {"/dev/sda": {"size": "5368709120"}}
36 lsblk
= {"TYPE": "disk", "NAME": "sda"}
37 device_info(devices
=data
,lsblk
=lsblk
)
38 disk
= device
.Device("/dev/sda")
39 assert disk
.lvm_size
.gb
== 4
41 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
42 def test_lvm_size_rounds_down(self
, fake_call
, device_info
):
44 data
= {"/dev/sda": {"size": "5905580032"}}
45 lsblk
= {"TYPE": "disk", "NAME": "sda"}
46 device_info(devices
=data
,lsblk
=lsblk
)
47 disk
= device
.Device("/dev/sda")
48 assert disk
.lvm_size
.gb
== 4
50 def test_is_lv(self
, fake_call
, device_info
):
51 data
= {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
52 lsblk
= {"TYPE": "lvm", "NAME": "vg-lv"}
53 device_info(lv
=data
,lsblk
=lsblk
)
54 disk
= device
.Device("vg/lv")
57 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
58 def test_vgs_is_empty(self
, fake_call
, device_info
, monkeypatch
):
59 BarPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000",
62 pvolumes
.append(BarPVolume
)
63 lsblk
= {"TYPE": "disk", "NAME": "sda"}
64 device_info(lsblk
=lsblk
)
65 monkeypatch
.setattr(api
, 'get_pvs', lambda **kwargs
: {})
67 disk
= device
.Device("/dev/nvme0n1")
70 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
71 def test_vgs_is_not_empty(self
, fake_call
, device_info
, monkeypatch
):
72 vg
= api
.VolumeGroup(pv_name
='/dev/nvme0n1', vg_name
='foo/bar', vg_free_count
=6,
73 vg_extent_size
=1073741824)
74 monkeypatch
.setattr(api
, 'get_all_devices_vgs', lambda : [vg
])
75 lsblk
= {"TYPE": "disk", "NAME": "nvme0n1"}
76 device_info(lsblk
=lsblk
)
77 disk
= device
.Device("/dev/nvme0n1")
78 assert len(disk
.vgs
) == 1
80 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
81 def test_device_is_device(self
, fake_call
, device_info
):
82 data
= {"/dev/sda": {"foo": "bar"}}
83 lsblk
= {"TYPE": "device", "NAME": "sda"}
84 device_info(devices
=data
, lsblk
=lsblk
)
85 disk
= device
.Device("/dev/sda")
86 assert disk
.is_device
is True
88 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
89 def test_loop_device_is_not_device(self
, fake_call
, device_info
):
90 data
= {"/dev/loop0": {"foo": "bar"}}
91 lsblk
= {"TYPE": "loop"}
92 device_info(devices
=data
, lsblk
=lsblk
)
93 disk
= device
.Device("/dev/loop0")
94 assert disk
.is_device
is False
96 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
97 def test_loop_device_is_device(self
, fake_call
, device_info
):
98 data
= {"/dev/loop0": {"foo": "bar"}}
99 lsblk
= {"TYPE": "loop"}
100 os
.environ
["CEPH_VOLUME_ALLOW_LOOP_DEVICES"] = "1"
101 device_info(devices
=data
, lsblk
=lsblk
)
102 disk
= device
.Device("/dev/loop0")
103 assert disk
.is_device
is True
104 del os
.environ
["CEPH_VOLUME_ALLOW_LOOP_DEVICES"]
106 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
107 def test_device_is_rotational(self
, fake_call
, device_info
):
108 data
= {"/dev/sda": {"rotational": "1"}}
109 lsblk
= {"TYPE": "device", "NAME": "sda"}
110 device_info(devices
=data
, lsblk
=lsblk
)
111 disk
= device
.Device("/dev/sda")
112 assert disk
.rotational
114 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
115 def test_device_is_not_rotational(self
, fake_call
, device_info
):
116 data
= {"/dev/sda": {"rotational": "0"}}
117 lsblk
= {"TYPE": "device", "NAME": "sda"}
118 device_info(devices
=data
, lsblk
=lsblk
)
119 disk
= device
.Device("/dev/sda")
120 assert not disk
.rotational
122 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
123 def test_device_is_rotational_lsblk(self
, fake_call
, device_info
):
124 data
= {"/dev/sda": {"foo": "bar"}}
125 lsblk
= {"TYPE": "device", "ROTA": "1", "NAME": "sda"}
126 device_info(devices
=data
, lsblk
=lsblk
)
127 disk
= device
.Device("/dev/sda")
128 assert disk
.rotational
130 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
131 def test_device_is_not_rotational_lsblk(self
, fake_call
, device_info
):
132 data
= {"/dev/sda": {"rotational": "0"}}
133 lsblk
= {"TYPE": "device", "ROTA": "0", "NAME": "sda"}
134 device_info(devices
=data
, lsblk
=lsblk
)
135 disk
= device
.Device("/dev/sda")
136 assert not disk
.rotational
138 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
139 def test_device_is_rotational_defaults_true(self
, fake_call
, device_info
):
140 # rotational will default true if no info from sys_api or lsblk is found
141 data
= {"/dev/sda": {"foo": "bar"}}
142 lsblk
= {"TYPE": "device", "foo": "bar", "NAME": "sda"}
143 device_info(devices
=data
, lsblk
=lsblk
)
144 disk
= device
.Device("/dev/sda")
145 assert disk
.rotational
147 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
148 def test_disk_is_device(self
, fake_call
, device_info
):
149 data
= {"/dev/sda": {"foo": "bar"}}
150 lsblk
= {"TYPE": "disk", "NAME": "sda"}
151 device_info(devices
=data
, lsblk
=lsblk
)
152 disk
= device
.Device("/dev/sda")
153 assert disk
.is_device
is True
155 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
156 def test_is_partition(self
, fake_call
, device_info
):
157 data
= {"/dev/sda1": {"foo": "bar"}}
158 lsblk
= {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
159 device_info(devices
=data
, lsblk
=lsblk
)
160 disk
= device
.Device("/dev/sda1")
161 assert disk
.is_partition
163 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
164 def test_mpath_device_is_device(self
, fake_call
, device_info
):
165 data
= {"/dev/foo": {"foo": "bar"}}
166 lsblk
= {"TYPE": "mpath", "NAME": "foo"}
167 device_info(devices
=data
, lsblk
=lsblk
)
168 disk
= device
.Device("/dev/foo")
169 assert disk
.is_device
is True
171 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
172 def test_is_not_lvm_member(self
, fake_call
, device_info
):
173 data
= {"/dev/sda1": {"foo": "bar"}}
174 lsblk
= {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
175 device_info(devices
=data
, lsblk
=lsblk
)
176 disk
= device
.Device("/dev/sda1")
177 assert not disk
.is_lvm_member
179 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
180 def test_is_lvm_member(self
, fake_call
, device_info
):
181 data
= {"/dev/sda1": {"foo": "bar"}}
182 lsblk
= {"TYPE": "part", "NAME": "sda1", "PKNAME": "sda"}
183 device_info(devices
=data
, lsblk
=lsblk
)
184 disk
= device
.Device("/dev/sda1")
185 assert not disk
.is_lvm_member
187 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
188 def test_is_mapper_device(self
, fake_call
, device_info
):
189 lsblk
= {"TYPE": "lvm", "NAME": "foo"}
190 device_info(lsblk
=lsblk
)
191 disk
= device
.Device("/dev/mapper/foo")
192 assert disk
.is_mapper
194 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
195 def test_dm_is_mapper_device(self
, fake_call
, device_info
):
196 lsblk
= {"TYPE": "lvm", "NAME": "dm-4"}
197 device_info(lsblk
=lsblk
)
198 disk
= device
.Device("/dev/dm-4")
199 assert disk
.is_mapper
201 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
202 def test_is_not_mapper_device(self
, fake_call
, device_info
):
203 lsblk
= {"TYPE": "disk", "NAME": "sda"}
204 device_info(lsblk
=lsblk
)
205 disk
= device
.Device("/dev/sda")
206 assert not disk
.is_mapper
208 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
209 "disable_kernel_queries")
210 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
211 def test_is_ceph_disk_lsblk(self
, fake_call
, monkeypatch
, patch_bluestore_label
):
212 disk
= device
.Device("/dev/sda")
213 assert disk
.is_ceph_disk_member
215 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
216 "lsblk_ceph_disk_member",
217 "disable_kernel_queries")
218 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
219 def test_is_ceph_disk_blkid(self
, fake_call
, monkeypatch
, patch_bluestore_label
):
220 disk
= device
.Device("/dev/sda")
221 assert disk
.is_ceph_disk_member
223 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
224 "disable_kernel_queries")
225 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
226 def test_is_ceph_disk_member_not_available_lsblk(self
, fake_call
, monkeypatch
, patch_bluestore_label
):
227 disk
= device
.Device("/dev/sda")
228 assert disk
.is_ceph_disk_member
229 assert not disk
.available
230 assert "Used by ceph-disk" in disk
.rejected_reasons
232 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
233 "lsblk_ceph_disk_member",
234 "disable_kernel_queries")
235 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
236 def test_is_ceph_disk_member_not_available_blkid(self
, fake_call
, monkeypatch
, patch_bluestore_label
):
237 disk
= device
.Device("/dev/sda")
238 assert disk
.is_ceph_disk_member
239 assert not disk
.available
240 assert "Used by ceph-disk" in disk
.rejected_reasons
242 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
243 def test_reject_removable_device(self
, fake_call
, device_info
):
244 data
= {"/dev/sdb": {"removable": 1}}
245 lsblk
= {"TYPE": "disk", "NAME": "sdb"}
246 device_info(devices
=data
,lsblk
=lsblk
)
247 disk
= device
.Device("/dev/sdb")
248 assert not disk
.available
250 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
251 def test_reject_device_with_gpt_headers(self
, fake_call
, device_info
):
252 data
= {"/dev/sdb": {"removable": 0, "size": 5368709120}}
253 lsblk
= {"TYPE": "disk", "NAME": "sdb"}
254 blkid
= {"PTTYPE": "gpt"}
260 disk
= device
.Device("/dev/sdb")
261 assert not disk
.available
263 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
264 def test_accept_non_removable_device(self
, fake_call
, device_info
):
265 data
= {"/dev/sdb": {"removable": 0, "size": 5368709120}}
266 lsblk
= {"TYPE": "disk", "NAME": "sdb"}
267 device_info(devices
=data
,lsblk
=lsblk
)
268 disk
= device
.Device("/dev/sdb")
269 assert disk
.available
271 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
272 def test_reject_not_acceptable_device(self
, fake_call
, device_info
):
273 data
= {"/dev/dm-0": {"foo": "bar"}}
274 lsblk
= {"TYPE": "mpath", "NAME": "dm-0"}
275 device_info(devices
=data
, lsblk
=lsblk
)
276 disk
= device
.Device("/dev/dm-0")
277 assert not disk
.available
279 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
280 @patch('ceph_volume.util.device.os.path.realpath')
281 @patch('ceph_volume.util.device.os.path.islink')
282 def test_accept_symlink_to_device(self
,
287 m_os_path_islink
.return_value
= True
288 m_os_path_realpath
.return_value
= '/dev/sdb'
289 data
= {"/dev/sdb": {"ro": 0, "size": 5368709120}}
290 lsblk
= {"TYPE": "disk"}
291 device_info(devices
=data
,lsblk
=lsblk
)
292 disk
= device
.Device("/dev/test_symlink")
295 assert disk
.available
297 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
298 @patch('ceph_volume.util.device.os.readlink')
299 @patch('ceph_volume.util.device.os.path.islink')
300 def test_reject_symlink_to_device_mapper(self
,
305 m_os_path_islink
.return_value
= True
306 m_os_readlink
.return_value
= '/dev/dm-0'
307 data
= {"/dev/mapper/mpatha": {"ro": 0, "size": 5368709120}}
308 lsblk
= {"TYPE": "disk"}
309 device_info(devices
=data
,lsblk
=lsblk
)
310 disk
= device
.Device("/dev/mapper/mpatha")
311 assert disk
.available
313 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
314 def test_reject_readonly_device(self
, fake_call
, device_info
):
315 data
= {"/dev/cdrom": {"ro": 1}}
316 lsblk
= {"TYPE": "disk", "NAME": "cdrom"}
317 device_info(devices
=data
,lsblk
=lsblk
)
318 disk
= device
.Device("/dev/cdrom")
319 assert not disk
.available
321 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
322 def test_reject_smaller_than_5gb(self
, fake_call
, device_info
):
323 data
= {"/dev/sda": {"size": 5368709119}}
324 lsblk
= {"TYPE": "disk", "NAME": "sda"}
325 device_info(devices
=data
,lsblk
=lsblk
)
326 disk
= device
.Device("/dev/sda")
327 assert not disk
.available
, 'too small device is available'
329 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
330 def test_accept_non_readonly_device(self
, fake_call
, device_info
):
331 data
= {"/dev/sda": {"ro": 0, "size": 5368709120}}
332 lsblk
= {"TYPE": "disk", "NAME": "sda"}
333 device_info(devices
=data
,lsblk
=lsblk
)
334 disk
= device
.Device("/dev/sda")
335 assert disk
.available
337 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
338 def test_reject_bluestore_device(self
, fake_call
, monkeypatch
, patch_bluestore_label
, device_info
):
339 patch_bluestore_label
.return_value
= True
340 lsblk
= {"TYPE": "disk", "NAME": "sda"}
341 device_info(lsblk
=lsblk
)
342 disk
= device
.Device("/dev/sda")
343 assert not disk
.available
344 assert "Has BlueStore device label" in disk
.rejected_reasons
346 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
347 def test_reject_device_with_oserror(self
, fake_call
, monkeypatch
, patch_bluestore_label
, device_info
):
348 patch_bluestore_label
.side_effect
= OSError('test failure')
349 lsblk
= {"TYPE": "disk", "NAME": "sda"}
350 device_info(lsblk
=lsblk
)
351 disk
= device
.Device("/dev/sda")
352 assert not disk
.available
353 assert "Failed to determine if device is BlueStore" in disk
.rejected_reasons
355 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
356 "device_info_not_ceph_disk_member",
357 "disable_kernel_queries")
358 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
359 def test_is_not_ceph_disk_member_lsblk(self
, fake_call
, patch_bluestore_label
):
360 disk
= device
.Device("/dev/sda")
361 assert disk
.is_ceph_disk_member
is False
363 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
364 def test_existing_vg_available(self
, fake_call
, monkeypatch
, device_info
):
365 vg
= api
.VolumeGroup(pv_name
='/dev/nvme0n1', vg_name
='foo/bar', vg_free_count
=1536,
366 vg_extent_size
=4194304)
367 monkeypatch
.setattr(api
, 'get_all_devices_vgs', lambda : [vg
])
368 lsblk
= {"TYPE": "disk", "NAME": "nvme0n1"}
369 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
370 lv
= {"tags": {"ceph.osd_id": "1"}}
371 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv
)
372 disk
= device
.Device("/dev/nvme0n1")
373 assert disk
.available_lvm
374 assert not disk
.available
375 assert not disk
.available_raw
377 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
378 def test_existing_vg_too_small(self
, fake_call
, monkeypatch
, device_info
):
379 vg
= api
.VolumeGroup(pv_name
='/dev/nvme0n1', vg_name
='foo/bar', vg_free_count
=4,
380 vg_extent_size
=1073741824)
381 monkeypatch
.setattr(api
, 'get_all_devices_vgs', lambda : [vg
])
382 lsblk
= {"TYPE": "disk", "NAME": "nvme0n1"}
383 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
384 lv
= {"tags": {"ceph.osd_id": "1"}}
385 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv
)
386 disk
= device
.Device("/dev/nvme0n1")
387 assert not disk
.available_lvm
388 assert not disk
.available
389 assert not disk
.available_raw
391 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
392 def test_multiple_existing_vgs(self
, fake_call
, monkeypatch
, device_info
):
393 vg1
= api
.VolumeGroup(pv_name
='/dev/nvme0n1', vg_name
='foo/bar', vg_free_count
=1000,
394 vg_extent_size
=4194304)
395 vg2
= api
.VolumeGroup(pv_name
='/dev/nvme0n1', vg_name
='foo/bar', vg_free_count
=536,
396 vg_extent_size
=4194304)
397 monkeypatch
.setattr(api
, 'get_all_devices_vgs', lambda : [vg1
, vg2
])
398 lsblk
= {"TYPE": "disk", "NAME": "nvme0n1"}
399 data
= {"/dev/nvme0n1": {"size": "6442450944"}}
400 lv
= {"tags": {"ceph.osd_id": "1"}}
401 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv
)
402 disk
= device
.Device("/dev/nvme0n1")
403 assert disk
.available_lvm
404 assert not disk
.available
405 assert not disk
.available_raw
407 @pytest.mark
.parametrize("ceph_type", ["data", "block"])
408 def test_used_by_ceph(self
, fake_call
, device_info
,
409 monkeypatch
, ceph_type
):
410 data
= {"/dev/sda": {"foo": "bar"}}
411 lsblk
= {"TYPE": "part", "NAME": "sda", "PKNAME": "sda"}
412 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000",
413 lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
415 pvolumes
.append(FooPVolume
)
416 lv_data
= {"lv_name": "lv", "lv_path": "vg/lv", "vg_name": "vg",
417 "lv_uuid": "0000", "lv_tags":
418 "ceph.osd_id=0,ceph.type="+ceph_type
}
420 lv
= api
.Volume(**lv_data
)
422 monkeypatch
.setattr(api
, 'get_pvs', lambda **kwargs
: pvolumes
)
423 monkeypatch
.setattr(api
, 'get_lvs', lambda **kwargs
:
426 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
427 vg
= api
.VolumeGroup(vg_name
='foo/bar', vg_free_count
=6,
428 vg_extent_size
=1073741824)
429 monkeypatch
.setattr(api
, 'get_device_vgs', lambda x
: [vg
])
430 disk
= device
.Device("/dev/sda")
431 assert disk
.used_by_ceph
433 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
434 def test_not_used_by_ceph(self
, fake_call
, device_info
, monkeypatch
):
435 FooPVolume
= api
.PVolume(pv_name
='/dev/sda', pv_uuid
="0000", lv_uuid
="0000", pv_tags
={}, vg_name
="vg")
437 pvolumes
.append(FooPVolume
)
438 data
= {"/dev/sda": {"foo": "bar"}}
439 lsblk
= {"TYPE": "part", "NAME": "sda", "PKNAME": "sda"}
440 lv_data
= {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
441 monkeypatch
.setattr(api
, 'get_pvs', lambda **kwargs
: pvolumes
)
443 device_info(devices
=data
, lsblk
=lsblk
, lv
=lv_data
)
444 disk
= device
.Device("/dev/sda")
445 assert not disk
.used_by_ceph
447 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
448 def test_get_device_id(self
, fake_call
, device_info
):
449 udev
= {k
:k
for k
in ['ID_VENDOR', 'ID_MODEL', 'ID_SCSI_SERIAL']}
450 lsblk
= {"TYPE": "disk", "NAME": "sda"}
451 device_info(udevadm
=udev
,lsblk
=lsblk
)
452 disk
= device
.Device("/dev/sda")
453 assert disk
._get
_device
_id
() == 'ID_VENDOR_ID_MODEL_ID_SCSI_SERIAL'
455 def test_has_bluestore_label(self
):
456 # patch device.Device __init__ function to do nothing since we want to only test the
457 # low-level behavior of has_bluestore_label
458 with patch
.object(device
.Device
, "__init__", lambda self
, path
, with_lsm
=False: None):
459 disk
= device
.Device("/dev/sda")
460 disk
.path
= "/dev/sda"
461 with
patch('builtins.open', mock_open(read_data
=b
'bluestore block device\n')):
462 assert disk
.has_bluestore_label
463 with
patch('builtins.open', mock_open(read_data
=b
'not a bluestore block device\n')):
464 assert not disk
.has_bluestore_label
467 class TestDeviceEncryption(object):
469 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
470 def test_partition_is_not_encrypted_lsblk(self
, fake_call
, device_info
):
471 lsblk
= {'TYPE': 'part', 'FSTYPE': 'xfs', 'NAME': 'sda', 'PKNAME': 'sda'}
472 device_info(lsblk
=lsblk
)
473 disk
= device
.Device("/dev/sda")
474 assert disk
.is_encrypted
is False
476 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
477 def test_partition_is_encrypted_lsblk(self
, fake_call
, device_info
):
478 lsblk
= {'TYPE': 'part', 'FSTYPE': 'crypto_LUKS', 'NAME': 'sda', 'PKNAME': 'sda'}
479 device_info(lsblk
=lsblk
)
480 disk
= device
.Device("/dev/sda")
481 assert disk
.is_encrypted
is True
483 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
484 def test_partition_is_not_encrypted_blkid(self
, fake_call
, device_info
):
485 lsblk
= {'TYPE': 'part', 'NAME': 'sda', 'PKNAME': 'sda'}
486 blkid
= {'TYPE': 'ceph data'}
487 device_info(lsblk
=lsblk
, blkid
=blkid
)
488 disk
= device
.Device("/dev/sda")
489 assert disk
.is_encrypted
is False
491 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
492 def test_partition_is_encrypted_blkid(self
, fake_call
, device_info
):
493 lsblk
= {'TYPE': 'part', 'NAME': 'sda' ,'PKNAME': 'sda'}
494 blkid
= {'TYPE': 'crypto_LUKS'}
495 device_info(lsblk
=lsblk
, blkid
=blkid
)
496 disk
= device
.Device("/dev/sda")
497 assert disk
.is_encrypted
is True
499 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
500 def test_mapper_is_encrypted_luks1(self
, fake_call
, device_info
, monkeypatch
):
501 status
= {'type': 'LUKS1'}
502 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
503 lsblk
= {'FSTYPE': 'xfs', 'NAME': 'uuid','TYPE': 'lvm'}
504 blkid
= {'TYPE': 'mapper'}
505 device_info(lsblk
=lsblk
, blkid
=blkid
)
506 disk
= device
.Device("/dev/mapper/uuid")
507 assert disk
.is_encrypted
is True
509 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
510 def test_mapper_is_encrypted_luks2(self
, fake_call
, device_info
, monkeypatch
):
511 status
= {'type': 'LUKS2'}
512 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
513 lsblk
= {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'}
514 blkid
= {'TYPE': 'mapper'}
515 device_info(lsblk
=lsblk
, blkid
=blkid
)
516 disk
= device
.Device("/dev/mapper/uuid")
517 assert disk
.is_encrypted
is True
519 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
520 def test_mapper_is_encrypted_plain(self
, fake_call
, device_info
, monkeypatch
):
521 status
= {'type': 'PLAIN'}
522 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: status
)
523 lsblk
= {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'}
524 blkid
= {'TYPE': 'mapper'}
525 device_info(lsblk
=lsblk
, blkid
=blkid
)
526 disk
= device
.Device("/dev/mapper/uuid")
527 assert disk
.is_encrypted
is True
529 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
530 def test_mapper_is_not_encrypted_plain(self
, fake_call
, device_info
, monkeypatch
):
531 monkeypatch
.setattr(device
, 'encryption_status', lambda x
: {})
532 lsblk
= {'FSTYPE': 'xfs', 'NAME': 'uuid', 'TYPE': 'lvm'}
533 blkid
= {'TYPE': 'mapper'}
534 device_info(lsblk
=lsblk
, blkid
=blkid
)
535 disk
= device
.Device("/dev/mapper/uuid")
536 assert disk
.is_encrypted
is False
538 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
539 def test_lv_is_encrypted_blkid(self
, fake_call
, device_info
):
540 lsblk
= {'TYPE': 'lvm', 'NAME': 'sda'}
541 blkid
= {'TYPE': 'crypto_LUKS'}
542 device_info(lsblk
=lsblk
, blkid
=blkid
)
543 disk
= device
.Device("/dev/sda")
545 assert disk
.is_encrypted
is True
547 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
548 def test_lv_is_not_encrypted_blkid(self
, fake_call
, factory
, device_info
):
549 lsblk
= {'TYPE': 'lvm', 'NAME': 'sda'}
550 blkid
= {'TYPE': 'xfs'}
551 device_info(lsblk
=lsblk
, blkid
=blkid
)
552 disk
= device
.Device("/dev/sda")
553 disk
.lv_api
= factory(encrypted
=None)
554 assert disk
.is_encrypted
is False
556 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
557 def test_lv_is_encrypted_lsblk(self
, fake_call
, device_info
):
558 lsblk
= {'FSTYPE': 'crypto_LUKS', 'NAME': 'sda', 'TYPE': 'lvm'}
559 blkid
= {'TYPE': 'mapper'}
560 device_info(lsblk
=lsblk
, blkid
=blkid
)
561 disk
= device
.Device("/dev/sda")
563 assert disk
.is_encrypted
is True
565 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
566 def test_lv_is_not_encrypted_lsblk(self
, fake_call
, factory
, device_info
):
567 lsblk
= {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
568 blkid
= {'TYPE': 'mapper'}
569 device_info(lsblk
=lsblk
, blkid
=blkid
)
570 disk
= device
.Device("/dev/sda")
571 disk
.lv_api
= factory(encrypted
=None)
572 assert disk
.is_encrypted
is False
574 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
575 def test_lv_is_encrypted_lvm_api(self
, fake_call
, factory
, device_info
):
576 lsblk
= {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
577 blkid
= {'TYPE': 'mapper'}
578 device_info(lsblk
=lsblk
, blkid
=blkid
)
579 disk
= device
.Device("/dev/sda")
580 disk
.lv_api
= factory(encrypted
=True)
581 assert disk
.is_encrypted
is True
583 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
584 def test_lv_is_not_encrypted_lvm_api(self
, fake_call
, factory
, device_info
):
585 lsblk
= {'FSTYPE': 'xfs', 'NAME': 'sda', 'TYPE': 'lvm'}
586 blkid
= {'TYPE': 'mapper'}
587 device_info(lsblk
=lsblk
, blkid
=blkid
)
588 disk
= device
.Device("/dev/sda")
589 disk
.lv_api
= factory(encrypted
=False)
590 assert disk
.is_encrypted
is False
593 class TestDeviceOrdering(object):
595 def setup_method(self
):
597 "/dev/sda": {"removable": 0},
598 "/dev/sdb": {"removable": 1}, # invalid
599 "/dev/sdc": {"removable": 0},
600 "/dev/sdd": {"removable": 1}, # invalid
603 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
604 def test_valid_before_invalid(self
, fake_call
, device_info
):
605 lsblk_sda
= {"NAME": "sda", "TYPE": "disk"}
606 lsblk_sdb
= {"NAME": "sdb", "TYPE": "disk"}
607 device_info(devices
=self
.data
,lsblk
=lsblk_sda
)
608 sda
= device
.Device("/dev/sda")
609 device_info(devices
=self
.data
,lsblk
=lsblk_sdb
)
610 sdb
= device
.Device("/dev/sdb")
615 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
616 def test_valid_alphabetical_ordering(self
, fake_call
, device_info
):
617 lsblk_sda
= {"NAME": "sda", "TYPE": "disk"}
618 lsblk_sdc
= {"NAME": "sdc", "TYPE": "disk"}
619 device_info(devices
=self
.data
,lsblk
=lsblk_sda
)
620 sda
= device
.Device("/dev/sda")
621 device_info(devices
=self
.data
,lsblk
=lsblk_sdc
)
622 sdc
= device
.Device("/dev/sdc")
627 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
628 def test_invalid_alphabetical_ordering(self
, fake_call
, device_info
):
629 lsblk_sdb
= {"NAME": "sdb", "TYPE": "disk"}
630 lsblk_sdd
= {"NAME": "sdd", "TYPE": "disk"}
631 device_info(devices
=self
.data
,lsblk
=lsblk_sdb
)
632 sdb
= device
.Device("/dev/sdb")
633 device_info(devices
=self
.data
,lsblk
=lsblk_sdd
)
634 sdd
= device
.Device("/dev/sdd")
640 class TestCephDiskDevice(object):
642 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
643 def test_partlabel_lsblk(self
, fake_call
, device_info
):
644 lsblk
= {"TYPE": "disk", "NAME": "sda", "PARTLABEL": ""}
645 device_info(lsblk
=lsblk
)
646 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
648 assert disk
.partlabel
== ''
650 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
651 def test_partlabel_blkid(self
, fake_call
, device_info
):
652 lsblk
= {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "ceph data"}
653 blkid
= {"TYPE": "disk", "PARTLABEL": "ceph data"}
654 device_info(blkid
=blkid
, lsblk
=lsblk
)
655 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
657 assert disk
.partlabel
== 'ceph data'
659 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
660 "blkid_ceph_disk_member",
661 "disable_kernel_queries")
662 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
663 def test_is_member_blkid(self
, fake_call
, monkeypatch
):
664 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
666 assert disk
.is_member
is True
668 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
669 "disable_kernel_queries")
670 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
671 def test_is_member_lsblk(self
, fake_call
, patch_bluestore_label
, device_info
):
672 lsblk
= {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "ceph"}
673 device_info(lsblk
=lsblk
)
674 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
676 assert disk
.is_member
is True
678 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
679 def test_unknown_type(self
, fake_call
, device_info
):
680 lsblk
= {"TYPE": "disk", "NAME": "sda", "PARTLABEL": "gluster"}
681 device_info(lsblk
=lsblk
)
682 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
684 assert disk
.type == 'unknown'
686 ceph_types
= ['data', 'wal', 'db', 'lockbox', 'journal', 'block']
688 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
689 @pytest.mark
.usefixtures("lsblk_ceph_disk_member",
690 "blkid_ceph_disk_member",
691 "disable_kernel_queries")
692 def test_type_blkid(self
, monkeypatch
, fake_call
, device_info
, ceph_partlabel
):
693 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
695 assert disk
.type in self
.ceph_types
697 @pytest.mark
.usefixtures("blkid_ceph_disk_member",
698 "lsblk_ceph_disk_member",
699 "disable_kernel_queries")
700 @patch("ceph_volume.util.disk.has_bluestore_label", lambda x
: False)
701 def test_type_lsblk(self
, fake_call
, device_info
, ceph_partlabel
):
702 disk
= device
.CephDiskDevice(device
.Device("/dev/sda"))
704 assert disk
.type in self
.ceph_types