]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/tests/fixtures.py
0567f7f7e68a02b5cc784dc2d12f1ac98cd96aae
4 from tempfile
import NamedTemporaryFile
5 from contextlib
import contextmanager
7 from ceph
.deployment
.service_spec
import PlacementSpec
, ServiceSpec
8 from ceph
.utils
import datetime_to_str
, datetime_now
9 from cephadm
.serve
import CephadmServe
, cephadmNoImage
12 from typing
import Any
, Iterator
, List
, Callable
, Dict
16 from cephadm
import CephadmOrchestrator
17 from orchestrator
import raise_if_exception
, OrchResult
, HostSpec
, DaemonDescriptionStatus
18 from tests
import mock
21 def async_side_effect(result
):
22 async def side_effect(*args
, **kwargs
):
27 def get_ceph_option(_
, key
):
31 def get_module_option_ex(_
, module
, key
, default
=None):
32 if module
== 'prometheus':
33 if key
== 'server_port':
38 def _run_cephadm(ret
):
39 async def foo(s
, host
, entity
, cmd
, e
, **kwargs
):
40 if cmd
== 'gather-facts':
46 def match_glob(val
, pat
):
47 ok
= fnmatch
.fnmatchcase(val
, pat
)
52 class MockEventLoopThread
:
53 def get_result(self
, coro
):
54 if sys
.version_info
>= (3, 7):
55 return asyncio
.run(coro
)
57 loop
= asyncio
.new_event_loop()
58 asyncio
.set_event_loop(loop
)
60 return loop
.run_until_complete(coro
)
63 asyncio
.set_event_loop(None)
66 def receive_agent_metadata(m
: CephadmOrchestrator
, host
: str, ops
: List
[str] = None) -> None:
67 to_update
: Dict
[str, Callable
[[str, Any
], None]] = {
68 'ls': m
._process
_ls
_output
,
69 'gather-facts': m
.cache
.update_host_facts
,
70 'list-networks': m
.cache
.update_host_networks
,
74 out
= m
.wait_async(CephadmServe(m
)._run
_cephadm
_json
(host
, cephadmNoImage
, op
, []))
75 to_update
[op
](host
, out
)
76 m
.cache
.last_daemon_update
[host
] = datetime_now()
77 m
.cache
.last_facts_update
[host
] = datetime_now()
78 m
.cache
.last_network_update
[host
] = datetime_now()
79 m
.cache
.metadata_up_to_date
[host
] = True
82 def receive_agent_metadata_all_hosts(m
: CephadmOrchestrator
) -> None:
83 for host
in m
.cache
.get_hosts():
84 receive_agent_metadata(m
, host
)
88 def with_cephadm_module(module_options
=None, store
=None):
90 :param module_options: Set opts as if they were set before module.__init__ is called
91 :param store: Set the store before module.__init__ is called
93 with mock
.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option
),\
94 mock
.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
95 mock
.patch('cephadm.module.CephadmOrchestrator.get_module_option_ex', get_module_option_ex
),\
96 mock
.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
97 mock
.patch("cephadm.module.CephadmOrchestrator.remote"), \
98 mock
.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \
99 mock
.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value
=False), \
100 mock
.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value
=False), \
101 mock
.patch('cephadm.agent.CherryPyThread.run'), \
102 mock
.patch('cephadm.offline_watcher.OfflineHostWatcher.run'), \
103 mock
.patch('cephadm.tuned_profiles.TunedProfileUtils._remove_stray_tuned_profiles'):
105 m
= CephadmOrchestrator
.__new
__(CephadmOrchestrator
)
106 if module_options
is not None:
107 for k
, v
in module_options
.items():
108 m
._ceph
_set
_module
_option
('cephadm', k
, v
)
111 if '_ceph_get/mon_map' not in store
:
112 m
.mock_store_set('_ceph_get', 'mon_map', {
113 'modified': datetime_to_str(datetime_now()),
116 if '_ceph_get/mgr_map' not in store
:
117 m
.mock_store_set('_ceph_get', 'mgr_map', {
119 'dashboard': 'http://[::1]:8080',
120 'prometheus': 'http://[::1]:8081'
122 'modules': ['dashboard', 'prometheus'],
124 for k
, v
in store
.items():
125 m
._ceph
_set
_store
(k
, v
)
127 m
.__init
__('cephadm', 0, 0)
128 m
._cluster
_fsid
= "fsid"
130 m
.event_loop
= MockEventLoopThread()
131 m
.tkey
= NamedTemporaryFile(prefix
='test-cephadm-identity-')
137 # type: (CephadmOrchestrator, OrchResult) -> Any
138 return raise_if_exception(c
)
142 def with_host(m
: CephadmOrchestrator
, name
, addr
='1::4', refresh_hosts
=True, rm_with_force
=True):
143 with mock
.patch("cephadm.utils.resolve_ip", return_value
=addr
):
144 wait(m
, m
.add_host(HostSpec(hostname
=name
)))
146 CephadmServe(m
)._refresh
_hosts
_and
_daemons
()
147 receive_agent_metadata(m
, name
)
149 wait(m
, m
.remove_host(name
, force
=rm_with_force
))
152 def assert_rm_service(cephadm
: CephadmOrchestrator
, srv_name
):
153 mon_or_mgr
= cephadm
.spec_store
[srv_name
].spec
.service_type
in ('mon', 'mgr')
155 assert 'Unable' in wait(cephadm
, cephadm
.remove_service(srv_name
))
157 assert wait(cephadm
, cephadm
.remove_service(srv_name
)) == f
'Removed service {srv_name}'
158 assert cephadm
.spec_store
[srv_name
].deleted
is not None
159 CephadmServe(cephadm
)._check
_daemons
()
160 CephadmServe(cephadm
)._apply
_all
_services
()
161 assert cephadm
.spec_store
[srv_name
].deleted
162 unmanaged
= cephadm
.spec_store
[srv_name
].spec
.unmanaged
163 CephadmServe(cephadm
)._purge
_deleted
_services
()
164 if not unmanaged
: # cause then we're not deleting daemons
165 assert srv_name
not in cephadm
.spec_store
, f
'{cephadm.spec_store[srv_name]!r}'
169 def with_service(cephadm_module
: CephadmOrchestrator
, spec
: ServiceSpec
, meth
=None, host
: str = '', status_running
=False) -> Iterator
[List
[str]]:
170 if spec
.placement
.is_empty() and host
:
171 spec
.placement
= PlacementSpec(hosts
=[host
], count
=1)
173 c
= meth(cephadm_module
, spec
)
174 assert wait(cephadm_module
, c
) == f
'Scheduled {spec.service_name()} update...'
176 c
= cephadm_module
.apply([spec
])
177 assert wait(cephadm_module
, c
) == [f
'Scheduled {spec.service_name()} update...']
179 specs
= [d
.spec
for d
in wait(cephadm_module
, cephadm_module
.describe_service())]
182 CephadmServe(cephadm_module
)._apply
_all
_services
()
185 make_daemons_running(cephadm_module
, spec
.service_name())
187 dds
= wait(cephadm_module
, cephadm_module
.list_daemons())
188 own_dds
= [dd
for dd
in dds
if dd
.service_name() == spec
.service_name()]
189 if host
and spec
.service_type
!= 'osd':
192 yield [dd
.name() for dd
in own_dds
]
194 assert_rm_service(cephadm_module
, spec
.service_name())
197 def make_daemons_running(cephadm_module
, service_name
):
198 own_dds
= cephadm_module
.cache
.get_daemons_by_service(service_name
)
200 dd
.status
= DaemonDescriptionStatus
.running
# We're changing the reference