]>
Commit | Line | Data |
---|---|---|
1 | import fnmatch | |
2 | import asyncio | |
3 | import sys | |
4 | from tempfile import NamedTemporaryFile | |
5 | from contextlib import contextmanager | |
6 | ||
7 | from ceph.deployment.service_spec import PlacementSpec, ServiceSpec | |
8 | from ceph.utils import datetime_to_str, datetime_now | |
9 | from cephadm.serve import CephadmServe, cephadmNoImage | |
10 | ||
11 | try: | |
12 | from typing import Any, Iterator, List, Callable, Dict | |
13 | except ImportError: | |
14 | pass | |
15 | ||
16 | from cephadm import CephadmOrchestrator | |
17 | from orchestrator import raise_if_exception, OrchResult, HostSpec, DaemonDescriptionStatus | |
18 | from tests import mock | |
19 | ||
20 | ||
21 | def async_side_effect(result): | |
22 | async def side_effect(*args, **kwargs): | |
23 | return result | |
24 | return side_effect | |
25 | ||
26 | ||
27 | def get_ceph_option(_, key): | |
28 | return __file__ | |
29 | ||
30 | ||
31 | def get_module_option_ex(_, module, key, default=None): | |
32 | if module == 'prometheus': | |
33 | if key == 'server_port': | |
34 | return 9283 | |
35 | return None | |
36 | ||
37 | ||
38 | def _run_cephadm(ret): | |
39 | async def foo(s, host, entity, cmd, e, **kwargs): | |
40 | if cmd == 'gather-facts': | |
41 | return '{}', '', 0 | |
42 | return [ret], '', 0 | |
43 | return foo | |
44 | ||
45 | ||
46 | def match_glob(val, pat): | |
47 | ok = fnmatch.fnmatchcase(val, pat) | |
48 | if not ok: | |
49 | assert pat in val | |
50 | ||
51 | ||
52 | class MockEventLoopThread: | |
53 | def get_result(self, coro): | |
54 | if sys.version_info >= (3, 7): | |
55 | return asyncio.run(coro) | |
56 | ||
57 | loop = asyncio.new_event_loop() | |
58 | asyncio.set_event_loop(loop) | |
59 | try: | |
60 | return loop.run_until_complete(coro) | |
61 | finally: | |
62 | loop.close() | |
63 | asyncio.set_event_loop(None) | |
64 | ||
65 | ||
66 | def receive_agent_metadata(m: CephadmOrchestrator, host: str, ops: List[str] = None) -> None: | |
67 | to_update: Dict[str, Callable[[str, Any], None]] = { | |
68 | 'ls': m._process_ls_output, | |
69 | 'gather-facts': m.cache.update_host_facts, | |
70 | 'list-networks': m.cache.update_host_networks, | |
71 | } | |
72 | if ops: | |
73 | for op in ops: | |
74 | out = m.wait_async(CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, [])) | |
75 | to_update[op](host, out) | |
76 | m.cache.last_daemon_update[host] = datetime_now() | |
77 | m.cache.last_facts_update[host] = datetime_now() | |
78 | m.cache.last_network_update[host] = datetime_now() | |
79 | m.cache.metadata_up_to_date[host] = True | |
80 | ||
81 | ||
82 | def receive_agent_metadata_all_hosts(m: CephadmOrchestrator) -> None: | |
83 | for host in m.cache.get_hosts(): | |
84 | receive_agent_metadata(m, host) | |
85 | ||
86 | ||
87 | @contextmanager | |
88 | def with_cephadm_module(module_options=None, store=None): | |
89 | """ | |
90 | :param module_options: Set opts as if they were set before module.__init__ is called | |
91 | :param store: Set the store before module.__init__ is called | |
92 | """ | |
93 | with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\ | |
94 | mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \ | |
95 | mock.patch('cephadm.module.CephadmOrchestrator.get_module_option_ex', get_module_option_ex),\ | |
96 | mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \ | |
97 | mock.patch("cephadm.module.CephadmOrchestrator.remote"), \ | |
98 | mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \ | |
99 | mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \ | |
100 | mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \ | |
101 | mock.patch('cephadm.agent.CherryPyThread.run'), \ | |
102 | mock.patch('cephadm.offline_watcher.OfflineHostWatcher.run'), \ | |
103 | mock.patch('cephadm.tuned_profiles.TunedProfileUtils._remove_stray_tuned_profiles'): | |
104 | ||
105 | m = CephadmOrchestrator.__new__(CephadmOrchestrator) | |
106 | if module_options is not None: | |
107 | for k, v in module_options.items(): | |
108 | m._ceph_set_module_option('cephadm', k, v) | |
109 | if store is None: | |
110 | store = {} | |
111 | if '_ceph_get/mon_map' not in store: | |
112 | m.mock_store_set('_ceph_get', 'mon_map', { | |
113 | 'modified': datetime_to_str(datetime_now()), | |
114 | 'fsid': 'foobar', | |
115 | }) | |
116 | if '_ceph_get/mgr_map' not in store: | |
117 | m.mock_store_set('_ceph_get', 'mgr_map', { | |
118 | 'services': { | |
119 | 'dashboard': 'http://[::1]:8080', | |
120 | 'prometheus': 'http://[::1]:8081' | |
121 | }, | |
122 | 'modules': ['dashboard', 'prometheus'], | |
123 | }) | |
124 | for k, v in store.items(): | |
125 | m._ceph_set_store(k, v) | |
126 | ||
127 | m.__init__('cephadm', 0, 0) | |
128 | m._cluster_fsid = "fsid" | |
129 | ||
130 | m.event_loop = MockEventLoopThread() | |
131 | m.tkey = NamedTemporaryFile(prefix='test-cephadm-identity-') | |
132 | ||
133 | yield m | |
134 | ||
135 | ||
136 | def wait(m, c): | |
137 | # type: (CephadmOrchestrator, OrchResult) -> Any | |
138 | return raise_if_exception(c) | |
139 | ||
140 | ||
141 | @contextmanager | |
142 | def with_host(m: CephadmOrchestrator, name, addr='1::4', refresh_hosts=True, rm_with_force=True): | |
143 | with mock.patch("cephadm.utils.resolve_ip", return_value=addr): | |
144 | wait(m, m.add_host(HostSpec(hostname=name))) | |
145 | if refresh_hosts: | |
146 | CephadmServe(m)._refresh_hosts_and_daemons() | |
147 | receive_agent_metadata(m, name) | |
148 | yield | |
149 | wait(m, m.remove_host(name, force=rm_with_force)) | |
150 | ||
151 | ||
152 | def assert_rm_service(cephadm: CephadmOrchestrator, srv_name): | |
153 | mon_or_mgr = cephadm.spec_store[srv_name].spec.service_type in ('mon', 'mgr') | |
154 | if mon_or_mgr: | |
155 | assert 'Unable' in wait(cephadm, cephadm.remove_service(srv_name)) | |
156 | return | |
157 | assert wait(cephadm, cephadm.remove_service(srv_name)) == f'Removed service {srv_name}' | |
158 | assert cephadm.spec_store[srv_name].deleted is not None | |
159 | CephadmServe(cephadm)._check_daemons() | |
160 | CephadmServe(cephadm)._apply_all_services() | |
161 | assert cephadm.spec_store[srv_name].deleted | |
162 | unmanaged = cephadm.spec_store[srv_name].spec.unmanaged | |
163 | CephadmServe(cephadm)._purge_deleted_services() | |
164 | if not unmanaged: # cause then we're not deleting daemons | |
165 | assert srv_name not in cephadm.spec_store, f'{cephadm.spec_store[srv_name]!r}' | |
166 | ||
167 | ||
168 | @contextmanager | |
169 | def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=None, host: str = '', status_running=False) -> Iterator[List[str]]: | |
170 | if spec.placement.is_empty() and host: | |
171 | spec.placement = PlacementSpec(hosts=[host], count=1) | |
172 | if meth is not None: | |
173 | c = meth(cephadm_module, spec) | |
174 | assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...' | |
175 | else: | |
176 | c = cephadm_module.apply([spec]) | |
177 | assert wait(cephadm_module, c) == [f'Scheduled {spec.service_name()} update...'] | |
178 | ||
179 | specs = [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())] | |
180 | assert spec in specs | |
181 | ||
182 | CephadmServe(cephadm_module)._apply_all_services() | |
183 | ||
184 | if status_running: | |
185 | make_daemons_running(cephadm_module, spec.service_name()) | |
186 | ||
187 | dds = wait(cephadm_module, cephadm_module.list_daemons()) | |
188 | own_dds = [dd for dd in dds if dd.service_name() == spec.service_name()] | |
189 | if host and spec.service_type != 'osd': | |
190 | assert own_dds | |
191 | ||
192 | yield [dd.name() for dd in own_dds] | |
193 | ||
194 | assert_rm_service(cephadm_module, spec.service_name()) | |
195 | ||
196 | ||
197 | def make_daemons_running(cephadm_module, service_name): | |
198 | own_dds = cephadm_module.cache.get_daemons_by_service(service_name) | |
199 | for dd in own_dds: | |
200 | dd.status = DaemonDescriptionStatus.running # We're changing the reference |