]>
git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/tests/test_configuration.py
3 from cStringIO
import StringIO
4 except ImportError: # pragma: no cover
5 from io
import StringIO
# pragma: no cover
6 from textwrap
import dedent
8 from ceph_volume
import configuration
, exceptions
20 class TestConf(object):
23 self
.conf_file
= StringIO(dedent("""
28 def test_get_non_existing_list(self
):
29 cfg
= configuration
.Conf()
30 cfg
.is_valid
= lambda: True
31 cfg
.read_conf(self
.conf_file
)
32 assert cfg
.get_list('global', 'key') == []
34 def test_get_non_existing_list_get_default(self
):
35 cfg
= configuration
.Conf()
36 cfg
.is_valid
= lambda: True
37 cfg
.read_conf(self
.conf_file
)
38 assert cfg
.get_list('global', 'key', ['a']) == ['a']
40 def test_get_rid_of_comments(self
):
41 cfg
= configuration
.Conf()
42 cfg
.is_valid
= lambda: True
43 conf_file
= StringIO(dedent("""
45 default = 0 # this is a comment
48 cfg
.read_conf(conf_file
)
49 assert cfg
.get_list('foo', 'default') == ['0']
51 def test_gets_split_on_commas(self
):
52 cfg
= configuration
.Conf()
53 cfg
.is_valid
= lambda: True
54 conf_file
= StringIO(dedent("""
56 default = 0,1,2,3 # this is a comment
59 cfg
.read_conf(conf_file
)
60 assert cfg
.get_list('foo', 'default') == ['0', '1', '2', '3']
62 def test_spaces_and_tabs_are_ignored(self
):
63 cfg
= configuration
.Conf()
64 cfg
.is_valid
= lambda: True
65 conf_file
= StringIO(dedent("""
67 default = 0, 1, 2 ,3 # this is a comment
70 cfg
.read_conf(conf_file
)
71 assert cfg
.get_list('foo', 'default') == ['0', '1', '2', '3']
74 class TestLoad(object):
76 def test_load_from_path(self
, tmpdir
):
77 conf_path
= os
.path
.join(str(tmpdir
), 'ceph.conf')
78 with
open(conf_path
, 'w') as conf
:
79 conf
.write(tabbed_conf
)
80 result
= configuration
.load(conf_path
)
81 assert result
.get('global', 'default') == '0'
83 def test_load_with_colon_comments(self
, tmpdir
):
84 conf_path
= os
.path
.join(str(tmpdir
), 'ceph.conf')
85 with
open(conf_path
, 'w') as conf
:
86 conf
.write(tabbed_conf
)
87 result
= configuration
.load(conf_path
)
88 assert result
.get('global', 'other_c') == '1'
90 def test_load_with_hash_comments(self
, tmpdir
):
91 conf_path
= os
.path
.join(str(tmpdir
), 'ceph.conf')
92 with
open(conf_path
, 'w') as conf
:
93 conf
.write(tabbed_conf
)
94 result
= configuration
.load(conf_path
)
95 assert result
.get('global', 'other_h') == '1'
97 def test_path_does_not_exist(self
):
98 with pytest
.raises(exceptions
.ConfigurationError
):
99 conf
= configuration
.load('/path/does/not/exist/ceph.con')
102 def test_unable_to_read_configuration(self
, tmpdir
, capsys
):
103 ceph_conf
= os
.path
.join(str(tmpdir
), 'ceph.conf')
104 with
open(ceph_conf
, 'w') as config
:
105 config
.write(']broken] config\n[[')
106 with pytest
.raises(RuntimeError):
107 configuration
.load(ceph_conf
)
108 stdout
, stderr
= capsys
.readouterr()
109 assert 'File contains no section headers' in stderr
111 @pytest.mark
.parametrize('commented', ['colon','hash'])
112 def test_coment_as_a_value(self
, tmpdir
, commented
):
113 conf_path
= os
.path
.join(str(tmpdir
), 'ceph.conf')
114 with
open(conf_path
, 'w') as conf
:
115 conf
.write(tabbed_conf
)
116 result
= configuration
.load(conf_path
)
117 assert result
.get('global', commented
) == ''