]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/tests/fixtures.py
bump version to 15.2.4-pve1
[ceph.git] / ceph / src / pybind / mgr / cephadm / tests / fixtures.py
CommitLineData
9f95a23c
TL
1import time
2import fnmatch
3try:
4 from typing import Any
5except ImportError:
6 pass
7import pytest
8
9from cephadm import CephadmOrchestrator
10from orchestrator import raise_if_exception, Completion
11from tests import mock
12
13
9f95a23c
TL
14
15def get_ceph_option(_, key):
16 return __file__
17
18
19def _run_cephadm(ret):
20 def foo(*args, **kwargs):
21 return ret, '', 0
22 return foo
23
24
25def match_glob(val, pat):
26 ok = fnmatch.fnmatchcase(val, pat)
27 if not ok:
28 assert pat in val
29
30
31def mon_command(*args, **kwargs):
32 return 0, '', ''
33
34
35@pytest.yield_fixture()
36def cephadm_module():
37 with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
9f95a23c 38 mock.patch("cephadm.module.CephadmOrchestrator.remote"),\
9f95a23c 39 mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \
e306af50 40 mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command):
9f95a23c 41
9f95a23c 42 m = CephadmOrchestrator.__new__ (CephadmOrchestrator)
9f95a23c
TL
43 m.__init__('cephadm', 0, 0)
44 m._cluster_fsid = "fsid"
45 yield m
46
47
48def wait(m, c):
49 # type: (CephadmOrchestrator, Completion) -> Any
50 m.process([c])
51
52 try:
53 import pydevd # if in debugger
54 in_debug = True
55 except ImportError:
56 in_debug = False
57
58 if in_debug:
59 while True: # don't timeout
60 if c.is_finished:
61 raise_if_exception(c)
62 return c.result
63 time.sleep(0.1)
64 else:
65 for i in range(30):
66 if i % 10 == 0:
67 m.process([c])
68 if c.is_finished:
69 raise_if_exception(c)
70 return c.result
71 time.sleep(0.1)
72 assert False, "timeout" + str(c._state)