]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | from unittest import mock |
2 | import copy, datetime, json, os, socket, threading | |
3 | ||
4 | import pytest | |
5 | ||
6 | from tests.fixtures import with_cephadm_ctx, cephadm_fs, import_cephadm | |
7 | ||
8 | from typing import Optional | |
9 | ||
10 | _cephadm = import_cephadm() | |
11 | ||
12 | ||
13 | FSID = "beefbeef-beef-beef-1234-beefbeefbeef" | |
14 | AGENT_ID = 'host1' | |
15 | AGENT_DIR = f'/var/lib/ceph/{FSID}/agent.{AGENT_ID}' | |
16 | ||
17 | ||
18 | def test_agent_validate(): | |
19 | required_files = _cephadm.CephadmAgent.required_files | |
20 | with with_cephadm_ctx([]) as ctx: | |
21 | agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) | |
22 | for i in range(len(required_files)): | |
23 | incomplete_files = {s: 'text' for s in [f for j, f in enumerate(required_files) if j != i]} | |
24 | with pytest.raises(_cephadm.Error, match=f'required file missing from config: {required_files[i]}'): | |
25 | agent.validate(incomplete_files) | |
26 | all_files = {s: 'text' for s in required_files} | |
27 | agent.validate(all_files) | |
28 | ||
29 | ||
30 | def _check_file(path, content): | |
31 | assert os.path.exists(path) | |
32 | with open(path) as f: | |
33 | fcontent = f.read() | |
34 | assert fcontent == content | |
35 | ||
36 | ||
37 | @mock.patch('cephadm.call_throws') | |
38 | def test_agent_deploy_daemon_unit(_call_throws, cephadm_fs): | |
39 | _call_throws.return_value = ('', '', 0) | |
40 | agent_id = AGENT_ID | |
41 | ||
42 | with with_cephadm_ctx([]) as ctx: | |
43 | ctx.meta_json = json.dumps({'meta': 'data'}) | |
44 | agent = _cephadm.CephadmAgent(ctx, FSID, agent_id) | |
45 | cephadm_fs.create_dir(AGENT_DIR) | |
46 | ||
47 | with pytest.raises(_cephadm.Error, match='Agent needs a config'): | |
48 | agent.deploy_daemon_unit() | |
49 | ||
50 | config = {s: f'text for {s}' for s in _cephadm.CephadmAgent.required_files} | |
51 | config['not-required-file.txt'] = 'don\'t write me' | |
52 | ||
53 | agent.deploy_daemon_unit(config) | |
54 | ||
55 | # check required config file were all created | |
56 | for fname in _cephadm.CephadmAgent.required_files: | |
57 | _check_file(f'{AGENT_DIR}/{fname}', f'text for {fname}') | |
58 | ||
59 | # assert non-required file was not written | |
60 | assert not os.path.exists(f'{AGENT_DIR}/not-required-file.txt') | |
61 | ||
62 | # check unit.run file was created correctly | |
63 | _check_file(f'{AGENT_DIR}/unit.run', agent.unit_run()) | |
64 | ||
65 | # check unit.meta file created correctly | |
66 | _check_file(f'{AGENT_DIR}/unit.meta', json.dumps({'meta': 'data'}, indent=4) + '\n') | |
67 | ||
68 | # check unit file was created correctly | |
69 | _check_file(f'{ctx.unit_dir}/{agent.unit_name()}', agent.unit_file()) | |
70 | ||
71 | expected_call_throws_calls = [ | |
72 | mock.call(ctx, ['systemctl', 'daemon-reload']), | |
73 | mock.call(ctx, ['systemctl', 'enable', '--now', agent.unit_name()]), | |
74 | ] | |
75 | _call_throws.assert_has_calls(expected_call_throws_calls) | |
76 | ||
77 | expected_call_calls = [ | |
78 | mock.call(ctx, ['systemctl', 'stop', agent.unit_name()], verbosity=_cephadm.CallVerbosity.DEBUG), | |
79 | mock.call(ctx, ['systemctl', 'reset-failed', agent.unit_name()], verbosity=_cephadm.CallVerbosity.DEBUG), | |
80 | ] | |
81 | _cephadm.call.assert_has_calls(expected_call_calls) | |
82 | ||
83 | ||
84 | @mock.patch('threading.Thread.is_alive') | |
85 | def test_agent_shutdown(_is_alive): | |
86 | with with_cephadm_ctx([]) as ctx: | |
87 | agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) | |
88 | _is_alive.return_value = True | |
89 | assert agent.stop == False | |
90 | assert agent.mgr_listener.stop == False | |
91 | assert agent.ls_gatherer.stop == False | |
92 | assert agent.volume_gatherer.stop == False | |
93 | agent.shutdown() | |
94 | assert agent.stop == True | |
95 | assert agent.mgr_listener.stop == True | |
96 | assert agent.ls_gatherer.stop == True | |
97 | assert agent.volume_gatherer.stop == True | |
98 | ||
99 | ||
100 | def test_agent_wakeup(): | |
101 | with with_cephadm_ctx([]) as ctx: | |
102 | agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) | |
103 | assert agent.event.is_set() == False | |
104 | agent.wakeup() | |
105 | assert agent.event.is_set() == True | |
106 | ||
107 | ||
108 | @mock.patch("cephadm.CephadmAgent.shutdown") | |
109 | @mock.patch("cephadm.AgentGatherer.update_func") | |
110 | def test_pull_conf_settings(_update_func, _shutdown, cephadm_fs): | |
111 | target_ip = '192.168.0.0' | |
112 | target_port = 9876 | |
113 | refresh_period = 20 | |
114 | listener_port = 5678 | |
115 | host = AGENT_ID | |
116 | device_enhanced_scan = 'True' | |
117 | with with_cephadm_ctx([]) as ctx: | |
118 | agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) | |
119 | full_config = { | |
120 | 'target_ip': target_ip, | |
121 | 'target_port': target_port, | |
122 | 'refresh_period': refresh_period, | |
123 | 'listener_port': listener_port, | |
124 | 'host': host, | |
125 | 'device_enhanced_scan': device_enhanced_scan | |
126 | } | |
127 | cephadm_fs.create_dir(AGENT_DIR) | |
128 | with open(agent.config_path, 'w') as f: | |
129 | f.write(json.dumps(full_config)) | |
130 | ||
131 | with pytest.raises(_cephadm.Error, match="Failed to get agent keyring:"): | |
132 | agent.pull_conf_settings() | |
133 | _shutdown.assert_called() | |
134 | with open(agent.keyring_path, 'w') as f: | |
135 | f.write('keyring') | |
136 | ||
137 | assert agent.device_enhanced_scan == False | |
138 | agent.pull_conf_settings() | |
139 | assert agent.host == host | |
140 | assert agent.target_ip == target_ip | |
141 | assert agent.target_port == target_port | |
142 | assert agent.loop_interval == refresh_period | |
143 | assert agent.starting_port == listener_port | |
144 | assert agent.device_enhanced_scan == True | |
145 | assert agent.keyring == 'keyring' | |
146 | _update_func.assert_called() | |
147 | ||
148 | full_config.pop('target_ip') | |
149 | with open(agent.config_path, 'w') as f: | |
150 | f.write(json.dumps(full_config)) | |
151 | with pytest.raises(_cephadm.Error, match="Failed to get agent target ip and port from config:"): | |
152 | agent.pull_conf_settings() | |
153 | ||
154 | ||
155 | @mock.patch("cephadm.command_ceph_volume") | |
156 | def test_agent_ceph_volume(_ceph_volume): | |
157 | ||
158 | def _ceph_volume_outputter(_): | |
159 | print("ceph-volume output") | |
160 | ||
161 | def _ceph_volume_empty(_): | |
162 | pass | |
163 | ||
164 | with with_cephadm_ctx([]) as ctx: | |
165 | agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) | |
166 | ||
167 | _ceph_volume.side_effect = _ceph_volume_outputter | |
168 | out, _ = agent._ceph_volume(False) | |
169 | assert ctx.command == ['inventory', '--format=json'] | |
170 | assert out == "ceph-volume output\n" | |
171 | ||
172 | out, _ = agent._ceph_volume(True) | |
173 | assert ctx.command == ['inventory', '--format=json', '--with-lsm'] | |
174 | assert out == "ceph-volume output\n" | |
175 | ||
176 | _ceph_volume.side_effect = _ceph_volume_empty | |
177 | with pytest.raises(Exception, match='ceph-volume returned empty value'): | |
178 | out, _ = agent._ceph_volume(False) | |
179 | ||
180 | ||
181 | def test_agent_daemon_ls_subset(cephadm_fs): | |
182 | # Basing part of this test on some actual sample output | |
183 | ||
184 | # Some sample "podman stats --format '{{.ID}},{{.MemUsage}}' --no-stream" output | |
185 | # 3f2b31d19ecd,456.4MB / 41.96GB | |
186 | # 5aca2499e0f8,7.082MB / 41.96GB | |
187 | # fe0cef07d5f7,35.91MB / 41.96GB | |
188 | ||
189 | # Sample "podman ps --format '{{.ID}},{{.Names}}' --no-trunc" output with the same containers | |
190 | # fe0cef07d5f71c5c604f7d1b4a4ac2e27873c96089d015014524e803361b4a30,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-mon-host1 | |
191 | # 3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-mgr-host1-pntmho | |
192 | # 5aca2499e0f8fb903788ff90eb03fe6ed58c7ed177caf278fed199936aff7b4a,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-crash-host1 | |
193 | ||
194 | # Some of the components from that output | |
195 | mgr_cid = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f' | |
196 | mon_cid = 'fe0cef07d5f71c5c604f7d1b4a4ac2e27873c96089d015014524e803361b4a30' | |
197 | crash_cid = '5aca2499e0f8fb903788ff90eb03fe6ed58c7ed177caf278fed199936aff7b4a' | |
198 | mgr_short_cid = mgr_cid[0:12] | |
199 | mon_short_cid = mon_cid[0:12] | |
200 | crash_short_cid = crash_cid[0:12] | |
201 | ||
202 | #Rebuilding the output but with our testing FSID and components (to allow alteration later for whatever reason) | |
203 | mem_out = f"""{mgr_short_cid},456.4MB / 41.96GB | |
204 | {crash_short_cid},7.082MB / 41.96GB | |
205 | {mon_short_cid},35.91MB / 41.96GB""" | |
206 | ||
207 | ps_out = f"""{mon_cid},ceph-{FSID}-mon-host1 | |
208 | {mgr_cid},ceph-{FSID}-mgr-host1-pntmho | |
209 | {crash_cid},ceph-{FSID}-crash-host1""" | |
210 | ||
211 | def _fake_call(ctx, cmd, desc=None, verbosity=_cephadm.CallVerbosity.VERBOSE_ON_FAILURE, timeout=_cephadm.DEFAULT_TIMEOUT, **kwargs): | |
212 | if 'stats' in cmd: | |
213 | return (mem_out, '', 0) | |
214 | elif 'ps' in cmd: | |
215 | return (ps_out, '', 0) | |
216 | return ('out', 'err', 0) | |
217 | ||
218 | cephadm_fs.create_dir(AGENT_DIR) | |
219 | cephadm_fs.create_dir(f'/var/lib/ceph/mon/ceph-host1') # legacy daemon | |
220 | cephadm_fs.create_dir(f'/var/lib/ceph/osd/nothing') # improper directory, should be skipped | |
221 | cephadm_fs.create_dir(f'/var/lib/ceph/{FSID}/mgr.host1.pntmho') # cephadm daemon | |
222 | cephadm_fs.create_dir(f'/var/lib/ceph/{FSID}/crash.host1') # cephadm daemon | |
223 | ||
224 | with with_cephadm_ctx([]) as ctx: | |
225 | ctx.fsid = FSID | |
226 | agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) | |
227 | _cephadm.call.side_effect = _fake_call | |
228 | daemons = agent._daemon_ls_subset() | |
229 | ||
230 | assert 'agent.host1' in daemons | |
231 | assert 'mgr.host1.pntmho' in daemons | |
232 | assert 'crash.host1' in daemons | |
233 | assert 'mon.host1' in daemons | |
234 | ||
235 | assert daemons['mon.host1']['style'] == 'legacy' | |
236 | assert daemons['mgr.host1.pntmho']['style'] == 'cephadm:v1' | |
237 | assert daemons['crash.host1']['style'] == 'cephadm:v1' | |
238 | assert daemons['agent.host1']['style'] == 'cephadm:v1' | |
239 | ||
240 | assert daemons['mgr.host1.pntmho']['systemd_unit'] == f'ceph-{FSID}@mgr.host1.pntmho' | |
241 | assert daemons['agent.host1']['systemd_unit'] == f'ceph-{FSID}@agent.host1' | |
242 | assert daemons['crash.host1']['systemd_unit'] == f'ceph-{FSID}@crash.host1' | |
243 | ||
244 | assert daemons['mgr.host1.pntmho']['container_id'] == mgr_cid | |
245 | assert daemons['crash.host1']['container_id'] == crash_cid | |
246 | ||
247 | assert daemons['mgr.host1.pntmho']['memory_usage'] == 478570086 # 456.4 MB | |
248 | assert daemons['crash.host1']['memory_usage'] == 7426015 # 7.082 MB | |
249 | ||
250 | ||
251 | @mock.patch("cephadm.list_daemons") | |
252 | @mock.patch("cephadm.CephadmAgent._daemon_ls_subset") | |
253 | def test_agent_get_ls(_ls_subset, _ls, cephadm_fs): | |
254 | ls_out = [{ | |
255 | "style": "cephadm:v1", | |
256 | "name": "mgr.host1.pntmho", | |
257 | "fsid": FSID, | |
258 | "systemd_unit": f"ceph-{FSID}@mgr.host1.pntmho", | |
259 | "enabled": True, | |
260 | "state": "running", | |
261 | "service_name": "mgr", | |
262 | "memory_request": None, | |
263 | "memory_limit": None, | |
264 | "ports": [ | |
265 | 9283, | |
266 | 8765 | |
267 | ], | |
268 | "container_id": "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f", | |
269 | "container_image_name": "quay.io/ceph/ceph:testing", | |
270 | "container_image_id": "3300e39269f0c13ae45026cf233d8b3fff1303d52f2598a69c7fba0bb8405164", | |
271 | "container_image_digests": [ | |
272 | "quay.io/ceph/ceph@sha256:d4f3522528ee79904f9e530bdce438acac30a039e9a0b3cf31d8b614f9f96a30" | |
273 | ], | |
274 | "memory_usage": 507510784, | |
275 | "cpu_percentage": "5.95%", | |
276 | "version": "18.0.0-556-gb4d1a199", | |
277 | "started": "2022-10-27T14:19:36.086664Z", | |
278 | "created": "2022-10-27T14:19:36.282281Z", | |
279 | "deployed": "2022-10-27T14:19:35.377275Z", | |
280 | "configured": "2022-10-27T14:22:40.316912Z" | |
281 | },{ | |
282 | "style": "cephadm:v1", | |
283 | "name": "agent.host1", | |
284 | "fsid": FSID, | |
285 | "systemd_unit": f"ceph-{FSID}@agent.host1", | |
286 | "enabled": True, | |
287 | "state": "running", | |
288 | "service_name": "agent", | |
289 | "ports": [], | |
290 | "ip": None, | |
291 | "deployed_by": [ | |
292 | "quay.io/ceph/ceph@sha256:d4f3522528ee79904f9e530bdce438acac30a039e9a0b3cf31d8b614f9f96a30" | |
293 | ], | |
294 | "rank": None, | |
295 | "rank_generation": None, | |
296 | "extra_container_args": None, | |
297 | "container_id": None, | |
298 | "container_image_name": None, | |
299 | "container_image_id": None, | |
300 | "container_image_digests": None, | |
301 | "version": None, | |
302 | "started": None, | |
303 | "created": "2022-10-27T19:46:49.751594Z", | |
304 | "deployed": None, | |
305 | "configured": "2022-10-27T19:46:49.751594Z" | |
306 | }, { | |
307 | "style": "legacy", | |
308 | "name": "mon.host1", | |
309 | "fsid": FSID, | |
310 | "systemd_unit": "ceph-mon@host1", | |
311 | "enabled": False, | |
312 | "state": "stopped", | |
313 | "host_version": None | |
314 | }] | |
315 | ||
316 | ls_subset_out = { | |
317 | 'mgr.host1.pntmho': { | |
318 | "style": "cephadm:v1", | |
319 | "fsid": FSID, | |
320 | "systemd_unit": f"ceph-{FSID}@mgr.host1.pntmho", | |
321 | "enabled": True, | |
322 | "state": "running", | |
323 | "container_id": "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f", | |
324 | "memory_usage": 507510784, | |
325 | }, | |
326 | 'agent.host1': { | |
327 | "style": "cephadm:v1", | |
328 | "fsid": FSID, | |
329 | "systemd_unit": f"ceph-{FSID}@agent.host1", | |
330 | "enabled": True, | |
331 | "state": "running", | |
332 | "container_id": None | |
333 | }, 'mon.host1': { | |
334 | "style": "legacy", | |
335 | "name": "mon.host1", | |
336 | "fsid": FSID, | |
337 | "systemd_unit": "ceph-mon@host1", | |
338 | "enabled": False, | |
339 | "state": "stopped", | |
340 | "host_version": None | |
341 | }} | |
342 | ||
343 | _ls.return_value = ls_out | |
344 | _ls_subset.return_value = ls_subset_out | |
345 | ||
346 | with with_cephadm_ctx([]) as ctx: | |
347 | ctx.fsid = FSID | |
348 | agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) | |
349 | ||
350 | # first pass, no cached daemon metadata | |
351 | daemons, changed = agent._get_ls() | |
352 | assert daemons == ls_out | |
353 | assert changed | |
354 | ||
355 | # second pass, should recognize that daemons have not changed and just keep cached values | |
356 | daemons, changed = agent._get_ls() | |
357 | assert daemons == daemons | |
358 | assert not changed | |
359 | ||
360 | # change a container id so it needs to get more info | |
361 | ls_subset_out2 = copy.deepcopy(ls_subset_out) | |
362 | ls_out2 = copy.deepcopy(ls_out) | |
363 | ls_subset_out2['mgr.host1.pntmho']['container_id'] = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e7066034aaaaa' | |
364 | ls_out2[0]['container_id'] = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e7066034aaaaa' | |
365 | _ls.return_value = ls_out2 | |
366 | _ls_subset.return_value = ls_subset_out2 | |
367 | assert agent.cached_ls_values['mgr.host1.pntmho']['container_id'] == "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f" | |
368 | daemons, changed = agent._get_ls() | |
369 | assert daemons == ls_out2 | |
370 | assert changed | |
371 | ||
372 | # run again with the same data so it should use cached values | |
373 | daemons, changed = agent._get_ls() | |
374 | assert daemons == ls_out2 | |
375 | assert not changed | |
376 | ||
377 | # change the state of a container so new daemon metadata is needed | |
378 | ls_subset_out3 = copy.deepcopy(ls_subset_out2) | |
379 | ls_out3 = copy.deepcopy(ls_out2) | |
380 | ls_subset_out3['mgr.host1.pntmho']['enabled'] = False | |
381 | ls_out3[0]['enabled'] = False | |
382 | _ls.return_value = ls_out3 | |
383 | _ls_subset.return_value = ls_subset_out3 | |
384 | assert agent.cached_ls_values['mgr.host1.pntmho']['enabled'] == True | |
385 | daemons, changed = agent._get_ls() | |
386 | assert daemons == ls_out3 | |
387 | assert changed | |
388 | ||
389 | # run again with the same data so it should use cached values | |
390 | daemons, changed = agent._get_ls() | |
391 | assert daemons == ls_out3 | |
392 | assert not changed | |
393 | ||
394 | # remove a daemon so new metadats is needed | |
395 | ls_subset_out4 = copy.deepcopy(ls_subset_out3) | |
396 | ls_out4 = copy.deepcopy(ls_out3) | |
397 | ls_subset_out4.pop('mon.host1') | |
398 | ls_out4.pop() | |
399 | _ls.return_value = ls_out4 | |
400 | _ls_subset.return_value = ls_subset_out4 | |
401 | assert 'mon.host1' in agent.cached_ls_values | |
402 | daemons, changed = agent._get_ls() | |
403 | assert daemons == ls_out4 | |
404 | assert changed | |
405 | ||
406 | # run again with the same data so it should use cached values | |
407 | daemons, changed = agent._get_ls() | |
408 | assert daemons == ls_out4 | |
409 | assert not changed | |
410 | ||
411 | ||
412 | @mock.patch("threading.Event.clear") | |
413 | @mock.patch("threading.Event.wait") | |
414 | @mock.patch("urllib.request.Request.__init__") | |
415 | @mock.patch("cephadm.urlopen") | |
416 | @mock.patch("cephadm.list_networks") | |
417 | @mock.patch("cephadm.HostFacts.dump") | |
418 | @mock.patch("cephadm.HostFacts.__init__", lambda _, __: None) | |
419 | @mock.patch("ssl.SSLContext.load_verify_locations") | |
420 | @mock.patch("threading.Thread.is_alive") | |
421 | @mock.patch("cephadm.MgrListener.start") | |
422 | @mock.patch("cephadm.AgentGatherer.start") | |
423 | @mock.patch("cephadm.port_in_use") | |
424 | @mock.patch("cephadm.CephadmAgent.pull_conf_settings") | |
425 | def test_agent_run(_pull_conf_settings, _port_in_use, _gatherer_start, | |
426 | _listener_start, _is_alive, _load_verify_locations, | |
427 | _HF_dump, _list_networks, _urlopen, _RQ_init, _wait, _clear): | |
428 | target_ip = '192.168.0.0' | |
429 | target_port = '9999' | |
430 | refresh_period = 20 | |
431 | listener_port = 7770 | |
432 | open_listener_port = 7777 | |
433 | host = AGENT_ID | |
434 | device_enhanced_scan = False | |
435 | ||
aee94f69 TL |
436 | def _fake_port_in_use(ctx, endpoint): |
437 | if endpoint.port == open_listener_port: | |
1e59de90 TL |
438 | return False |
439 | return True | |
440 | ||
441 | network_data: Dict[str, Dict[str, Set[str]]] = { | |
442 | "10.2.1.0/24": { | |
443 | "eth1": set(["10.2.1.122"]) | |
444 | }, | |
445 | "192.168.122.0/24": { | |
446 | "eth0": set(["192.168.122.221"]) | |
447 | }, | |
448 | "fe80::/64": { | |
449 | "eth0": set(["fe80::5054:ff:fe3f:d94e"]), | |
450 | "eth1": set(["fe80::5054:ff:fe3f:aa4a"]), | |
451 | } | |
452 | } | |
453 | ||
454 | # the json serializable version of the networks data | |
455 | # we expect the agent to actually send | |
456 | network_data_no_sets: Dict[str, Dict[str, List[str]]] = { | |
457 | "10.2.1.0/24": { | |
458 | "eth1": ["10.2.1.122"] | |
459 | }, | |
460 | "192.168.122.0/24": { | |
461 | "eth0": ["192.168.122.221"] | |
462 | }, | |
463 | "fe80::/64": { | |
464 | "eth0": ["fe80::5054:ff:fe3f:d94e"], | |
465 | "eth1": ["fe80::5054:ff:fe3f:aa4a"], | |
466 | } | |
467 | } | |
468 | ||
469 | class FakeHTTPResponse(): | |
470 | def __init__(self): | |
471 | pass | |
472 | ||
473 | def __enter__(self): | |
474 | return self | |
475 | ||
476 | def __exit__(self, type, value, tb): | |
477 | pass | |
478 | ||
479 | def read(self): | |
480 | return json.dumps({'valid': 'output', 'result': '400'}) | |
481 | ||
482 | _port_in_use.side_effect = _fake_port_in_use | |
483 | _is_alive.return_value = False | |
484 | _HF_dump.return_value = 'Host Facts' | |
485 | _list_networks.return_value = network_data | |
486 | _urlopen.side_effect = lambda *args, **kwargs: FakeHTTPResponse() | |
487 | _RQ_init.side_effect = lambda *args, **kwargs: None | |
488 | with with_cephadm_ctx([]) as ctx: | |
489 | ctx.fsid = FSID | |
490 | agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) | |
491 | agent.keyring = 'agent keyring' | |
492 | agent.ack = 7 | |
493 | agent.volume_gatherer.ack = 7 | |
494 | agent.volume_gatherer.data = 'ceph-volume inventory data' | |
495 | agent.ls_gatherer.ack = 7 | |
496 | agent.ls_gatherer.data = [{'valid_daemon': 'valid_metadata'}] | |
497 | ||
498 | def _set_conf(): | |
499 | agent.target_ip = target_ip | |
500 | agent.target_port = target_port | |
501 | agent.loop_interval = refresh_period | |
502 | agent.starting_port = listener_port | |
503 | agent.host = host | |
504 | agent.device_enhanced_scan = device_enhanced_scan | |
505 | _pull_conf_settings.side_effect = _set_conf | |
506 | ||
507 | # technically the run function loops forever unless the agent | |
508 | # is told to stop. To get around that we're going to have the | |
509 | # event.wait() (which happens at the end of the loop) to throw | |
510 | # a special exception type. If we catch this exception we can | |
511 | # consider it as being a "success" run | |
512 | class EventCleared(Exception): | |
513 | pass | |
514 | ||
515 | _clear.side_effect = EventCleared('SUCCESS') | |
516 | with pytest.raises(EventCleared, match='SUCCESS'): | |
517 | agent.run() | |
518 | ||
519 | expected_data = { | |
520 | 'host': host, | |
521 | 'ls': [{'valid_daemon': 'valid_metadata'}], | |
522 | 'networks': network_data_no_sets, | |
523 | 'facts': 'Host Facts', | |
524 | 'volume': 'ceph-volume inventory data', | |
525 | 'ack': str(7), | |
526 | 'keyring': 'agent keyring', | |
527 | 'port': str(open_listener_port) | |
528 | } | |
529 | _RQ_init.assert_called_with( | |
530 | f'https://{target_ip}:{target_port}/data/', | |
531 | json.dumps(expected_data).encode('ascii'), | |
532 | {'Content-Type': 'application/json'} | |
533 | ) | |
534 | _listener_start.assert_called() | |
535 | _gatherer_start.assert_called() | |
536 | _urlopen.assert_called() | |
537 | ||
538 | # agent should not go down if connections fail | |
539 | _urlopen.side_effect = Exception() | |
540 | with pytest.raises(EventCleared, match='SUCCESS'): | |
541 | agent.run() | |
542 | ||
543 | # should fail if no ports are open for listener | |
544 | _port_in_use.side_effect = lambda _, __: True | |
545 | agent.listener_port = None | |
546 | with pytest.raises(Exception, match='Failed to pick port for agent to listen on: All 1000 ports starting at 7770 taken.'): | |
547 | agent.run() | |
548 | ||
549 | ||
550 | @mock.patch("cephadm.CephadmAgent.pull_conf_settings") | |
551 | @mock.patch("cephadm.CephadmAgent.wakeup") | |
552 | def test_mgr_listener_handle_json_payload(_agent_wakeup, _pull_conf_settings, cephadm_fs): | |
553 | with with_cephadm_ctx([]) as ctx: | |
554 | ctx.fsid = FSID | |
555 | agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) | |
556 | cephadm_fs.create_dir(AGENT_DIR) | |
557 | ||
558 | data_no_config = { | |
559 | 'counter': 7 | |
560 | } | |
561 | agent.mgr_listener.handle_json_payload(data_no_config) | |
562 | _agent_wakeup.assert_not_called() | |
563 | _pull_conf_settings.assert_not_called() | |
564 | assert not any(os.path.exists(os.path.join(AGENT_DIR, s)) for s in agent.required_files) | |
565 | ||
566 | data_with_config = { | |
567 | 'counter': 7, | |
568 | 'config': { | |
569 | 'unrequired-file': 'unrequired-text' | |
570 | } | |
571 | } | |
572 | data_with_config['config'].update({s: f'{s} text' for s in agent.required_files if s != agent.required_files[2]}) | |
573 | agent.mgr_listener.handle_json_payload(data_with_config) | |
574 | _agent_wakeup.assert_called() | |
575 | _pull_conf_settings.assert_called() | |
576 | assert all(os.path.exists(os.path.join(AGENT_DIR, s)) for s in agent.required_files if s != agent.required_files[2]) | |
577 | assert not os.path.exists(os.path.join(AGENT_DIR, agent.required_files[2])) | |
578 | assert not os.path.exists(os.path.join(AGENT_DIR, 'unrequired-file')) | |
579 | ||
580 | ||
581 | @mock.patch("socket.socket") | |
582 | @mock.patch("ssl.SSLContext.wrap_socket") | |
583 | @mock.patch("cephadm.MgrListener.handle_json_payload") | |
584 | @mock.patch("ssl.SSLContext.load_verify_locations") | |
585 | @mock.patch("ssl.SSLContext.load_cert_chain") | |
586 | def test_mgr_listener_run(_load_cert_chain, _load_verify_locations, _handle_json_payload, | |
587 | _wrap_context, _socket, cephadm_fs): | |
588 | ||
589 | with with_cephadm_ctx([]) as ctx: | |
590 | ctx.fsid = FSID | |
591 | agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) | |
592 | cephadm_fs.create_dir(AGENT_DIR) | |
593 | ||
594 | payload = json.dumps({'counter': 3, | |
595 | 'config': {s: f'{s} text' for s in agent.required_files if s != agent.required_files[1]}}) | |
596 | ||
597 | class FakeSocket: | |
598 | ||
599 | def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, fileno=None): | |
600 | self.family = family | |
601 | self.type = type | |
602 | ||
603 | def bind(*args, **kwargs): | |
604 | return | |
605 | ||
606 | def settimeout(*args, **kwargs): | |
607 | return | |
608 | ||
609 | def listen(*args, **kwargs): | |
610 | return | |
611 | ||
612 | class FakeSecureSocket: | |
613 | ||
614 | def __init__(self, pload): | |
615 | self.payload = pload | |
616 | self._conn = FakeConn(self.payload) | |
617 | self.accepted = False | |
618 | ||
619 | def accept(self): | |
620 | # to make mgr listener run loop stop running, | |
621 | # set it to stop after accepting a "connection" | |
622 | # on our fake socket so only one iteration of the loop | |
623 | # actually happens | |
624 | agent.mgr_listener.stop = True | |
625 | accepted = True | |
626 | return self._conn, None | |
627 | ||
628 | def load_cert_chain(*args, **kwargs): | |
629 | return | |
630 | ||
631 | def load_verify_locations(*args, **kwargs): | |
632 | return | |
633 | ||
634 | class FakeConn: | |
635 | ||
636 | def __init__(self, payload: str = ''): | |
637 | payload_len_str = str(len(payload.encode('utf-8'))) | |
638 | while len(payload_len_str.encode('utf-8')) < 10: | |
639 | payload_len_str = '0' + payload_len_str | |
640 | self.payload = (payload_len_str + payload).encode('utf-8') | |
641 | self.buffer_len = len(self.payload) | |
642 | ||
643 | def recv(self, len: Optional[int] = None): | |
644 | if not len or len >= self.buffer_len: | |
645 | ret = self.payload | |
646 | self.payload = b'' | |
647 | self.buffer_len = 0 | |
648 | return ret | |
649 | else: | |
650 | ret = self.payload[:len] | |
651 | self.payload = self.payload[len:] | |
652 | self.buffer_len = self.buffer_len - len | |
653 | return ret | |
654 | ||
655 | FSS_good_data = FakeSecureSocket(payload) | |
656 | FSS_bad_json = FakeSecureSocket('bad json') | |
657 | _socket = FakeSocket | |
658 | agent.listener_port = 7777 | |
659 | ||
660 | # first run, should successfully receive properly structured json payload | |
661 | _wrap_context.side_effect = [FSS_good_data] | |
662 | agent.mgr_listener.stop = False | |
663 | FakeConn.send = mock.Mock(return_value=None) | |
664 | agent.mgr_listener.run() | |
665 | ||
666 | # verify payload was correctly extracted | |
667 | assert _handle_json_payload.called_with(json.loads(payload)) | |
668 | FakeConn.send.assert_called_once_with(b'ACK') | |
669 | ||
670 | # second run, with bad json data received | |
671 | _wrap_context.side_effect = [FSS_bad_json] | |
672 | agent.mgr_listener.stop = False | |
673 | FakeConn.send = mock.Mock(return_value=None) | |
674 | agent.mgr_listener.run() | |
675 | FakeConn.send.assert_called_once_with(b'Failed to extract json payload from message: Expecting value: line 1 column 1 (char 0)') | |
676 | ||
677 | # third run, no proper length as beginning og payload | |
678 | FSS_no_length = FakeSecureSocket(payload) | |
679 | FSS_no_length.payload = FSS_no_length.payload[10:] | |
680 | FSS_no_length._conn.payload = FSS_no_length._conn.payload[10:] | |
681 | FSS_no_length._conn.buffer_len -= 10 | |
682 | _wrap_context.side_effect = [FSS_no_length] | |
683 | agent.mgr_listener.stop = False | |
684 | FakeConn.send = mock.Mock(return_value=None) | |
685 | agent.mgr_listener.run() | |
686 | FakeConn.send.assert_called_once_with(b'Failed to extract length of payload from message: invalid literal for int() with base 10: \'{"counter"\'') | |
687 | ||
688 | # some exception handling for full coverage | |
689 | FSS_exc_testing = FakeSecureSocket(payload) | |
690 | FSS_exc_testing.accept = mock.MagicMock() | |
691 | ||
692 | def _accept(*args, **kwargs): | |
693 | if not FSS_exc_testing.accepted: | |
694 | FSS_exc_testing.accepted = True | |
695 | raise socket.timeout() | |
696 | else: | |
697 | agent.mgr_listener.stop = True | |
698 | raise Exception() | |
699 | ||
700 | FSS_exc_testing.accept.side_effect = _accept | |
701 | _wrap_context.side_effect = [FSS_exc_testing] | |
702 | agent.mgr_listener.stop = False | |
703 | FakeConn.send = mock.Mock(return_value=None) | |
704 | agent.mgr_listener.run() | |
705 | FakeConn.send.assert_not_called() | |
706 | FSS_exc_testing.accept.call_count == 3 | |
707 | ||
708 | ||
709 | @mock.patch("cephadm.CephadmAgent._get_ls") | |
710 | def test_gatherer_update_func(_get_ls, cephadm_fs): | |
711 | with with_cephadm_ctx([]) as ctx: | |
712 | ctx.fsid = FSID | |
713 | agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) | |
714 | cephadm_fs.create_dir(AGENT_DIR) | |
715 | ||
716 | def _sample_func(): | |
717 | return 7 | |
718 | ||
719 | agent.ls_gatherer.func() | |
720 | _get_ls.assert_called() | |
721 | ||
722 | _get_ls = mock.MagicMock() | |
723 | agent.ls_gatherer.update_func(_sample_func) | |
724 | out = agent.ls_gatherer.func() | |
725 | assert out == 7 | |
726 | _get_ls.assert_not_called() | |
727 | ||
728 | ||
729 | @mock.patch("cephadm.CephadmAgent.wakeup") | |
730 | @mock.patch("time.monotonic") | |
731 | @mock.patch("threading.Event.wait") | |
732 | def test_gatherer_run(_wait, _time, _agent_wakeup, cephadm_fs): | |
733 | with with_cephadm_ctx([]) as ctx: | |
734 | ctx.fsid = FSID | |
735 | agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID) | |
736 | cephadm_fs.create_dir(AGENT_DIR) | |
737 | agent.loop_interval = 30 | |
738 | agent.ack = 23 | |
739 | ||
740 | _sample_func = lambda *args, **kwargs: ('sample out', True) | |
741 | agent.ls_gatherer.update_func(_sample_func) | |
742 | agent.ls_gatherer.ack = 20 | |
743 | agent.ls_gatherer.stop = False | |
744 | ||
745 | def _fake_clear(*args, **kwargs): | |
746 | agent.ls_gatherer.stop = True | |
747 | ||
748 | _time.side_effect = [0, 20, 0, 20, 0, 20] # start at time 0, complete at time 20 | |
749 | _wait.return_value = None | |
750 | ||
751 | with mock.patch("threading.Event.clear") as _clear: | |
752 | _clear.side_effect = _fake_clear | |
753 | agent.ls_gatherer.run() | |
754 | ||
755 | _wait.assert_called_with(10) # agent loop_interval - run time | |
756 | assert agent.ls_gatherer.data == 'sample out' | |
757 | assert agent.ls_gatherer.ack == 23 | |
758 | _agent_wakeup.assert_called_once() | |
759 | _clear.assert_called_once() | |
760 | ||
761 | _exc_func = lambda *args, **kwargs: Exception() | |
762 | agent.ls_gatherer.update_func(_exc_func) | |
763 | agent.ls_gatherer.ack = 20 | |
764 | agent.ls_gatherer.stop = False | |
765 | ||
766 | with mock.patch("threading.Event.clear") as _clear: | |
767 | _clear.side_effect = _fake_clear | |
768 | agent.ls_gatherer.run() | |
769 | assert agent.ls_gatherer.data is None | |
770 | assert agent.ls_gatherer.ack == agent.ack | |
771 | # should have run full loop despite exception | |
772 | _clear.assert_called_once() | |
773 | ||
774 | # test general exception for full coverage | |
775 | _agent_wakeup.side_effect = [Exception()] | |
776 | agent.ls_gatherer.update_func(_sample_func) | |
777 | agent.ls_gatherer.stop = False | |
778 | # just to force only one iteration | |
779 | _time.side_effect = _fake_clear | |
780 | with mock.patch("threading.Event.clear") as _clear: | |
781 | _clear.side_effect = Exception() | |
782 | agent.ls_gatherer.run() | |
783 | assert agent.ls_gatherer.data == 'sample out' | |
784 | assert agent.ls_gatherer.ack == agent.ack | |
785 | # should not have gotten to end of loop | |
786 | _clear.assert_not_called() | |
787 | ||
788 | ||
789 | @mock.patch("cephadm.CephadmAgent.run") | |
790 | def test_command_agent(_agent_run, cephadm_fs): | |
791 | with with_cephadm_ctx([]) as ctx: | |
792 | ctx.fsid = FSID | |
793 | ctx.daemon_id = AGENT_ID | |
794 | ||
795 | with pytest.raises(Exception, match=f"Agent daemon directory {AGENT_DIR} does not exist. Perhaps agent was never deployed?"): | |
796 | _cephadm.command_agent(ctx) | |
797 | ||
798 | cephadm_fs.create_dir(AGENT_DIR) | |
799 | _cephadm.command_agent(ctx) | |
800 | _agent_run.assert_called() |