3 from mock
.mock
import patch
4 from ceph_volume
.util
import disk
7 class TestLsblkParser(object):
9 def test_parses_whitespace_values(self
):
10 output
= 'NAME="sdaa5" PARTLABEL="ceph data" RM="0" SIZE="10M" RO="0" TYPE="part"'
11 result
= disk
._lsblk
_parser
(output
)
12 assert result
['PARTLABEL'] == 'ceph data'
14 def test_ignores_bogus_pairs(self
):
15 output
= 'NAME="sdaa5" PARTLABEL RM="0" SIZE="10M" RO="0" TYPE="part" MOUNTPOINT=""'
16 result
= disk
._lsblk
_parser
(output
)
17 assert result
['SIZE'] == '10M'
20 class TestBlkidParser(object):
22 def test_parses_whitespace_values(self
):
23 output
= '''/dev/sdb1: UUID="62416664-cbaf-40bd-9689-10bd337379c3" TYPE="xfs" PART_ENTRY_SCHEME="gpt" PART_ENTRY_NAME="ceph data" PART_ENTRY_UUID="b89c03bc-bf58-4338-a8f8-a2f484852b4f"''' # noqa
24 result
= disk
._blkid
_parser
(output
)
25 assert result
['PARTLABEL'] == 'ceph data'
27 def test_ignores_unmapped(self
):
28 output
= '''/dev/sdb1: UUID="62416664-cbaf-40bd-9689-10bd337379c3" TYPE="xfs" PART_ENTRY_SCHEME="gpt" PART_ENTRY_NAME="ceph data" PART_ENTRY_UUID="b89c03bc-bf58-4338-a8f8-a2f484852b4f"''' # noqa
29 result
= disk
._blkid
_parser
(output
)
30 assert len(result
.keys()) == 4
32 def test_translates_to_partuuid(self
):
33 output
= '''/dev/sdb1: UUID="62416664-cbaf-40bd-9689-10bd337379c3" TYPE="xfs" PART_ENTRY_SCHEME="gpt" PART_ENTRY_NAME="ceph data" PART_ENTRY_UUID="b89c03bc-bf58-4338-a8f8-a2f484852b4f"''' # noqa
34 result
= disk
._blkid
_parser
(output
)
35 assert result
['PARTUUID'] == 'b89c03bc-bf58-4338-a8f8-a2f484852b4f'
38 class TestBlkid(object):
40 def test_parses_translated(self
, stub_call
):
41 output
= '''/dev/sdb1: UUID="62416664-cbaf-40bd-9689-10bd337379c3" TYPE="xfs" PART_ENTRY_SCHEME="gpt" PART_ENTRY_NAME="ceph data" PART_ENTRY_UUID="b89c03bc-bf58-4338-a8f8-a2f484852b4f"''' # noqa
42 stub_call((output
.split(), [], 0))
43 result
= disk
.blkid('/dev/sdb1')
44 assert result
['PARTUUID'] == 'b89c03bc-bf58-4338-a8f8-a2f484852b4f'
45 assert result
['PARTLABEL'] == 'ceph data'
46 assert result
['UUID'] == '62416664-cbaf-40bd-9689-10bd337379c3'
47 assert result
['TYPE'] == 'xfs'
49 class TestUdevadmProperty(object):
51 def test_good_output(self
, stub_call
):
52 output
= """ID_MODEL=SK_hynix_SC311_SATA_512GB
53 ID_PART_TABLE_TYPE=gpt
54 ID_SERIAL_SHORT=MS83N71801150416A""".split()
55 stub_call((output
, [], 0))
56 result
= disk
.udevadm_property('dev/sda')
57 assert result
['ID_MODEL'] == 'SK_hynix_SC311_SATA_512GB'
58 assert result
['ID_PART_TABLE_TYPE'] == 'gpt'
59 assert result
['ID_SERIAL_SHORT'] == 'MS83N71801150416A'
61 def test_property_filter(self
, stub_call
):
62 output
= """ID_MODEL=SK_hynix_SC311_SATA_512GB
63 ID_PART_TABLE_TYPE=gpt
64 ID_SERIAL_SHORT=MS83N71801150416A""".split()
65 stub_call((output
, [], 0))
66 result
= disk
.udevadm_property('dev/sda', ['ID_MODEL',
68 assert result
['ID_MODEL'] == 'SK_hynix_SC311_SATA_512GB'
69 assert 'ID_PART_TABLE_TYPE' not in result
71 def test_fail_on_broken_output(self
, stub_call
):
72 output
= ["ID_MODEL:SK_hynix_SC311_SATA_512GB"]
73 stub_call((output
, [], 0))
74 with pytest
.raises(ValueError):
75 disk
.udevadm_property('dev/sda')
78 class TestDeviceFamily(object):
80 def test_groups_multiple_devices(self
, stub_call
):
82 'NAME="sdaa5" PARLABEL="ceph lockbox"',
84 'NAME="sdaa1" PARLABEL="ceph data"',
85 'NAME="sdaa2" PARLABEL="ceph journal"',
87 stub_call((out
, '', 0))
88 result
= disk
.device_family('sdaa5')
89 assert len(result
) == 4
91 def test_parses_output_correctly(self
, stub_call
):
92 names
= ['sdaa', 'sdaa5', 'sdaa1', 'sdaa2']
94 'NAME="sdaa5" PARLABEL="ceph lockbox"',
96 'NAME="sdaa1" PARLABEL="ceph data"',
97 'NAME="sdaa2" PARLABEL="ceph journal"',
99 stub_call((out
, '', 0))
100 result
= disk
.device_family('sdaa5')
101 for parsed
in result
:
102 assert parsed
['NAME'] in names
105 class TestHumanReadableSize(object):
107 def test_bytes(self
):
108 result
= disk
.human_readable_size(800)
109 assert result
== '800.00 B'
111 def test_kilobytes(self
):
112 result
= disk
.human_readable_size(800*1024)
113 assert result
== '800.00 KB'
115 def test_megabytes(self
):
116 result
= disk
.human_readable_size(800*1024*1024)
117 assert result
== '800.00 MB'
119 def test_gigabytes(self
):
120 result
= disk
.human_readable_size(8.19*1024*1024*1024)
121 assert result
== '8.19 GB'
123 def test_terabytes(self
):
124 result
= disk
.human_readable_size(81.2*1024*1024*1024*1024)
125 assert result
== '81.20 TB'
127 def test_petabytes(self
):
128 result
= disk
.human_readable_size(9.23*1024*1024*1024*1024*1024)
129 assert result
== '9.23 PB'
131 class TestSizeFromHumanReadable(object):
133 def test_bytes(self
):
134 result
= disk
.size_from_human_readable('2')
135 assert result
== disk
.Size(b
=2)
137 def test_kilobytes(self
):
138 result
= disk
.size_from_human_readable('2 K')
139 assert result
== disk
.Size(kb
=2)
141 def test_megabytes(self
):
142 result
= disk
.size_from_human_readable('2 M')
143 assert result
== disk
.Size(mb
=2)
145 def test_gigabytes(self
):
146 result
= disk
.size_from_human_readable('2 G')
147 assert result
== disk
.Size(gb
=2)
149 def test_terabytes(self
):
150 result
= disk
.size_from_human_readable('2 T')
151 assert result
== disk
.Size(tb
=2)
153 def test_petabytes(self
):
154 result
= disk
.size_from_human_readable('2 P')
155 assert result
== disk
.Size(pb
=2)
158 result
= disk
.size_from_human_readable('2 t')
159 assert result
== disk
.Size(tb
=2)
161 def test_space(self
):
162 result
= disk
.size_from_human_readable('2T')
163 assert result
== disk
.Size(tb
=2)
165 def test_float(self
):
166 result
= disk
.size_from_human_readable('2.0')
167 assert result
== disk
.Size(b
=2)
168 result
= disk
.size_from_human_readable('2.0T')
169 assert result
== disk
.Size(tb
=2)
170 result
= disk
.size_from_human_readable('1.8T')
171 assert result
== disk
.Size(tb
=1.8)
174 class TestSizeParse(object):
176 def test_bytes(self
):
177 result
= disk
.Size
.parse('2')
178 assert result
== disk
.Size(b
=2)
180 def test_kilobytes(self
):
181 result
= disk
.Size
.parse('2K')
182 assert result
== disk
.Size(kb
=2)
184 def test_megabytes(self
):
185 result
= disk
.Size
.parse('2M')
186 assert result
== disk
.Size(mb
=2)
188 def test_gigabytes(self
):
189 result
= disk
.Size
.parse('2G')
190 assert result
== disk
.Size(gb
=2)
192 def test_terabytes(self
):
193 result
= disk
.Size
.parse('2T')
194 assert result
== disk
.Size(tb
=2)
196 def test_petabytes(self
):
197 result
= disk
.Size
.parse('2P')
198 assert result
== disk
.Size(pb
=2)
201 result
= disk
.Size
.parse('2Tb')
202 assert result
== disk
.Size(tb
=2)
205 result
= disk
.Size
.parse('2t')
206 assert result
== disk
.Size(tb
=2)
208 def test_space(self
):
209 result
= disk
.Size
.parse('2T')
210 assert result
== disk
.Size(tb
=2)
212 def test_float(self
):
213 result
= disk
.Size
.parse('2.0')
214 assert result
== disk
.Size(b
=2)
215 result
= disk
.Size
.parse('2.0T')
216 assert result
== disk
.Size(tb
=2)
217 result
= disk
.Size
.parse('1.8T')
218 assert result
== disk
.Size(tb
=1.8)
221 class TestGetBlockDevsLsblk(object):
223 @patch('ceph_volume.process.call')
224 def test_return_structure(self
, patched_call
):
226 '/dev/dm-0 /dev/mapper/ceph--8b2684eb--56ff--49e4--8f28--522e04cbd6ab-osd--data--9fc29fbf--3b5b--4066--be10--61042569b5a7 lvm',
227 '/dev/vda /dev/vda disk',
228 '/dev/vda1 /dev/vda1 part',
229 '/dev/vdb /dev/vdb disk',]
230 patched_call
.return_value
= (lsblk_stdout
, '', 0)
231 disks
= disk
.get_block_devs_lsblk()
232 assert len(disks
) == len(lsblk_stdout
)
233 assert len(disks
[0]) == 3
235 @patch('ceph_volume.process.call')
236 def test_empty_lsblk(self
, patched_call
):
237 patched_call
.return_value
= ([], '', 0)
238 disks
= disk
.get_block_devs_lsblk()
239 assert len(disks
) == 0
241 @patch('ceph_volume.process.call')
242 def test_raise_on_failure(self
, patched_call
):
243 patched_call
.return_value
= ([], 'error', 1)
244 with pytest
.raises(OSError):
245 disk
.get_block_devs_lsblk()
248 class TestGetDevices(object):
250 def setup_path(self
, tmpdir
):
251 path
= os
.path
.join(str(tmpdir
), 'block')
255 def test_no_devices_are_found(self
, tmpdir
, patched_get_block_devs_lsblk
):
256 patched_get_block_devs_lsblk
.return_value
= []
257 result
= disk
.get_devices(_sys_block_path
=str(tmpdir
))
260 def test_sda_block_is_found(self
, tmpdir
, patched_get_block_devs_lsblk
):
261 sda_path
= '/dev/sda'
262 patched_get_block_devs_lsblk
.return_value
= [[sda_path
, sda_path
, 'disk']]
263 block_path
= self
.setup_path(tmpdir
)
264 os
.makedirs(os
.path
.join(block_path
, 'sda'))
265 result
= disk
.get_devices(_sys_block_path
=block_path
)
266 assert len(result
.keys()) == 1
267 assert result
[sda_path
]['human_readable_size'] == '0.00 B'
268 assert result
[sda_path
]['model'] == ''
269 assert result
[sda_path
]['partitions'] == {}
272 def test_sda_size(self
, tmpfile
, tmpdir
, patched_get_block_devs_lsblk
):
273 sda_path
= '/dev/sda'
274 patched_get_block_devs_lsblk
.return_value
= [[sda_path
, sda_path
, 'disk']]
275 block_path
= self
.setup_path(tmpdir
)
276 block_sda_path
= os
.path
.join(block_path
, 'sda')
277 os
.makedirs(block_sda_path
)
278 tmpfile('size', '1024', directory
=block_sda_path
)
279 result
= disk
.get_devices(_sys_block_path
=block_path
)
280 assert list(result
.keys()) == [sda_path
]
281 assert result
[sda_path
]['human_readable_size'] == '512.00 KB'
283 def test_sda_sectorsize_fallsback(self
, tmpfile
, tmpdir
, patched_get_block_devs_lsblk
):
284 # if no sectorsize, it will use queue/hw_sector_size
285 sda_path
= '/dev/sda'
286 patched_get_block_devs_lsblk
.return_value
= [[sda_path
, sda_path
, 'disk']]
287 block_path
= self
.setup_path(tmpdir
)
288 block_sda_path
= os
.path
.join(block_path
, 'sda')
289 sda_queue_path
= os
.path
.join(block_sda_path
, 'queue')
290 os
.makedirs(block_sda_path
)
291 os
.makedirs(sda_queue_path
)
292 tmpfile('hw_sector_size', contents
='1024', directory
=sda_queue_path
)
293 result
= disk
.get_devices(_sys_block_path
=block_path
)
294 assert list(result
.keys()) == [sda_path
]
295 assert result
[sda_path
]['sectorsize'] == '1024'
297 def test_sda_sectorsize_from_logical_block(self
, tmpfile
, tmpdir
, patched_get_block_devs_lsblk
):
298 sda_path
= '/dev/sda'
299 patched_get_block_devs_lsblk
.return_value
= [[sda_path
, sda_path
, 'disk']]
300 block_path
= self
.setup_path(tmpdir
)
301 block_sda_path
= os
.path
.join(block_path
, 'sda')
302 sda_queue_path
= os
.path
.join(block_sda_path
, 'queue')
303 os
.makedirs(block_sda_path
)
304 os
.makedirs(sda_queue_path
)
305 tmpfile('logical_block_size', contents
='99', directory
=sda_queue_path
)
306 result
= disk
.get_devices(_sys_block_path
=block_path
)
307 assert result
[sda_path
]['sectorsize'] == '99'
309 def test_sda_sectorsize_does_not_fallback(self
, tmpfile
, tmpdir
, patched_get_block_devs_lsblk
):
310 sda_path
= '/dev/sda'
311 patched_get_block_devs_lsblk
.return_value
= [[sda_path
, sda_path
, 'disk']]
312 block_path
= self
.setup_path(tmpdir
)
313 block_sda_path
= os
.path
.join(block_path
, 'sda')
314 sda_queue_path
= os
.path
.join(block_sda_path
, 'queue')
315 os
.makedirs(block_sda_path
)
316 os
.makedirs(sda_queue_path
)
317 tmpfile('logical_block_size', contents
='99', directory
=sda_queue_path
)
318 tmpfile('hw_sector_size', contents
='1024', directory
=sda_queue_path
)
319 result
= disk
.get_devices(_sys_block_path
=block_path
)
320 assert result
[sda_path
]['sectorsize'] == '99'
322 def test_is_rotational(self
, tmpfile
, tmpdir
, patched_get_block_devs_lsblk
):
323 sda_path
= '/dev/sda'
324 patched_get_block_devs_lsblk
.return_value
= [[sda_path
, sda_path
, 'disk']]
325 block_path
= self
.setup_path(tmpdir
)
326 block_sda_path
= os
.path
.join(block_path
, 'sda')
327 sda_queue_path
= os
.path
.join(block_sda_path
, 'queue')
328 os
.makedirs(block_sda_path
)
329 os
.makedirs(sda_queue_path
)
330 tmpfile('rotational', contents
='1', directory
=sda_queue_path
)
331 result
= disk
.get_devices(_sys_block_path
=block_path
)
332 assert result
[sda_path
]['rotational'] == '1'
335 class TestSizeCalculations(object):
337 @pytest.mark
.parametrize('aliases', [
344 def test_aliases(self
, aliases
):
345 short_alias
, long_alias
= aliases
347 short_alias
= getattr(s
, short_alias
)
348 long_alias
= getattr(s
, long_alias
)
349 assert short_alias
== long_alias
351 @pytest.mark
.parametrize('values', [
352 ('b', 857619069665.28),
353 ('kb', 837518622.72),
358 def test_terabytes(self
, values
):
359 # regardless of the input value, all the other values correlate to each
360 # other the same, every time
362 s
= disk
.Size(**{unit
: value
})
363 assert s
.b
== 857619069665.28
364 assert s
.kb
== 837518622.72
365 assert s
.mb
== 817889.28
366 assert s
.gb
== 798.72
370 class TestSizeOperators(object):
372 @pytest.mark
.parametrize('larger', [1025, 1024.1, 1024.001])
373 def test_gigabytes_is_smaller(self
, larger
):
374 assert disk
.Size(gb
=1) < disk
.Size(mb
=larger
)
376 @pytest.mark
.parametrize('smaller', [1023, 1023.9, 1023.001])
377 def test_gigabytes_is_larger(self
, smaller
):
378 assert disk
.Size(gb
=1) > disk
.Size(mb
=smaller
)
380 @pytest.mark
.parametrize('larger', [1025, 1024.1, 1024.001, 1024])
381 def test_gigabytes_is_smaller_or_equal(self
, larger
):
382 assert disk
.Size(gb
=1) <= disk
.Size(mb
=larger
)
384 @pytest.mark
.parametrize('smaller', [1023, 1023.9, 1023.001, 1024])
385 def test_gigabytes_is_larger_or_equal(self
, smaller
):
386 assert disk
.Size(gb
=1) >= disk
.Size(mb
=smaller
)
388 @pytest.mark
.parametrize('values', [
389 ('b', 857619069665.28),
390 ('kb', 837518622.72),
395 def test_equality(self
, values
):
397 s
= disk
.Size(**{unit
: value
})
398 # both tb and b, since b is always calculated regardless, and is useful
400 assert disk
.Size(tb
=0.78) == s
401 assert disk
.Size(b
=857619069665.28) == s
403 @pytest.mark
.parametrize('values', [
404 ('b', 857619069665.28),
405 ('kb', 837518622.72),
410 def test_inequality(self
, values
):
412 s
= disk
.Size(**{unit
: value
})
413 # both tb and b, since b is always calculated regardless, and is useful
415 assert disk
.Size(tb
=1) != s
416 assert disk
.Size(b
=100) != s
419 class TestSizeOperations(object):
421 def test_assignment_addition_with_size_objects(self
):
422 result
= disk
.Size(mb
=256) + disk
.Size(gb
=1)
423 assert result
.gb
== 1.25
424 assert result
.gb
.as_int() == 1
425 assert result
.gb
.as_float() == 1.25
427 def test_self_addition_with_size_objects(self
):
428 base
= disk
.Size(mb
=256)
429 base
+= disk
.Size(gb
=1)
430 assert base
.gb
== 1.25
432 def test_self_addition_does_not_alter_state(self
):
433 base
= disk
.Size(mb
=256)
434 base
+ disk
.Size(gb
=1)
435 assert base
.mb
== 256
437 def test_addition_with_non_size_objects(self
):
438 with pytest
.raises(TypeError):
439 disk
.Size(mb
=100) + 4
441 def test_assignment_subtraction_with_size_objects(self
):
442 base
= disk
.Size(gb
=1)
443 base
-= disk
.Size(mb
=256)
444 assert base
.mb
== 768
446 def test_self_subtraction_does_not_alter_state(self
):
447 base
= disk
.Size(gb
=1)
448 base
- disk
.Size(mb
=256)
451 def test_subtraction_with_size_objects(self
):
452 result
= disk
.Size(gb
=1) - disk
.Size(mb
=256)
453 assert result
.mb
== 768
455 def test_subtraction_with_non_size_objects(self
):
456 with pytest
.raises(TypeError):
457 disk
.Size(mb
=100) - 4
459 def test_multiplication_with_size_objects(self
):
460 with pytest
.raises(TypeError):
461 disk
.Size(mb
=100) * disk
.Size(mb
=1)
463 def test_multiplication_with_non_size_objects(self
):
464 base
= disk
.Size(gb
=1)
466 assert result
.gb
== 2
467 assert result
.gb
.as_int() == 2
469 def test_division_with_size_objects(self
):
470 result
= disk
.Size(gb
=1) / disk
.Size(mb
=1)
471 assert int(result
) == 1024
473 def test_division_with_non_size_objects(self
):
474 base
= disk
.Size(gb
=1)
476 assert result
.mb
== 512
477 assert result
.mb
.as_int() == 512
479 def test_division_with_non_size_objects_without_state(self
):
480 base
= disk
.Size(gb
=1)
483 assert base
.gb
.as_int() == 1
486 class TestSizeAttributes(object):
488 def test_attribute_does_not_exist(self
):
489 with pytest
.raises(AttributeError):
490 disk
.Size(mb
=1).exabytes
493 class TestSizeFormatting(object):
495 def test_default_formatting_tb_to_b(self
):
496 size
= disk
.Size(tb
=0.0000000001)
498 assert result
== "109.95 B"
500 def test_default_formatting_tb_to_kb(self
):
501 size
= disk
.Size(tb
=0.00000001)
503 assert result
== "10.74 KB"
505 def test_default_formatting_tb_to_mb(self
):
506 size
= disk
.Size(tb
=0.000001)
508 assert result
== "1.05 MB"
510 def test_default_formatting_tb_to_gb(self
):
511 size
= disk
.Size(tb
=0.001)
513 assert result
== "1.02 GB"
515 def test_default_formatting_tb_to_tb(self
):
516 size
= disk
.Size(tb
=10)
518 assert result
== "10.00 TB"
521 class TestSizeSpecificFormatting(object):
523 def test_formatting_b(self
):
524 size
= disk
.Size(b
=2048)
525 result
= "%s" % size
.b
526 assert "%s" % size
.b
== "%s" % size
.bytes
527 assert result
== "2048.00 B"
529 def test_formatting_kb(self
):
530 size
= disk
.Size(kb
=5700)
531 result
= "%s" % size
.kb
532 assert "%s" % size
.kb
== "%s" % size
.kilobytes
533 assert result
== "5700.00 KB"
535 def test_formatting_mb(self
):
536 size
= disk
.Size(mb
=4000)
537 result
= "%s" % size
.mb
538 assert "%s" % size
.mb
== "%s" % size
.megabytes
539 assert result
== "4000.00 MB"
541 def test_formatting_gb(self
):
542 size
= disk
.Size(gb
=77777)
543 result
= "%s" % size
.gb
544 assert "%s" % size
.gb
== "%s" % size
.gigabytes
545 assert result
== "77777.00 GB"
547 def test_formatting_tb(self
):
548 size
= disk
.Size(tb
=1027)
549 result
= "%s" % size
.tb
550 assert "%s" % size
.tb
== "%s" % size
.terabytes
551 assert result
== "1027.00 TB"