import os
import pytest
from ceph_volume.util import disk
-from mock.mock import patch
+from mock.mock import patch, MagicMock
+
+
+class TestFunctions:
+ @patch('ceph_volume.util.disk.os.path.exists', MagicMock(return_value=False))
+ def test_is_device_path_does_not_exist(self):
+ assert not disk.is_device('/dev/foo')
+
+ @patch('ceph_volume.util.disk.os.path.exists', MagicMock(return_value=True))
+ def test_is_device_dev_doesnt_startswith_dev(self):
+ assert not disk.is_device('/foo')
+
+ @patch('ceph_volume.util.disk.allow_loop_devices', MagicMock(return_value=False))
+ @patch('ceph_volume.util.disk.os.path.exists', MagicMock(return_value=True))
+ def test_is_device_loop_not_allowed(self):
+ assert not disk.is_device('/dev/loop123')
+
+ @patch('ceph_volume.util.disk.lsblk', MagicMock(return_value={'NAME': 'foo', 'TYPE': 'disk'}))
+ @patch('ceph_volume.util.disk.os.path.exists', MagicMock(return_value=True))
+ def test_is_device_type_disk(self):
+ assert disk.is_device('/dev/foo')
+
+ @patch('ceph_volume.util.disk.lsblk', MagicMock(return_value={'NAME': 'foo', 'TYPE': 'mpath'}))
+ @patch('ceph_volume.util.disk.os.path.exists', MagicMock(return_value=True))
+ def test_is_device_type_mpath(self):
+ assert disk.is_device('/dev/foo')
+
+ @patch('ceph_volume.util.disk.lsblk', MagicMock(return_value={'NAME': 'foo1', 'TYPE': 'part'}))
+ @patch('ceph_volume.util.disk.os.path.exists', MagicMock(return_value=True))
+ def test_is_device_type_part(self):
+ assert not disk.is_device('/dev/foo1')
class TestLsblkParser(object):
result = disk.get_devices(_sys_block_path=str(tmpdir))
assert result == {}
- @patch('ceph_volume.util.disk.is_locked_raw_device', lambda x: False)
def test_sda_block_is_found(self, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk']]
assert result[sda_path]['model'] == ''
assert result[sda_path]['partitions'] == {}
- @patch('ceph_volume.util.disk.is_locked_raw_device', lambda x: False)
def test_sda_size(self, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk']]
assert list(result.keys()) == [sda_path]
assert result[sda_path]['human_readable_size'] == '512.00 KB'
- @patch('ceph_volume.util.disk.is_locked_raw_device', lambda x: False)
def test_sda_sectorsize_fallsback(self, patched_get_block_devs_sysfs, fake_filesystem):
# if no sectorsize, it will use queue/hw_sector_size
sda_path = '/dev/sda'
assert list(result.keys()) == [sda_path]
assert result[sda_path]['sectorsize'] == '1024'
- @patch('ceph_volume.util.disk.is_locked_raw_device', lambda x: False)
def test_sda_sectorsize_from_logical_block(self, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk']]
result = disk.get_devices()
assert result[sda_path]['sectorsize'] == '99'
- @patch('ceph_volume.util.disk.is_locked_raw_device', lambda x: False)
def test_sda_sectorsize_does_not_fallback(self, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk']]
result = disk.get_devices()
assert result[sda_path]['sectorsize'] == '99'
- @patch('ceph_volume.util.disk.is_locked_raw_device', lambda x: False)
def test_is_rotational(self, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk']]
result = disk.get_devices()
assert result[sda_path]['rotational'] == '1'
- @patch('ceph_volume.util.disk.is_locked_raw_device', lambda x: False)
def test_is_ceph_rbd(self, patched_get_block_devs_sysfs, fake_filesystem):
rbd_path = '/dev/rbd0'
patched_get_block_devs_sysfs.return_value = [[rbd_path, rbd_path, 'disk']]
result = disk.get_devices()
assert rbd_path not in result
- @patch('ceph_volume.util.disk.is_locked_raw_device', lambda x: False)
def test_actuator_device(self, patched_get_block_devs_sysfs, fake_filesystem):
sda_path = '/dev/sda'
fake_actuator_nb = 2