]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / util / test_device.py
index 7477777bc8d57c74645fcf9870c8d852dc6d037a..cbae68782a032ebf9593eef6957b3727ebb28ae6 100644 (file)
@@ -41,10 +41,10 @@ class TestDevice(object):
         disk = device.Device("/dev/nvme0n1")
         assert disk.vgs == []
 
-    def test_vgs_is_not_empty(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
-        BarPVolume = api.PVolume(vg_name='foo', lv_uuid='111', pv_name='/dev/nvme0n1', pv_uuid="0000", pv_tags={})
-        pvolumes.append(BarPVolume)
-        monkeypatch.setattr(api, 'PVolumes', lambda populate=True: pvolumes if populate else pvolumes_empty)
+    def test_vgs_is_not_empty(self, device_info, monkeypatch):
+        vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
+                             vg_extent_size=1073741824)
+        monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
         lsblk = {"TYPE": "disk"}
         device_info(lsblk=lsblk)
         disk = device.Device("/dev/nvme0n1")
@@ -139,14 +139,14 @@ class TestDevice(object):
     @pytest.mark.usefixtures("lsblk_ceph_disk_member",
                              "disable_kernel_queries",
                              "disable_lvm_queries")
-    def test_is_ceph_disk_lsblk(self, monkeypatch):
+    def test_is_ceph_disk_lsblk(self, monkeypatch, patch_bluestore_label):
         disk = device.Device("/dev/sda")
         assert disk.is_ceph_disk_member
 
     @pytest.mark.usefixtures("blkid_ceph_disk_member",
                              "disable_kernel_queries",
                              "disable_lvm_queries")
-    def test_is_ceph_disk_blkid(self, monkeypatch):
+    def test_is_ceph_disk_blkid(self, monkeypatch, patch_bluestore_label):
         monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
                             lambda path: {'PARTLABEL': ""})
         disk = device.Device("/dev/sda")
@@ -155,7 +155,7 @@ class TestDevice(object):
     @pytest.mark.usefixtures("lsblk_ceph_disk_member",
                              "disable_kernel_queries",
                              "disable_lvm_queries")
-    def test_is_ceph_disk_member_not_available_lsblk(self, monkeypatch):
+    def test_is_ceph_disk_member_not_available_lsblk(self, monkeypatch, patch_bluestore_label):
         disk = device.Device("/dev/sda")
         assert disk.is_ceph_disk_member
         assert not disk.available
@@ -164,7 +164,7 @@ class TestDevice(object):
     @pytest.mark.usefixtures("blkid_ceph_disk_member",
                              "disable_kernel_queries",
                              "disable_lvm_queries")
-    def test_is_ceph_disk_member_not_available_blkid(self, monkeypatch):
+    def test_is_ceph_disk_member_not_available_blkid(self, monkeypatch, patch_bluestore_label):
         monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
                             lambda path: {'PARTLABEL': ""})
         disk = device.Device("/dev/sda")
@@ -172,22 +172,86 @@ class TestDevice(object):
         assert not disk.available
         assert "Used by ceph-disk" in disk.rejected_reasons
 
+    def test_reject_removable_device(self, device_info):
+        data = {"/dev/sdb": {"removable": 1}}
+        device_info(devices=data)
+        disk = device.Device("/dev/sdb")
+        assert not disk.available
+
+    def test_accept_non_removable_device(self, device_info):
+        data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
+        device_info(devices=data)
+        disk = device.Device("/dev/sdb")
+        assert disk.available
+
+    def test_reject_readonly_device(self, device_info):
+        data = {"/dev/cdrom": {"ro": 1}}
+        device_info(devices=data)
+        disk = device.Device("/dev/cdrom")
+        assert not disk.available
+
+    def test_reject_smaller_than_5gb(self, device_info):
+        data = {"/dev/sda": {"size": 5368709119}}
+        device_info(devices=data)
+        disk = device.Device("/dev/sda")
+        assert not disk.available, 'too small device is available'
+
+    def test_accept_non_readonly_device(self, device_info):
+        data = {"/dev/sda": {"ro": 0, "size": 5368709120}}
+        device_info(devices=data)
+        disk = device.Device("/dev/sda")
+        assert disk.available
+
+    def test_reject_bluestore_device(self, monkeypatch, patch_bluestore_label):
+        patch_bluestore_label.return_value = True
+        disk = device.Device("/dev/sda")
+        assert not disk.available
+        assert "Has BlueStore device label" in disk.rejected_reasons
+
     @pytest.mark.usefixtures("device_info_not_ceph_disk_member",
                              "disable_lvm_queries",
                              "disable_kernel_queries")
-    def test_is_not_ceph_disk_member_lsblk(self):
+    def test_is_not_ceph_disk_member_lsblk(self, patch_bluestore_label):
         disk = device.Device("/dev/sda")
         assert disk.is_ceph_disk_member is False
 
