]>
Commit | Line | Data |
---|---|---|
91327a77 | 1 | import pytest |
1adf2230 AA |
2 | from ceph_volume.util import device |
3 | from ceph_volume.api import lvm as api | |
4 | ||
5 | ||
6 | class TestDevice(object): | |
7 | ||
8 | def test_sys_api(self, device_info): | |
9 | data = {"/dev/sda": {"foo": "bar"}} | |
10 | device_info(devices=data) | |
11 | disk = device.Device("/dev/sda") | |
12 | assert disk.sys_api | |
13 | assert "foo" in disk.sys_api | |
14 | ||
15 | def test_is_lv(self, device_info): | |
91327a77 | 16 | data = {"lv_path": "vg/lv", "vg_name": "vg", "name": "lv"} |
1adf2230 AA |
17 | device_info(lv=data) |
18 | disk = device.Device("vg/lv") | |
19 | assert disk.is_lv | |
20 | ||
21 | def test_is_device(self, device_info): | |
22 | data = {"/dev/sda": {"foo": "bar"}} | |
23 | lsblk = {"TYPE": "device"} | |
24 | device_info(devices=data, lsblk=lsblk) | |
25 | disk = device.Device("/dev/sda") | |
26 | assert disk.is_device | |
27 | ||
28 | def test_is_partition(self, device_info, pvolumes): | |
29 | data = {"/dev/sda": {"foo": "bar"}} | |
30 | lsblk = {"TYPE": "part"} | |
31 | device_info(devices=data, lsblk=lsblk) | |
32 | disk = device.Device("/dev/sda") | |
33 | assert disk.is_partition | |
34 | ||
35 | def test_is_not_lvm_memeber(self, device_info, pvolumes): | |
36 | data = {"/dev/sda": {"foo": "bar"}} | |
37 | lsblk = {"TYPE": "part"} | |
38 | device_info(devices=data, lsblk=lsblk) | |
39 | disk = device.Device("/dev/sda") | |
40 | assert not disk.is_lvm_member | |
41 | ||
42 | def test_is_lvm_memeber(self, device_info, pvolumes): | |
43 | data = {"/dev/sda": {"foo": "bar"}} | |
44 | lsblk = {"TYPE": "part"} | |
45 | device_info(devices=data, lsblk=lsblk) | |
46 | disk = device.Device("/dev/sda") | |
47 | assert not disk.is_lvm_member | |
48 | ||
49 | def test_is_mapper_device(self, device_info): | |
50 | device_info() | |
51 | disk = device.Device("/dev/mapper/foo") | |
52 | assert disk.is_mapper | |
53 | ||
54 | def test_is_not_mapper_device(self, device_info): | |
55 | device_info() | |
56 | disk = device.Device("/dev/sda") | |
57 | assert not disk.is_mapper | |
58 | ||
91327a77 AA |
59 | def test_is_ceph_disk_member_lsblk(self, device_info): |
60 | lsblk = {"PARTLABEL": "ceph data"} | |
61 | device_info(lsblk=lsblk) | |
62 | disk = device.Device("/dev/sda") | |
63 | assert disk.is_ceph_disk_member | |
64 | ||
65 | def test_is_not_ceph_disk_member_lsblk(self, device_info): | |
66 | lsblk = {"PARTLABEL": "gluster partition"} | |
67 | device_info(lsblk=lsblk) | |
68 | disk = device.Device("/dev/sda") | |
69 | assert disk.is_ceph_disk_member is False | |
70 | ||
71 | def test_is_ceph_disk_member_blkid(self, device_info): | |
72 | # falls back to blkid | |
73 | lsblk = {"PARTLABEL": ""} | |
74 | blkid = {"PARTLABEL": "ceph data"} | |
75 | device_info(lsblk=lsblk, blkid=blkid) | |
76 | disk = device.Device("/dev/sda") | |
77 | assert disk.is_ceph_disk_member | |
78 | ||
79 | def test_is_not_ceph_disk_member_blkid(self, device_info): | |
80 | # falls back to blkid | |
81 | lsblk = {"PARTLABEL": ""} | |
82 | blkid = {"PARTLABEL": "gluster partition"} | |
83 | device_info(lsblk=lsblk, blkid=blkid) | |
84 | disk = device.Device("/dev/sda") | |
85 | assert disk.is_ceph_disk_member is False | |
86 | ||
1adf2230 | 87 | def test_pv_api(self, device_info, pvolumes, monkeypatch): |
91327a77 | 88 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg") |
1adf2230 AA |
89 | pvolumes.append(FooPVolume) |
90 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
91 | data = {"/dev/sda": {"foo": "bar"}} | |
92 | lsblk = {"TYPE": "part"} | |
93 | device_info(devices=data, lsblk=lsblk) | |
94 | disk = device.Device("/dev/sda") | |
95 | assert disk.pvs_api | |
91327a77 AA |
96 | |
97 | @pytest.mark.parametrize("ceph_type", ["data", "block"]) | |
98 | def test_used_by_ceph(self, device_info, pvolumes, monkeypatch, ceph_type): | |
99 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg") | |
100 | pvolumes.append(FooPVolume) | |
101 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
102 | data = {"/dev/sda": {"foo": "bar"}} | |
103 | lsblk = {"TYPE": "part"} | |
104 | lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": ceph_type}} | |
105 | device_info(devices=data, lsblk=lsblk, lv=lv_data) | |
106 | disk = device.Device("/dev/sda") | |
107 | assert disk.used_by_ceph | |
108 | ||
109 | def test_not_used_by_ceph(self, device_info, pvolumes, monkeypatch): | |
110 | FooPVolume = api.PVolume(pv_name='/dev/sda', pv_uuid="0000", lv_uuid="0000", pv_tags={}, vg_name="vg") | |
111 | pvolumes.append(FooPVolume) | |
112 | monkeypatch.setattr(api, 'PVolumes', lambda: pvolumes) | |
113 | data = {"/dev/sda": {"foo": "bar"}} | |
114 | lsblk = {"TYPE": "part"} | |
115 | lv_data = {"lv_path": "vg/lv", "vg_name": "vg", "lv_uuid": "0000", "tags": {"ceph.osd_id": 0, "ceph.type": "journal"}} | |
116 | device_info(devices=data, lsblk=lsblk, lv=lv_data) | |
117 | disk = device.Device("/dev/sda") | |
118 | assert not disk.used_by_ceph | |
119 | ||
120 | ||
121 | class TestDeviceOrdering(object): | |
122 | ||
123 | def setup(self): | |
124 | self.data = { | |
125 | "/dev/sda": {"removable": 0}, | |
126 | "/dev/sdb": {"removable": 1}, # invalid | |
127 | "/dev/sdc": {"removable": 0}, | |
128 | "/dev/sdd": {"removable": 1}, # invalid | |
129 | } | |
130 | ||
131 | def test_valid_before_invalid(self, device_info): | |
132 | device_info(devices=self.data) | |
133 | sda = device.Device("/dev/sda") | |
134 | sdb = device.Device("/dev/sdb") | |
135 | ||
136 | assert sda < sdb | |
137 | assert sdb > sda | |
138 | ||
139 | def test_valid_alphabetical_ordering(self, device_info): | |
140 | device_info(devices=self.data) | |
141 | sda = device.Device("/dev/sda") | |
142 | sdc = device.Device("/dev/sdc") | |
143 | ||
144 | assert sda < sdc | |
145 | assert sdc > sda | |
146 | ||
147 | def test_invalid_alphabetical_ordering(self, device_info): | |
148 | device_info(devices=self.data) | |
149 | sdb = device.Device("/dev/sdb") | |
150 | sdd = device.Device("/dev/sdd") | |
151 | ||
152 | assert sdb < sdd | |
153 | assert sdd > sdb | |
154 | ||
155 | ||
156 | ceph_partlabels = [ | |
157 | 'ceph data', 'ceph journal', 'ceph block', | |
158 | 'ceph block.wal', 'ceph block.db', 'ceph lockbox' | |
159 | ] | |
160 | ||
161 | ||
162 | class TestCephDiskDevice(object): | |
163 | ||
164 | def test_partlabel_lsblk(self, device_info): | |
165 | lsblk = {"PARTLABEL": ""} | |
166 | device_info(lsblk=lsblk) | |
167 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
168 | ||
169 | assert disk.partlabel == '' | |
170 | ||
171 | def test_partlabel_blkid(self, device_info): | |
172 | lsblk = {"PARTLABEL": ""} | |
173 | blkid = {"PARTLABEL": "ceph data"} | |
174 | device_info(lsblk=lsblk, blkid=blkid) | |
175 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
176 | ||
177 | assert disk.partlabel == 'ceph data' | |
178 | ||
179 | @pytest.mark.parametrize("label", ceph_partlabels) | |
180 | def test_is_member_blkid(self, device_info, label): | |
181 | lsblk = {"PARTLABEL": ""} | |
182 | blkid = {"PARTLABEL": label} | |
183 | device_info(lsblk=lsblk, blkid=blkid) | |
184 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
185 | ||
186 | assert disk.is_member is True | |
187 | ||
188 | def test_reject_removable_device(self, device_info): | |
189 | data = {"/dev/sdb": {"removable": 1}} | |
190 | device_info(devices=data) | |
191 | disk = device.Device("/dev/sdb") | |
192 | assert not disk.available | |
193 | ||
194 | def test_accept_non_removable_device(self, device_info): | |
195 | data = {"/dev/sdb": {"removable": 0}} | |
196 | device_info(devices=data) | |
197 | disk = device.Device("/dev/sdb") | |
198 | assert disk.available | |
199 | ||
200 | def test_reject_readonly_device(self, device_info): | |
201 | data = {"/dev/cdrom": {"ro": 1}} | |
202 | device_info(devices=data) | |
203 | disk = device.Device("/dev/cdrom") | |
204 | assert not disk.available | |
205 | ||
206 | def test_accept_non_readonly_device(self, device_info): | |
207 | data = {"/dev/sda": {"ro": 0}} | |
208 | device_info(devices=data) | |
209 | disk = device.Device("/dev/sda") | |
210 | assert disk.available | |
211 | ||
212 | @pytest.mark.parametrize("label", ceph_partlabels) | |
213 | def test_is_member_lsblk(self, device_info, label): | |
214 | lsblk = {"PARTLABEL": label} | |
215 | device_info(lsblk=lsblk) | |
216 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
217 | ||
218 | assert disk.is_member is True | |
219 | ||
220 | def test_unknown_type(self, device_info): | |
221 | lsblk = {"PARTLABEL": "gluster"} | |
222 | device_info(lsblk=lsblk) | |
223 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
224 | ||
225 | assert disk.type == 'unknown' | |
226 | ||
227 | @pytest.mark.parametrize("label", ceph_partlabels) | |
228 | def test_type_blkid(self, device_info, label): | |
229 | expected = label.split()[-1].split('.')[-1] | |
230 | lsblk = {"PARTLABEL": ""} | |
231 | blkid = {"PARTLABEL": label} | |
232 | device_info(lsblk=lsblk, blkid=blkid) | |
233 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
234 | ||
235 | assert disk.type == expected | |
236 | ||
237 | @pytest.mark.parametrize("label", ceph_partlabels) | |
238 | def test_type_lsblk(self, device_info, label): | |
239 | expected = label.split()[-1].split('.')[-1] | |
240 | lsblk = {"PARTLABEL": label} | |
241 | blkid = {"PARTLABEL": ''} | |
242 | device_info(lsblk=lsblk, blkid=blkid) | |
243 | disk = device.CephDiskDevice(device.Device("/dev/sda")) | |
244 | ||
245 | assert disk.type == expected |