import fnmatch
+import asyncio
+import sys
+from tempfile import NamedTemporaryFile
from contextlib import contextmanager
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
from ceph.utils import datetime_to_str, datetime_now
-from cephadm.serve import CephadmServe
+from cephadm.serve import CephadmServe, cephadmNoImage
try:
- from typing import Any, Iterator, List
+ from typing import Any, Iterator, List, Callable, Dict
except ImportError:
pass
from cephadm import CephadmOrchestrator
-from orchestrator import raise_if_exception, OrchResult, HostSpec
+from orchestrator import raise_if_exception, OrchResult, HostSpec, DaemonDescriptionStatus
from tests import mock
+def async_side_effect(result):
+ async def side_effect(*args, **kwargs):
+ return result
+ return side_effect
+
+
def get_ceph_option(_, key):
return __file__
def _run_cephadm(ret):
- def foo(s, host, entity, cmd, e, **kwargs):
+ async def foo(s, host, entity, cmd, e, **kwargs):
if cmd == 'gather-facts':
return '{}', '', 0
return [ret], '', 0
assert pat in val
+class MockEventLoopThread:
+ def get_result(self, coro):
+ if sys.version_info >= (3, 7):
+ return asyncio.run(coro)
+
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ try:
+ return loop.run_until_complete(coro)
+ finally:
+ loop.close()
+ asyncio.set_event_loop(None)
+
+
+def receive_agent_metadata(m: CephadmOrchestrator, host: str, ops: List[str] = None) -> None:
+ to_update: Dict[str, Callable[[str, Any], None]] = {
+ 'ls': m._process_ls_output,
+ 'gather-facts': m.cache.update_host_facts,
+ 'list-networks': m.cache.update_host_networks,
+ }
+ if ops:
+ for op in ops:
+ out = m.wait_async(CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, []))
+ to_update[op](host, out)
+ m.cache.last_daemon_update[host] = datetime_now()
+ m.cache.last_facts_update[host] = datetime_now()
+ m.cache.last_network_update[host] = datetime_now()
+ m.cache.metadata_up_to_date[host] = True
+
+
+def receive_agent_metadata_all_hosts(m: CephadmOrchestrator) -> None:
+ for host in m.cache.get_hosts():
+ receive_agent_metadata(m, host)
+
+
@contextmanager
def with_cephadm_module(module_options=None, store=None):
"""
with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
- mock.patch("cephadm.services.osd.OSDService.get_osdspec_affinity", return_value='test_spec'), \
- mock.patch("cephadm.module.CephadmOrchestrator.remote"):
+ mock.patch("cephadm.module.CephadmOrchestrator.remote"), \
+ mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \
+ mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \
+ mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \
+ mock.patch('cephadm.agent.CherryPyThread.run'):
m = CephadmOrchestrator.__new__(CephadmOrchestrator)
if module_options is not None:
m.__init__('cephadm', 0, 0)
m._cluster_fsid = "fsid"
+
+ m.event_loop = MockEventLoopThread()
+ m.tkey = NamedTemporaryFile(prefix='test-cephadm-identity-')
+
yield m
@contextmanager
-def with_host(m: CephadmOrchestrator, name, addr='1::4', refresh_hosts=True):
- # type: (CephadmOrchestrator, str) -> None
+def with_host(m: CephadmOrchestrator, name, addr='1::4', refresh_hosts=True, rm_with_force=True):
with mock.patch("cephadm.utils.resolve_ip", return_value=addr):
wait(m, m.add_host(HostSpec(hostname=name)))
if refresh_hosts:
CephadmServe(m)._refresh_hosts_and_daemons()
+ receive_agent_metadata(m, name)
yield
- wait(m, m.remove_host(name))
+ wait(m, m.remove_host(name, force=rm_with_force))
def assert_rm_service(cephadm: CephadmOrchestrator, srv_name):
@contextmanager
-def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=None, host: str = '') -> Iterator[List[str]]:
+def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=None, host: str = '', status_running=False) -> Iterator[List[str]]:
if spec.placement.is_empty() and host:
spec.placement = PlacementSpec(hosts=[host], count=1)
if meth is not None:
CephadmServe(cephadm_module)._apply_all_services()
+ if status_running:
+ make_daemons_running(cephadm_module, spec.service_name())
+
dds = wait(cephadm_module, cephadm_module.list_daemons())
own_dds = [dd for dd in dds if dd.service_name() == spec.service_name()]
- if host:
+ if host and spec.service_type != 'osd':
assert own_dds
yield [dd.name() for dd in own_dds]
assert_rm_service(cephadm_module, spec.service_name())
-def _deploy_cephadm_binary(host):
- def foo(*args, **kwargs):
- return True
- return foo
+def make_daemons_running(cephadm_module, service_name):
+ own_dds = cephadm_module.cache.get_daemons_by_service(service_name)
+ for dd in own_dds:
+ dd.status = DaemonDescriptionStatus.running # We're changing the reference