+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_removable_device(self, fake_call, device_info):
+ data = {"/dev/sdb": {"removable": 1}}
+ lsblk = {"TYPE": "disk", "NAME": "sdb"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/sdb")
+ assert not disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_device_with_gpt_headers(self, fake_call, device_info):
+ data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
+ lsblk = {"TYPE": "disk", "NAME": "sdb"}
+ blkid= {"PTTYPE": "gpt"}
+ device_info(
+ devices=data,
+ blkid=blkid,
+ lsblk=lsblk,
+ )
+ disk = device.Device("/dev/sdb")
+ assert not disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_accept_non_removable_device(self, fake_call, device_info):
+ data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
+ lsblk = {"TYPE": "disk", "NAME": "sdb"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/sdb")
+ assert disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_not_acceptable_device(self, fake_call, device_info):
+ data = {"/dev/dm-0": {"foo": "bar"}}
+ lsblk = {"TYPE": "mpath", "NAME": "dm-0"}
+ device_info(devices=data, lsblk=lsblk)
+ disk = device.Device("/dev/dm-0")
+ assert not disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ @patch('ceph_volume.util.device.os.path.realpath')
+ @patch('ceph_volume.util.device.os.path.islink')
+ def test_accept_symlink_to_device(self,
+ m_os_path_islink,
+ m_os_path_realpath,
+ device_info,
+ fake_call):
+ m_os_path_islink.return_value = True
+ m_os_path_realpath.return_value = '/dev/sdb'
+ data = {"/dev/sdb": {"ro": 0, "size": 5368709120}}
+ lsblk = {"TYPE": "disk"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/test_symlink")
+ print(disk)
+ print(disk.sys_api)
+ assert disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ @patch('ceph_volume.util.device.os.readlink')
+ @patch('ceph_volume.util.device.os.path.islink')
+ def test_reject_symlink_to_device_mapper(self,
+ m_os_path_islink,
+ m_os_readlink,
+ device_info,
+ fake_call):
+ m_os_path_islink.return_value = True
+ m_os_readlink.return_value = '/dev/dm-0'
+ data = {"/dev/mapper/mpatha": {"ro": 0, "size": 5368709120}}
+ lsblk = {"TYPE": "disk"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/mapper/mpatha")
+ assert disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_readonly_device(self, fake_call, device_info):
+ data = {"/dev/cdrom": {"ro": 1}}
+ lsblk = {"TYPE": "disk", "NAME": "cdrom"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/cdrom")
+ assert not disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_smaller_than_5gb(self, fake_call, device_info):
+ data = {"/dev/sda": {"size": 5368709119}}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert not disk.available, 'too small device is available'
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_accept_non_readonly_device(self, fake_call, device_info):
+ data = {"/dev/sda": {"ro": 0, "size": 5368709120}}
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(devices=data,lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert disk.available
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_bluestore_device(self, fake_call, monkeypatch, patch_bluestore_label, device_info):
+ patch_bluestore_label.return_value = True
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert not disk.available
+ assert "Has BlueStore device label" in disk.rejected_reasons
+
+ @patch("ceph_volume.util.disk.has_bluestore_label", lambda x: False)
+ def test_reject_device_with_oserror(self, fake_call, monkeypatch, patch_bluestore_label, device_info):
+ patch_bluestore_label.side_effect = OSError('test failure')
+ lsblk = {"TYPE": "disk", "NAME": "sda"}
+ device_info(lsblk=lsblk)
+ disk = device.Device("/dev/sda")
+ assert not disk.available
+ assert "Failed to determine if device is BlueStore" in disk.rejected_reasons
+
+ @pytest.mark.usefixtures("lsblk_ceph_disk_member",
+ "device_info_not_ceph_disk_member",