+import datetime
import time
import fnmatch
+from contextlib import contextmanager
+
+from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
+from cephadm.module import CEPH_DATEFMT
+
try:
from typing import Any
except ImportError:
import pytest
from cephadm import CephadmOrchestrator
-from orchestrator import raise_if_exception, Completion
+from cephadm.services.osd import RemoveUtil, OSD
+from orchestrator import raise_if_exception, Completion, HostSpec
from tests import mock
-
def get_ceph_option(_, key):
return __file__
assert pat in val
-def mon_command(*args, **kwargs):
- return 0, '', ''
-
-
-@pytest.yield_fixture()
-def cephadm_module():
+@contextmanager
+def with_cephadm_module(module_options=None, store=None):
+ """
+ :param module_options: Set opts as if they were set before module.__init__ is called
+ :param store: Set the store before module.__init__ is called
+ """
with mock.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option),\
- mock.patch("cephadm.module.CephadmOrchestrator.remote"),\
- mock.patch("cephadm.module.CephadmOrchestrator.send_command"), \
- mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command):
+ mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
+ mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
+ mock.patch("cephadm.services.osd.OSDService.get_osdspec_affinity", return_value='test_spec'), \
+ mock.patch("cephadm.module.CephadmOrchestrator.remote"):
m = CephadmOrchestrator.__new__ (CephadmOrchestrator)
+ if module_options is not None:
+ for k, v in module_options.items():
+ m._ceph_set_module_option('cephadm', k, v)
+ if store is None:
+ store = {}
+ if '_ceph_get/mon_map' not in store:
+ m.mock_store_set('_ceph_get', 'mon_map', {
+ 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'fsid': 'foobar',
+ })
+ for k, v in store.items():
+ m._ceph_set_store(k, v)
+
m.__init__('cephadm', 0, 0)
m._cluster_fsid = "fsid"
yield m
+@pytest.yield_fixture()
+def cephadm_module():
+ with with_cephadm_module({}) as m:
+ yield m
+
+
+@pytest.yield_fixture()
+def rm_util():
+ with with_cephadm_module({}) as m:
+ r = RemoveUtil.__new__(RemoveUtil)
+ r.__init__(m)
+ yield r
+
+
+@pytest.yield_fixture()
+def osd_obj():
+ with mock.patch("cephadm.services.osd.RemoveUtil"):
+ o = OSD(0, mock.MagicMock())
+ yield o
+
+
def wait(m, c):
# type: (CephadmOrchestrator, Completion) -> Any
m.process([c])
return c.result
time.sleep(0.1)
assert False, "timeout" + str(c._state)
+
+
+@contextmanager
+def with_host(m:CephadmOrchestrator, name):
+ # type: (CephadmOrchestrator, str) -> None
+ wait(m, m.add_host(HostSpec(hostname=name)))
+ yield
+ wait(m, m.remove_host(name))
+
+
+def assert_rm_service(cephadm, srv_name):
+ assert wait(cephadm, cephadm.remove_service(srv_name)) == f'Removed service {srv_name}'
+ cephadm._apply_all_services()
+
+
+@contextmanager
+def with_service(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth, host: str):
+ if spec.placement.is_empty():
+ spec.placement = PlacementSpec(hosts=[host], count=1)
+ c = meth(cephadm_module, spec)
+ assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...'
+ specs = [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())]
+ assert spec in specs
+
+ cephadm_module._apply_all_services()
+
+ dds = wait(cephadm_module, cephadm_module.list_daemons())
+ names = {dd.service_name() for dd in dds}
+ assert spec.service_name() in names, dds
+
+ yield
+
+ assert_rm_service(cephadm_module, spec.service_name())
\ No newline at end of file