]> git.proxmox.com Git - ceph.git/blame - ceph/src/cephadm/tests/test_agent.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / cephadm / tests / test_agent.py
CommitLineData
1e59de90
TL
1from unittest import mock
2import copy, datetime, json, os, socket, threading
3
4import pytest
5
6from tests.fixtures import with_cephadm_ctx, cephadm_fs, import_cephadm
7
8from typing import Optional
9
10_cephadm = import_cephadm()
11
12
13FSID = "beefbeef-beef-beef-1234-beefbeefbeef"
14AGENT_ID = 'host1'
15AGENT_DIR = f'/var/lib/ceph/{FSID}/agent.{AGENT_ID}'
16
17
18def test_agent_validate():
19 required_files = _cephadm.CephadmAgent.required_files
20 with with_cephadm_ctx([]) as ctx:
21 agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
22 for i in range(len(required_files)):
23 incomplete_files = {s: 'text' for s in [f for j, f in enumerate(required_files) if j != i]}
24 with pytest.raises(_cephadm.Error, match=f'required file missing from config: {required_files[i]}'):
25 agent.validate(incomplete_files)
26 all_files = {s: 'text' for s in required_files}
27 agent.validate(all_files)
28
29
30def _check_file(path, content):
31 assert os.path.exists(path)
32 with open(path) as f:
33 fcontent = f.read()
34 assert fcontent == content
35
36
37@mock.patch('cephadm.call_throws')
38def test_agent_deploy_daemon_unit(_call_throws, cephadm_fs):
39 _call_throws.return_value = ('', '', 0)
40 agent_id = AGENT_ID
41
42 with with_cephadm_ctx([]) as ctx:
43 ctx.meta_json = json.dumps({'meta': 'data'})
44 agent = _cephadm.CephadmAgent(ctx, FSID, agent_id)
45 cephadm_fs.create_dir(AGENT_DIR)
46
47 with pytest.raises(_cephadm.Error, match='Agent needs a config'):
48 agent.deploy_daemon_unit()
49
50 config = {s: f'text for {s}' for s in _cephadm.CephadmAgent.required_files}
51 config['not-required-file.txt'] = 'don\'t write me'
52
53 agent.deploy_daemon_unit(config)
54
55 # check required config file were all created
56 for fname in _cephadm.CephadmAgent.required_files:
57 _check_file(f'{AGENT_DIR}/{fname}', f'text for {fname}')
58
59 # assert non-required file was not written
60 assert not os.path.exists(f'{AGENT_DIR}/not-required-file.txt')
61
62 # check unit.run file was created correctly
63 _check_file(f'{AGENT_DIR}/unit.run', agent.unit_run())
64
65 # check unit.meta file created correctly
66 _check_file(f'{AGENT_DIR}/unit.meta', json.dumps({'meta': 'data'}, indent=4) + '\n')
67
68 # check unit file was created correctly
69 _check_file(f'{ctx.unit_dir}/{agent.unit_name()}', agent.unit_file())
70
71 expected_call_throws_calls = [
72 mock.call(ctx, ['systemctl', 'daemon-reload']),
73 mock.call(ctx, ['systemctl', 'enable', '--now', agent.unit_name()]),
74 ]
75 _call_throws.assert_has_calls(expected_call_throws_calls)
76
77 expected_call_calls = [
78 mock.call(ctx, ['systemctl', 'stop', agent.unit_name()], verbosity=_cephadm.CallVerbosity.DEBUG),
79 mock.call(ctx, ['systemctl', 'reset-failed', agent.unit_name()], verbosity=_cephadm.CallVerbosity.DEBUG),
80 ]
81 _cephadm.call.assert_has_calls(expected_call_calls)
82
83
84@mock.patch('threading.Thread.is_alive')
85def test_agent_shutdown(_is_alive):
86 with with_cephadm_ctx([]) as ctx:
87 agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
88 _is_alive.return_value = True
89 assert agent.stop == False
90 assert agent.mgr_listener.stop == False
91 assert agent.ls_gatherer.stop == False
92 assert agent.volume_gatherer.stop == False
93 agent.shutdown()
94 assert agent.stop == True
95 assert agent.mgr_listener.stop == True
96 assert agent.ls_gatherer.stop == True
97 assert agent.volume_gatherer.stop == True
98
99
100def test_agent_wakeup():
101 with with_cephadm_ctx([]) as ctx:
102 agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
103 assert agent.event.is_set() == False
104 agent.wakeup()
105 assert agent.event.is_set() == True
106
107
108@mock.patch("cephadm.CephadmAgent.shutdown")
109@mock.patch("cephadm.AgentGatherer.update_func")
110def test_pull_conf_settings(_update_func, _shutdown, cephadm_fs):
111 target_ip = '192.168.0.0'
112 target_port = 9876
113 refresh_period = 20
114 listener_port = 5678
115 host = AGENT_ID
116 device_enhanced_scan = 'True'
117 with with_cephadm_ctx([]) as ctx:
118 agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
119 full_config = {
120 'target_ip': target_ip,
121 'target_port': target_port,
122 'refresh_period': refresh_period,
123 'listener_port': listener_port,
124 'host': host,
125 'device_enhanced_scan': device_enhanced_scan
126 }
127 cephadm_fs.create_dir(AGENT_DIR)
128 with open(agent.config_path, 'w') as f:
129 f.write(json.dumps(full_config))
130
131 with pytest.raises(_cephadm.Error, match="Failed to get agent keyring:"):
132 agent.pull_conf_settings()
133 _shutdown.assert_called()
134 with open(agent.keyring_path, 'w') as f:
135 f.write('keyring')
136
137 assert agent.device_enhanced_scan == False
138 agent.pull_conf_settings()
139 assert agent.host == host
140 assert agent.target_ip == target_ip
141 assert agent.target_port == target_port
142 assert agent.loop_interval == refresh_period
143 assert agent.starting_port == listener_port
144 assert agent.device_enhanced_scan == True
145 assert agent.keyring == 'keyring'
146 _update_func.assert_called()
147
148 full_config.pop('target_ip')
149 with open(agent.config_path, 'w') as f:
150 f.write(json.dumps(full_config))
151 with pytest.raises(_cephadm.Error, match="Failed to get agent target ip and port from config:"):
152 agent.pull_conf_settings()
153
154
155@mock.patch("cephadm.command_ceph_volume")
156def test_agent_ceph_volume(_ceph_volume):
157
158 def _ceph_volume_outputter(_):
159 print("ceph-volume output")
160
161 def _ceph_volume_empty(_):
162 pass
163
164 with with_cephadm_ctx([]) as ctx:
165 agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
166
167 _ceph_volume.side_effect = _ceph_volume_outputter
168 out, _ = agent._ceph_volume(False)
169 assert ctx.command == ['inventory', '--format=json']
170 assert out == "ceph-volume output\n"
171
172 out, _ = agent._ceph_volume(True)
173 assert ctx.command == ['inventory', '--format=json', '--with-lsm']
174 assert out == "ceph-volume output\n"
175
176 _ceph_volume.side_effect = _ceph_volume_empty
177 with pytest.raises(Exception, match='ceph-volume returned empty value'):
178 out, _ = agent._ceph_volume(False)
179
180
181def test_agent_daemon_ls_subset(cephadm_fs):
182 # Basing part of this test on some actual sample output
183
184 # Some sample "podman stats --format '{{.ID}},{{.MemUsage}}' --no-stream" output
185 # 3f2b31d19ecd,456.4MB / 41.96GB
186 # 5aca2499e0f8,7.082MB / 41.96GB
187 # fe0cef07d5f7,35.91MB / 41.96GB
188
189 # Sample "podman ps --format '{{.ID}},{{.Names}}' --no-trunc" output with the same containers
190 # fe0cef07d5f71c5c604f7d1b4a4ac2e27873c96089d015014524e803361b4a30,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-mon-host1
191 # 3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-mgr-host1-pntmho
192 # 5aca2499e0f8fb903788ff90eb03fe6ed58c7ed177caf278fed199936aff7b4a,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-crash-host1
193
194 # Some of the components from that output
195 mgr_cid = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f'
196 mon_cid = 'fe0cef07d5f71c5c604f7d1b4a4ac2e27873c96089d015014524e803361b4a30'
197 crash_cid = '5aca2499e0f8fb903788ff90eb03fe6ed58c7ed177caf278fed199936aff7b4a'
198 mgr_short_cid = mgr_cid[0:12]
199 mon_short_cid = mon_cid[0:12]
200 crash_short_cid = crash_cid[0:12]
201
202 #Rebuilding the output but with our testing FSID and components (to allow alteration later for whatever reason)
203 mem_out = f"""{mgr_short_cid},456.4MB / 41.96GB
204{crash_short_cid},7.082MB / 41.96GB
205{mon_short_cid},35.91MB / 41.96GB"""
206
207 ps_out = f"""{mon_cid},ceph-{FSID}-mon-host1
208{mgr_cid},ceph-{FSID}-mgr-host1-pntmho
209{crash_cid},ceph-{FSID}-crash-host1"""
210
211 def _fake_call(ctx, cmd, desc=None, verbosity=_cephadm.CallVerbosity.VERBOSE_ON_FAILURE, timeout=_cephadm.DEFAULT_TIMEOUT, **kwargs):
212 if 'stats' in cmd:
213 return (mem_out, '', 0)
214 elif 'ps' in cmd:
215 return (ps_out, '', 0)
216 return ('out', 'err', 0)
217
218 cephadm_fs.create_dir(AGENT_DIR)
219 cephadm_fs.create_dir(f'/var/lib/ceph/mon/ceph-host1') # legacy daemon
220 cephadm_fs.create_dir(f'/var/lib/ceph/osd/nothing') # improper directory, should be skipped
221 cephadm_fs.create_dir(f'/var/lib/ceph/{FSID}/mgr.host1.pntmho') # cephadm daemon
222 cephadm_fs.create_dir(f'/var/lib/ceph/{FSID}/crash.host1') # cephadm daemon
223
224 with with_cephadm_ctx([]) as ctx:
225 ctx.fsid = FSID
226 agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
227 _cephadm.call.side_effect = _fake_call
228 daemons = agent._daemon_ls_subset()
229
230 assert 'agent.host1' in daemons
231 assert 'mgr.host1.pntmho' in daemons
232 assert 'crash.host1' in daemons
233 assert 'mon.host1' in daemons
234
235 assert daemons['mon.host1']['style'] == 'legacy'
236 assert daemons['mgr.host1.pntmho']['style'] == 'cephadm:v1'
237 assert daemons['crash.host1']['style'] == 'cephadm:v1'
238 assert daemons['agent.host1']['style'] == 'cephadm:v1'
239
240 assert daemons['mgr.host1.pntmho']['systemd_unit'] == f'ceph-{FSID}@mgr.host1.pntmho'
241 assert daemons['agent.host1']['systemd_unit'] == f'ceph-{FSID}@agent.host1'
242 assert daemons['crash.host1']['systemd_unit'] == f'ceph-{FSID}@crash.host1'
243
244 assert daemons['mgr.host1.pntmho']['container_id'] == mgr_cid
245 assert daemons['crash.host1']['container_id'] == crash_cid
246
247 assert daemons['mgr.host1.pntmho']['memory_usage'] == 478570086 # 456.4 MB
248 assert daemons['crash.host1']['memory_usage'] == 7426015 # 7.082 MB
249
250
251@mock.patch("cephadm.list_daemons")
252@mock.patch("cephadm.CephadmAgent._daemon_ls_subset")
253def test_agent_get_ls(_ls_subset, _ls, cephadm_fs):
254 ls_out = [{
255 "style": "cephadm:v1",
256 "name": "mgr.host1.pntmho",
257 "fsid": FSID,
258 "systemd_unit": f"ceph-{FSID}@mgr.host1.pntmho",
259 "enabled": True,
260 "state": "running",
261 "service_name": "mgr",
262 "memory_request": None,
263 "memory_limit": None,
264 "ports": [
265 9283,
266 8765
267 ],
268 "container_id": "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f",
269 "container_image_name": "quay.io/ceph/ceph:testing",
270 "container_image_id": "3300e39269f0c13ae45026cf233d8b3fff1303d52f2598a69c7fba0bb8405164",
271 "container_image_digests": [
272 "quay.io/ceph/ceph@sha256:d4f3522528ee79904f9e530bdce438acac30a039e9a0b3cf31d8b614f9f96a30"
273 ],
274 "memory_usage": 507510784,
275 "cpu_percentage": "5.95%",
276 "version": "18.0.0-556-gb4d1a199",
277 "started": "2022-10-27T14:19:36.086664Z",
278 "created": "2022-10-27T14:19:36.282281Z",
279 "deployed": "2022-10-27T14:19:35.377275Z",
280 "configured": "2022-10-27T14:22:40.316912Z"
281 },{
282 "style": "cephadm:v1",
283 "name": "agent.host1",
284 "fsid": FSID,
285 "systemd_unit": f"ceph-{FSID}@agent.host1",
286 "enabled": True,
287 "state": "running",
288 "service_name": "agent",
289 "ports": [],
290 "ip": None,
291 "deployed_by": [
292 "quay.io/ceph/ceph@sha256:d4f3522528ee79904f9e530bdce438acac30a039e9a0b3cf31d8b614f9f96a30"
293 ],
294 "rank": None,
295 "rank_generation": None,
296 "extra_container_args": None,
297 "container_id": None,
298 "container_image_name": None,
299 "container_image_id": None,
300 "container_image_digests": None,
301 "version": None,
302 "started": None,
303 "created": "2022-10-27T19:46:49.751594Z",
304 "deployed": None,
305 "configured": "2022-10-27T19:46:49.751594Z"
306 }, {
307 "style": "legacy",
308 "name": "mon.host1",
309 "fsid": FSID,
310 "systemd_unit": "ceph-mon@host1",
311 "enabled": False,
312 "state": "stopped",
313 "host_version": None
314 }]
315
316 ls_subset_out = {
317 'mgr.host1.pntmho': {
318 "style": "cephadm:v1",
319 "fsid": FSID,
320 "systemd_unit": f"ceph-{FSID}@mgr.host1.pntmho",
321 "enabled": True,
322 "state": "running",
323 "container_id": "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f",
324 "memory_usage": 507510784,
325 },
326 'agent.host1': {
327 "style": "cephadm:v1",
328 "fsid": FSID,
329 "systemd_unit": f"ceph-{FSID}@agent.host1",
330 "enabled": True,
331 "state": "running",
332 "container_id": None
333 }, 'mon.host1': {
334 "style": "legacy",
335 "name": "mon.host1",
336 "fsid": FSID,
337 "systemd_unit": "ceph-mon@host1",
338 "enabled": False,
339 "state": "stopped",
340 "host_version": None
341 }}
342
343 _ls.return_value = ls_out
344 _ls_subset.return_value = ls_subset_out
345
346 with with_cephadm_ctx([]) as ctx:
347 ctx.fsid = FSID
348 agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
349
350 # first pass, no cached daemon metadata
351 daemons, changed = agent._get_ls()
352 assert daemons == ls_out
353 assert changed
354
355 # second pass, should recognize that daemons have not changed and just keep cached values
356 daemons, changed = agent._get_ls()
357 assert daemons == daemons
358 assert not changed
359
360 # change a container id so it needs to get more info
361 ls_subset_out2 = copy.deepcopy(ls_subset_out)
362 ls_out2 = copy.deepcopy(ls_out)
363 ls_subset_out2['mgr.host1.pntmho']['container_id'] = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e7066034aaaaa'
364 ls_out2[0]['container_id'] = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e7066034aaaaa'
365 _ls.return_value = ls_out2
366 _ls_subset.return_value = ls_subset_out2
367 assert agent.cached_ls_values['mgr.host1.pntmho']['container_id'] == "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f"
368 daemons, changed = agent._get_ls()
369 assert daemons == ls_out2
370 assert changed
371
372 # run again with the same data so it should use cached values
373 daemons, changed = agent._get_ls()
374 assert daemons == ls_out2
375 assert not changed
376
377 # change the state of a container so new daemon metadata is needed
378 ls_subset_out3 = copy.deepcopy(ls_subset_out2)
379 ls_out3 = copy.deepcopy(ls_out2)
380 ls_subset_out3['mgr.host1.pntmho']['enabled'] = False
381 ls_out3[0]['enabled'] = False
382 _ls.return_value = ls_out3
383 _ls_subset.return_value = ls_subset_out3
384 assert agent.cached_ls_values['mgr.host1.pntmho']['enabled'] == True
385 daemons, changed = agent._get_ls()
386 assert daemons == ls_out3
387 assert changed
388
389 # run again with the same data so it should use cached values
390 daemons, changed = agent._get_ls()
391 assert daemons == ls_out3
392 assert not changed
393
394 # remove a daemon so new metadats is needed
395 ls_subset_out4 = copy.deepcopy(ls_subset_out3)
396 ls_out4 = copy.deepcopy(ls_out3)
397 ls_subset_out4.pop('mon.host1')
398 ls_out4.pop()
399 _ls.return_value = ls_out4
400 _ls_subset.return_value = ls_subset_out4
401 assert 'mon.host1' in agent.cached_ls_values
402 daemons, changed = agent._get_ls()
403 assert daemons == ls_out4
404 assert changed
405
406 # run again with the same data so it should use cached values
407 daemons, changed = agent._get_ls()
408 assert daemons == ls_out4
409 assert not changed
410
411
412@mock.patch("threading.Event.clear")
413@mock.patch("threading.Event.wait")
414@mock.patch("urllib.request.Request.__init__")
415@mock.patch("cephadm.urlopen")
416@mock.patch("cephadm.list_networks")
417@mock.patch("cephadm.HostFacts.dump")
418@mock.patch("cephadm.HostFacts.__init__", lambda _, __: None)
419@mock.patch("ssl.SSLContext.load_verify_locations")
420@mock.patch("threading.Thread.is_alive")
421@mock.patch("cephadm.MgrListener.start")
422@mock.patch("cephadm.AgentGatherer.start")
423@mock.patch("cephadm.port_in_use")
424@mock.patch("cephadm.CephadmAgent.pull_conf_settings")
425def test_agent_run(_pull_conf_settings, _port_in_use, _gatherer_start,
426 _listener_start, _is_alive, _load_verify_locations,
427 _HF_dump, _list_networks, _urlopen, _RQ_init, _wait, _clear):
428 target_ip = '192.168.0.0'
429 target_port = '9999'
430 refresh_period = 20
431 listener_port = 7770
432 open_listener_port = 7777
433 host = AGENT_ID
434 device_enhanced_scan = False
435
aee94f69
TL
436 def _fake_port_in_use(ctx, endpoint):
437 if endpoint.port == open_listener_port:
1e59de90
TL
438 return False
439 return True
440
441 network_data: Dict[str, Dict[str, Set[str]]] = {
442 "10.2.1.0/24": {
443 "eth1": set(["10.2.1.122"])
444 },
445 "192.168.122.0/24": {
446 "eth0": set(["192.168.122.221"])
447 },
448 "fe80::/64": {
449 "eth0": set(["fe80::5054:ff:fe3f:d94e"]),
450 "eth1": set(["fe80::5054:ff:fe3f:aa4a"]),
451 }
452 }
453
454 # the json serializable version of the networks data
455 # we expect the agent to actually send
456 network_data_no_sets: Dict[str, Dict[str, List[str]]] = {
457 "10.2.1.0/24": {
458 "eth1": ["10.2.1.122"]
459 },
460 "192.168.122.0/24": {
461 "eth0": ["192.168.122.221"]
462 },
463 "fe80::/64": {
464 "eth0": ["fe80::5054:ff:fe3f:d94e"],
465 "eth1": ["fe80::5054:ff:fe3f:aa4a"],
466 }
467 }
468
469 class FakeHTTPResponse():
470 def __init__(self):
471 pass
472
473 def __enter__(self):
474 return self
475
476 def __exit__(self, type, value, tb):
477 pass
478
479 def read(self):
480 return json.dumps({'valid': 'output', 'result': '400'})
481
482 _port_in_use.side_effect = _fake_port_in_use
483 _is_alive.return_value = False
484 _HF_dump.return_value = 'Host Facts'
485 _list_networks.return_value = network_data
486 _urlopen.side_effect = lambda *args, **kwargs: FakeHTTPResponse()
487 _RQ_init.side_effect = lambda *args, **kwargs: None
488 with with_cephadm_ctx([]) as ctx:
489 ctx.fsid = FSID
490 agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
491 agent.keyring = 'agent keyring'
492 agent.ack = 7
493 agent.volume_gatherer.ack = 7
494 agent.volume_gatherer.data = 'ceph-volume inventory data'
495 agent.ls_gatherer.ack = 7
496 agent.ls_gatherer.data = [{'valid_daemon': 'valid_metadata'}]
497
498 def _set_conf():
499 agent.target_ip = target_ip
500 agent.target_port = target_port
501 agent.loop_interval = refresh_period
502 agent.starting_port = listener_port
503 agent.host = host
504 agent.device_enhanced_scan = device_enhanced_scan
505 _pull_conf_settings.side_effect = _set_conf
506
507 # technically the run function loops forever unless the agent
508 # is told to stop. To get around that we're going to have the
509 # event.wait() (which happens at the end of the loop) to throw
510 # a special exception type. If we catch this exception we can
511 # consider it as being a "success" run
512 class EventCleared(Exception):
513 pass
514
515 _clear.side_effect = EventCleared('SUCCESS')
516 with pytest.raises(EventCleared, match='SUCCESS'):
517 agent.run()
518
519 expected_data = {
520 'host': host,
521 'ls': [{'valid_daemon': 'valid_metadata'}],
522 'networks': network_data_no_sets,
523 'facts': 'Host Facts',
524 'volume': 'ceph-volume inventory data',
525 'ack': str(7),
526 'keyring': 'agent keyring',
527 'port': str(open_listener_port)
528 }
529 _RQ_init.assert_called_with(
530 f'https://{target_ip}:{target_port}/data/',
531 json.dumps(expected_data).encode('ascii'),
532 {'Content-Type': 'application/json'}
533 )
534 _listener_start.assert_called()
535 _gatherer_start.assert_called()
536 _urlopen.assert_called()
537
538 # agent should not go down if connections fail
539 _urlopen.side_effect = Exception()
540 with pytest.raises(EventCleared, match='SUCCESS'):
541 agent.run()
542
543 # should fail if no ports are open for listener
544 _port_in_use.side_effect = lambda _, __: True
545 agent.listener_port = None
546 with pytest.raises(Exception, match='Failed to pick port for agent to listen on: All 1000 ports starting at 7770 taken.'):
547 agent.run()
548
549
550@mock.patch("cephadm.CephadmAgent.pull_conf_settings")
551@mock.patch("cephadm.CephadmAgent.wakeup")
552def test_mgr_listener_handle_json_payload(_agent_wakeup, _pull_conf_settings, cephadm_fs):
553 with with_cephadm_ctx([]) as ctx:
554 ctx.fsid = FSID
555 agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
556 cephadm_fs.create_dir(AGENT_DIR)
557
558 data_no_config = {
559 'counter': 7
560 }
561 agent.mgr_listener.handle_json_payload(data_no_config)
562 _agent_wakeup.assert_not_called()
563 _pull_conf_settings.assert_not_called()
564 assert not any(os.path.exists(os.path.join(AGENT_DIR, s)) for s in agent.required_files)
565
566 data_with_config = {
567 'counter': 7,
568 'config': {
569 'unrequired-file': 'unrequired-text'
570 }
571 }
572 data_with_config['config'].update({s: f'{s} text' for s in agent.required_files if s != agent.required_files[2]})
573 agent.mgr_listener.handle_json_payload(data_with_config)
574 _agent_wakeup.assert_called()
575 _pull_conf_settings.assert_called()
576 assert all(os.path.exists(os.path.join(AGENT_DIR, s)) for s in agent.required_files if s != agent.required_files[2])
577 assert not os.path.exists(os.path.join(AGENT_DIR, agent.required_files[2]))
578 assert not os.path.exists(os.path.join(AGENT_DIR, 'unrequired-file'))
579
580
581@mock.patch("socket.socket")
582@mock.patch("ssl.SSLContext.wrap_socket")
583@mock.patch("cephadm.MgrListener.handle_json_payload")
584@mock.patch("ssl.SSLContext.load_verify_locations")
585@mock.patch("ssl.SSLContext.load_cert_chain")
586def test_mgr_listener_run(_load_cert_chain, _load_verify_locations, _handle_json_payload,
587 _wrap_context, _socket, cephadm_fs):
588
589 with with_cephadm_ctx([]) as ctx:
590 ctx.fsid = FSID
591 agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
592 cephadm_fs.create_dir(AGENT_DIR)
593
594 payload = json.dumps({'counter': 3,
595 'config': {s: f'{s} text' for s in agent.required_files if s != agent.required_files[1]}})
596
597 class FakeSocket:
598
599 def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, fileno=None):
600 self.family = family
601 self.type = type
602
603 def bind(*args, **kwargs):
604 return
605
606 def settimeout(*args, **kwargs):
607 return
608
609 def listen(*args, **kwargs):
610 return
611
612 class FakeSecureSocket:
613
614 def __init__(self, pload):
615 self.payload = pload
616 self._conn = FakeConn(self.payload)
617 self.accepted = False
618
619 def accept(self):
620 # to make mgr listener run loop stop running,
621 # set it to stop after accepting a "connection"
622 # on our fake socket so only one iteration of the loop
623 # actually happens
624 agent.mgr_listener.stop = True
625 accepted = True
626 return self._conn, None
627
628 def load_cert_chain(*args, **kwargs):
629 return
630
631 def load_verify_locations(*args, **kwargs):
632 return
633
634 class FakeConn:
635
636 def __init__(self, payload: str = ''):
637 payload_len_str = str(len(payload.encode('utf-8')))
638 while len(payload_len_str.encode('utf-8')) < 10:
639 payload_len_str = '0' + payload_len_str
640 self.payload = (payload_len_str + payload).encode('utf-8')
641 self.buffer_len = len(self.payload)
642
643 def recv(self, len: Optional[int] = None):
644 if not len or len >= self.buffer_len:
645 ret = self.payload
646 self.payload = b''
647 self.buffer_len = 0
648 return ret
649 else:
650 ret = self.payload[:len]
651 self.payload = self.payload[len:]
652 self.buffer_len = self.buffer_len - len
653 return ret
654
655 FSS_good_data = FakeSecureSocket(payload)
656 FSS_bad_json = FakeSecureSocket('bad json')
657 _socket = FakeSocket
658 agent.listener_port = 7777
659
660 # first run, should successfully receive properly structured json payload
661 _wrap_context.side_effect = [FSS_good_data]
662 agent.mgr_listener.stop = False
663 FakeConn.send = mock.Mock(return_value=None)
664 agent.mgr_listener.run()
665
666 # verify payload was correctly extracted
667 assert _handle_json_payload.called_with(json.loads(payload))
668 FakeConn.send.assert_called_once_with(b'ACK')
669
670 # second run, with bad json data received
671 _wrap_context.side_effect = [FSS_bad_json]
672 agent.mgr_listener.stop = False
673 FakeConn.send = mock.Mock(return_value=None)
674 agent.mgr_listener.run()
675 FakeConn.send.assert_called_once_with(b'Failed to extract json payload from message: Expecting value: line 1 column 1 (char 0)')
676
677 # third run, no proper length as beginning og payload
678 FSS_no_length = FakeSecureSocket(payload)
679 FSS_no_length.payload = FSS_no_length.payload[10:]
680 FSS_no_length._conn.payload = FSS_no_length._conn.payload[10:]
681 FSS_no_length._conn.buffer_len -= 10
682 _wrap_context.side_effect = [FSS_no_length]
683 agent.mgr_listener.stop = False
684 FakeConn.send = mock.Mock(return_value=None)
685 agent.mgr_listener.run()
686 FakeConn.send.assert_called_once_with(b'Failed to extract length of payload from message: invalid literal for int() with base 10: \'{"counter"\'')
687
688 # some exception handling for full coverage
689 FSS_exc_testing = FakeSecureSocket(payload)
690 FSS_exc_testing.accept = mock.MagicMock()
691
692 def _accept(*args, **kwargs):
693 if not FSS_exc_testing.accepted:
694 FSS_exc_testing.accepted = True
695 raise socket.timeout()
696 else:
697 agent.mgr_listener.stop = True
698 raise Exception()
699
700 FSS_exc_testing.accept.side_effect = _accept
701 _wrap_context.side_effect = [FSS_exc_testing]
702 agent.mgr_listener.stop = False
703 FakeConn.send = mock.Mock(return_value=None)
704 agent.mgr_listener.run()
705 FakeConn.send.assert_not_called()
706 FSS_exc_testing.accept.call_count == 3
707
708
709@mock.patch("cephadm.CephadmAgent._get_ls")
710def test_gatherer_update_func(_get_ls, cephadm_fs):
711 with with_cephadm_ctx([]) as ctx:
712 ctx.fsid = FSID
713 agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
714 cephadm_fs.create_dir(AGENT_DIR)
715
716 def _sample_func():
717 return 7
718
719 agent.ls_gatherer.func()
720 _get_ls.assert_called()
721
722 _get_ls = mock.MagicMock()
723 agent.ls_gatherer.update_func(_sample_func)
724 out = agent.ls_gatherer.func()
725 assert out == 7
726 _get_ls.assert_not_called()
727
728
729@mock.patch("cephadm.CephadmAgent.wakeup")
730@mock.patch("time.monotonic")
731@mock.patch("threading.Event.wait")
732def test_gatherer_run(_wait, _time, _agent_wakeup, cephadm_fs):
733 with with_cephadm_ctx([]) as ctx:
734 ctx.fsid = FSID
735 agent = _cephadm.CephadmAgent(ctx, FSID, AGENT_ID)
736 cephadm_fs.create_dir(AGENT_DIR)
737 agent.loop_interval = 30
738 agent.ack = 23
739
740 _sample_func = lambda *args, **kwargs: ('sample out', True)
741 agent.ls_gatherer.update_func(_sample_func)
742 agent.ls_gatherer.ack = 20
743 agent.ls_gatherer.stop = False
744
745 def _fake_clear(*args, **kwargs):
746 agent.ls_gatherer.stop = True
747
748 _time.side_effect = [0, 20, 0, 20, 0, 20] # start at time 0, complete at time 20
749 _wait.return_value = None
750
751 with mock.patch("threading.Event.clear") as _clear:
752 _clear.side_effect = _fake_clear
753 agent.ls_gatherer.run()
754
755 _wait.assert_called_with(10) # agent loop_interval - run time
756 assert agent.ls_gatherer.data == 'sample out'
757 assert agent.ls_gatherer.ack == 23
758 _agent_wakeup.assert_called_once()
759 _clear.assert_called_once()
760
761 _exc_func = lambda *args, **kwargs: Exception()
762 agent.ls_gatherer.update_func(_exc_func)
763 agent.ls_gatherer.ack = 20
764 agent.ls_gatherer.stop = False
765
766 with mock.patch("threading.Event.clear") as _clear:
767 _clear.side_effect = _fake_clear
768 agent.ls_gatherer.run()
769 assert agent.ls_gatherer.data is None
770 assert agent.ls_gatherer.ack == agent.ack
771 # should have run full loop despite exception
772 _clear.assert_called_once()
773
774 # test general exception for full coverage
775 _agent_wakeup.side_effect = [Exception()]
776 agent.ls_gatherer.update_func(_sample_func)
777 agent.ls_gatherer.stop = False
778 # just to force only one iteration
779 _time.side_effect = _fake_clear
780 with mock.patch("threading.Event.clear") as _clear:
781 _clear.side_effect = Exception()
782 agent.ls_gatherer.run()
783 assert agent.ls_gatherer.data == 'sample out'
784 assert agent.ls_gatherer.ack == agent.ack
785 # should not have gotten to end of loop
786 _clear.assert_not_called()
787
788
789@mock.patch("cephadm.CephadmAgent.run")
790def test_command_agent(_agent_run, cephadm_fs):
791 with with_cephadm_ctx([]) as ctx:
792 ctx.fsid = FSID
793 ctx.daemon_id = AGENT_ID
794
795 with pytest.raises(Exception, match=f"Agent daemon directory {AGENT_DIR} does not exist. Perhaps agent was never deployed?"):
796 _cephadm.command_agent(ctx)
797
798 cephadm_fs.create_dir(AGENT_DIR)
799 _cephadm.command_agent(ctx)
800 _agent_run.assert_called()