]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/tests/fixtures.py
9 from cephadm
import CephadmOrchestrator
10 from orchestrator
import raise_if_exception
, Completion
11 from tests
import mock
14 def set_store(self
, k
, v
):
21 def get_store(self
, k
):
22 return self
._store
.get(k
, None)
25 def get_store_prefix(self
, prefix
):
27 k
: v
for k
, v
in self
._store
.items()
28 if k
.startswith(prefix
)
32 def get_ceph_option(_
, key
):
36 def _run_cephadm(ret
):
37 def foo(*args
, **kwargs
):
42 def match_glob(val
, pat
):
43 ok
= fnmatch
.fnmatchcase(val
, pat
)
48 def mon_command(*args
, **kwargs
):
52 @pytest.yield_fixture()
54 with mock
.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option
),\
55 mock
.patch("cephadm.module.CephadmOrchestrator._configure_logging", lambda *args
: None),\
56 mock
.patch("cephadm.module.CephadmOrchestrator.remote"),\
57 mock
.patch("cephadm.module.CephadmOrchestrator.set_store", set_store
), \
58 mock
.patch("cephadm.module.CephadmOrchestrator.get_store", get_store
),\
59 mock
.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')), \
60 mock
.patch("cephadm.module.HostCache.save_host"), \
61 mock
.patch("cephadm.module.HostCache.rm_host"), \
62 mock
.patch("cephadm.module.CephadmOrchestrator.send_command"), \
63 mock
.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command
), \
64 mock
.patch("cephadm.module.CephadmOrchestrator.get_store_prefix", get_store_prefix
):
66 CephadmOrchestrator
._register
_commands
('')
67 CephadmOrchestrator
._register
_options
('')
68 m
= CephadmOrchestrator
.__new
__ (CephadmOrchestrator
)
69 m
._root
_logger
= mock
.MagicMock()
72 'ssh_identity_key': '',
73 'ssh_identity_pub': '',
75 'upgrade_state': None,
77 m
.__init
__('cephadm', 0, 0)
78 m
._cluster
_fsid
= "fsid"
83 # type: (CephadmOrchestrator, Completion) -> Any
87 import pydevd
# if in debugger
93 while True: # don't timeout
103 raise_if_exception(c
)
106 assert False, "timeout" + str(c
._state
)