]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/util/test_device.py
update sources to 12.2.10
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / util / test_device.py
CommitLineData
91327a77 1import pytest
1adf2230
AA
2from ceph_volume.util import device
3from ceph_volume.api import lvm as api
4
5
6class TestDevice(object):
7
8 def test_sys_api(self, device_info):
9 data = {"/dev/sda": {"foo": "bar"}}
10 device_info(devices=data)
11 disk = device.Device("/dev/sda")
12 assert disk.sys_api
13 assert "foo" in disk.sys_api
14
15 def test_is_lv(self, device_info):
91327a77 16 data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"}
1adf2230
AA
17 device_info(lv=data)
18 disk = device.Device("vg/lv")
19 assert disk.is_lv
20
21 def test_is_device(self, device_info):
22 data = {"/dev/sda": {"foo": "bar"}}
23 lsblk = {"TYPE": "device"}
24 device_info(devices=data, lsblk=lsblk)
25 disk = device.Device("/dev/sda")
26 assert disk.is_device
27
28 def test_is_partition(self, device_info, pvolumes):
29 data = {"/dev/sda": {"foo": "bar"}}
30 lsblk = {"TYPE": "part"}
31 device_info(devices=data, lsblk=lsblk)
32 disk = device.Device("/dev/sda")
33 assert disk.is_partition
34
35 def test_is_not_lvm_memeber(self, device_info, pvolumes):
36 data = {"/dev/sda": {"foo": "bar"}}
37 lsblk = {"TYPE": "part"}
38 device_info(devices=data, lsblk=lsblk)
39 disk = device.Device("/dev/sda")
40 assert not disk.is_lvm_member
41
42 def test_is_lvm_memeber(self, device_info, pvolumes):
43 data = {"/dev/sda": {"foo": "bar"}}
44 lsblk = {"TYPE": "part"}
45 device_info(devices=data, lsblk=lsblk)
46 disk = device.Device("/dev/sda")
47 assert not disk.is_lvm_member
48
49 def test_is_mapper_device(self, device_info):
50 device_info()
51 disk = device.Device("/dev/mapper/foo")
52 assert disk.is_mapper
53
54 def test_is_not_mapper_device(self, device_info):
55 device_info()
56 disk = device.Device("/dev/sda")
57 assert not disk.is_mapper
58
91327a77
AA
59 def test_is_ceph_disk_member_lsblk(self, device_info):
60 lsblk = {"PARTLABEL": "ceph data"}
61 device_info(lsblk=lsblk)
62 disk = device.Device("/dev/sda")
63 assert disk.is_ceph_disk_member
64
65 def test_is_not_ceph_disk_member_lsblk(self, device_info):
66 lsblk = {"PARTLABEL": "gluster partition"}
67 device_info(lsblk=lsblk)
68 disk = device.Device("/dev/sda")
69 assert disk.is_ceph_disk_member is False
70
71 def test_is_ceph_disk_member_blkid(self, device_info):
72 # falls back to blkid
73 lsblk = {"PARTLABEL": ""}
74 blkid = {"PARTLABEL": "ceph data"}
75 device_info(lsblk=lsblk, blkid=blkid)
76 disk = device.Device("/dev/sda")
77 assert disk.is_ceph_disk_member
78
79 def test_is_not_ceph_disk_member_blkid(self, device_info):
80 # falls back to blkid
81 lsblk = {"PARTLABEL": ""}
82 blkid = {"PARTLABEL": "gluster partition"}
83 device_info(lsblk=lsblk, blkid=blkid)
84 disk = device.Device("/dev/sda")
85 assert disk.is_ceph_disk_member is False
86
1adf2230 87 def test_pv_api(self, device_info, pvolumes, monkeypatch):
91327a77 88 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
1adf2230
AA
89 pvolumes.append(FooPVolume)
90 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
91 data = {"/dev/sda": {"foo": "bar"}}
92 lsblk = {"TYPE": "part"}
93 device_info(devices=data, lsblk=lsblk)
94 disk = device.Device("/dev/sda")
95 assert disk.pvs_api
91327a77
AA
96
97 @pytest.mark.parametrize("ceph_type", ["data", "block"])
98 def test_used_by_ceph(self, device_info, pvolumes, monkeypatch, ceph_type):
99 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
100 pvolumes.append(FooPVolume)
101 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
102 data = {"/dev/sda": {"foo": "bar"}}
103 lsblk = {"TYPE": "part"}
104 lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type}}
105 device_info(devices=data, lsblk=lsblk, lv=lv_data)
106 disk = device.Device("/dev/sda")
107 assert disk.used_by_ceph
108
109 def test_not_used_by_ceph(self, device_info, pvolumes, monkeypatch):
110 FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg")
111 pvolumes.append(FooPVolume)
112 monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes)
113 data = {"/dev/sda": {"foo": "bar"}}
114 lsblk = {"TYPE": "part"}
115 lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}}
116 device_info(devices=data, lsblk=lsblk, lv=lv_data)
117 disk = device.Device("/dev/sda")
118 assert not disk.used_by_ceph
119
120
121class TestDeviceOrdering(object):
122
123 def setup(self):
124 self.data = {
125 "/dev/sda": {"removable": 0},
126 "/dev/sdb": {"removable": 1}, # invalid
127 "/dev/sdc": {"removable": 0},
128 "/dev/sdd": {"removable": 1}, # invalid
129 }
130
131 def test_valid_before_invalid(self, device_info):
132 device_info(devices=self.data)
133 sda = device.Device("/dev/sda")
134 sdb = device.Device("/dev/sdb")
135
136 assert sda < sdb
137 assert sdb > sda
138
139 def test_valid_alphabetical_ordering(self, device_info):
140 device_info(devices=self.data)
141 sda = device.Device("/dev/sda")
142 sdc = device.Device("/dev/sdc")
143
144 assert sda < sdc
145 assert sdc > sda
146
147 def test_invalid_alphabetical_ordering(self, device_info):
148 device_info(devices=self.data)
149 sdb = device.Device("/dev/sdb")
150 sdd = device.Device("/dev/sdd")
151
152 assert sdb < sdd
153 assert sdd > sdb
154
155
156ceph_partlabels = [
157 'ceph data', 'ceph journal', 'ceph block',
158 'ceph block.wal', 'ceph block.db', 'ceph lockbox'
159]
160
161
162class TestCephDiskDevice(object):
163
164 def test_partlabel_lsblk(self, device_info):
165 lsblk = {"PARTLABEL": ""}
166 device_info(lsblk=lsblk)
167 disk = device.CephDiskDevice(device.Device("/dev/sda"))
168
169 assert disk.partlabel == ''
170
171 def test_partlabel_blkid(self, device_info):
172 lsblk = {"PARTLABEL": ""}
173 blkid = {"PARTLABEL": "ceph data"}
174 device_info(lsblk=lsblk, blkid=blkid)
175 disk = device.CephDiskDevice(device.Device("/dev/sda"))
176
177 assert disk.partlabel == 'ceph data'
178
179 @pytest.mark.parametrize("label", ceph_partlabels)
180 def test_is_member_blkid(self, device_info, label):
181 lsblk = {"PARTLABEL": ""}
182 blkid = {"PARTLABEL": label}
183 device_info(lsblk=lsblk, blkid=blkid)
184 disk = device.CephDiskDevice(device.Device("/dev/sda"))
185
186 assert disk.is_member is True
187
188 def test_reject_removable_device(self, device_info):
189 data = {"/dev/sdb": {"removable": 1}}
190 device_info(devices=data)
191 disk = device.Device("/dev/sdb")
192 assert not disk.available
193
194 def test_accept_non_removable_device(self, device_info):
195 data = {"/dev/sdb": {"removable": 0}}
196 device_info(devices=data)
197 disk = device.Device("/dev/sdb")
198 assert disk.available
199
200 def test_reject_readonly_device(self, device_info):
201 data = {"/dev/cdrom": {"ro": 1}}
202 device_info(devices=data)
203 disk = device.Device("/dev/cdrom")
204 assert not disk.available
205
206 def test_accept_non_readonly_device(self, device_info):
207 data = {"/dev/sda": {"ro": 0}}
208 device_info(devices=data)
209 disk = device.Device("/dev/sda")
210 assert disk.available
211
212 @pytest.mark.parametrize("label", ceph_partlabels)
213 def test_is_member_lsblk(self, device_info, label):
214 lsblk = {"PARTLABEL": label}
215 device_info(lsblk=lsblk)
216 disk = device.CephDiskDevice(device.Device("/dev/sda"))
217
218 assert disk.is_member is True
219
220 def test_unknown_type(self, device_info):
221 lsblk = {"PARTLABEL": "gluster"}
222 device_info(lsblk=lsblk)
223 disk = device.CephDiskDevice(device.Device("/dev/sda"))
224
225 assert disk.type == 'unknown'
226
227 @pytest.mark.parametrize("label", ceph_partlabels)
228 def test_type_blkid(self, device_info, label):
229 expected = label.split()[-1].split('.')[-1]
230 lsblk = {"PARTLABEL": ""}
231 blkid = {"PARTLABEL": label}
232 device_info(lsblk=lsblk, blkid=blkid)
233 disk = device.CephDiskDevice(device.Device("/dev/sda"))
234
235 assert disk.type == expected
236
237 @pytest.mark.parametrize("label", ceph_partlabels)
238 def test_type_lsblk(self, device_info, label):
239 expected = label.split()[-1].split('.')[-1]
240 lsblk = {"PARTLABEL": label}
241 blkid = {"PARTLABEL": ''}
242 device_info(lsblk=lsblk, blkid=blkid)
243 disk = device.CephDiskDevice(device.Device("/dev/sda"))
244
245 assert disk.type == expected