import datetime
import json
from contextlib import contextmanager
+from unittest.mock import ANY
import pytest
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
-from cephadm.services.osd import OSDRemoval
+from cephadm.services.osd import OSD, OSDQueue
try:
from typing import Any, List
from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
HostSpec, OrchestratorError
from tests import mock
-from .fixtures import cephadm_module, wait, _run_cephadm, mon_command, match_glob
-from cephadm.module import CephadmOrchestrator
-
+from .fixtures import cephadm_module, wait, _run_cephadm, match_glob, with_host, \
+ with_cephadm_module, with_service, assert_rm_service
+from cephadm.module import CephadmOrchestrator, CEPH_DATEFMT
"""
TODOs:
"""
-def assert_rm_service(cephadm, srv_name):
- assert wait(cephadm, cephadm.remove_service(srv_name)) == [
- f'Removed service {srv_name}']
- cephadm._apply_all_services()
-
-
def assert_rm_daemon(cephadm: CephadmOrchestrator, prefix, host):
dds: List[DaemonDescription] = wait(cephadm, cephadm.list_daemons(host=host))
d_names = [dd.name() for dd in dds if dd.name().startswith(prefix)]
match_glob(out, f"Removed {d_names}* from host '{host}'")
-class TestCephadm(object):
+@contextmanager
+def with_daemon(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth, host: str):
+ spec.placement = PlacementSpec(hosts=[host], count=1)
+
+ c = meth(cephadm_module, spec)
+ [out] = wait(cephadm_module, c)
+ match_glob(out, f"Deployed {spec.service_name()}.* on host '{host}'")
- @contextmanager
- def _with_host(self, m, name):
- # type: (CephadmOrchestrator, str) -> None
- wait(m, m.add_host(HostSpec(hostname=name)))
- yield
- wait(m, m.remove_host(name))
+ dds = cephadm_module.cache.get_daemons_by_service(spec.service_name())
+ for dd in dds:
+ if dd.hostname == host:
+ yield dd.daemon_id
+ assert_rm_daemon(cephadm_module, spec.service_name(), host)
+ return
+
+ assert False, 'Daemon not found'
+
+
+class TestCephadm(object):
def test_get_unique_name(self, cephadm_module):
# type: (CephadmOrchestrator) -> None
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_host(self, cephadm_module):
assert wait(cephadm_module, cephadm_module.get_hosts()) == []
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', 'test')]
# Be careful with backward compatibility when changing things here:
- assert json.loads(cephadm_module._store['inventory']) == \
+ assert json.loads(cephadm_module.get_store('inventory')) == \
{"test": {"hostname": "test", "addr": "test", "labels": [], "status": ""}}
- with self._with_host(cephadm_module, 'second'):
+ with with_host(cephadm_module, 'second'):
assert wait(cephadm_module, cephadm_module.get_hosts()) == [
HostSpec('test', 'test'),
HostSpec('second', 'second')
assert wait(cephadm_module, cephadm_module.get_hosts()) == []
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+ @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
def test_service_ls(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
assert wait(cephadm_module, c) == []
- ps = PlacementSpec(hosts=['test'], count=1)
- c = cephadm_module.add_mds(ServiceSpec('mds', 'name', placement=ps))
- [out] = wait(cephadm_module, c)
- match_glob(out, "Deployed mds.name.* on host 'test'")
-
- c = cephadm_module.list_daemons()
+ with with_daemon(cephadm_module, ServiceSpec('mds', 'name'), CephadmOrchestrator.add_mds, 'test'):
- def remove_id(dd):
- out = dd.to_json()
- del out['daemon_id']
- return out
-
- assert [remove_id(dd) for dd in wait(cephadm_module, c)] == [
- {
- 'daemon_type': 'mds',
- 'hostname': 'test',
- 'status': 1,
- 'status_desc': 'starting'}
- ]
+ c = cephadm_module.list_daemons()
- ps = PlacementSpec(hosts=['test'], count=1)
- spec = ServiceSpec('rgw', 'r.z', placement=ps)
- c = cephadm_module.apply_rgw(spec)
- assert wait(cephadm_module, c) == 'Scheduled rgw.r.z update...'
+ def remove_id_events(dd):
+ out = dd.to_json()
+ del out['daemon_id']
+ del out['events']
+ return out
- c = cephadm_module.describe_service()
- out = [o.to_json() for o in wait(cephadm_module, c)]
- expected = [
- {
- 'placement': {'hosts': [{'hostname': 'test', 'name': '', 'network': ''}]},
- 'service_id': 'name',
- 'service_name': 'mds.name',
- 'service_type': 'mds',
- 'status': {'running': 1, 'size': 0},
- 'unmanaged': True
- },
- {
- 'placement': {
- 'count': 1,
- 'hosts': [{'hostname': 'test', 'name': '', 'network': ''}]
- },
- 'rgw_realm': 'r',
- 'rgw_zone': 'z',
- 'service_id': 'r.z',
- 'service_name': 'rgw.r.z',
- 'service_type': 'rgw',
- 'status': {'running': 0, 'size': 1}
- }
- ]
- assert out == expected
- assert [ServiceDescription.from_json(o).to_json() for o in expected] == expected
+ assert [remove_id_events(dd) for dd in wait(cephadm_module, c)] == [
+ {
+ 'daemon_type': 'mds',
+ 'hostname': 'test',
+ 'status': 1,
+ 'status_desc': 'starting',
+ 'is_active': False}
+ ]
- assert_rm_service(cephadm_module, 'rgw.r.z')
- assert_rm_daemon(cephadm_module, 'mds.name', 'test')
+ with with_service(cephadm_module, ServiceSpec('rgw', 'r.z'), CephadmOrchestrator.apply_rgw, 'test'):
+
+ c = cephadm_module.describe_service()
+ out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
+ expected = [
+ {
+ 'placement': {'hosts': [{'hostname': 'test', 'name': '', 'network': ''}]},
+ 'service_id': 'name',
+ 'service_name': 'mds.name',
+ 'service_type': 'mds',
+ 'status': {'running': 1, 'size': 0},
+ 'unmanaged': True
+ },
+ {
+ 'placement': {
+ 'count': 1,
+ 'hosts': [{'hostname': 'test', 'name': '', 'network': ''}]
+ },
+ 'spec': {
+ 'rgw_realm': 'r',
+ 'rgw_zone': 'z',
+ },
+ 'service_id': 'r.z',
+ 'service_name': 'rgw.r.z',
+ 'service_type': 'rgw',
+ 'status': {'created': mock.ANY, 'running': 1, 'size': 1},
+ }
+ ]
+ for o in out:
+ if 'events' in o:
+ del o['events'] # delete it, as it contains a timestamp
+ assert out == expected
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_device_ls(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
c = cephadm_module.get_inventory()
assert wait(cephadm_module, c) == [InventoryHost('test')]
)
])
))
- def test_daemon_action(self, cephadm_module):
+ def test_list_daemons(self, cephadm_module: CephadmOrchestrator):
cephadm_module.service_cache_timeout = 10
- with self._with_host(cephadm_module, 'test'):
- c = cephadm_module.list_daemons(refresh=True)
- wait(cephadm_module, c)
- c = cephadm_module.daemon_action('redeploy', 'rgw', 'myrgw.foobar')
- assert wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"]
+ with with_host(cephadm_module, 'test'):
+ cephadm_module._refresh_host_daemons('test')
+ c = cephadm_module.list_daemons()
+ assert wait(cephadm_module, c)[0].name() == 'rgw.myrgw.foobar'
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+ @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
+ def test_daemon_action(self, cephadm_module: CephadmOrchestrator):
+ cephadm_module.service_cache_timeout = 10
+ with with_host(cephadm_module, 'test'):
+ with with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
+
+ c = cephadm_module.daemon_action('redeploy', 'rgw.' + daemon_id)
+ assert wait(cephadm_module, c) == f"Deployed rgw.{daemon_id} on host 'test'"
+
+ for what in ('start', 'stop', 'restart'):
+ c = cephadm_module.daemon_action(what, 'rgw.' + daemon_id)
+ assert wait(cephadm_module, c) == what + f" rgw.{daemon_id} from host 'test'"
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module._store['_ceph_get/mon_map'] = {
+ 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'fsid': 'foobar',
+ }
+ cephadm_module.notify('mon_map', None)
+
+ cephadm_module._check_daemons()
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+ @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
+ def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator):
+ cephadm_module.service_cache_timeout = 10
+ with with_host(cephadm_module, 'test'):
+ with with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
+ with mock.patch('ceph_module.BaseMgrModule._ceph_send_command') as _ceph_send_command:
+
+ _ceph_send_command.side_effect = Exception("myerror")
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
+ 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'fsid': 'foobar',
+ })
+ cephadm_module.notify('mon_map', None)
+
+ cephadm_module._check_daemons()
- for what in ('start', 'stop', 'restart'):
- c = cephadm_module.daemon_action(what, 'rgw', 'myrgw.foobar')
- assert wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
+ evs = [e.message for e in cephadm_module.events.get_for_daemon(f'rgw.{daemon_id}')]
+
+ assert 'myerror' in ''.join(evs)
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ def test_daemon_check_post(self, cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test'):
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
+ 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'fsid': 'foobar',
+ })
+ cephadm_module.notify('mon_map', None)
+ cephadm_module.mock_store_set('_ceph_get', 'mgr_map', {
+ 'modules': ['dashboard']
+ })
+
+ with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd:
+
+ cephadm_module._check_daemons()
+ _mon_cmd.assert_any_call({'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'})
- assert_rm_daemon(cephadm_module, 'rgw.myrgw.foobar', 'test')
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_mon_add(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_mgr_update(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
r = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps))
assert r
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.return_value = ('{}', '', 0)
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
spec = DriveGroupSpec(
service_id='foo',
)
)
- c = cephadm_module.apply_drivegroups([spec])
+ c = cephadm_module.apply([spec])
assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
inventory = Devices([
_run_cephadm.assert_any_call(
'test', 'osd', 'ceph-volume',
['--config-json', '-', '--', 'lvm', 'prepare', '--bluestore', '--data', '/dev/sdb', '--no-systemd'],
- env_vars=[], error_ok=True, stdin='{"config": "", "keyring": ""}')
+ env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=foo'], error_ok=True, stdin='{"config": "", "keyring": ""}')
_run_cephadm.assert_called_with('test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'])
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")
def test_apply_osd_save_placement(self, _save_spec, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
json_spec = {'service_type': 'osd', 'placement': {'host_pattern': 'test'}, 'service_id': 'foo', 'data_devices': {'all': True}}
spec = ServiceSpec.from_json(json_spec)
assert isinstance(spec, DriveGroupSpec)
- c = cephadm_module.apply_drivegroups([spec])
+ c = cephadm_module.apply([spec])
assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
_save_spec.assert_called_with(spec)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_create_osds(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
c = cephadm_module.create_osds(dg)
out = wait(cephadm_module, c)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_prepare_drivegroup(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
out = cephadm_module.osd_service.prepare_drivegroup(dg)
assert len(out) == 1
)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
preview = preview
- out = cephadm_module.osd_service.driveselection_to_ceph_volume(dg, ds, [], preview)
+ out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
assert out in exp_command
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
)
])
))
+ @mock.patch("cephadm.services.osd.OSD.exists", True)
@mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
def test_remove_osds(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
- c = cephadm_module.list_daemons(refresh=True)
+ with with_host(cephadm_module, 'test'):
+ cephadm_module._refresh_host_daemons('test')
+ c = cephadm_module.list_daemons()
wait(cephadm_module, c)
c = cephadm_module.remove_daemons(['osd.0'])
out = wait(cephadm_module, c)
assert out == ["Removed osd.0 from host 'test'"]
- osd_removal_op = OSDRemoval(0, False, False, 'test', 'osd.0', datetime.datetime.utcnow(), -1)
- cephadm_module.rm_util.queue_osds_for_removal({osd_removal_op})
- cephadm_module.rm_util._remove_osds_bg()
- assert cephadm_module.rm_util.to_remove_osds == set()
+ cephadm_module.to_remove_osds.enqueue(OSD(osd_id=0,
+ replace=False,
+ force=False,
+ hostname='test',
+ fullname='osd.0',
+ process_started_at=datetime.datetime.utcnow(),
+ remove_util=cephadm_module.rm_util
+ ))
+ cephadm_module.rm_util.process_removal_queue()
+ assert cephadm_module.to_remove_osds == OSDQueue()
c = cephadm_module.remove_osds_status()
out = wait(cephadm_module, c)
- assert out == set()
+ assert out == []
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
def test_rgw_update(self, cephadm_module):
- with self._with_host(cephadm_module, 'host1'):
- with self._with_host(cephadm_module, 'host2'):
+ with with_host(cephadm_module, 'host1'):
+ with with_host(cephadm_module, 'host2'):
ps = PlacementSpec(hosts=['host1'], count=1)
c = cephadm_module.add_rgw(RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
[out] = wait(cephadm_module, c)
])
))
def test_remove_daemon(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
- c = cephadm_module.list_daemons(refresh=True)
+ with with_host(cephadm_module, 'test'):
+ cephadm_module._refresh_host_daemons('test')
+ c = cephadm_module.list_daemons()
wait(cephadm_module, c)
c = cephadm_module.remove_daemons(['rgw.myrgw.myhost.myid'])
out = wait(cephadm_module, c)
]
)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
def test_daemon_add(self, spec: ServiceSpec, meth, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
- spec.placement = PlacementSpec(hosts=['test'], count=1)
-
- c = meth(cephadm_module, spec)
- [out] = wait(cephadm_module, c)
- match_glob(out, f"Deployed {spec.service_name()}.* on host 'test'")
-
- assert_rm_daemon(cephadm_module, spec.service_name(), 'test')
+ with with_host(cephadm_module, 'test'):
+ with with_daemon(cephadm_module, spec, meth, 'test'):
+ pass
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
def test_nfs(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
spec = NFSServiceSpec(
service_id='name',
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
def test_iscsi(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
spec = IscsiServiceSpec(
service_id='name',
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_blink_device_light(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
c = cephadm_module.blink_device_light('ident', True, [('test', '', '')])
assert wait(cephadm_module, c) == ['Set ident light for test: on']
]
)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
- with self._with_host(cephadm_module, 'test'):
- if not spec.placement:
- spec.placement = PlacementSpec(hosts=['test'], count=1)
- c = meth(cephadm_module, spec)
- assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...'
- assert [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())] == [spec]
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, spec, meth, 'test'):
+ pass
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop")
+ def test_daemon_ok_to_stop(self, ok_to_stop, cephadm_module: CephadmOrchestrator):
+ spec = ServiceSpec(
+ 'mds',
+ service_id='fsname',
+ placement=PlacementSpec(hosts=['host1', 'host2'])
+ )
+ with with_host(cephadm_module, 'host1'), with_host(cephadm_module, 'host2'):
+ c = cephadm_module.apply_mds(spec)
+ out = wait(cephadm_module, c)
+ match_glob(out, "Scheduled mds.fsname update...")
cephadm_module._apply_all_services()
- dds = wait(cephadm_module, cephadm_module.list_daemons())
- for dd in dds:
- assert dd.service_name() == spec.service_name()
+ [daemon] = cephadm_module.cache.daemons['host1'].keys()
+
+ spec.placement.set_hosts(['host2'])
+ ok_to_stop.side_effect = False
- assert_rm_service(cephadm_module, spec.service_name())
+ c = cephadm_module.apply_mds(spec)
+ out = wait(cephadm_module, c)
+ match_glob(out, "Scheduled mds.fsname update...")
+ cephadm_module._apply_all_services()
+
+ ok_to_stop.assert_called_with([daemon[4:]])
+
+ assert_rm_daemon(cephadm_module, spec.service_name(), 'host1') # verifies ok-to-stop
+ assert_rm_daemon(cephadm_module, spec.service_name(), 'host2')
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
def test_offline(self, _check, _get_connection, cephadm_module):
_check.return_value = '{}', '', 0
_get_connection.return_value = mock.Mock(), mock.Mock()
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
_get_connection.side_effect = HostNotFound
code, out, err = cephadm_module.check_host('test')
assert out == ''
- assert 'Failed to connect to test (test)' in err
+ assert "Host 'test' not found" in err
out = wait(cephadm_module, cephadm_module.get_hosts())[0].to_json()
assert out == HostSpec('test', 'test', status='Offline').to_json()
return '{}', None, 0
with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]):
with mock.patch("remoto.process.check", _check):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
code, out, err = cephadm_module.check_host('test')
# First should succeed.
assert err is None
# code will blow up here triggering the BOOM!
code, out, err = cephadm_module.check_host('test')
assert err is None
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+ @mock.patch("remoto.process.check")
+ def test_etc_ceph(self, _check, _get_connection, cephadm_module):
+ _get_connection.return_value = mock.Mock(), mock.Mock()
+ _check.return_value = '{}', '', 0
+
+ assert cephadm_module.manage_etc_ceph_ceph_conf is False
+
+ with with_host(cephadm_module, 'test'):
+ assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.set_module_option('manage_etc_ceph_ceph_conf', True)
+ cephadm_module.config_notify()
+ assert cephadm_module.manage_etc_ceph_ceph_conf == True
+
+ cephadm_module._refresh_hosts_and_daemons()
+ _check.assert_called_with(ANY, ['dd', 'of=/etc/ceph/ceph.conf'], stdin=b'')
+
+ assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+
+ cephadm_module.cache.last_etc_ceph_ceph_conf = {}
+ cephadm_module.cache.load()
+
+ assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+
+ # Make sure, _check_daemons does a redeploy due to monmap change:
+ cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
+ 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'fsid': 'foobar',
+ })
+ cephadm_module.notify('mon_map', mock.MagicMock())
+ assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+ cephadm_module.cache.last_etc_ceph_ceph_conf = {}
+ cephadm_module.cache.load()
+ assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+
+
+ def test_etc_ceph_init(self):
+ with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m:
+ assert m.manage_etc_ceph_ceph_conf is True
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ def check_registry_credentials(url, username, password):
+ assert cephadm_module.get_module_option('registry_url') == url
+ assert cephadm_module.get_module_option('registry_username') == username
+ assert cephadm_module.get_module_option('registry_password') == password
+
+ _run_cephadm.return_value = '{}', '', 0
+ with with_host(cephadm_module, 'test'):
+ # test successful login with valid args
+ code, out, err = cephadm_module.registry_login('test-url', 'test-user', 'test-password')
+ assert out == 'registry login scheduled'
+ assert err == ''
+ check_registry_credentials('test-url', 'test-user', 'test-password')
+
+ # test bad login attempt with invalid args
+ code, out, err = cephadm_module.registry_login('bad-args')
+ assert err == ("Invalid arguments. Please provide arguments <url> <username> <password> "
+ "or -i <login credentials json file>")
+ check_registry_credentials('test-url', 'test-user', 'test-password')
+
+ # test bad login using invalid json file
+ code, out, err = cephadm_module.registry_login(None, None, None, '{"bad-json": "bad-json"}')
+ assert err == ("json provided for custom registry login did not include all necessary fields. "
+ "Please setup json file as\n"
+ "{\n"
+ " \"url\": \"REGISTRY_URL\",\n"
+ " \"username\": \"REGISTRY_USERNAME\",\n"
+ " \"password\": \"REGISTRY_PASSWORD\"\n"
+ "}\n")
+ check_registry_credentials('test-url', 'test-user', 'test-password')
+
+ # test good login using valid json file
+ good_json = ("{\"url\": \"" + "json-url" + "\", \"username\": \"" + "json-user" + "\", "
+ " \"password\": \"" + "json-pass" + "\"}")
+ code, out, err = cephadm_module.registry_login(None, None, None, good_json)
+ assert out == 'registry login scheduled'
+ assert err == ''
+ check_registry_credentials('json-url', 'json-user', 'json-pass')
+
+ # test bad login where args are valid but login command fails
+ _run_cephadm.return_value = '{}', 'error', 1
+ code, out, err = cephadm_module.registry_login('fail-url', 'fail-user', 'fail-password')
+ assert err == 'Host test failed to login to fail-url as fail-user with given password'
+ check_registry_credentials('json-url', 'json-user', 'json-pass')