]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/tests/fixtures.py
9 from cephadm
import CephadmOrchestrator
10 from orchestrator
import raise_if_exception
, Completion
11 from tests
import mock
15 def get_ceph_option(_
, key
):
19 def _run_cephadm(ret
):
20 def foo(*args
, **kwargs
):
25 def match_glob(val
, pat
):
26 ok
= fnmatch
.fnmatchcase(val
, pat
)
31 def mon_command(*args
, **kwargs
):
35 @pytest.yield_fixture()
37 with mock
.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option
),\
38 mock
.patch("cephadm.module.CephadmOrchestrator.remote"),\
39 mock
.patch("cephadm.module.CephadmOrchestrator.send_command"), \
40 mock
.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command
):
42 m
= CephadmOrchestrator
.__new
__ (CephadmOrchestrator
)
43 m
.__init
__('cephadm', 0, 0)
44 m
._cluster
_fsid
= "fsid"
49 # type: (CephadmOrchestrator, Completion) -> Any
53 import pydevd
# if in debugger
59 while True: # don't timeout
72 assert False, "timeout" + str(c
._state
)