]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/util/test_arg_validators.py
import ceph quincy 17.2.4
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / util / test_arg_validators.py
CommitLineData
b5b8bbf5 1import argparse
3a9019d9 2import pytest
f91f0fd5 3import os
33c7a0ef 4from ceph_volume import exceptions, process
b5b8bbf5 5from ceph_volume.util import arg_validators
33c7a0ef 6from mock.mock import patch, MagicMock
b5b8bbf5
FG
7
8
3efd9988
FG
9class TestOSDPath(object):
10
11 def setup(self):
12 self.validator = arg_validators.OSDPath()
13
f91f0fd5
TL
14 def test_is_not_root(self, monkeypatch):
15 monkeypatch.setattr(os, 'getuid', lambda: 100)
3efd9988
FG
16 with pytest.raises(exceptions.SuperUserError):
17 self.validator('')
18
2a845540
TL
19 def test_path_is_not_a_directory(self, is_root, monkeypatch, fake_filesystem):
20 fake_file = fake_filesystem.create_file('/tmp/foo')
3efd9988
FG
21 monkeypatch.setattr(arg_validators.disk, 'is_partition', lambda x: False)
22 validator = arg_validators.OSDPath()
23 with pytest.raises(argparse.ArgumentError):
2a845540 24 validator(fake_file.path)
3efd9988
FG
25
26 def test_files_are_missing(self, is_root, tmpdir, monkeypatch):
27 tmppath = str(tmpdir)
28 monkeypatch.setattr(arg_validators.disk, 'is_partition', lambda x: False)
29 validator = arg_validators.OSDPath()
30 with pytest.raises(argparse.ArgumentError) as error:
31 validator(tmppath)
81eedcae 32 assert 'Required file (ceph_fsid) was not found in OSD' in str(error.value)
3a9019d9
FG
33
34
35class TestExcludeGroupOptions(object):
36
37 def setup(self):
38 self.parser = argparse.ArgumentParser()
39
40 def test_flags_in_one_group(self):
41 argv = ['<prog>', '--filestore', '--bar']
42 filestore_group = self.parser.add_argument_group('filestore')
43 bluestore_group = self.parser.add_argument_group('bluestore')
44 filestore_group.add_argument('--filestore')
45 bluestore_group.add_argument('--bluestore')
46 result = arg_validators.exclude_group_options(
47 self.parser,
48 ['filestore', 'bluestore'],
49 argv=argv
50 )
51 assert result is None
52
53 def test_flags_in_no_group(self):
54 argv = ['<prog>', '--foo', '--bar']
55 filestore_group = self.parser.add_argument_group('filestore')
56 bluestore_group = self.parser.add_argument_group('bluestore')
57 filestore_group.add_argument('--filestore')
58 bluestore_group.add_argument('--bluestore')
59 result = arg_validators.exclude_group_options(
60 self.parser,
61 ['filestore', 'bluestore'],
62 argv=argv
63 )
64 assert result is None
65
66 def test_flags_conflict(self, capsys):
67 argv = ['<prog>', '--filestore', '--bluestore']
68 filestore_group = self.parser.add_argument_group('filestore')
69 bluestore_group = self.parser.add_argument_group('bluestore')
70 filestore_group.add_argument('--filestore')
71 bluestore_group.add_argument('--bluestore')
72
73 arg_validators.exclude_group_options(
74 self.parser, ['filestore', 'bluestore'], argv=argv
75 )
76 stdout, stderr = capsys.readouterr()
494da23a 77 assert 'Cannot use --filestore (filestore) with --bluestore (bluestore)' in stderr
1adf2230
AA
78
79
80class TestValidDevice(object):
81
2a845540 82 def setup(self, fake_filesystem):
1adf2230
AA
83 self.validator = arg_validators.ValidDevice()
84
33c7a0ef 85 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
2a845540
TL
86 def test_path_is_valid(self, m_has_bs_label,
87 fake_call, patch_bluestore_label,
88 device_info, monkeypatch):
89 monkeypatch.setattr('ceph_volume.util.device.Device.exists', lambda: True)
90 lsblk = {"TYPE": "disk", "NAME": "sda"}
91 device_info(lsblk=lsblk)
92 result = self.validator('/dev/sda')
93 assert result.path == '/dev/sda'
1adf2230 94
33c7a0ef 95 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
2a845540
TL
96 def test_path_is_invalid(self, m_has_bs_label,
97 fake_call, patch_bluestore_label,
98 device_info):
99 lsblk = {"TYPE": "disk", "NAME": "sda"}
100 device_info(lsblk=lsblk)
1adf2230
AA
101 with pytest.raises(argparse.ArgumentError):
102 self.validator('/device/does/not/exist')
20effc67 103
33c7a0ef
TL
104 @patch('ceph_volume.util.arg_validators.Device')
105 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
1d09f67e 106 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
33c7a0ef
TL
107 def test_dev_has_partitions(self, m_get_single_lv, m_has_bs_label, mocked_device, fake_call):
108 mocked_device.return_value = MagicMock(
109 exists=True,
110 has_partitions=True,
111 )
112 with pytest.raises(RuntimeError):
113 self.validator('/dev/foo')
114
115class TestValidZapDevice(object):
116 def setup(self):
117 self.validator = arg_validators.ValidZapDevice()
118
119 @patch('ceph_volume.util.arg_validators.Device')
120 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
121 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
122 def test_device_has_partition(self, m_get_single_lv, m_has_bs_label, mocked_device):
123 mocked_device.return_value = MagicMock(
124 used_by_ceph=False,
125 exists=True,
126 has_partitions=True,
127 has_gpt_headers=False,
128 has_fs=False
129 )
130 self.validator.zap = False
131 with pytest.raises(RuntimeError):
132 assert self.validator('/dev/foo')
133
134 @patch('ceph_volume.util.arg_validators.Device')
135 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
136 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
137 def test_device_has_no_partition(self, m_get_single_lv, m_has_bs_label, mocked_device):
138 mocked_device.return_value = MagicMock(
139 used_by_ceph=False,
140 exists=True,
141 has_partitions=False,
142 has_gpt_headers=False,
143 has_fs=False
144 )
145 self.validator.zap = False
146 assert self.validator('/dev/foo')
147
148class TestValidDataDevice(object):
149 def setup(self):
150 self.validator = arg_validators.ValidDataDevice()
151
152 @patch('ceph_volume.util.arg_validators.Device')
153 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
154 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
155 def test_device_used_by_ceph(self, m_get_single_lv, m_has_bs_label, mocked_device, fake_call):
156 mocked_device.return_value = MagicMock(
157 used_by_ceph=True,
158 exists=True,
159 has_partitions=False,
160 has_gpt_headers=False
161 )
162 with pytest.raises(SystemExit):
163 self.validator.zap = False
164 self.validator('/dev/foo')
165
166 @patch('ceph_volume.util.arg_validators.Device')
167 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
168 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
169 def test_device_has_fs(self, m_get_single_lv, m_has_bs_label, mocked_device, fake_call):
170 mocked_device.return_value = MagicMock(
171 used_by_ceph=False,
172 exists=True,
173 has_partitions=False,
174 has_gpt_headers=False,
175 has_fs=True
176 )
177 with pytest.raises(RuntimeError):
178 self.validator.zap = False
179 self.validator('/dev/foo')
180
181 @patch('ceph_volume.util.arg_validators.Device')
182 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=True)
183 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
184 def test_device_has_bs_signature(self, m_get_single_lv, m_has_bs_label, mocked_device, fake_call):
185 mocked_device.return_value = MagicMock(
186 used_by_ceph=False,
187 exists=True,
188 has_partitions=False,
189 has_gpt_headers=False,
190 has_fs=False
191 )
192 with pytest.raises(RuntimeError):
193 self.validator.zap = False
194 self.validator('/dev/foo')
195
196class TestValidRawDevice(object):
197 def setup(self):
198 self.validator = arg_validators.ValidRawDevice()
199
200 @patch('ceph_volume.util.arg_validators.Device')
201 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
202 @patch('ceph_volume.util.arg_validators.disk.blkid')
203 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
204 def test_dmcrypt_device_already_prepared(self, m_get_single_lv, m_blkid, m_has_bs_label, mocked_device, fake_call, monkeypatch):
205 def mock_call(cmd, **kw):
206 return ('', '', 1)
207 monkeypatch.setattr(process, 'call', mock_call)
208 m_blkid.return_value = {'UUID': '8fd92779-ad78-437c-a06f-275f7170fa74', 'TYPE': 'crypto_LUKS'}
209 mocked_device.return_value = MagicMock(
210 used_by_ceph=False,
211 exists=True,
212 has_partitions=False,
213 has_gpt_headers=False,
214 has_fs=False
215 )
216 with pytest.raises(SystemExit):
217 self.validator.zap = False
218 self.validator('/dev/foo')
219
220 @patch('ceph_volume.util.arg_validators.Device')
221 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
222 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
223 def test_device_already_prepared(self, m_get_single_lv, m_has_bs_label, mocked_device, fake_call):
224 mocked_device.return_value = MagicMock(
225 used_by_ceph=False,
226 exists=True,
227 has_partitions=False,
228 has_gpt_headers=False,
229 has_fs=False
230 )
231 with pytest.raises(SystemExit):
232 self.validator.zap = False
233 self.validator('/dev/foo')
234
235 @patch('ceph_volume.util.arg_validators.Device')
236 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
237 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
238 def test_device_not_prepared(self, m_get_single_lv, m_has_bs_label, mocked_device, fake_call, monkeypatch):
239 def mock_call(cmd, **kw):
240 return ('', '', 1)
241 monkeypatch.setattr(process, 'call', mock_call)
242 mocked_device.return_value = MagicMock(
243 used_by_ceph=False,
244 exists=True,
245 has_partitions=False,
246 has_gpt_headers=False,
247 has_fs=False
248 )
249 self.validator.zap = False
250 assert self.validator('/dev/foo')
251
252 @patch('ceph_volume.util.arg_validators.Device')
253 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
254 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
255 def test_device_has_partition(self, m_get_single_lv, m_has_bs_label, mocked_device, fake_call, monkeypatch):
256 def mock_call(cmd, **kw):
257 return ('', '', 1)
258 monkeypatch.setattr(process, 'call', mock_call)
259 mocked_device.return_value = MagicMock(
260 used_by_ceph=False,
261 exists=True,
262 has_partitions=True,
263 has_gpt_headers=False,
264 has_fs=False
265 )
266 self.validator.zap = False
1d09f67e 267 with pytest.raises(RuntimeError):
33c7a0ef
TL
268 assert self.validator('/dev/foo')
269
270class TestValidBatchDevice(object):
271 def setup(self):
272 self.validator = arg_validators.ValidBatchDevice()
273
274 @patch('ceph_volume.util.arg_validators.Device')
275 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
276 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
277 def test_device_is_partition(self, m_get_single_lv, m_has_bs_label, mocked_device, fake_call):
278 mocked_device.return_value = MagicMock(
279 used_by_ceph=False,
280 exists=True,
281 has_partitions=False,
282 has_gpt_headers=False,
283 has_fs=False,
284 is_partition=True
285 )
286 with pytest.raises(argparse.ArgumentError):
287 self.validator.zap = False
1d09f67e
TL
288 self.validator('/dev/foo')
289
33c7a0ef
TL
290 @patch('ceph_volume.util.arg_validators.Device')
291 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
292 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
293 def test_device_is_not_partition(self, m_get_single_lv, m_has_bs_label, mocked_device, fake_call):
294 mocked_device.return_value = MagicMock(
295 used_by_ceph=False,
296 exists=True,
297 has_partitions=False,
298 has_gpt_headers=False,
299 has_fs=False,
300 is_partition=False
301 )
302 self.validator.zap = False
303 assert self.validator('/dev/foo')
304
305class TestValidBatchDataDevice(object):
306 def setup(self):
307 self.validator = arg_validators.ValidBatchDataDevice()
308
309 @patch('ceph_volume.util.arg_validators.Device')
310 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
311 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
312 def test_device_is_partition(self, m_get_single_lv, m_has_bs_label, mocked_device, fake_call):
313 mocked_device.return_value = MagicMock(
314 used_by_ceph=False,
315 exists=True,
316 has_partitions=False,
317 has_gpt_headers=False,
318 has_fs=False,
319 is_partition=True
320 )
321 with pytest.raises(argparse.ArgumentError):
322 self.validator.zap = False
323 assert self.validator('/dev/foo')
324
325 @patch('ceph_volume.util.arg_validators.Device')
326 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value=False)
327 @patch('ceph_volume.api.lvm.get_single_lv', return_value=None)
328 def test_device_is_not_partition(self, m_get_single_lv, m_has_bs_label, mocked_device, fake_call):
329 mocked_device.return_value = MagicMock(
330 used_by_ceph=False,
331 exists=True,
332 has_partitions=False,
333 has_gpt_headers=False,
334 has_fs=False,
335 is_partition=False
336 )
337 self.validator.zap = False
338 assert self.validator('/dev/foo')
339
20effc67
TL
340
341class TestValidFraction(object):
342
343 def setup(self):
344 self.validator = arg_validators.ValidFraction()
345
346 def test_fraction_is_valid(self, fake_call):
347 result = self.validator('0.8')
348 assert result == 0.8
349
350 def test_fraction_not_float(self, fake_call):
351 with pytest.raises(ValueError):
352 self.validator('xyz')
353
354 def test_fraction_is_nan(self, fake_call):
355 with pytest.raises(argparse.ArgumentError):
356 self.validator('NaN')
357
358 def test_fraction_is_negative(self, fake_call):
359 with pytest.raises(argparse.ArgumentError):
360 self.validator('-1.0')
361
362 def test_fraction_is_zero(self, fake_call):
363 with pytest.raises(argparse.ArgumentError):
364 self.validator('0.0')
365
366 def test_fraction_is_greater_one(self, fake_call):
367 with pytest.raises(argparse.ArgumentError):
368 self.validator('1.1')