]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/tests/fixtures.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / cephadm / tests / fixtures.py
index 6a31395291fa12a036b68b760f9542f84d3e2487..33c6d19a4bfd54433fc8a78162e0ca32d853926e 100644 (file)
@@ -1,26 +1,35 @@
 import fnmatch
+import asyncio
+import sys
+from tempfile import NamedTemporaryFile
 from contextlib import contextmanager
 
 from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
 from ceph.utils import datetime_to_str, datetime_now
-from cephadm.serve import CephadmServe
+from cephadm.serve import CephadmServe, cephadmNoImage
 
 try:
-    from typing import Any, Iterator, List
+    from typing import Any, Iterator, List, Callable, Dict
 except ImportError:
     pass
 
 from cephadm import CephadmOrchestrator
-from orchestrator import raise_if_exception, OrchResult, HostSpec
+from orchestrator import raise_if_exception, OrchResult, HostSpec, DaemonDescriptionStatus
 from tests import mock
 
 
+def async_side_effect(result):
+    async def side_effect(*args, **kwargs):
+        return result
+    return side_effect
+
+
 def get_ceph_option(_, key):
     return __file__
 
 
 def _run_cephadm(ret):
-    def foo(s, host, entity, cmd, e, **kwargs):
+    async def foo(s, host, entity, cmd, e, **kwargs):
         if cmd == 'gather-facts':
             return '{}', '', 0
         return [ret], '', 0
@@ -33,6 +42,41 @@ def match_glob(val, pat):
         assert pat in val
 
 
+class MockEventLoopThread:
+    def get_result(self, coro):
+        if sys.version_info >= (3, 7):
+            return asyncio.run(coro)
+
+        loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(loop)
+        try:
+            return loop.run_until_complete(coro)
+        finally:
+            loop.close()
+            asyncio.set_event_loop(None)
+
+
+def receive_agent_metadata(m: CephadmOrchestrator, host: str, ops: List[str] = None) -> None:
+    to_update: Dict[str, Callable[[str, Any], None]] = {
+        'ls': m._process_ls_output,
+        'gather-facts': m.cache.update_host_facts,
+        'list-networks': m.cache.update_host_networks,
+    }
+    if ops:
+        for op in ops:
+            out = m.wait_async(CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, []))
+            to_update[op](host, out)
+    m.cache.last_daemon_update[host] = datetime_now()
+    m.cache.last_facts_update[host] = datetime_now()
+    m.cache.last_network_update[host] = datetime_now()
+    m.cache.metadata_up_to_date[host] = True
+
+
+def receive_agent_metadata_all_hosts(m: CephadmOrchestrator) -> None:
+    for host in m.cache.get_hosts():
+        receive_agent_metadata(m, host)
+
+
 @contextmanager
 def with_cephadm_module(module_options=None, store=None):
     """
@@ -42,8 +86,11 @@ def with_cephadm_module(module_options=None, store=None):
     with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
             mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
             mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
-            mock.patch("cephadm.services.osd.OSDService.get_osdspec_affinity", return_value='test_spec'), \
-            mock.patch("cephadm.module.CephadmOrchestrator.remote"):
+            mock.patch("cephadm.module.CephadmOrchestrator.remote"), \
+            mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \
+            mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \
+            mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \
+            mock.patch('cephadm.agent.CherryPyThread.run'):
 
         m = CephadmOrchestrator.__new__(CephadmOrchestrator)
         if module_options is not None:
@@ -69,6 +116,10 @@ def with_cephadm_module(module_options=None, store=None):
 
         m.__init__('cephadm', 0, 0)
         m._cluster_fsid = "fsid"
+
+        m.event_loop = MockEventLoopThread()
+        m.tkey = NamedTemporaryFile(prefix='test-cephadm-identity-')
+
         yield m
 
 
@@ -78,14 +129,14 @@ def wait(m, c):
 
 
 @contextmanager
-def with_host(m: CephadmOrchestrator, name, addr='1::4', refresh_hosts=True):
-    # type: (CephadmOrchestrator, str) -> None
+def with_host(m: CephadmOrchestrator, name, addr='1::4', refresh_hosts=True, rm_with_force=True):
     with mock.patch("cephadm.utils.resolve_ip", return_value=addr):
         wait(m, m.add_host(HostSpec(hostname=name)))
         if refresh_hosts:
             CephadmServe(m)._refresh_hosts_and_daemons()
+            receive_agent_metadata(m, name)
         yield
-        wait(m, m.remove_host(name))
+        wait(m, m.remove_host(name, force=rm_with_force))
 
 
 def assert_rm_service(cephadm: CephadmOrchestrator, srv_name):
@@ -105,7 +156,7 @@ def assert_rm_service(cephadm: CephadmOrchestrator, srv_name):
 
 
 @contextmanager
-def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=None, host: str = '') -> Iterator[List[str]]:
+def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=None, host: str = '', status_running=False) -> Iterator[List[str]]:
     if spec.placement.is_empty() and host:
         spec.placement = PlacementSpec(hosts=[host], count=1)
     if meth is not None:
@@ -120,9 +171,12 @@ def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=No
 
     CephadmServe(cephadm_module)._apply_all_services()
 
+    if status_running:
+        make_daemons_running(cephadm_module, spec.service_name())
+
     dds = wait(cephadm_module, cephadm_module.list_daemons())
     own_dds = [dd for dd in dds if dd.service_name() == spec.service_name()]
-    if host:
+    if host and spec.service_type != 'osd':
         assert own_dds
 
     yield [dd.name() for dd in own_dds]
@@ -130,7 +184,7 @@ def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth=No
     assert_rm_service(cephadm_module, spec.service_name())
 
 
-def _deploy_cephadm_binary(host):
-    def foo(*args, **kwargs):
-        return True
-    return foo
+def make_daemons_running(cephadm_module, service_name):
+    own_dds = cephadm_module.cache.get_daemons_by_service(service_name)
+    for dd in own_dds:
+        dd.status = DaemonDescriptionStatus.running  # We're changing the reference