]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_arg_validators.py
19aaaa3bd7e839f3cda5d02c88597e67db18399b
4 from ceph_volume
import exceptions
, process
5 from ceph_volume
.util
import arg_validators
6 from mock
.mock
import patch
, MagicMock
9 class TestOSDPath(object):
12 self
.validator
= arg_validators
.OSDPath()
14 def test_is_not_root(self
, monkeypatch
):
15 monkeypatch
.setattr(os
, 'getuid', lambda: 100)
16 with pytest
.raises(exceptions
.SuperUserError
):
19 def test_path_is_not_a_directory(self
, is_root
, tmpfile
, monkeypatch
):
20 monkeypatch
.setattr(arg_validators
.disk
, 'is_partition', lambda x
: False)
21 validator
= arg_validators
.OSDPath()
22 with pytest
.raises(argparse
.ArgumentError
):
25 def test_files_are_missing(self
, is_root
, tmpdir
, monkeypatch
):
27 monkeypatch
.setattr(arg_validators
.disk
, 'is_partition', lambda x
: False)
28 validator
= arg_validators
.OSDPath()
29 with pytest
.raises(argparse
.ArgumentError
) as error
:
31 assert 'Required file (ceph_fsid) was not found in OSD' in str(error
.value
)
34 class TestExcludeGroupOptions(object):
37 self
.parser
= argparse
.ArgumentParser()
39 def test_flags_in_one_group(self
):
40 argv
= ['<prog>', '--filestore', '--bar']
41 filestore_group
= self
.parser
.add_argument_group('filestore')
42 bluestore_group
= self
.parser
.add_argument_group('bluestore')
43 filestore_group
.add_argument('--filestore')
44 bluestore_group
.add_argument('--bluestore')
45 result
= arg_validators
.exclude_group_options(
47 ['filestore', 'bluestore'],
52 def test_flags_in_no_group(self
):
53 argv
= ['<prog>', '--foo', '--bar']
54 filestore_group
= self
.parser
.add_argument_group('filestore')
55 bluestore_group
= self
.parser
.add_argument_group('bluestore')
56 filestore_group
.add_argument('--filestore')
57 bluestore_group
.add_argument('--bluestore')
58 result
= arg_validators
.exclude_group_options(
60 ['filestore', 'bluestore'],
65 def test_flags_conflict(self
, capsys
):
66 argv
= ['<prog>', '--filestore', '--bluestore']
67 filestore_group
= self
.parser
.add_argument_group('filestore')
68 bluestore_group
= self
.parser
.add_argument_group('bluestore')
69 filestore_group
.add_argument('--filestore')
70 bluestore_group
.add_argument('--bluestore')
72 arg_validators
.exclude_group_options(
73 self
.parser
, ['filestore', 'bluestore'], argv
=argv
75 stdout
, stderr
= capsys
.readouterr()
76 assert 'Cannot use --filestore (filestore) with --bluestore (bluestore)' in stderr
79 class TestValidDevice(object):
82 self
.validator
= arg_validators
.ValidDevice()
84 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
85 def test_path_is_valid(self
, m_has_bs_label
, fake_call
, patch_bluestore_label
):
86 result
= self
.validator('/')
87 assert result
.abspath
== '/'
89 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
90 def test_path_is_invalid(self
, m_has_bs_label
, fake_call
, patch_bluestore_label
):
91 with pytest
.raises(argparse
.ArgumentError
):
92 self
.validator('/device/does/not/exist')
94 @patch('ceph_volume.util.arg_validators.Device')
95 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
96 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
97 def test_dev_has_partitions(self
, m_get_single_lv
, m_has_bs_label
, mocked_device
, fake_call
):
98 mocked_device
.return_value
= MagicMock(
102 with pytest
.raises(RuntimeError):
103 self
.validator('/dev/foo')
105 class TestValidZapDevice(object):
107 self
.validator
= arg_validators
.ValidZapDevice()
109 @patch('ceph_volume.util.arg_validators.Device')
110 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
111 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
112 def test_device_has_partition(self
, m_get_single_lv
, m_has_bs_label
, mocked_device
):
113 mocked_device
.return_value
= MagicMock(
117 has_gpt_headers
=False,
120 self
.validator
.zap
= False
121 with pytest
.raises(RuntimeError):
122 assert self
.validator('/dev/foo')
124 @patch('ceph_volume.util.arg_validators.Device')
125 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
126 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
127 def test_device_has_no_partition(self
, m_get_single_lv
, m_has_bs_label
, mocked_device
):
128 mocked_device
.return_value
= MagicMock(
131 has_partitions
=False,
132 has_gpt_headers
=False,
135 self
.validator
.zap
= False
136 assert self
.validator('/dev/foo')
138 class TestValidDataDevice(object):
140 self
.validator
= arg_validators
.ValidDataDevice()
142 @patch('ceph_volume.util.arg_validators.Device')
143 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
144 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
145 def test_device_used_by_ceph(self
, m_get_single_lv
, m_has_bs_label
, mocked_device
, fake_call
):
146 mocked_device
.return_value
= MagicMock(
149 has_partitions
=False,
150 has_gpt_headers
=False
152 with pytest
.raises(SystemExit):
153 self
.validator
.zap
= False
154 self
.validator('/dev/foo')
156 @patch('ceph_volume.util.arg_validators.Device')
157 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
158 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
159 def test_device_has_fs(self
, m_get_single_lv
, m_has_bs_label
, mocked_device
, fake_call
):
160 mocked_device
.return_value
= MagicMock(
163 has_partitions
=False,
164 has_gpt_headers
=False,
167 with pytest
.raises(RuntimeError):
168 self
.validator
.zap
= False
169 self
.validator('/dev/foo')
171 @patch('ceph_volume.util.arg_validators.Device')
172 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=True)
173 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
174 def test_device_has_bs_signature(self
, m_get_single_lv
, m_has_bs_label
, mocked_device
, fake_call
):
175 mocked_device
.return_value
= MagicMock(
178 has_partitions
=False,
179 has_gpt_headers
=False,
182 with pytest
.raises(RuntimeError):
183 self
.validator
.zap
= False
184 self
.validator('/dev/foo')
186 class TestValidRawDevice(object):
188 self
.validator
= arg_validators
.ValidRawDevice()
190 @patch('ceph_volume.util.arg_validators.Device')
191 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
192 @patch('ceph_volume.util.arg_validators.disk.blkid')
193 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
194 def test_dmcrypt_device_already_prepared(self
, m_get_single_lv
, m_blkid
, m_has_bs_label
, mocked_device
, fake_call
, monkeypatch
):
195 def mock_call(cmd
, **kw
):
197 monkeypatch
.setattr(process
, 'call', mock_call
)
198 m_blkid
.return_value
= {'UUID': '8fd92779-ad78-437c-a06f-275f7170fa74', 'TYPE': 'crypto_LUKS'}
199 mocked_device
.return_value
= MagicMock(
202 has_partitions
=False,
203 has_gpt_headers
=False,
206 with pytest
.raises(SystemExit):
207 self
.validator
.zap
= False
208 self
.validator('/dev/foo')
210 @patch('ceph_volume.util.arg_validators.Device')
211 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
212 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
213 def test_device_already_prepared(self
, m_get_single_lv
, m_has_bs_label
, mocked_device
, fake_call
):
214 mocked_device
.return_value
= MagicMock(
217 has_partitions
=False,
218 has_gpt_headers
=False,
221 with pytest
.raises(SystemExit):
222 self
.validator
.zap
= False
223 self
.validator('/dev/foo')
225 @patch('ceph_volume.util.arg_validators.Device')
226 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
227 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
228 def test_device_not_prepared(self
, m_get_single_lv
, m_has_bs_label
, mocked_device
, fake_call
, monkeypatch
):
229 def mock_call(cmd
, **kw
):
231 monkeypatch
.setattr(process
, 'call', mock_call
)
232 mocked_device
.return_value
= MagicMock(
235 has_partitions
=False,
236 has_gpt_headers
=False,
239 self
.validator
.zap
= False
240 assert self
.validator('/dev/foo')
242 @patch('ceph_volume.util.arg_validators.Device')
243 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
244 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
245 def test_device_has_partition(self
, m_get_single_lv
, m_has_bs_label
, mocked_device
, fake_call
, monkeypatch
):
246 def mock_call(cmd
, **kw
):
248 monkeypatch
.setattr(process
, 'call', mock_call
)
249 mocked_device
.return_value
= MagicMock(
253 has_gpt_headers
=False,
256 self
.validator
.zap
= False
257 with pytest
.raises(RuntimeError):
258 assert self
.validator('/dev/foo')
260 class TestValidBatchDevice(object):
262 self
.validator
= arg_validators
.ValidBatchDevice()
264 @patch('ceph_volume.util.arg_validators.Device')
265 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
266 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
267 def test_device_is_partition(self
, m_get_single_lv
, m_has_bs_label
, mocked_device
, fake_call
):
268 mocked_device
.return_value
= MagicMock(
271 has_partitions
=False,
272 has_gpt_headers
=False,
276 with pytest
.raises(argparse
.ArgumentError
):
277 self
.validator
.zap
= False
278 self
.validator('/dev/foo')
280 @patch('ceph_volume.util.arg_validators.Device')
281 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
282 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
283 def test_device_is_not_partition(self
, m_get_single_lv
, m_has_bs_label
, mocked_device
, fake_call
):
284 mocked_device
.return_value
= MagicMock(
287 has_partitions
=False,
288 has_gpt_headers
=False,
292 self
.validator
.zap
= False
293 assert self
.validator('/dev/foo')
295 class TestValidBatchDataDevice(object):
297 self
.validator
= arg_validators
.ValidBatchDataDevice()
299 @patch('ceph_volume.util.arg_validators.Device')
300 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
301 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
302 def test_device_is_partition(self
, m_get_single_lv
, m_has_bs_label
, mocked_device
, fake_call
):
303 mocked_device
.return_value
= MagicMock(
306 has_partitions
=False,
307 has_gpt_headers
=False,
311 with pytest
.raises(argparse
.ArgumentError
):
312 self
.validator
.zap
= False
313 assert self
.validator('/dev/foo')
315 @patch('ceph_volume.util.arg_validators.Device')
316 @patch('ceph_volume.util.arg_validators.disk.has_bluestore_label', return_value
=False)
317 @patch('ceph_volume.api.lvm.get_single_lv', return_value
=None)
318 def test_device_is_not_partition(self
, m_get_single_lv
, m_has_bs_label
, mocked_device
, fake_call
):
319 mocked_device
.return_value
= MagicMock(
322 has_partitions
=False,
323 has_gpt_headers
=False,
327 self
.validator
.zap
= False
328 assert self
.validator('/dev/foo')
331 class TestValidFraction(object):
334 self
.validator
= arg_validators
.ValidFraction()
336 def test_fraction_is_valid(self
, fake_call
):
337 result
= self
.validator('0.8')
340 def test_fraction_not_float(self
, fake_call
):
341 with pytest
.raises(ValueError):
342 self
.validator('xyz')
344 def test_fraction_is_nan(self
, fake_call
):
345 with pytest
.raises(argparse
.ArgumentError
):
346 self
.validator('NaN')
348 def test_fraction_is_negative(self
, fake_call
):
349 with pytest
.raises(argparse
.ArgumentError
):
350 self
.validator('-1.0')
352 def test_fraction_is_zero(self
, fake_call
):
353 with pytest
.raises(argparse
.ArgumentError
):
354 self
.validator('0.0')
356 def test_fraction_is_greater_one(self
, fake_call
):
357 with pytest
.raises(argparse
.ArgumentError
):
358 self
.validator('1.1')