-    def test_pv_api(self, device_info, pvolumes, pvolumes_empty, monkeypatch):
-        FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
-        pvolumes.append(FooPVolume)
-        monkeypatch.setattr(api, 'PVolumes', lambda populate=True: pvolumes if populate else pvolumes_empty)
-        data = {"/dev/sda": {"foo": "bar"}}
-        lsblk = {"TYPE": "part"}
+    def test_existing_vg_available(self, monkeypatch, device_info):
+        vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
+                             vg_extent_size=1073741824)
+        monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
+        lsblk = {"TYPE": "disk"}
+        data = {"/dev/nvme0n1": {"size": "6442450944"}}
         device_info(devices=data, lsblk=lsblk)
-        disk = device.Device("/dev/sda")
-        assert disk.pvs_api
+        disk = device.Device("/dev/nvme0n1")
+        assert disk.available_lvm
+        assert not disk.available
+        assert not disk.available_raw
+
+    def test_existing_vg_too_small(self, monkeypatch, device_info):
+        vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4,
+                             vg_extent_size=1073741824)
+        monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
+        lsblk = {"TYPE": "disk"}
+        data = {"/dev/nvme0n1": {"size": "6442450944"}}
+        device_info(devices=data, lsblk=lsblk)
+        disk = device.Device("/dev/nvme0n1")
+        assert not disk.available_lvm
+        assert not disk.available
+        assert not disk.available_raw
+
+    def test_multiple_existing_vgs(self, monkeypatch, device_info):
+        vg1 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=4,
+                             vg_extent_size=1073741824)
+        vg2 = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
+                             vg_extent_size=1073741824)
+        monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg1, vg2])
+        lsblk = {"TYPE": "disk"}
+        data = {"/dev/nvme0n1": {"size": "6442450944"}}
+        device_info(devices=data, lsblk=lsblk)
+        disk = device.Device("/dev/nvme0n1")
+        assert disk.available_lvm
+        assert not disk.available
+        assert not disk.available_raw
 
     @pytest.mark.parametrize("ceph_type", ["data", "block"])
     def test_used_by_ceph(self, device_info, pvolumes, pvolumes_empty, monkeypatch, ceph_type):
@@ -198,6 +262,9 @@ class TestDevice(object):
         lsblk = {"TYPE": "part"}
         lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type}}
         device_info(devices=data, lsblk=lsblk, lv=lv_data)
+        vg = api.VolumeGroup(vg_name='foo/bar', vg_free_count=6,
+                             vg_extent_size=1073741824)
+        monkeypatch.setattr(api, 'get_device_vgs', lambda x: [vg])
         disk = device.Device("/dev/sda")
         assert disk.used_by_ceph
 
@@ -387,7 +454,7 @@ class TestCephDiskDevice(object):
     @pytest.mark.usefixtures("blkid_ceph_disk_member",
                              "disable_kernel_queries",
                              "disable_lvm_queries")
-    def test_is_member_blkid(self, monkeypatch):
+    def test_is_member_blkid(self, monkeypatch, patch_bluestore_label):
         monkeypatch.setattr("ceph_volume.util.device.disk.lsblk",
                             lambda path: {'PARTLABEL': ""})
         disk = device.CephDiskDevice(device.Device("/dev/sda"))
@@ -401,7 +468,7 @@ class TestCephDiskDevice(object):
         assert not disk.available
 
     def test_accept_non_removable_device(self, device_info):
-        data = {"/dev/sdb": {"removable": 0}}
+        data = {"/dev/sdb": {"removable": 0, "size": 5368709120}}
         device_info(devices=data)
         disk = device.Device("/dev/sdb")
         assert disk.available
@@ -412,8 +479,14 @@ class TestCephDiskDevice(object):
         disk = device.Device("/dev/cdrom")
         assert not disk.available
 
+    def test_reject_smaller_than_5gb(self, device_info):
+        data = {"/dev/sda": {"size": 5368709119}}
+        device_info(devices=data)
+        disk = device.Device("/dev/sda")
+        assert not disk.available, 'too small device is available'
+
     def test_accept_non_readonly_device(self, device_info):
-        data = {"/dev/sda": {"ro": 0}}
+        data = {"/dev/sda": {"ro": 0, "size": 5368709120}}
         device_info(devices=data)
         disk = device.Device("/dev/sda")
         assert disk.available
@@ -421,7 +494,7 @@ class TestCephDiskDevice(object):
     @pytest.mark.usefixtures("lsblk_ceph_disk_member",
                              "disable_kernel_queries",
                              "disable_lvm_queries")
-    def test_is_member_lsblk(self):
+    def test_is_member_lsblk(self, patch_bluestore_label):
         disk = device.CephDiskDevice(device.Device("/dev/sda"))
 
         assert disk.is_member is True