]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/test_configuration.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / ceph-volume / ceph_volume / tests / test_configuration.py
1 import os
2 try:
3 from cStringIO import StringIO
4 except ImportError: # pragma: no cover
5 from io import StringIO # pragma: no cover
6 from textwrap import dedent
7 import pytest
8 from ceph_volume import configuration, exceptions
9
10 tabbed_conf = """
11 [global]
12 default = 0
13 other_h = 1 # comment
14 other_c = 1 ; comment
15 colon = ;
16 hash = #
17 """
18
19
20 class TestConf(object):
21
22 def setup(self):
23 self.conf_file = StringIO(dedent("""
24 [foo]
25 default = 0
26 """))
27
28 def test_get_non_existing_list(self):
29 cfg = configuration.Conf()
30 cfg.is_valid = lambda: True
31 cfg.readfp(self.conf_file)
32 assert cfg.get_list('global', 'key') == []
33
34 def test_get_non_existing_list_get_default(self):
35 cfg = configuration.Conf()
36 cfg.is_valid = lambda: True
37 cfg.readfp(self.conf_file)
38 assert cfg.get_list('global', 'key', ['a']) == ['a']
39
40 def test_get_rid_of_comments(self):
41 cfg = configuration.Conf()
42 cfg.is_valid = lambda: True
43 conf_file = StringIO(dedent("""
44 [foo]
45 default = 0 # this is a comment
46 """))
47
48 cfg.readfp(conf_file)
49 assert cfg.get_list('foo', 'default') == ['0']
50
51 def test_gets_split_on_commas(self):
52 cfg = configuration.Conf()
53 cfg.is_valid = lambda: True
54 conf_file = StringIO(dedent("""
55 [foo]
56 default = 0,1,2,3 # this is a comment
57 """))
58
59 cfg.readfp(conf_file)
60 assert cfg.get_list('foo', 'default') == ['0', '1', '2', '3']
61
62 def test_spaces_and_tabs_are_ignored(self):
63 cfg = configuration.Conf()
64 cfg.is_valid = lambda: True
65 conf_file = StringIO(dedent("""
66 [foo]
67 default = 0, 1, 2 ,3 # this is a comment
68 """))
69
70 cfg.readfp(conf_file)
71 assert cfg.get_list('foo', 'default') == ['0', '1', '2', '3']
72
73
74 class TestLoad(object):
75
76 def test_load_from_path(self, tmpdir):
77 conf_path = os.path.join(str(tmpdir), 'ceph.conf')
78 with open(conf_path, 'w') as conf:
79 conf.write(tabbed_conf)
80 result = configuration.load(conf_path)
81 assert result.get('global', 'default') == '0'
82
83 def test_load_with_colon_comments(self, tmpdir):
84 conf_path = os.path.join(str(tmpdir), 'ceph.conf')
85 with open(conf_path, 'w') as conf:
86 conf.write(tabbed_conf)
87 result = configuration.load(conf_path)
88 assert result.get('global', 'other_c') == '1'
89
90 def test_load_with_hash_comments(self, tmpdir):
91 conf_path = os.path.join(str(tmpdir), 'ceph.conf')
92 with open(conf_path, 'w') as conf:
93 conf.write(tabbed_conf)
94 result = configuration.load(conf_path)
95 assert result.get('global', 'other_h') == '1'
96
97 def test_path_does_not_exist(self):
98 with pytest.raises(exceptions.ConfigurationError):
99 conf = configuration.load('/path/does/not/exist/ceph.con')
100 conf.is_valid()
101
102 def test_unable_to_read_configuration(self, tmpdir, capsys):
103 ceph_conf = os.path.join(str(tmpdir), 'ceph.conf')
104 with open(ceph_conf, 'w') as config:
105 config.write(']broken] config\n[[')
106 with pytest.raises(RuntimeError):
107 configuration.load(ceph_conf)
108 stdout, stderr = capsys.readouterr()
109 assert 'File contains no section headers' in stderr
110
111 @pytest.mark.parametrize('commented', ['colon','hash'])
112 def test_coment_as_a_value(self, tmpdir, commented):
113 conf_path = os.path.join(str(tmpdir), 'ceph.conf')
114 with open(conf_path, 'w') as conf:
115 conf.write(tabbed_conf)
116 result = configuration.load(conf_path)
117 assert result.get('global', commented) == ''