]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/tests/fixtures.py
Import ceph 15.2.8
[ceph.git] / ceph / src / pybind / mgr / cephadm / tests / fixtures.py
1 import datetime
2 import time
3 import fnmatch
4 from contextlib import contextmanager
5
6 from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
7 from cephadm.module import CEPH_DATEFMT
8 from cephadm.serve import CephadmServe
9
10 try:
11 from typing import Any, Iterator, List
12 except ImportError:
13 pass
14 import pytest
15
16 from cephadm import CephadmOrchestrator
17 from cephadm.services.osd import RemoveUtil, OSD
18 from orchestrator import raise_if_exception, Completion, HostSpec
19 from tests import mock
20
21
22 def get_ceph_option(_, key):
23 return __file__
24
25
26 def _run_cephadm(ret):
27 def foo(*args, **kwargs):
28 return [ret], '', 0
29 return foo
30
31
32 def match_glob(val, pat):
33 ok = fnmatch.fnmatchcase(val, pat)
34 if not ok:
35 assert pat in val
36
37
38 @contextmanager
39 def with_cephadm_module(module_options=None, store=None):
40 """
41 :param module_options: Set opts as if they were set before module.__init__ is called
42 :param store: Set the store before module.__init__ is called
43 """
44 with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
45 mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
46 mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
47 mock.patch("cephadm.services.osd.OSDService.get_osdspec_affinity", return_value='test_spec'), \
48 mock.patch("cephadm.module.CephadmOrchestrator.remote"):
49
50 m = CephadmOrchestrator.__new__(CephadmOrchestrator)
51 if module_options is not None:
52 for k, v in module_options.items():
53 m._ceph_set_module_option('cephadm', k, v)
54 if store is None:
55 store = {}
56 if '_ceph_get/mon_map' not in store:
57 m.mock_store_set('_ceph_get', 'mon_map', {
58 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
59 'fsid': 'foobar',
60 })
61 for k, v in store.items():
62 m._ceph_set_store(k, v)
63
64 m.__init__('cephadm', 0, 0)
65 m._cluster_fsid = "fsid"
66 yield m
67
68
69 @pytest.yield_fixture()
70 def cephadm_module():
71 with with_cephadm_module({}) as m:
72 yield m
73
74
75 @pytest.yield_fixture()
76 def rm_util():
77 with with_cephadm_module({}) as m:
78 r = RemoveUtil.__new__(RemoveUtil)
79 r.__init__(m)
80 yield r
81
82
83 @pytest.yield_fixture()
84 def osd_obj():
85 with mock.patch("cephadm.services.osd.RemoveUtil"):
86 o = OSD(0, mock.MagicMock())
87 yield o
88
89
90 def wait(m, c):
91 # type: (CephadmOrchestrator, Completion) -> Any
92 m.process([c])
93
94 try:
95 import pydevd # if in debugger
96 in_debug = True
97 except ImportError:
98 in_debug = False
99
100 if in_debug:
101 while True: # don't timeout
102 if c.is_finished:
103 raise_if_exception(c)
104 return c.result
105 time.sleep(0.1)
106 else:
107 for i in range(30):
108 if i % 10 == 0:
109 m.process([c])
110 if c.is_finished:
111 raise_if_exception(c)
112 return c.result
113 time.sleep(0.1)
114 assert False, "timeout" + str(c._state)
115
116
117 @contextmanager
118 def with_host(m: CephadmOrchestrator, name, refresh_hosts=True):
119 # type: (CephadmOrchestrator, str) -> None
120 wait(m, m.add_host(HostSpec(hostname=name)))
121 if refresh_hosts:
122 CephadmServe(m)._refresh_hosts_and_daemons()
123 yield
124 wait(m, m.remove_host(name))
125
126
127 def assert_rm_service(cephadm, srv_name):
128 assert wait(cephadm, cephadm.remove_service(srv_name)) == f'Removed service {srv_name}'
129 CephadmServe(cephadm)._apply_all_services()
130
131
132 @contextmanager
133 def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth, host: str) -> Iterator[List[str]]:
134 if spec.placement.is_empty():
135 spec.placement = PlacementSpec(hosts=[host], count=1)
136 c = meth(cephadm_module, spec)
137 assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...'
138 specs = [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())]
139 assert spec in specs
140
141 CephadmServe(cephadm_module)._apply_all_services()
142
143 dds = wait(cephadm_module, cephadm_module.list_daemons())
144 own_dds = [dd for dd in dds if dd.service_name() == spec.service_name()]
145 assert own_dds
146
147 yield [dd.name() for dd in own_dds]
148
149 assert_rm_service(cephadm_module, spec.service_name())