]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/util/test_disk.py
44f19e036fa47d87eb41dc6c35ccf72343e25444
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / util / test_disk.py
1 import os
2 import pytest
3 from mock.mock import patch
4 from ceph_volume.util import disk
5
6
7 class TestLsblkParser(object):
8
9 def test_parses_whitespace_values(self):
10 output = 'NAME="sdaa5" PARTLABEL="ceph data" RM="0" SIZE="10M" RO="0" TYPE="part"'
11 result = disk._lsblk_parser(output)
12 assert result['PARTLABEL'] == 'ceph data'
13
14 def test_ignores_bogus_pairs(self):
15 output = 'NAME="sdaa5" PARTLABEL RM="0" SIZE="10M" RO="0" TYPE="part" MOUNTPOINT=""'
16 result = disk._lsblk_parser(output)
17 assert result['SIZE'] == '10M'
18
19
20 class TestBlkidParser(object):
21
22 def test_parses_whitespace_values(self):
23 output = '''/dev/sdb1: UUID="62416664-cbaf-40bd-9689-10bd337379c3" TYPE="xfs" PART_ENTRY_SCHEME="gpt" PART_ENTRY_NAME="ceph data" PART_ENTRY_UUID="b89c03bc-bf58-4338-a8f8-a2f484852b4f"''' # noqa
24 result = disk._blkid_parser(output)
25 assert result['PARTLABEL'] == 'ceph data'
26
27 def test_ignores_unmapped(self):
28 output = '''/dev/sdb1: UUID="62416664-cbaf-40bd-9689-10bd337379c3" TYPE="xfs" PART_ENTRY_SCHEME="gpt" PART_ENTRY_NAME="ceph data" PART_ENTRY_UUID="b89c03bc-bf58-4338-a8f8-a2f484852b4f"''' # noqa
29 result = disk._blkid_parser(output)
30 assert len(result.keys()) == 4
31
32 def test_translates_to_partuuid(self):
33 output = '''/dev/sdb1: UUID="62416664-cbaf-40bd-9689-10bd337379c3" TYPE="xfs" PART_ENTRY_SCHEME="gpt" PART_ENTRY_NAME="ceph data" PART_ENTRY_UUID="b89c03bc-bf58-4338-a8f8-a2f484852b4f"''' # noqa
34 result = disk._blkid_parser(output)
35 assert result['PARTUUID'] == 'b89c03bc-bf58-4338-a8f8-a2f484852b4f'
36
37
38 class TestBlkid(object):
39
40 def test_parses_translated(self, stub_call):
41 output = '''/dev/sdb1: UUID="62416664-cbaf-40bd-9689-10bd337379c3" TYPE="xfs" PART_ENTRY_SCHEME="gpt" PART_ENTRY_NAME="ceph data" PART_ENTRY_UUID="b89c03bc-bf58-4338-a8f8-a2f484852b4f"''' # noqa
42 stub_call((output.split(), [], 0))
43 result = disk.blkid('/dev/sdb1')
44 assert result['PARTUUID'] == 'b89c03bc-bf58-4338-a8f8-a2f484852b4f'
45 assert result['PARTLABEL'] == 'ceph data'
46 assert result['UUID'] == '62416664-cbaf-40bd-9689-10bd337379c3'
47 assert result['TYPE'] == 'xfs'
48
49 class TestUdevadmProperty(object):
50
51 def test_good_output(self, stub_call):
52 output = """ID_MODEL=SK_hynix_SC311_SATA_512GB
53 ID_PART_TABLE_TYPE=gpt
54 ID_SERIAL_SHORT=MS83N71801150416A""".split()
55 stub_call((output, [], 0))
56 result = disk.udevadm_property('dev/sda')
57 assert result['ID_MODEL'] == 'SK_hynix_SC311_SATA_512GB'
58 assert result['ID_PART_TABLE_TYPE'] == 'gpt'
59 assert result['ID_SERIAL_SHORT'] == 'MS83N71801150416A'
60
61 def test_property_filter(self, stub_call):
62 output = """ID_MODEL=SK_hynix_SC311_SATA_512GB
63 ID_PART_TABLE_TYPE=gpt
64 ID_SERIAL_SHORT=MS83N71801150416A""".split()
65 stub_call((output, [], 0))
66 result = disk.udevadm_property('dev/sda', ['ID_MODEL',
67 'ID_SERIAL_SHORT'])
68 assert result['ID_MODEL'] == 'SK_hynix_SC311_SATA_512GB'
69 assert 'ID_PART_TABLE_TYPE' not in result
70
71 def test_fail_on_broken_output(self, stub_call):
72 output = ["ID_MODEL:SK_hynix_SC311_SATA_512GB"]
73 stub_call((output, [], 0))
74 with pytest.raises(ValueError):
75 disk.udevadm_property('dev/sda')
76
77
78 class TestDeviceFamily(object):
79
80 def test_groups_multiple_devices(self, stub_call):
81 out = [
82 'NAME="sdaa5" PARLABEL="ceph lockbox"',
83 'NAME="sdaa" RO="0"',
84 'NAME="sdaa1" PARLABEL="ceph data"',
85 'NAME="sdaa2" PARLABEL="ceph journal"',
86 ]
87 stub_call((out, '', 0))
88 result = disk.device_family('sdaa5')
89 assert len(result) == 4
90
91 def test_parses_output_correctly(self, stub_call):
92 names = ['sdaa', 'sdaa5', 'sdaa1', 'sdaa2']
93 out = [
94 'NAME="sdaa5" PARLABEL="ceph lockbox"',
95 'NAME="sdaa" RO="0"',
96 'NAME="sdaa1" PARLABEL="ceph data"',
97 'NAME="sdaa2" PARLABEL="ceph journal"',
98 ]
99 stub_call((out, '', 0))
100 result = disk.device_family('sdaa5')
101 for parsed in result:
102 assert parsed['NAME'] in names
103
104
105 class TestHumanReadableSize(object):
106
107 def test_bytes(self):
108 result = disk.human_readable_size(800)
109 assert result == '800.00 B'
110
111 def test_kilobytes(self):
112 result = disk.human_readable_size(800*1024)
113 assert result == '800.00 KB'
114
115 def test_megabytes(self):
116 result = disk.human_readable_size(800*1024*1024)
117 assert result == '800.00 MB'
118
119 def test_gigabytes(self):
120 result = disk.human_readable_size(8.19*1024*1024*1024)
121 assert result == '8.19 GB'
122
123 def test_terabytes(self):
124 result = disk.human_readable_size(81.2*1024*1024*1024*1024)
125 assert result == '81.20 TB'
126
127 def test_petabytes(self):
128 result = disk.human_readable_size(9.23*1024*1024*1024*1024*1024)
129 assert result == '9.23 PB'
130
131 class TestSizeFromHumanReadable(object):
132
133 def test_bytes(self):
134 result = disk.size_from_human_readable('2')
135 assert result == disk.Size(b=2)
136
137 def test_kilobytes(self):
138 result = disk.size_from_human_readable('2 K')
139 assert result == disk.Size(kb=2)
140
141 def test_megabytes(self):
142 result = disk.size_from_human_readable('2 M')
143 assert result == disk.Size(mb=2)
144
145 def test_gigabytes(self):
146 result = disk.size_from_human_readable('2 G')
147 assert result == disk.Size(gb=2)
148
149 def test_terabytes(self):
150 result = disk.size_from_human_readable('2 T')
151 assert result == disk.Size(tb=2)
152
153 def test_petabytes(self):
154 result = disk.size_from_human_readable('2 P')
155 assert result == disk.Size(pb=2)
156
157 def test_case(self):
158 result = disk.size_from_human_readable('2 t')
159 assert result == disk.Size(tb=2)
160
161 def test_space(self):
162 result = disk.size_from_human_readable('2T')
163 assert result == disk.Size(tb=2)
164
165 def test_float(self):
166 result = disk.size_from_human_readable('2.0')
167 assert result == disk.Size(b=2)
168 result = disk.size_from_human_readable('2.0T')
169 assert result == disk.Size(tb=2)
170 result = disk.size_from_human_readable('1.8T')
171 assert result == disk.Size(tb=1.8)
172
173
174 class TestSizeParse(object):
175
176 def test_bytes(self):
177 result = disk.Size.parse('2')
178 assert result == disk.Size(b=2)
179
180 def test_kilobytes(self):
181 result = disk.Size.parse('2K')
182 assert result == disk.Size(kb=2)
183
184 def test_megabytes(self):
185 result = disk.Size.parse('2M')
186 assert result == disk.Size(mb=2)
187
188 def test_gigabytes(self):
189 result = disk.Size.parse('2G')
190 assert result == disk.Size(gb=2)
191
192 def test_terabytes(self):
193 result = disk.Size.parse('2T')
194 assert result == disk.Size(tb=2)
195
196 def test_petabytes(self):
197 result = disk.Size.parse('2P')
198 assert result == disk.Size(pb=2)
199
200 def test_tb(self):
201 result = disk.Size.parse('2Tb')
202 assert result == disk.Size(tb=2)
203
204 def test_case(self):
205 result = disk.Size.parse('2t')
206 assert result == disk.Size(tb=2)
207
208 def test_space(self):
209 result = disk.Size.parse('2T')
210 assert result == disk.Size(tb=2)
211
212 def test_float(self):
213 result = disk.Size.parse('2.0')
214 assert result == disk.Size(b=2)
215 result = disk.Size.parse('2.0T')
216 assert result == disk.Size(tb=2)
217 result = disk.Size.parse('1.8T')
218 assert result == disk.Size(tb=1.8)
219
220
221 class TestGetBlockDevsLsblk(object):
222
223 @patch('ceph_volume.process.call')
224 def test_return_structure(self, patched_call):
225 lsblk_stdout = [
226 '/dev/dm-0 /dev/mapper/ceph--8b2684eb--56ff--49e4--8f28--522e04cbd6ab-osd--data--9fc29fbf--3b5b--4066--be10--61042569b5a7 lvm',
227 '/dev/vda /dev/vda disk',
228 '/dev/vda1 /dev/vda1 part',
229 '/dev/vdb /dev/vdb disk',]
230 patched_call.return_value = (lsblk_stdout, '', 0)
231 disks = disk.get_block_devs_lsblk()
232 assert len(disks) == len(lsblk_stdout)
233 assert len(disks[0]) == 3
234
235 @patch('ceph_volume.process.call')
236 def test_empty_lsblk(self, patched_call):
237 patched_call.return_value = ([], '', 0)
238 disks = disk.get_block_devs_lsblk()
239 assert len(disks) == 0
240
241 @patch('ceph_volume.process.call')
242 def test_raise_on_failure(self, patched_call):
243 patched_call.return_value = ([], 'error', 1)
244 with pytest.raises(OSError):
245 disk.get_block_devs_lsblk()
246
247
248 class TestGetDevices(object):
249
250 def setup_path(self, tmpdir):
251 path = os.path.join(str(tmpdir), 'block')
252 os.makedirs(path)
253 return path
254
255 def test_no_devices_are_found(self, tmpdir, patched_get_block_devs_lsblk):
256 patched_get_block_devs_lsblk.return_value = []
257 result = disk.get_devices(_sys_block_path=str(tmpdir))
258 assert result == {}
259
260 def test_sda_block_is_found(self, tmpdir, patched_get_block_devs_lsblk):
261 sda_path = '/dev/sda'
262 patched_get_block_devs_lsblk.return_value = [[sda_path, sda_path, 'disk']]
263 block_path = self.setup_path(tmpdir)
264 os.makedirs(os.path.join(block_path, 'sda'))
265 result = disk.get_devices(_sys_block_path=block_path)
266 assert len(result.keys()) == 1
267 assert result[sda_path]['human_readable_size'] == '0.00 B'
268 assert result[sda_path]['model'] == ''
269 assert result[sda_path]['partitions'] == {}
270
271
272 def test_sda_size(self, tmpfile, tmpdir, patched_get_block_devs_lsblk):
273 sda_path = '/dev/sda'
274 patched_get_block_devs_lsblk.return_value = [[sda_path, sda_path, 'disk']]
275 block_path = self.setup_path(tmpdir)
276 block_sda_path = os.path.join(block_path, 'sda')
277 os.makedirs(block_sda_path)
278 tmpfile('size', '1024', directory=block_sda_path)
279 result = disk.get_devices(_sys_block_path=block_path)
280 assert list(result.keys()) == [sda_path]
281 assert result[sda_path]['human_readable_size'] == '512.00 KB'
282
283 def test_sda_sectorsize_fallsback(self, tmpfile, tmpdir, patched_get_block_devs_lsblk):
284 # if no sectorsize, it will use queue/hw_sector_size
285 sda_path = '/dev/sda'
286 patched_get_block_devs_lsblk.return_value = [[sda_path, sda_path, 'disk']]
287 block_path = self.setup_path(tmpdir)
288 block_sda_path = os.path.join(block_path, 'sda')
289 sda_queue_path = os.path.join(block_sda_path, 'queue')
290 os.makedirs(block_sda_path)
291 os.makedirs(sda_queue_path)
292 tmpfile('hw_sector_size', contents='1024', directory=sda_queue_path)
293 result = disk.get_devices(_sys_block_path=block_path)
294 assert list(result.keys()) == [sda_path]
295 assert result[sda_path]['sectorsize'] == '1024'
296
297 def test_sda_sectorsize_from_logical_block(self, tmpfile, tmpdir, patched_get_block_devs_lsblk):
298 sda_path = '/dev/sda'
299 patched_get_block_devs_lsblk.return_value = [[sda_path, sda_path, 'disk']]
300 block_path = self.setup_path(tmpdir)
301 block_sda_path = os.path.join(block_path, 'sda')
302 sda_queue_path = os.path.join(block_sda_path, 'queue')
303 os.makedirs(block_sda_path)
304 os.makedirs(sda_queue_path)
305 tmpfile('logical_block_size', contents='99', directory=sda_queue_path)
306 result = disk.get_devices(_sys_block_path=block_path)
307 assert result[sda_path]['sectorsize'] == '99'
308
309 def test_sda_sectorsize_does_not_fallback(self, tmpfile, tmpdir, patched_get_block_devs_lsblk):
310 sda_path = '/dev/sda'
311 patched_get_block_devs_lsblk.return_value = [[sda_path, sda_path, 'disk']]
312 block_path = self.setup_path(tmpdir)
313 block_sda_path = os.path.join(block_path, 'sda')
314 sda_queue_path = os.path.join(block_sda_path, 'queue')
315 os.makedirs(block_sda_path)
316 os.makedirs(sda_queue_path)
317 tmpfile('logical_block_size', contents='99', directory=sda_queue_path)
318 tmpfile('hw_sector_size', contents='1024', directory=sda_queue_path)
319 result = disk.get_devices(_sys_block_path=block_path)
320 assert result[sda_path]['sectorsize'] == '99'
321
322 def test_is_rotational(self, tmpfile, tmpdir, patched_get_block_devs_lsblk):
323 sda_path = '/dev/sda'
324 patched_get_block_devs_lsblk.return_value = [[sda_path, sda_path, 'disk']]
325 block_path = self.setup_path(tmpdir)
326 block_sda_path = os.path.join(block_path, 'sda')
327 sda_queue_path = os.path.join(block_sda_path, 'queue')
328 os.makedirs(block_sda_path)
329 os.makedirs(sda_queue_path)
330 tmpfile('rotational', contents='1', directory=sda_queue_path)
331 result = disk.get_devices(_sys_block_path=block_path)
332 assert result[sda_path]['rotational'] == '1'
333
334
335 class TestSizeCalculations(object):
336
337 @pytest.mark.parametrize('aliases', [
338 ('b', 'bytes'),
339 ('kb', 'kilobytes'),
340 ('mb', 'megabytes'),
341 ('gb', 'gigabytes'),
342 ('tb', 'terabytes'),
343 ])
344 def test_aliases(self, aliases):
345 short_alias, long_alias = aliases
346 s = disk.Size(b=1)
347 short_alias = getattr(s, short_alias)
348 long_alias = getattr(s, long_alias)
349 assert short_alias == long_alias
350
351 @pytest.mark.parametrize('values', [
352 ('b', 857619069665.28),
353 ('kb', 837518622.72),
354 ('mb', 817889.28),
355 ('gb', 798.72),
356 ('tb', 0.78),
357 ])
358 def test_terabytes(self, values):
359 # regardless of the input value, all the other values correlate to each
360 # other the same, every time
361 unit, value = values
362 s = disk.Size(**{unit: value})
363 assert s.b == 857619069665.28
364 assert s.kb == 837518622.72
365 assert s.mb == 817889.28
366 assert s.gb == 798.72
367 assert s.tb == 0.78
368
369
370 class TestSizeOperators(object):
371
372 @pytest.mark.parametrize('larger', [1025, 1024.1, 1024.001])
373 def test_gigabytes_is_smaller(self, larger):
374 assert disk.Size(gb=1) < disk.Size(mb=larger)
375
376 @pytest.mark.parametrize('smaller', [1023, 1023.9, 1023.001])
377 def test_gigabytes_is_larger(self, smaller):
378 assert disk.Size(gb=1) > disk.Size(mb=smaller)
379
380 @pytest.mark.parametrize('larger', [1025, 1024.1, 1024.001, 1024])
381 def test_gigabytes_is_smaller_or_equal(self, larger):
382 assert disk.Size(gb=1) <= disk.Size(mb=larger)
383
384 @pytest.mark.parametrize('smaller', [1023, 1023.9, 1023.001, 1024])
385 def test_gigabytes_is_larger_or_equal(self, smaller):
386 assert disk.Size(gb=1) >= disk.Size(mb=smaller)
387
388 @pytest.mark.parametrize('values', [
389 ('b', 857619069665.28),
390 ('kb', 837518622.72),
391 ('mb', 817889.28),
392 ('gb', 798.72),
393 ('tb', 0.78),
394 ])
395 def test_equality(self, values):
396 unit, value = values
397 s = disk.Size(**{unit: value})
398 # both tb and b, since b is always calculated regardless, and is useful
399 # when testing tb
400 assert disk.Size(tb=0.78) == s
401 assert disk.Size(b=857619069665.28) == s
402
403 @pytest.mark.parametrize('values', [
404 ('b', 857619069665.28),
405 ('kb', 837518622.72),
406 ('mb', 817889.28),
407 ('gb', 798.72),
408 ('tb', 0.78),
409 ])
410 def test_inequality(self, values):
411 unit, value = values
412 s = disk.Size(**{unit: value})
413 # both tb and b, since b is always calculated regardless, and is useful
414 # when testing tb
415 assert disk.Size(tb=1) != s
416 assert disk.Size(b=100) != s
417
418
419 class TestSizeOperations(object):
420
421 def test_assignment_addition_with_size_objects(self):
422 result = disk.Size(mb=256) + disk.Size(gb=1)
423 assert result.gb == 1.25
424 assert result.gb.as_int() == 1
425 assert result.gb.as_float() == 1.25
426
427 def test_self_addition_with_size_objects(self):
428 base = disk.Size(mb=256)
429 base += disk.Size(gb=1)
430 assert base.gb == 1.25
431
432 def test_self_addition_does_not_alter_state(self):
433 base = disk.Size(mb=256)
434 base + disk.Size(gb=1)
435 assert base.mb == 256
436
437 def test_addition_with_non_size_objects(self):
438 with pytest.raises(TypeError):
439 disk.Size(mb=100) + 4
440
441 def test_assignment_subtraction_with_size_objects(self):
442 base = disk.Size(gb=1)
443 base -= disk.Size(mb=256)
444 assert base.mb == 768
445
446 def test_self_subtraction_does_not_alter_state(self):
447 base = disk.Size(gb=1)
448 base - disk.Size(mb=256)
449 assert base.gb == 1
450
451 def test_subtraction_with_size_objects(self):
452 result = disk.Size(gb=1) - disk.Size(mb=256)
453 assert result.mb == 768
454
455 def test_subtraction_with_non_size_objects(self):
456 with pytest.raises(TypeError):
457 disk.Size(mb=100) - 4
458
459 def test_multiplication_with_size_objects(self):
460 with pytest.raises(TypeError):
461 disk.Size(mb=100) * disk.Size(mb=1)
462
463 def test_multiplication_with_non_size_objects(self):
464 base = disk.Size(gb=1)
465 result = base * 2
466 assert result.gb == 2
467 assert result.gb.as_int() == 2
468
469 def test_division_with_size_objects(self):
470 result = disk.Size(gb=1) / disk.Size(mb=1)
471 assert int(result) == 1024
472
473 def test_division_with_non_size_objects(self):
474 base = disk.Size(gb=1)
475 result = base / 2
476 assert result.mb == 512
477 assert result.mb.as_int() == 512
478
479 def test_division_with_non_size_objects_without_state(self):
480 base = disk.Size(gb=1)
481 base / 2
482 assert base.gb == 1
483 assert base.gb.as_int() == 1
484
485
486 class TestSizeAttributes(object):
487
488 def test_attribute_does_not_exist(self):
489 with pytest.raises(AttributeError):
490 disk.Size(mb=1).exabytes
491
492
493 class TestSizeFormatting(object):
494
495 def test_default_formatting_tb_to_b(self):
496 size = disk.Size(tb=0.0000000001)
497 result = "%s" % size
498 assert result == "109.95 B"
499
500 def test_default_formatting_tb_to_kb(self):
501 size = disk.Size(tb=0.00000001)
502 result = "%s" % size
503 assert result == "10.74 KB"
504
505 def test_default_formatting_tb_to_mb(self):
506 size = disk.Size(tb=0.000001)
507 result = "%s" % size
508 assert result == "1.05 MB"
509
510 def test_default_formatting_tb_to_gb(self):
511 size = disk.Size(tb=0.001)
512 result = "%s" % size
513 assert result == "1.02 GB"
514
515 def test_default_formatting_tb_to_tb(self):
516 size = disk.Size(tb=10)
517 result = "%s" % size
518 assert result == "10.00 TB"
519
520
521 class TestSizeSpecificFormatting(object):
522
523 def test_formatting_b(self):
524 size = disk.Size(b=2048)
525 result = "%s" % size.b
526 assert "%s" % size.b == "%s" % size.bytes
527 assert result == "2048.00 B"
528
529 def test_formatting_kb(self):
530 size = disk.Size(kb=5700)
531 result = "%s" % size.kb
532 assert "%s" % size.kb == "%s" % size.kilobytes
533 assert result == "5700.00 KB"
534
535 def test_formatting_mb(self):
536 size = disk.Size(mb=4000)
537 result = "%s" % size.mb
538 assert "%s" % size.mb == "%s" % size.megabytes
539 assert result == "4000.00 MB"
540
541 def test_formatting_gb(self):
542 size = disk.Size(gb=77777)
543 result = "%s" % size.gb
544 assert "%s" % size.gb == "%s" % size.gigabytes
545 assert result == "77777.00 GB"
546
547 def test_formatting_tb(self):
548 size = disk.Size(tb=1027)
549 result = "%s" % size.tb
550 assert "%s" % size.tb == "%s" % size.terabytes
551 assert result == "1027.00 TB"