]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/tests/fixtures.py
0567f7f7e68a02b5cc784dc2d12f1ac98cd96aae
[ceph.git] / ceph / src / pybind / mgr / cephadm / tests / fixtures.py
1 import fnmatch
2 import asyncio
3 import sys
4 from tempfile import NamedTemporaryFile
5 from contextlib import contextmanager
6
7 from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
8 from ceph.utils import datetime_to_str, datetime_now
9 from cephadm.serve import CephadmServe, cephadmNoImage
10
11 try:
12 from typing import Any, Iterator, List, Callable, Dict
13 except ImportError:
14 pass
15
16 from cephadm import CephadmOrchestrator
17 from orchestrator import raise_if_exception, OrchResult, HostSpec, DaemonDescriptionStatus
18 from tests import mock
19
20
21 def async_side_effect(result):
22 async def side_effect(*args, **kwargs):
23 return result
24 return side_effect
25
26
27 def get_ceph_option(_, key):
28 return __file__
29
30
31 def get_module_option_ex(_, module, key, default=None):
32 if module == 'prometheus':
33 if key == 'server_port':
34 return 9283
35 return None
36
37
38 def _run_cephadm(ret):
39 async def foo(s, host, entity, cmd, e, **kwargs):
40 if cmd == 'gather-facts':
41 return '{}', '', 0
42 return [ret], '', 0
43 return foo
44
45
46 def match_glob(val, pat):
47 ok = fnmatch.fnmatchcase(val, pat)
48 if not ok:
49 assert pat in val
50
51
52 class MockEventLoopThread:
53 def get_result(self, coro):
54 if sys.version_info >= (3, 7):
55 return asyncio.run(coro)
56
57 loop = asyncio.new_event_loop()
58 asyncio.set_event_loop(loop)
59 try:
60 return loop.run_until_complete(coro)
61 finally:
62 loop.close()
63 asyncio.set_event_loop(None)
64
65
66 def receive_agent_metadata(m: CephadmOrchestrator, host: str, ops: List[str] = None) -> None:
67 to_update: Dict[str, Callable[[str, Any], None]] = {
68 'ls': m._process_ls_output,
69 'gather-facts': m.cache.update_host_facts,
70 'list-networks': m.cache.update_host_networks,
71 }
72 if ops:
73 for op in ops:
74 out = m.wait_async(CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, []))
75 to_update[op](host, out)
76 m.cache.last_daemon_update[host] = datetime_now()
77 m.cache.last_facts_update[host] = datetime_now()
78 m.cache.last_network_update[host] = datetime_now()
79 m.cache.metadata_up_to_date[host] = True
80
81
82 def receive_agent_metadata_all_hosts(m: CephadmOrchestrator) -> None:
83 for host in m.cache.get_hosts():
84 receive_agent_metadata(m, host)
85
86
87 @contextmanager
88 def with_cephadm_module(module_options=None, store=None):
89 """
90 :param module_options: Set opts as if they were set before module.__init__ is called
91 :param store: Set the store before module.__init__ is called
92 """
93 with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
94 mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
95 mock.patch('cephadm.module.CephadmOrchestrator.get_module_option_ex', get_module_option_ex),\
96 mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
97 mock.patch("cephadm.module.CephadmOrchestrator.remote"), \
98 mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \
99 mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \
100 mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \
101 mock.patch('cephadm.agent.CherryPyThread.run'), \
102 mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'), \
103 mock.patch('cephadm.tuned_profiles.TunedProfileUtils._remove_stray_tuned_profiles'):
104
105 m = CephadmOrchestrator.__new__(CephadmOrchestrator)
106 if module_options is not None:
107 for k, v in module_options.items():
108 m._ceph_set_module_option('cephadm', k, v)
109 if store is None:
110 store = {}
111 if '_ceph_get/mon_map' not in store:
112 m.mock_store_set('_ceph_get', 'mon_map', {
113 'modified': datetime_to_str(datetime_now()),
114 'fsid': 'foobar',
115 })
116 if '_ceph_get/mgr_map' not in store:
117 m.mock_store_set('_ceph_get', 'mgr_map', {
118 'services': {
119 'dashboard': 'http://[::1]:8080',
120 'prometheus': 'http://[::1]:8081'
121 },
122 'modules': ['dashboard', 'prometheus'],
123 })
124 for k, v in store.items():
125 m._ceph_set_store(k, v)
126
127 m.__init__('cephadm', 0, 0)
128 m._cluster_fsid = "fsid"
129
130 m.event_loop = MockEventLoopThread()
131 m.tkey = NamedTemporaryFile(prefix='test-cephadm-identity-')
132
133 yield m
134
135
136 def wait(m, c):
137 # type: (CephadmOrchestrator, OrchResult) -> Any
138 return raise_if_exception(c)
139
140
141 @contextmanager
142 def with_host(m: CephadmOrchestrator, name, addr='1::4', refresh_hosts=True, rm_with_force=True):
143 with mock.patch("cephadm.utils.resolve_ip", return_value=addr):
144 wait(m, m.add_host(HostSpec(hostname=name)))
145 if refresh_hosts:
146 CephadmServe(m)._refresh_hosts_and_daemons()
147 receive_agent_metadata(m, name)
148 yield
149 wait(m, m.remove_host(name, force=rm_with_force))
150
151
152 def assert_rm_service(cephadm: CephadmOrchestrator, srv_name):
153 mon_or_mgr = cephadm.spec_store[srv_name].spec.service_type in ('mon', 'mgr')
154 if mon_or_mgr:
155 assert 'Unable' in wait(cephadm, cephadm.remove_service(srv_name))
156 return
157 assert wait(cephadm, cephadm.remove_service(srv_name)) == f'Removed service {srv_name}'
158 assert cephadm.spec_store[srv_name].deleted is not None
159 CephadmServe(cephadm)._check_daemons()
160 CephadmServe(cephadm)._apply_all_services()
161 assert cephadm.spec_store[srv_name].deleted
162 unmanaged = cephadm.spec_store[srv_name].spec.unmanaged
163 CephadmServe(cephadm)._purge_deleted_services()
164 if not unmanaged: # cause then we're not deleting daemons
165 assert srv_name not in cephadm.spec_store, f'{cephadm.spec_store[srv_name]!r}'
166
167
168 @contextmanager
169 def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=None, host: str = '', status_running=False) -> Iterator[List[str]]:
170 if spec.placement.is_empty() and host:
171 spec.placement = PlacementSpec(hosts=[host], count=1)
172 if meth is not None:
173 c = meth(cephadm_module, spec)
174 assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...'
175 else:
176 c = cephadm_module.apply([spec])
177 assert wait(cephadm_module, c) == [f'Scheduled {spec.service_name()} update...']
178
179 specs = [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())]
180 assert spec in specs
181
182 CephadmServe(cephadm_module)._apply_all_services()
183
184 if status_running:
185 make_daemons_running(cephadm_module, spec.service_name())
186
187 dds = wait(cephadm_module, cephadm_module.list_daemons())
188 own_dds = [dd for dd in dds if dd.service_name() == spec.service_name()]
189 if host and spec.service_type != 'osd':
190 assert own_dds
191
192 yield [dd.name() for dd in own_dds]
193
194 assert_rm_service(cephadm_module, spec.service_name())
195
196
197 def make_daemons_running(cephadm_module, service_name):
198 own_dds = cephadm_module.cache.get_daemons_by_service(service_name)
199 for dd in own_dds:
200 dd.status = DaemonDescriptionStatus.running # We're changing the reference