1 from unittest
import mock
2 import copy
, datetime
, json
, os
, socket
, threading
6 from tests
.fixtures
import with_cephadm_ctx
, cephadm_fs
, import_cephadm
8 from typing
import Optional
10 _cephadm
= import_cephadm()
13 FSID
= "beefbeef-beef-beef-1234-beefbeefbeef"
15 AGENT_DIR
= f
'/var/lib/ceph/{FSID}/agent.{AGENT_ID}'
18 def test_agent_validate():
19 required_files
= _cephadm
.CephadmAgent
.required_files
20 with
with_cephadm_ctx([]) as ctx
:
21 agent
= _cephadm
.CephadmAgent(ctx
, FSID
, AGENT_ID
)
22 for i
in range(len(required_files
)):
23 incomplete_files
= {s
: 'text' for s
in [f
for j
, f
in enumerate(required_files
) if j
!= i
]}
24 with pytest
.raises(_cephadm
.Error
, match
=f
'required file missing from config: {required_files[i]}'):
25 agent
.validate(incomplete_files
)
26 all_files
= {s
: 'text' for s
in required_files
}
27 agent
.validate(all_files
)
30 def _check_file(path
, content
):
31 assert os
.path
.exists(path
)
34 assert fcontent
== content
37 @mock.patch('cephadm.call_throws')
38 def test_agent_deploy_daemon_unit(_call_throws
, cephadm_fs
):
39 _call_throws
.return_value
= ('', '', 0)
42 with
with_cephadm_ctx([]) as ctx
:
43 ctx
.meta_json
= json
.dumps({'meta': 'data'})
44 agent
= _cephadm
.CephadmAgent(ctx
, FSID
, agent_id
)
45 cephadm_fs
.create_dir(AGENT_DIR
)
47 with pytest
.raises(_cephadm
.Error
, match
='Agent needs a config'):
48 agent
.deploy_daemon_unit()
50 config
= {s
: f
'text for {s}' for s
in _cephadm
.CephadmAgent
.required_files
}
51 config
['not-required-file.txt'] = 'don\'t write me'
53 agent
.deploy_daemon_unit(config
)
55 # check required config file were all created
56 for fname
in _cephadm
.CephadmAgent
.required_files
:
57 _check_file(f
'{AGENT_DIR}/{fname}', f
'text for {fname}')
59 # assert non-required file was not written
60 assert not os
.path
.exists(f
'{AGENT_DIR}/not-required-file.txt')
62 # check unit.run file was created correctly
63 _check_file(f
'{AGENT_DIR}/unit.run', agent
.unit_run())
65 # check unit.meta file created correctly
66 _check_file(f
'{AGENT_DIR}/unit.meta', json
.dumps({'meta': 'data'}, indent
=4) + '\n')
68 # check unit file was created correctly
69 _check_file(f
'{ctx.unit_dir}/{agent.unit_name()}', agent
.unit_file())
71 expected_call_throws_calls
= [
72 mock
.call(ctx
, ['systemctl', 'daemon-reload']),
73 mock
.call(ctx
, ['systemctl', 'enable', '--now', agent
.unit_name()]),
75 _call_throws
.assert_has_calls(expected_call_throws_calls
)
77 expected_call_calls
= [
78 mock
.call(ctx
, ['systemctl', 'stop', agent
.unit_name()], verbosity
=_cephadm
.CallVerbosity
.DEBUG
),
79 mock
.call(ctx
, ['systemctl', 'reset-failed', agent
.unit_name()], verbosity
=_cephadm
.CallVerbosity
.DEBUG
),
81 _cephadm
.call
.assert_has_calls(expected_call_calls
)
84 @mock.patch('threading.Thread.is_alive')
85 def test_agent_shutdown(_is_alive
):
86 with
with_cephadm_ctx([]) as ctx
:
87 agent
= _cephadm
.CephadmAgent(ctx
, FSID
, AGENT_ID
)
88 _is_alive
.return_value
= True
89 assert agent
.stop
== False
90 assert agent
.mgr_listener
.stop
== False
91 assert agent
.ls_gatherer
.stop
== False
92 assert agent
.volume_gatherer
.stop
== False
94 assert agent
.stop
== True
95 assert agent
.mgr_listener
.stop
== True
96 assert agent
.ls_gatherer
.stop
== True
97 assert agent
.volume_gatherer
.stop
== True
100 def test_agent_wakeup():
101 with
with_cephadm_ctx([]) as ctx
:
102 agent
= _cephadm
.CephadmAgent(ctx
, FSID
, AGENT_ID
)
103 assert agent
.event
.is_set() == False
105 assert agent
.event
.is_set() == True
108 @mock.patch("cephadm.CephadmAgent.shutdown")
109 @mock.patch("cephadm.AgentGatherer.update_func")
110 def test_pull_conf_settings(_update_func
, _shutdown
, cephadm_fs
):
111 target_ip
= '192.168.0.0'
116 device_enhanced_scan
= 'True'
117 with
with_cephadm_ctx([]) as ctx
:
118 agent
= _cephadm
.CephadmAgent(ctx
, FSID
, AGENT_ID
)
120 'target_ip': target_ip
,
121 'target_port': target_port
,
122 'refresh_period': refresh_period
,
123 'listener_port': listener_port
,
125 'device_enhanced_scan': device_enhanced_scan
127 cephadm_fs
.create_dir(AGENT_DIR
)
128 with
open(agent
.config_path
, 'w') as f
:
129 f
.write(json
.dumps(full_config
))
131 with pytest
.raises(_cephadm
.Error
, match
="Failed to get agent keyring:"):
132 agent
.pull_conf_settings()
133 _shutdown
.assert_called()
134 with
open(agent
.keyring_path
, 'w') as f
:
137 assert agent
.device_enhanced_scan
== False
138 agent
.pull_conf_settings()
139 assert agent
.host
== host
140 assert agent
.target_ip
== target_ip
141 assert agent
.target_port
== target_port
142 assert agent
.loop_interval
== refresh_period
143 assert agent
.starting_port
== listener_port
144 assert agent
.device_enhanced_scan
== True
145 assert agent
.keyring
== 'keyring'
146 _update_func
.assert_called()
148 full_config
.pop('target_ip')
149 with
open(agent
.config_path
, 'w') as f
:
150 f
.write(json
.dumps(full_config
))
151 with pytest
.raises(_cephadm
.Error
, match
="Failed to get agent target ip and port from config:"):
152 agent
.pull_conf_settings()
155 @mock.patch("cephadm.command_ceph_volume")
156 def test_agent_ceph_volume(_ceph_volume
):
158 def _ceph_volume_outputter(_
):
159 print("ceph-volume output")
161 def _ceph_volume_empty(_
):
164 with
with_cephadm_ctx([]) as ctx
:
165 agent
= _cephadm
.CephadmAgent(ctx
, FSID
, AGENT_ID
)
167 _ceph_volume
.side_effect
= _ceph_volume_outputter
168 out
, _
= agent
._ceph
_volume
(False)
169 assert ctx
.command
== ['inventory', '--format=json']
170 assert out
== "ceph-volume output\n"
172 out
, _
= agent
._ceph
_volume
(True)
173 assert ctx
.command
== ['inventory', '--format=json', '--with-lsm']
174 assert out
== "ceph-volume output\n"
176 _ceph_volume
.side_effect
= _ceph_volume_empty
177 with pytest
.raises(Exception, match
='ceph-volume returned empty value'):
178 out
, _
= agent
._ceph
_volume
(False)
181 def test_agent_daemon_ls_subset(cephadm_fs
):
182 # Basing part of this test on some actual sample output
184 # Some sample "podman stats --format '{{.ID}},{{.MemUsage}}' --no-stream" output
185 # 3f2b31d19ecd,456.4MB / 41.96GB
186 # 5aca2499e0f8,7.082MB / 41.96GB
187 # fe0cef07d5f7,35.91MB / 41.96GB
189 # Sample "podman ps --format '{{.ID}},{{.Names}}' --no-trunc" output with the same containers
190 # fe0cef07d5f71c5c604f7d1b4a4ac2e27873c96089d015014524e803361b4a30,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-mon-host1
191 # 3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-mgr-host1-pntmho
192 # 5aca2499e0f8fb903788ff90eb03fe6ed58c7ed177caf278fed199936aff7b4a,ceph-4434fa7c-5602-11ed-b719-5254006ef86b-crash-host1
194 # Some of the components from that output
195 mgr_cid
= '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f'
196 mon_cid
= 'fe0cef07d5f71c5c604f7d1b4a4ac2e27873c96089d015014524e803361b4a30'
197 crash_cid
= '5aca2499e0f8fb903788ff90eb03fe6ed58c7ed177caf278fed199936aff7b4a'
198 mgr_short_cid
= mgr_cid
[0:12]
199 mon_short_cid
= mon_cid
[0:12]
200 crash_short_cid
= crash_cid
[0:12]
202 #Rebuilding the output but with our testing FSID and components (to allow alteration later for whatever reason)
203 mem_out
= f
"""{mgr_short_cid},456.4MB / 41.96GB
204 {crash_short_cid},7.082MB / 41.96GB
205 {mon_short_cid},35.91MB / 41.96GB"""
207 ps_out
= f
"""{mon_cid},ceph-{FSID}-mon-host1
208 {mgr_cid},ceph-{FSID}-mgr-host1-pntmho
209 {crash_cid},ceph-{FSID}-crash-host1"""
211 def _fake_call(ctx
, cmd
, desc
=None, verbosity
=_cephadm
.CallVerbosity
.VERBOSE_ON_FAILURE
, timeout
=_cephadm
.DEFAULT_TIMEOUT
, **kwargs
):
213 return (mem_out
, '', 0)
215 return (ps_out
, '', 0)
216 return ('out', 'err', 0)
218 cephadm_fs
.create_dir(AGENT_DIR
)
219 cephadm_fs
.create_dir(f
'/var/lib/ceph/mon/ceph-host1') # legacy daemon
220 cephadm_fs
.create_dir(f
'/var/lib/ceph/osd/nothing') # improper directory, should be skipped
221 cephadm_fs
.create_dir(f
'/var/lib/ceph/{FSID}/mgr.host1.pntmho') # cephadm daemon
222 cephadm_fs
.create_dir(f
'/var/lib/ceph/{FSID}/crash.host1') # cephadm daemon
224 with
with_cephadm_ctx([]) as ctx
:
226 agent
= _cephadm
.CephadmAgent(ctx
, FSID
, AGENT_ID
)
227 _cephadm
.call
.side_effect
= _fake_call
228 daemons
= agent
._daemon
_ls
_subset
()
230 assert 'agent.host1' in daemons
231 assert 'mgr.host1.pntmho' in daemons
232 assert 'crash.host1' in daemons
233 assert 'mon.host1' in daemons
235 assert daemons
['mon.host1']['style'] == 'legacy'
236 assert daemons
['mgr.host1.pntmho']['style'] == 'cephadm:v1'
237 assert daemons
['crash.host1']['style'] == 'cephadm:v1'
238 assert daemons
['agent.host1']['style'] == 'cephadm:v1'
240 assert daemons
['mgr.host1.pntmho']['systemd_unit'] == f
'ceph-{FSID}@mgr.host1.pntmho'
241 assert daemons
['agent.host1']['systemd_unit'] == f
'ceph-{FSID}@agent.host1'
242 assert daemons
['crash.host1']['systemd_unit'] == f
'ceph-{FSID}@crash.host1'
244 assert daemons
['mgr.host1.pntmho']['container_id'] == mgr_cid
245 assert daemons
['crash.host1']['container_id'] == crash_cid
247 assert daemons
['mgr.host1.pntmho']['memory_usage'] == 478570086 # 456.4 MB
248 assert daemons
['crash.host1']['memory_usage'] == 7426015 # 7.082 MB
251 @mock.patch("cephadm.list_daemons")
252 @mock.patch("cephadm.CephadmAgent._daemon_ls_subset")
253 def test_agent_get_ls(_ls_subset
, _ls
, cephadm_fs
):
255 "style": "cephadm:v1",
256 "name": "mgr.host1.pntmho",
258 "systemd_unit": f
"ceph-{FSID}@mgr.host1.pntmho",
261 "service_name": "mgr",
262 "memory_request": None,
263 "memory_limit": None,
268 "container_id": "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f",
269 "container_image_name": "quay.io/ceph/ceph:testing",
270 "container_image_id": "3300e39269f0c13ae45026cf233d8b3fff1303d52f2598a69c7fba0bb8405164",
271 "container_image_digests": [
272 "quay.io/ceph/ceph@sha256:d4f3522528ee79904f9e530bdce438acac30a039e9a0b3cf31d8b614f9f96a30"
274 "memory_usage": 507510784,
275 "cpu_percentage": "5.95%",
276 "version": "18.0.0-556-gb4d1a199",
277 "started": "2022-10-27T14:19:36.086664Z",
278 "created": "2022-10-27T14:19:36.282281Z",
279 "deployed": "2022-10-27T14:19:35.377275Z",
280 "configured": "2022-10-27T14:22:40.316912Z"
282 "style": "cephadm:v1",
283 "name": "agent.host1",
285 "systemd_unit": f
"ceph-{FSID}@agent.host1",
288 "service_name": "agent",
292 "quay.io/ceph/ceph@sha256:d4f3522528ee79904f9e530bdce438acac30a039e9a0b3cf31d8b614f9f96a30"
295 "rank_generation": None,
296 "extra_container_args": None,
297 "container_id": None,
298 "container_image_name": None,
299 "container_image_id": None,
300 "container_image_digests": None,
303 "created": "2022-10-27T19:46:49.751594Z",
305 "configured": "2022-10-27T19:46:49.751594Z"
310 "systemd_unit": "ceph-mon@host1",
317 'mgr.host1.pntmho': {
318 "style": "cephadm:v1",
320 "systemd_unit": f
"ceph-{FSID}@mgr.host1.pntmho",
323 "container_id": "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f",
324 "memory_usage": 507510784,
327 "style": "cephadm:v1",
329 "systemd_unit": f
"ceph-{FSID}@agent.host1",
337 "systemd_unit": "ceph-mon@host1",
343 _ls
.return_value
= ls_out
344 _ls_subset
.return_value
= ls_subset_out
346 with
with_cephadm_ctx([]) as ctx
:
348 agent
= _cephadm
.CephadmAgent(ctx
, FSID
, AGENT_ID
)
350 # first pass, no cached daemon metadata
351 daemons
, changed
= agent
._get
_ls
()
352 assert daemons
== ls_out
355 # second pass, should recognize that daemons have not changed and just keep cached values
356 daemons
, changed
= agent
._get
_ls
()
357 assert daemons
== daemons
360 # change a container id so it needs to get more info
361 ls_subset_out2
= copy
.deepcopy(ls_subset_out
)
362 ls_out2
= copy
.deepcopy(ls_out
)
363 ls_subset_out2
['mgr.host1.pntmho']['container_id'] = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e7066034aaaaa'
364 ls_out2
[0]['container_id'] = '3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e7066034aaaaa'
365 _ls
.return_value
= ls_out2
366 _ls_subset
.return_value
= ls_subset_out2
367 assert agent
.cached_ls_values
['mgr.host1.pntmho']['container_id'] == "3f2b31d19ecdd586640cc9c6ef7c0fe62157a3f7a71fcb60c91e70660340cd1f"
368 daemons
, changed
= agent
._get
_ls
()
369 assert daemons
== ls_out2
372 # run again with the same data so it should use cached values
373 daemons
, changed
= agent
._get
_ls
()
374 assert daemons
== ls_out2
377 # change the state of a container so new daemon metadata is needed
378 ls_subset_out3
= copy
.deepcopy(ls_subset_out2
)
379 ls_out3
= copy
.deepcopy(ls_out2
)
380 ls_subset_out3
['mgr.host1.pntmho']['enabled'] = False
381 ls_out3
[0]['enabled'] = False
382 _ls
.return_value
= ls_out3
383 _ls_subset
.return_value
= ls_subset_out3
384 assert agent
.cached_ls_values
['mgr.host1.pntmho']['enabled'] == True
385 daemons
, changed
= agent
._get
_ls
()
386 assert daemons
== ls_out3
389 # run again with the same data so it should use cached values
390 daemons
, changed
= agent
._get
_ls
()
391 assert daemons
== ls_out3
394 # remove a daemon so new metadats is needed
395 ls_subset_out4
= copy
.deepcopy(ls_subset_out3
)
396 ls_out4
= copy
.deepcopy(ls_out3
)
397 ls_subset_out4
.pop('mon.host1')
399 _ls
.return_value
= ls_out4
400 _ls_subset
.return_value
= ls_subset_out4
401 assert 'mon.host1' in agent
.cached_ls_values
402 daemons
, changed
= agent
._get
_ls
()
403 assert daemons
== ls_out4
406 # run again with the same data so it should use cached values
407 daemons
, changed
= agent
._get
_ls
()
408 assert daemons
== ls_out4
412 @mock.patch("threading.Event.clear")
413 @mock.patch("threading.Event.wait")
414 @mock.patch("urllib.request.Request.__init__")
415 @mock.patch("cephadm.urlopen")
416 @mock.patch("cephadm.list_networks")
417 @mock.patch("cephadm.HostFacts.dump")
418 @mock.patch("cephadm.HostFacts.__init__", lambda _
, __
: None)
419 @mock.patch("ssl.SSLContext.load_verify_locations")
420 @mock.patch("threading.Thread.is_alive")
421 @mock.patch("cephadm.MgrListener.start")
422 @mock.patch("cephadm.AgentGatherer.start")
423 @mock.patch("cephadm.port_in_use")
424 @mock.patch("cephadm.CephadmAgent.pull_conf_settings")
425 def test_agent_run(_pull_conf_settings
, _port_in_use
, _gatherer_start
,
426 _listener_start
, _is_alive
, _load_verify_locations
,
427 _HF_dump
, _list_networks
, _urlopen
, _RQ_init
, _wait
, _clear
):
428 target_ip
= '192.168.0.0'
432 open_listener_port
= 7777
434 device_enhanced_scan
= False
436 def _fake_port_in_use(ctx
, endpoint
):
437 if endpoint
.port
== open_listener_port
:
441 network_data
: Dict
[str, Dict
[str, Set
[str]]] = {
443 "eth1": set(["10.2.1.122"])
445 "192.168.122.0/24": {
446 "eth0": set(["192.168.122.221"])
449 "eth0": set(["fe80::5054:ff:fe3f:d94e"]),
450 "eth1": set(["fe80::5054:ff:fe3f:aa4a"]),
454 # the json serializable version of the networks data
455 # we expect the agent to actually send
456 network_data_no_sets
: Dict
[str, Dict
[str, List
[str]]] = {
458 "eth1": ["10.2.1.122"]
460 "192.168.122.0/24": {
461 "eth0": ["192.168.122.221"]
464 "eth0": ["fe80::5054:ff:fe3f:d94e"],
465 "eth1": ["fe80::5054:ff:fe3f:aa4a"],
469 class FakeHTTPResponse():
476 def __exit__(self
, type, value
, tb
):
480 return json
.dumps({'valid': 'output', 'result': '400'})
482 _port_in_use
.side_effect
= _fake_port_in_use
483 _is_alive
.return_value
= False
484 _HF_dump
.return_value
= 'Host Facts'
485 _list_networks
.return_value
= network_data
486 _urlopen
.side_effect
= lambda *args
, **kwargs
: FakeHTTPResponse()
487 _RQ_init
.side_effect
= lambda *args
, **kwargs
: None
488 with
with_cephadm_ctx([]) as ctx
:
490 agent
= _cephadm
.CephadmAgent(ctx
, FSID
, AGENT_ID
)
491 agent
.keyring
= 'agent keyring'
493 agent
.volume_gatherer
.ack
= 7
494 agent
.volume_gatherer
.data
= 'ceph-volume inventory data'
495 agent
.ls_gatherer
.ack
= 7
496 agent
.ls_gatherer
.data
= [{'valid_daemon': 'valid_metadata'}]
499 agent
.target_ip
= target_ip
500 agent
.target_port
= target_port
501 agent
.loop_interval
= refresh_period
502 agent
.starting_port
= listener_port
504 agent
.device_enhanced_scan
= device_enhanced_scan
505 _pull_conf_settings
.side_effect
= _set_conf
507 # technically the run function loops forever unless the agent
508 # is told to stop. To get around that we're going to have the
509 # event.wait() (which happens at the end of the loop) to throw
510 # a special exception type. If we catch this exception we can
511 # consider it as being a "success" run
512 class EventCleared(Exception):
515 _clear
.side_effect
= EventCleared('SUCCESS')
516 with pytest
.raises(EventCleared
, match
='SUCCESS'):
521 'ls': [{'valid_daemon': 'valid_metadata'}],
522 'networks': network_data_no_sets
,
523 'facts': 'Host Facts',
524 'volume': 'ceph-volume inventory data',
526 'keyring': 'agent keyring',
527 'port': str(open_listener_port
)
529 _RQ_init
.assert_called_with(
530 f
'https://{target_ip}:{target_port}/data/',
531 json
.dumps(expected_data
).encode('ascii'),
532 {'Content-Type': 'application/json'}
534 _listener_start
.assert_called()
535 _gatherer_start
.assert_called()
536 _urlopen
.assert_called()
538 # agent should not go down if connections fail
539 _urlopen
.side_effect
= Exception()
540 with pytest
.raises(EventCleared
, match
='SUCCESS'):
543 # should fail if no ports are open for listener
544 _port_in_use
.side_effect
= lambda _
, __
: True
545 agent
.listener_port
= None
546 with pytest
.raises(Exception, match
='Failed to pick port for agent to listen on: All 1000 ports starting at 7770 taken.'):
550 @mock.patch("cephadm.CephadmAgent.pull_conf_settings")
551 @mock.patch("cephadm.CephadmAgent.wakeup")
552 def test_mgr_listener_handle_json_payload(_agent_wakeup
, _pull_conf_settings
, cephadm_fs
):
553 with
with_cephadm_ctx([]) as ctx
:
555 agent
= _cephadm
.CephadmAgent(ctx
, FSID
, AGENT_ID
)
556 cephadm_fs
.create_dir(AGENT_DIR
)
561 agent
.mgr_listener
.handle_json_payload(data_no_config
)
562 _agent_wakeup
.assert_not_called()
563 _pull_conf_settings
.assert_not_called()
564 assert not any(os
.path
.exists(os
.path
.join(AGENT_DIR
, s
)) for s
in agent
.required_files
)
569 'unrequired-file': 'unrequired-text'
572 data_with_config
['config'].update({s
: f
'{s} text' for s
in agent
.required_files
if s
!= agent
.required_files
[2]})
573 agent
.mgr_listener
.handle_json_payload(data_with_config
)
574 _agent_wakeup
.assert_called()
575 _pull_conf_settings
.assert_called()
576 assert all(os
.path
.exists(os
.path
.join(AGENT_DIR
, s
)) for s
in agent
.required_files
if s
!= agent
.required_files
[2])
577 assert not os
.path
.exists(os
.path
.join(AGENT_DIR
, agent
.required_files
[2]))
578 assert not os
.path
.exists(os
.path
.join(AGENT_DIR
, 'unrequired-file'))
581 @mock.patch("socket.socket")
582 @mock.patch("ssl.SSLContext.wrap_socket")
583 @mock.patch("cephadm.MgrListener.handle_json_payload")
584 @mock.patch("ssl.SSLContext.load_verify_locations")
585 @mock.patch("ssl.SSLContext.load_cert_chain")
586 def test_mgr_listener_run(_load_cert_chain
, _load_verify_locations
, _handle_json_payload
,
587 _wrap_context
, _socket
, cephadm_fs
):
589 with
with_cephadm_ctx([]) as ctx
:
591 agent
= _cephadm
.CephadmAgent(ctx
, FSID
, AGENT_ID
)
592 cephadm_fs
.create_dir(AGENT_DIR
)
594 payload
= json
.dumps({'counter': 3,
595 'config': {s
: f
'{s} text' for s
in agent
.required_files
if s
!= agent
.required_files
[1]}})
599 def __init__(self
, family
=socket
.AF_INET
, type=socket
.SOCK_STREAM
, proto
=0, fileno
=None):
603 def bind(*args
, **kwargs
):
606 def settimeout(*args
, **kwargs
):
609 def listen(*args
, **kwargs
):
612 class FakeSecureSocket
:
614 def __init__(self
, pload
):
616 self
._conn
= FakeConn(self
.payload
)
617 self
.accepted
= False
620 # to make mgr listener run loop stop running,
621 # set it to stop after accepting a "connection"
622 # on our fake socket so only one iteration of the loop
624 agent
.mgr_listener
.stop
= True
626 return self
._conn
, None
628 def load_cert_chain(*args
, **kwargs
):
631 def load_verify_locations(*args
, **kwargs
):
636 def __init__(self
, payload
: str = ''):
637 payload_len_str
= str(len(payload
.encode('utf-8')))
638 while len(payload_len_str
.encode('utf-8')) < 10:
639 payload_len_str
= '0' + payload_len_str
640 self
.payload
= (payload_len_str
+ payload
).encode('utf-8')
641 self
.buffer_len
= len(self
.payload
)
643 def recv(self
, len: Optional
[int] = None):
644 if not len or len >= self
.buffer_len
:
650 ret
= self
.payload
[:len]
651 self
.payload
= self
.payload
[len:]
652 self
.buffer_len
= self
.buffer_len
- len
655 FSS_good_data
= FakeSecureSocket(payload
)
656 FSS_bad_json
= FakeSecureSocket('bad json')
658 agent
.listener_port
= 7777
660 # first run, should successfully receive properly structured json payload
661 _wrap_context
.side_effect
= [FSS_good_data
]
662 agent
.mgr_listener
.stop
= False
663 FakeConn
.send
= mock
.Mock(return_value
=None)
664 agent
.mgr_listener
.run()
666 # verify payload was correctly extracted
667 assert _handle_json_payload
.called_with(json
.loads(payload
))
668 FakeConn
.send
.assert_called_once_with(b
'ACK')
670 # second run, with bad json data received
671 _wrap_context
.side_effect
= [FSS_bad_json
]
672 agent
.mgr_listener
.stop
= False
673 FakeConn
.send
= mock
.Mock(return_value
=None)
674 agent
.mgr_listener
.run()
675 FakeConn
.send
.assert_called_once_with(b
'Failed to extract json payload from message: Expecting value: line 1 column 1 (char 0)')
677 # third run, no proper length as beginning og payload
678 FSS_no_length
= FakeSecureSocket(payload
)
679 FSS_no_length
.payload
= FSS_no_length
.payload
[10:]
680 FSS_no_length
._conn
.payload
= FSS_no_length
._conn
.payload
[10:]
681 FSS_no_length
._conn
.buffer_len
-= 10
682 _wrap_context
.side_effect
= [FSS_no_length
]
683 agent
.mgr_listener
.stop
= False
684 FakeConn
.send
= mock
.Mock(return_value
=None)
685 agent
.mgr_listener
.run()
686 FakeConn
.send
.assert_called_once_with(b
'Failed to extract length of payload from message: invalid literal for int() with base 10: \'{"counter"\'')
688 # some exception handling for full coverage
689 FSS_exc_testing
= FakeSecureSocket(payload
)
690 FSS_exc_testing
.accept
= mock
.MagicMock()
692 def _accept(*args
, **kwargs
):
693 if not FSS_exc_testing
.accepted
:
694 FSS_exc_testing
.accepted
= True
695 raise socket
.timeout()
697 agent
.mgr_listener
.stop
= True
700 FSS_exc_testing
.accept
.side_effect
= _accept
701 _wrap_context
.side_effect
= [FSS_exc_testing
]
702 agent
.mgr_listener
.stop
= False
703 FakeConn
.send
= mock
.Mock(return_value
=None)
704 agent
.mgr_listener
.run()
705 FakeConn
.send
.assert_not_called()
706 FSS_exc_testing
.accept
.call_count
== 3
709 @mock.patch("cephadm.CephadmAgent._get_ls")
710 def test_gatherer_update_func(_get_ls
, cephadm_fs
):
711 with
with_cephadm_ctx([]) as ctx
:
713 agent
= _cephadm
.CephadmAgent(ctx
, FSID
, AGENT_ID
)
714 cephadm_fs
.create_dir(AGENT_DIR
)
719 agent
.ls_gatherer
.func()
720 _get_ls
.assert_called()
722 _get_ls
= mock
.MagicMock()
723 agent
.ls_gatherer
.update_func(_sample_func
)
724 out
= agent
.ls_gatherer
.func()
726 _get_ls
.assert_not_called()
729 @mock.patch("cephadm.CephadmAgent.wakeup")
730 @mock.patch("time.monotonic")
731 @mock.patch("threading.Event.wait")
732 def test_gatherer_run(_wait
, _time
, _agent_wakeup
, cephadm_fs
):
733 with
with_cephadm_ctx([]) as ctx
:
735 agent
= _cephadm
.CephadmAgent(ctx
, FSID
, AGENT_ID
)
736 cephadm_fs
.create_dir(AGENT_DIR
)
737 agent
.loop_interval
= 30
740 _sample_func
= lambda *args
, **kwargs
: ('sample out', True)
741 agent
.ls_gatherer
.update_func(_sample_func
)
742 agent
.ls_gatherer
.ack
= 20
743 agent
.ls_gatherer
.stop
= False
745 def _fake_clear(*args
, **kwargs
):
746 agent
.ls_gatherer
.stop
= True
748 _time
.side_effect
= [0, 20, 0, 20, 0, 20] # start at time 0, complete at time 20
749 _wait
.return_value
= None
751 with mock
.patch("threading.Event.clear") as _clear
:
752 _clear
.side_effect
= _fake_clear
753 agent
.ls_gatherer
.run()
755 _wait
.assert_called_with(10) # agent loop_interval - run time
756 assert agent
.ls_gatherer
.data
== 'sample out'
757 assert agent
.ls_gatherer
.ack
== 23
758 _agent_wakeup
.assert_called_once()
759 _clear
.assert_called_once()
761 _exc_func
= lambda *args
, **kwargs
: Exception()
762 agent
.ls_gatherer
.update_func(_exc_func
)
763 agent
.ls_gatherer
.ack
= 20
764 agent
.ls_gatherer
.stop
= False
766 with mock
.patch("threading.Event.clear") as _clear
:
767 _clear
.side_effect
= _fake_clear
768 agent
.ls_gatherer
.run()
769 assert agent
.ls_gatherer
.data
is None
770 assert agent
.ls_gatherer
.ack
== agent
.ack
771 # should have run full loop despite exception
772 _clear
.assert_called_once()
774 # test general exception for full coverage
775 _agent_wakeup
.side_effect
= [Exception()]
776 agent
.ls_gatherer
.update_func(_sample_func
)
777 agent
.ls_gatherer
.stop
= False
778 # just to force only one iteration
779 _time
.side_effect
= _fake_clear
780 with mock
.patch("threading.Event.clear") as _clear
:
781 _clear
.side_effect
= Exception()
782 agent
.ls_gatherer
.run()
783 assert agent
.ls_gatherer
.data
== 'sample out'
784 assert agent
.ls_gatherer
.ack
== agent
.ack
785 # should not have gotten to end of loop
786 _clear
.assert_not_called()
789 @mock.patch("cephadm.CephadmAgent.run")
790 def test_command_agent(_agent_run
, cephadm_fs
):
791 with
with_cephadm_ctx([]) as ctx
:
793 ctx
.daemon_id
= AGENT_ID
795 with pytest
.raises(Exception, match
=f
"Agent daemon directory {AGENT_DIR} does not exist. Perhaps agent was never deployed?"):
796 _cephadm
.command_agent(ctx
)
798 cephadm_fs
.create_dir(AGENT_DIR
)
799 _cephadm
.command_agent(ctx
)
800 _agent_run
.assert_called()