]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/tests/fixtures.py
bump version to 16.2.6-pve2
[ceph.git] / ceph / src / pybind / mgr / cephadm / tests / fixtures.py
1 import fnmatch
2 from contextlib import contextmanager
3
4 from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
5 from ceph.utils import datetime_to_str, datetime_now
6 from cephadm.serve import CephadmServe
7
8 try:
9 from typing import Any, Iterator, List
10 except ImportError:
11 pass
12
13 from cephadm import CephadmOrchestrator
14 from orchestrator import raise_if_exception, OrchResult, HostSpec
15 from tests import mock
16
17
18 def get_ceph_option(_, key):
19 return __file__
20
21
22 def _run_cephadm(ret):
23 def foo(s, host, entity, cmd, e, **kwargs):
24 if cmd == 'gather-facts':
25 return '{}', '', 0
26 return [ret], '', 0
27 return foo
28
29
30 def match_glob(val, pat):
31 ok = fnmatch.fnmatchcase(val, pat)
32 if not ok:
33 assert pat in val
34
35
36 @contextmanager
37 def with_cephadm_module(module_options=None, store=None):
38 """
39 :param module_options: Set opts as if they were set before module.__init__ is called
40 :param store: Set the store before module.__init__ is called
41 """
42 with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
43 mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
44 mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
45 mock.patch("cephadm.services.osd.OSDService.get_osdspec_affinity", return_value='test_spec'), \
46 mock.patch("cephadm.module.CephadmOrchestrator.remote"):
47
48 m = CephadmOrchestrator.__new__(CephadmOrchestrator)
49 if module_options is not None:
50 for k, v in module_options.items():
51 m._ceph_set_module_option('cephadm', k, v)
52 if store is None:
53 store = {}
54 if '_ceph_get/mon_map' not in store:
55 m.mock_store_set('_ceph_get', 'mon_map', {
56 'modified': datetime_to_str(datetime_now()),
57 'fsid': 'foobar',
58 })
59 for k, v in store.items():
60 m._ceph_set_store(k, v)
61
62 m.__init__('cephadm', 0, 0)
63 m._cluster_fsid = "fsid"
64 yield m
65
66
67 def wait(m, c):
68 # type: (CephadmOrchestrator, OrchResult) -> Any
69 return raise_if_exception(c)
70
71
72 @contextmanager
73 def with_host(m: CephadmOrchestrator, name, addr='1.2.3.4', refresh_hosts=True):
74 # type: (CephadmOrchestrator, str) -> None
75 with mock.patch("cephadm.utils.resolve_ip", return_value=addr):
76 wait(m, m.add_host(HostSpec(hostname=name)))
77 if refresh_hosts:
78 CephadmServe(m)._refresh_hosts_and_daemons()
79 yield
80 wait(m, m.remove_host(name))
81
82
83 def assert_rm_service(cephadm: CephadmOrchestrator, srv_name):
84 mon_or_mgr = cephadm.spec_store[srv_name].spec.service_type in ('mon', 'mgr')
85 if mon_or_mgr:
86 assert 'Unable' in wait(cephadm, cephadm.remove_service(srv_name))
87 return
88 assert wait(cephadm, cephadm.remove_service(srv_name)) == f'Removed service {srv_name}'
89 assert cephadm.spec_store[srv_name].deleted is not None
90 CephadmServe(cephadm)._check_daemons()
91 CephadmServe(cephadm)._apply_all_services()
92 assert cephadm.spec_store[srv_name].deleted
93 unmanaged = cephadm.spec_store[srv_name].spec.unmanaged
94 CephadmServe(cephadm)._purge_deleted_services()
95 if not unmanaged: # cause then we're not deleting daemons
96 assert srv_name not in cephadm.spec_store, f'{cephadm.spec_store[srv_name]!r}'
97
98
99 @contextmanager
100 def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=None, host: str = '') -> Iterator[List[str]]:
101 if spec.placement.is_empty() and host:
102 spec.placement = PlacementSpec(hosts=[host], count=1)
103 if meth is not None:
104 c = meth(cephadm_module, spec)
105 assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...'
106 else:
107 c = cephadm_module.apply([spec])
108 assert wait(cephadm_module, c) == [f'Scheduled {spec.service_name()} update...']
109
110 specs = [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())]
111 assert spec in specs
112
113 CephadmServe(cephadm_module)._apply_all_services()
114
115 dds = wait(cephadm_module, cephadm_module.list_daemons())
116 own_dds = [dd for dd in dds if dd.service_name() == spec.service_name()]
117 if host:
118 assert own_dds
119
120 yield [dd.name() for dd in own_dds]
121
122 assert_rm_service(cephadm_module, spec.service_name())
123
124
125 def _deploy_cephadm_binary(host):
126 def foo(*args, **kwargs):
127 return True
128 return foo