]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/tests/test_agent.py
buildsys: change download over to reef release
[ceph.git] / ceph / src / pybind / mgr / cephadm / tests / test_agent.py
1 from unittest.mock import MagicMock
2 from cephadm.agent import Root
3
4
5 class FakeDaemonDescription:
6 def __init__(self, ip, ports, hostname, service_name='', daemon_type=''):
7 self.ip = ip
8 self.ports = ports
9 self.hostname = hostname
10 self._service_name = service_name
11 self.daemon_type = daemon_type
12
13 def service_name(self):
14 return self._service_name
15
16
17 class FakeCache:
18 def get_daemons_by_service(self, service_type):
19 if service_type == 'ceph-exporter':
20 return [FakeDaemonDescription('1.2.3.4', [9926], 'node0'),
21 FakeDaemonDescription('1.2.3.5', [9926], 'node1')]
22
23 return [FakeDaemonDescription('1.2.3.4', [9100], 'node0'),
24 FakeDaemonDescription('1.2.3.5', [9200], 'node1')]
25
26 def get_daemons_by_type(self, daemon_type):
27 return [FakeDaemonDescription('1.2.3.4', [9100], 'node0', 'ingress', 'haproxy'),
28 FakeDaemonDescription('1.2.3.5', [9200], 'node1', 'ingress', 'haproxy')]
29
30
31 class FakeInventory:
32 def get_addr(self, name: str):
33 return '1.2.3.4'
34
35
36 class FakeServiceSpec:
37 def __init__(self, port):
38 self.monitor_port = port
39
40
41 class FakeSpecDescription:
42 def __init__(self, port):
43 self.spec = FakeServiceSpec(port)
44
45
46 class FakeSpecStore():
47 def __init__(self, mgr):
48 self.mgr = mgr
49 self._specs = {'ingress': FakeSpecDescription(9049)}
50
51 def __contains__(self, name):
52 return name in self._specs
53
54 def __getitem__(self, name):
55 return self._specs['ingress']
56
57
58 class FakeMgr:
59 def __init__(self):
60 self.config = ''
61 self.check_mon_command = MagicMock(side_effect=self._check_mon_command)
62 self.mon_command = MagicMock(side_effect=self._check_mon_command)
63 self.template = MagicMock()
64 self.log = MagicMock()
65 self.inventory = FakeInventory()
66 self.cache = FakeCache()
67 self.spec_store = FakeSpecStore(self)
68
69 def list_servers(self):
70
71 servers = [
72 {'hostname': 'node0',
73 'ceph_version': '16.2',
74 'services': [{'type': 'mgr'}, {'type': 'mon'}]},
75 {'hostname': 'node1',
76 'ceph_version': '16.2',
77 'services': [{'type': 'mgr'}, {'type': 'mon'}]}
78 ]
79
80 return servers
81
82 def _check_mon_command(self, cmd_dict, inbuf=None):
83 prefix = cmd_dict.get('prefix')
84 if prefix == 'get-cmd':
85 return 0, self.config, ''
86 if prefix == 'set-cmd':
87 self.config = cmd_dict.get('value')
88 return 0, 'value set', ''
89 return -1, '', 'error'
90
91 def get_module_option_ex(self, module, option, default_value):
92 return "9283"
93
94
95 class TestCephadmService:
96
97 def test_get_sd_config_prometheus(self):
98 mgr = FakeMgr()
99 root = Root(mgr)
100 cfg = root.get_sd_config('mgr-prometheus')
101
102 # check response structure
103 assert cfg
104 for entry in cfg:
105 assert 'labels' in entry
106 assert 'targets' in entry
107
108 # check content
109 assert cfg[0]['targets'] == ['node0:9283', 'node1:9283']
110
111 def test_get_sd_config_node_exporter(self):
112 mgr = FakeMgr()
113 root = Root(mgr)
114 cfg = root.get_sd_config('node-exporter')
115
116 # check response structure
117 assert cfg
118 for entry in cfg:
119 assert 'labels' in entry
120 assert 'targets' in entry
121
122 # check content
123 assert cfg[0]['targets'] == ['1.2.3.4:9100']
124 assert cfg[0]['labels'] == {'instance': 'node0'}
125 assert cfg[1]['targets'] == ['1.2.3.5:9200']
126 assert cfg[1]['labels'] == {'instance': 'node1'}
127
128 def test_get_sd_config_alertmgr(self):
129 mgr = FakeMgr()
130 root = Root(mgr)
131 cfg = root.get_sd_config('alertmanager')
132
133 # check response structure
134 assert cfg
135 for entry in cfg:
136 assert 'labels' in entry
137 assert 'targets' in entry
138
139 # check content
140 assert cfg[0]['targets'] == ['1.2.3.4:9100', '1.2.3.5:9200']
141
142 def test_get_sd_config_haproxy(self):
143 mgr = FakeMgr()
144 root = Root(mgr)
145 cfg = root.get_sd_config('haproxy')
146
147 # check response structure
148 assert cfg
149 for entry in cfg:
150 assert 'labels' in entry
151 assert 'targets' in entry
152
153 # check content
154 assert cfg[0]['targets'] == ['1.2.3.4:9049']
155 assert cfg[0]['labels'] == {'instance': 'ingress'}
156
157 def test_get_sd_config_ceph_exporter(self):
158 mgr = FakeMgr()
159 root = Root(mgr)
160 cfg = root.get_sd_config('ceph-exporter')
161
162 # check response structure
163 assert cfg
164 for entry in cfg:
165 assert 'labels' in entry
166 assert 'targets' in entry
167
168 # check content
169 assert cfg[0]['targets'] == ['1.2.3.4:9926']
170
171 def test_get_sd_config_invalid_service(self):
172 mgr = FakeMgr()
173 root = Root(mgr)
174 cfg = root.get_sd_config('invalid-service')
175 assert cfg == []