]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/tests/fixtures.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / cephadm / tests / fixtures.py
1 import time
2 import fnmatch
3 try:
4 from typing import Any
5 except ImportError:
6 pass
7 import pytest
8
9 from cephadm import CephadmOrchestrator
10 from orchestrator import raise_if_exception, Completion
11 from tests import mock
12
13
14
15 def get_ceph_option(_, key):
16 return __file__
17
18
19 def _run_cephadm(ret):
20 def foo(*args, **kwargs):
21 return ret, '', 0
22 return foo
23
24
25 def match_glob(val, pat):
26 ok = fnmatch.fnmatchcase(val, pat)
27 if not ok:
28 assert pat in val
29
30
31 def mon_command(*args, **kwargs):
32 return 0, '', ''
33
34
35 @pytest.yield_fixture()
36 def cephadm_module():
37 with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
38 mock.patch("cephadm.module.CephadmOrchestrator.remote"),\
39 mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \
40 mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command):
41
42 m = CephadmOrchestrator.__new__ (CephadmOrchestrator)
43 m.__init__('cephadm', 0, 0)
44 m._cluster_fsid = "fsid"
45 yield m
46
47
48 def wait(m, c):
49 # type: (CephadmOrchestrator, Completion) -> Any
50 m.process([c])
51
52 try:
53 import pydevd # if in debugger
54 in_debug = True
55 except ImportError:
56 in_debug = False
57
58 if in_debug:
59 while True: # don't timeout
60 if c.is_finished:
61 raise_if_exception(c)
62 return c.result
63 time.sleep(0.1)
64 else:
65 for i in range(30):
66 if i % 10 == 0:
67 m.process([c])
68 if c.is_finished:
69 raise_if_exception(c)
70 return c.result
71 time.sleep(0.1)
72 assert False, "timeout" + str(c._state)