]> git.proxmox.com Git - ceph.git/blame - ceph/src/ceph-volume/ceph_volume/tests/util/test_disk.py
bump version to 19.2.0-pve1
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / util / test_disk.py
CommitLineData
1adf2230
AA
1import os
2import pytest
b32b8144 3from ceph_volume.util import disk
aee94f69
TL
4from mock.mock import patch, MagicMock
5
6
7class TestFunctions:
8 @patch('ceph_volume.util.disk.os.path.exists', MagicMock(return_value=False))
9 def test_is_device_path_does_not_exist(self):
10 assert not disk.is_device('/dev/foo')
11
12 @patch('ceph_volume.util.disk.os.path.exists', MagicMock(return_value=True))
13 def test_is_device_dev_doesnt_startswith_dev(self):
14 assert not disk.is_device('/foo')
15
16 @patch('ceph_volume.util.disk.allow_loop_devices', MagicMock(return_value=False))
17 @patch('ceph_volume.util.disk.os.path.exists', MagicMock(return_value=True))
18 def test_is_device_loop_not_allowed(self):
19 assert not disk.is_device('/dev/loop123')
20
21 @patch('ceph_volume.util.disk.lsblk', MagicMock(return_value={'NAME': 'foo', 'TYPE': 'disk'}))
22 @patch('ceph_volume.util.disk.os.path.exists', MagicMock(return_value=True))
23 def test_is_device_type_disk(self):
24 assert disk.is_device('/dev/foo')
25
26 @patch('ceph_volume.util.disk.lsblk', MagicMock(return_value={'NAME': 'foo', 'TYPE': 'mpath'}))
27 @patch('ceph_volume.util.disk.os.path.exists', MagicMock(return_value=True))
28 def test_is_device_type_mpath(self):
29 assert disk.is_device('/dev/foo')
30
31 @patch('ceph_volume.util.disk.lsblk', MagicMock(return_value={'NAME': 'foo1', 'TYPE': 'part'}))
32 @patch('ceph_volume.util.disk.os.path.exists', MagicMock(return_value=True))
33 def test_is_device_type_part(self):
34 assert not disk.is_device('/dev/foo1')
b32b8144 35
f38dd50b
TL
36 @patch('ceph_volume.util.disk.os.path.exists', MagicMock(return_value=True))
37 @patch('ceph_volume.util.disk.get_partitions', MagicMock(return_value={"sda1": "sda"}))
38 def test_is_partition(self):
39 assert disk.is_partition('sda1')
40
b32b8144
FG
41
42class TestLsblkParser(object):
43
44 def test_parses_whitespace_values(self):
45 output = 'NAME="sdaa5" PARTLABEL="ceph data" RM="0" SIZE="10M" RO="0" TYPE="part"'
46 result = disk._lsblk_parser(output)
47 assert result['PARTLABEL'] == 'ceph data'
48
49 def test_ignores_bogus_pairs(self):
50 output = 'NAME="sdaa5" PARTLABEL RM="0" SIZE="10M" RO="0" TYPE="part" MOUNTPOINT=""'
51 result = disk._lsblk_parser(output)
52 assert result['SIZE'] == '10M'
53
54
91327a77
AA
55class TestBlkidParser(object):
56
57 def test_parses_whitespace_values(self):
58 output = '''/dev/sdb1: UUID="62416664-cbaf-40bd-9689-10bd337379c3" TYPE="xfs" PART_ENTRY_SCHEME="gpt" PART_ENTRY_NAME="ceph data" PART_ENTRY_UUID="b89c03bc-bf58-4338-a8f8-a2f484852b4f"''' # noqa
59 result = disk._blkid_parser(output)
60 assert result['PARTLABEL'] == 'ceph data'
61
62 def test_ignores_unmapped(self):
63 output = '''/dev/sdb1: UUID="62416664-cbaf-40bd-9689-10bd337379c3" TYPE="xfs" PART_ENTRY_SCHEME="gpt" PART_ENTRY_NAME="ceph data" PART_ENTRY_UUID="b89c03bc-bf58-4338-a8f8-a2f484852b4f"''' # noqa
64 result = disk._blkid_parser(output)
65 assert len(result.keys()) == 4
66
67 def test_translates_to_partuuid(self):
68 output = '''/dev/sdb1: UUID="62416664-cbaf-40bd-9689-10bd337379c3" TYPE="xfs" PART_ENTRY_SCHEME="gpt" PART_ENTRY_NAME="ceph data" PART_ENTRY_UUID="b89c03bc-bf58-4338-a8f8-a2f484852b4f"''' # noqa
69 result = disk._blkid_parser(output)
70 assert result['PARTUUID'] == 'b89c03bc-bf58-4338-a8f8-a2f484852b4f'
71
72
73class TestBlkid(object):
74
75 def test_parses_translated(self, stub_call):
76 output = '''/dev/sdb1: UUID="62416664-cbaf-40bd-9689-10bd337379c3" TYPE="xfs" PART_ENTRY_SCHEME="gpt" PART_ENTRY_NAME="ceph data" PART_ENTRY_UUID="b89c03bc-bf58-4338-a8f8-a2f484852b4f"''' # noqa
77 stub_call((output.split(), [], 0))
78 result = disk.blkid('/dev/sdb1')
79 assert result['PARTUUID'] == 'b89c03bc-bf58-4338-a8f8-a2f484852b4f'
80 assert result['PARTLABEL'] == 'ceph data'
81 assert result['UUID'] == '62416664-cbaf-40bd-9689-10bd337379c3'
82 assert result['TYPE'] == 'xfs'
83
f64942e4
AA
84class TestUdevadmProperty(object):
85
86 def test_good_output(self, stub_call):
87 output = """ID_MODEL=SK_hynix_SC311_SATA_512GB
88ID_PART_TABLE_TYPE=gpt
89ID_SERIAL_SHORT=MS83N71801150416A""".split()
90 stub_call((output, [], 0))
91 result = disk.udevadm_property('dev/sda')
92 assert result['ID_MODEL'] == 'SK_hynix_SC311_SATA_512GB'
93 assert result['ID_PART_TABLE_TYPE'] == 'gpt'
94 assert result['ID_SERIAL_SHORT'] == 'MS83N71801150416A'
95
96 def test_property_filter(self, stub_call):
97 output = """ID_MODEL=SK_hynix_SC311_SATA_512GB
98ID_PART_TABLE_TYPE=gpt
99ID_SERIAL_SHORT=MS83N71801150416A""".split()
100 stub_call((output, [], 0))
101 result = disk.udevadm_property('dev/sda', ['ID_MODEL',
102 'ID_SERIAL_SHORT'])
103 assert result['ID_MODEL'] == 'SK_hynix_SC311_SATA_512GB'
104 assert 'ID_PART_TABLE_TYPE' not in result
105
106 def test_fail_on_broken_output(self, stub_call):
107 output = ["ID_MODEL:SK_hynix_SC311_SATA_512GB"]
108 stub_call((output, [], 0))
109 with pytest.raises(ValueError):
110 disk.udevadm_property('dev/sda')
111
91327a77 112
b32b8144
FG
113class TestDeviceFamily(object):
114
115 def test_groups_multiple_devices(self, stub_call):
116 out = [
117 'NAME="sdaa5" PARLABEL="ceph lockbox"',
118 'NAME="sdaa" RO="0"',
119 'NAME="sdaa1" PARLABEL="ceph data"',
120 'NAME="sdaa2" PARLABEL="ceph journal"',
121 ]
122 stub_call((out, '', 0))
123 result = disk.device_family('sdaa5')
124 assert len(result) == 4
125
126 def test_parses_output_correctly(self, stub_call):
127 names = ['sdaa', 'sdaa5', 'sdaa1', 'sdaa2']
128 out = [
129 'NAME="sdaa5" PARLABEL="ceph lockbox"',
130 'NAME="sdaa" RO="0"',
131 'NAME="sdaa1" PARLABEL="ceph data"',
132 'NAME="sdaa2" PARLABEL="ceph journal"',
133 ]
134 stub_call((out, '', 0))
135 result = disk.device_family('sdaa5')
136 for parsed in result:
137 assert parsed['NAME'] in names
1adf2230
AA
138
139
1adf2230
AA
140class TestHumanReadableSize(object):
141
142 def test_bytes(self):
143 result = disk.human_readable_size(800)
144 assert result == '800.00 B'
145
146 def test_kilobytes(self):
147 result = disk.human_readable_size(800*1024)
148 assert result == '800.00 KB'
149
150 def test_megabytes(self):
151 result = disk.human_readable_size(800*1024*1024)
152 assert result == '800.00 MB'
153
154 def test_gigabytes(self):
155 result = disk.human_readable_size(8.19*1024*1024*1024)
156 assert result == '8.19 GB'
157
158 def test_terabytes(self):
159 result = disk.human_readable_size(81.2*1024*1024*1024*1024)
160 assert result == '81.20 TB'
161
20effc67
TL
162 def test_petabytes(self):
163 result = disk.human_readable_size(9.23*1024*1024*1024*1024*1024)
164 assert result == '9.23 PB'
1adf2230 165
81eedcae
TL
166class TestSizeFromHumanReadable(object):
167
168 def test_bytes(self):
169 result = disk.size_from_human_readable('2')
170 assert result == disk.Size(b=2)
171
172 def test_kilobytes(self):
173 result = disk.size_from_human_readable('2 K')
174 assert result == disk.Size(kb=2)
175
176 def test_megabytes(self):
177 result = disk.size_from_human_readable('2 M')
178 assert result == disk.Size(mb=2)
179
180 def test_gigabytes(self):
181 result = disk.size_from_human_readable('2 G')
182 assert result == disk.Size(gb=2)
183
20effc67 184 def test_terabytes(self):
81eedcae
TL
185 result = disk.size_from_human_readable('2 T')
186 assert result == disk.Size(tb=2)
187
20effc67
TL
188 def test_petabytes(self):
189 result = disk.size_from_human_readable('2 P')
190 assert result == disk.Size(pb=2)
191
81eedcae
TL
192 def test_case(self):
193 result = disk.size_from_human_readable('2 t')
194 assert result == disk.Size(tb=2)
195
196 def test_space(self):
197 result = disk.size_from_human_readable('2T')
198 assert result == disk.Size(tb=2)
199
200 def test_float(self):
201 result = disk.size_from_human_readable('2.0')
202 assert result == disk.Size(b=2)
203 result = disk.size_from_human_readable('2.0T')
204 assert result == disk.Size(tb=2)
205 result = disk.size_from_human_readable('1.8T')
206 assert result == disk.Size(tb=1.8)
207
208
92f5a8d4
TL
209class TestSizeParse(object):
210
211 def test_bytes(self):
212 result = disk.Size.parse('2')
213 assert result == disk.Size(b=2)
214
215 def test_kilobytes(self):
216 result = disk.Size.parse('2K')
217 assert result == disk.Size(kb=2)
218
219 def test_megabytes(self):
220 result = disk.Size.parse('2M')
221 assert result == disk.Size(mb=2)
222
223 def test_gigabytes(self):
224 result = disk.Size.parse('2G')
225 assert result == disk.Size(gb=2)
226
20effc67 227 def test_terabytes(self):
92f5a8d4
TL
228 result = disk.Size.parse('2T')
229 assert result == disk.Size(tb=2)
230
20effc67
TL
231 def test_petabytes(self):
232 result = disk.Size.parse('2P')
233 assert result == disk.Size(pb=2)
234
92f5a8d4
TL
235 def test_tb(self):
236 result = disk.Size.parse('2Tb')
237 assert result == disk.Size(tb=2)
238
239 def test_case(self):
240 result = disk.Size.parse('2t')
241 assert result == disk.Size(tb=2)
242
243 def test_space(self):
244 result = disk.Size.parse('2T')
245 assert result == disk.Size(tb=2)
246
247 def test_float(self):
248 result = disk.Size.parse('2.0')
249 assert result == disk.Size(b=2)
250 result = disk.Size.parse('2.0T')
251 assert result == disk.Size(tb=2)
252 result = disk.Size.parse('1.8T')
253 assert result == disk.Size(tb=1.8)
254
255
1adf2230
AA
256class TestGetDevices(object):
257
2a845540
TL
258 def test_no_devices_are_found(self, tmpdir, patched_get_block_devs_sysfs):
259 patched_get_block_devs_sysfs.return_value = []
92f5a8d4 260 result = disk.get_devices(_sys_block_path=str(tmpdir))
91327a77
AA
261 assert result == {}
262
f78120f9
TL
263 @patch('ceph_volume.util.disk.udevadm_property')
264 def test_sda_block_is_found(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
92f5a8d4 265 sda_path = '/dev/sda'
f78120f9 266 patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
2a845540 267 result = disk.get_devices()
1adf2230 268 assert len(result.keys()) == 1
92f5a8d4
TL
269 assert result[sda_path]['human_readable_size'] == '0.00 B'
270 assert result[sda_path]['model'] == ''
271 assert result[sda_path]['partitions'] == {}
272
f78120f9
TL
273 @patch('ceph_volume.util.disk.udevadm_property')
274 def test_sda_size(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
92f5a8d4 275 sda_path = '/dev/sda'
f78120f9 276 patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
2a845540
TL
277 fake_filesystem.create_file('/sys/block/sda/size', contents = '1024')
278 result = disk.get_devices()
92f5a8d4
TL
279 assert list(result.keys()) == [sda_path]
280 assert result[sda_path]['human_readable_size'] == '512.00 KB'
281
f78120f9
TL
282 @patch('ceph_volume.util.disk.udevadm_property')
283 def test_sda_sectorsize_fallsback(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
1adf2230 284 # if no sectorsize, it will use queue/hw_sector_size
92f5a8d4 285 sda_path = '/dev/sda'
f78120f9 286 patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
2a845540
TL
287 fake_filesystem.create_file('/sys/block/sda/queue/hw_sector_size', contents = '1024')
288 result = disk.get_devices()
92f5a8d4
TL
289 assert list(result.keys()) == [sda_path]
290 assert result[sda_path]['sectorsize'] == '1024'
291
f78120f9
TL
292 @patch('ceph_volume.util.disk.udevadm_property')
293 def test_sda_sectorsize_from_logical_block(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
92f5a8d4 294 sda_path = '/dev/sda'
f78120f9 295 patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
2a845540
TL
296 fake_filesystem.create_file('/sys/block/sda/queue/logical_block_size', contents = '99')
297 result = disk.get_devices()
92f5a8d4
TL
298 assert result[sda_path]['sectorsize'] == '99'
299
f78120f9
TL
300 @patch('ceph_volume.util.disk.udevadm_property')
301 def test_sda_sectorsize_does_not_fallback(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
92f5a8d4 302 sda_path = '/dev/sda'
f78120f9 303 patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
2a845540
TL
304 fake_filesystem.create_file('/sys/block/sda/queue/logical_block_size', contents = '99')
305 fake_filesystem.create_file('/sys/block/sda/queue/hw_sector_size', contents = '1024')
306 result = disk.get_devices()
92f5a8d4
TL
307 assert result[sda_path]['sectorsize'] == '99'
308
f78120f9
TL
309 @patch('ceph_volume.util.disk.udevadm_property')
310 def test_is_rotational(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
92f5a8d4 311 sda_path = '/dev/sda'
f78120f9 312 patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
2a845540
TL
313 fake_filesystem.create_file('/sys/block/sda/queue/rotational', contents = '1')
314 result = disk.get_devices()
92f5a8d4 315 assert result[sda_path]['rotational'] == '1'
1adf2230 316
f78120f9
TL
317 @patch('ceph_volume.util.disk.udevadm_property')
318 def test_is_ceph_rbd(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
2a845540 319 rbd_path = '/dev/rbd0'
f78120f9 320 patched_get_block_devs_sysfs.return_value = [[rbd_path, rbd_path, 'disk', rbd_path]]
2a845540
TL
321 result = disk.get_devices()
322 assert rbd_path not in result
323
f78120f9
TL
324 @patch('ceph_volume.util.disk.udevadm_property')
325 def test_actuator_device(self, m_udev_adm_property, patched_get_block_devs_sysfs, fake_filesystem):
1e59de90
TL
326 sda_path = '/dev/sda'
327 fake_actuator_nb = 2
f78120f9 328 patched_get_block_devs_sysfs.return_value = [[sda_path, sda_path, 'disk', sda_path]]
1e59de90
TL
329 for actuator in range(0, fake_actuator_nb):
330 fake_filesystem.create_dir(f'/sys/block/sda/queue/independent_access_ranges/{actuator}')
331 result = disk.get_devices()
332 assert result[sda_path]['actuators'] == fake_actuator_nb
333
1adf2230
AA
334
335class TestSizeCalculations(object):
336
337 @pytest.mark.parametrize('aliases', [
338 ('b', 'bytes'),
339 ('kb', 'kilobytes'),
340 ('mb', 'megabytes'),
341 ('gb', 'gigabytes'),
342 ('tb', 'terabytes'),
343 ])
344 def test_aliases(self, aliases):
345 short_alias, long_alias = aliases
346 s = disk.Size(b=1)
347 short_alias = getattr(s, short_alias)
348 long_alias = getattr(s, long_alias)
349 assert short_alias == long_alias
350
351 @pytest.mark.parametrize('values', [
352 ('b', 857619069665.28),
353 ('kb', 837518622.72),
354 ('mb', 817889.28),
355 ('gb', 798.72),
356 ('tb', 0.78),
357 ])
358 def test_terabytes(self, values):
359 # regardless of the input value, all the other values correlate to each
360 # other the same, every time
361 unit, value = values
362 s = disk.Size(**{unit: value})
363 assert s.b == 857619069665.28
364 assert s.kb == 837518622.72
365 assert s.mb == 817889.28
366 assert s.gb == 798.72
367 assert s.tb == 0.78
368
369
370class TestSizeOperators(object):
371
372 @pytest.mark.parametrize('larger', [1025, 1024.1, 1024.001])
373 def test_gigabytes_is_smaller(self, larger):
374 assert disk.Size(gb=1) < disk.Size(mb=larger)
375
376 @pytest.mark.parametrize('smaller', [1023, 1023.9, 1023.001])
377 def test_gigabytes_is_larger(self, smaller):
378 assert disk.Size(gb=1) > disk.Size(mb=smaller)
379
380 @pytest.mark.parametrize('larger', [1025, 1024.1, 1024.001, 1024])
381 def test_gigabytes_is_smaller_or_equal(self, larger):
382 assert disk.Size(gb=1) <= disk.Size(mb=larger)
383
384 @pytest.mark.parametrize('smaller', [1023, 1023.9, 1023.001, 1024])
385 def test_gigabytes_is_larger_or_equal(self, smaller):
386 assert disk.Size(gb=1) >= disk.Size(mb=smaller)
387
388 @pytest.mark.parametrize('values', [
389 ('b', 857619069665.28),
390 ('kb', 837518622.72),
391 ('mb', 817889.28),
392 ('gb', 798.72),
393 ('tb', 0.78),
394 ])
395 def test_equality(self, values):
396 unit, value = values
397 s = disk.Size(**{unit: value})
398 # both tb and b, since b is always calculated regardless, and is useful
399 # when testing tb
400 assert disk.Size(tb=0.78) == s
401 assert disk.Size(b=857619069665.28) == s
402
403 @pytest.mark.parametrize('values', [
404 ('b', 857619069665.28),
405 ('kb', 837518622.72),
406 ('mb', 817889.28),
407 ('gb', 798.72),
408 ('tb', 0.78),
409 ])
410 def test_inequality(self, values):
411 unit, value = values
412 s = disk.Size(**{unit: value})
413 # both tb and b, since b is always calculated regardless, and is useful
414 # when testing tb
415 assert disk.Size(tb=1) != s
416 assert disk.Size(b=100) != s
417
418
419class TestSizeOperations(object):
420
421 def test_assignment_addition_with_size_objects(self):
422 result = disk.Size(mb=256) + disk.Size(gb=1)
423 assert result.gb == 1.25
424 assert result.gb.as_int() == 1
425 assert result.gb.as_float() == 1.25
426
427 def test_self_addition_with_size_objects(self):
428 base = disk.Size(mb=256)
429 base += disk.Size(gb=1)
430 assert base.gb == 1.25
431
432 def test_self_addition_does_not_alter_state(self):
433 base = disk.Size(mb=256)
434 base + disk.Size(gb=1)
435 assert base.mb == 256
436
437 def test_addition_with_non_size_objects(self):
438 with pytest.raises(TypeError):
439 disk.Size(mb=100) + 4
440
441 def test_assignment_subtraction_with_size_objects(self):
442 base = disk.Size(gb=1)
443 base -= disk.Size(mb=256)
444 assert base.mb == 768
445
446 def test_self_subtraction_does_not_alter_state(self):
447 base = disk.Size(gb=1)
448 base - disk.Size(mb=256)
449 assert base.gb == 1
450
451 def test_subtraction_with_size_objects(self):
452 result = disk.Size(gb=1) - disk.Size(mb=256)
453 assert result.mb == 768
454
455 def test_subtraction_with_non_size_objects(self):
456 with pytest.raises(TypeError):
457 disk.Size(mb=100) - 4
458
459 def test_multiplication_with_size_objects(self):
460 with pytest.raises(TypeError):
461 disk.Size(mb=100) * disk.Size(mb=1)
462
463 def test_multiplication_with_non_size_objects(self):
464 base = disk.Size(gb=1)
465 result = base * 2
466 assert result.gb == 2
467 assert result.gb.as_int() == 2
468
469 def test_division_with_size_objects(self):
470 result = disk.Size(gb=1) / disk.Size(mb=1)
471 assert int(result) == 1024
472
473 def test_division_with_non_size_objects(self):
91327a77
AA
474 base = disk.Size(gb=1)
475 result = base / 2
476 assert result.mb == 512
477 assert result.mb.as_int() == 512
478
479 def test_division_with_non_size_objects_without_state(self):
1adf2230
AA
480 base = disk.Size(gb=1)
481 base / 2
91327a77
AA
482 assert base.gb == 1
483 assert base.gb.as_int() == 1
1adf2230
AA
484
485
486class TestSizeAttributes(object):
487
488 def test_attribute_does_not_exist(self):
489 with pytest.raises(AttributeError):
490 disk.Size(mb=1).exabytes
491
492
493class TestSizeFormatting(object):
494
495 def test_default_formatting_tb_to_b(self):
496 size = disk.Size(tb=0.0000000001)
497 result = "%s" % size
498 assert result == "109.95 B"
499
500 def test_default_formatting_tb_to_kb(self):
501 size = disk.Size(tb=0.00000001)
502 result = "%s" % size
503 assert result == "10.74 KB"
504
505 def test_default_formatting_tb_to_mb(self):
506 size = disk.Size(tb=0.000001)
507 result = "%s" % size
508 assert result == "1.05 MB"
509
510 def test_default_formatting_tb_to_gb(self):
511 size = disk.Size(tb=0.001)
512 result = "%s" % size
513 assert result == "1.02 GB"
514
515 def test_default_formatting_tb_to_tb(self):
516 size = disk.Size(tb=10)
517 result = "%s" % size
518 assert result == "10.00 TB"
519
520
521class TestSizeSpecificFormatting(object):
522
523 def test_formatting_b(self):
524 size = disk.Size(b=2048)
525 result = "%s" % size.b
526 assert "%s" % size.b == "%s" % size.bytes
527 assert result == "2048.00 B"
528
529 def test_formatting_kb(self):
530 size = disk.Size(kb=5700)
531 result = "%s" % size.kb
532 assert "%s" % size.kb == "%s" % size.kilobytes
533 assert result == "5700.00 KB"
534
535 def test_formatting_mb(self):
536 size = disk.Size(mb=4000)
537 result = "%s" % size.mb
538 assert "%s" % size.mb == "%s" % size.megabytes
539 assert result == "4000.00 MB"
540
541 def test_formatting_gb(self):
542 size = disk.Size(gb=77777)
543 result = "%s" % size.gb
544 assert "%s" % size.gb == "%s" % size.gigabytes
545 assert result == "77777.00 GB"
546
547 def test_formatting_tb(self):
548 size = disk.Size(tb=1027)
549 result = "%s" % size.tb
550 assert "%s" % size.tb == "%s" % size.terabytes
551 assert result == "1027.00 TB"
2a845540
TL
552
553
554class TestAllowLoopDevsWarning(object):
f78120f9
TL
555 def setup_method(self):
556 disk.AllowLoopDevices.allow = False
557 disk.AllowLoopDevices.warned = False
558 if os.environ.get('CEPH_VOLUME_ALLOW_LOOP_DEVICES'):
559 os.environ.pop('CEPH_VOLUME_ALLOW_LOOP_DEVICES')
560
2a845540 561 def test_loop_dev_warning(self, fake_call, caplog):
f78120f9 562 disk.AllowLoopDevices.warned = False
2a845540
TL
563 assert disk.allow_loop_devices() is False
564 assert not caplog.records
565 os.environ['CEPH_VOLUME_ALLOW_LOOP_DEVICES'] = "y"
566 assert disk.allow_loop_devices() is True
567 log = caplog.records[0]
568 assert log.levelname == "WARNING"
569 assert "will never be supported in production" in log.message
570
571
572class TestHasBlueStoreLabel(object):
573 def test_device_path_is_a_path(self, fake_filesystem):
574 device_path = '/var/lib/ceph/osd/ceph-0'
575 fake_filesystem.create_dir(device_path)
576 assert not disk.has_bluestore_label(device_path)