]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/tests/test_cephadm.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / cephadm / tests / test_cephadm.py
index f64d8a0ce904d9ae074aa722f2f321c16df4ec6b..d1072d4f250b58cadb19628d9efd281fc86278c4 100644 (file)
@@ -1,11 +1,12 @@
 import datetime
 import json
 from contextlib import contextmanager
+from unittest.mock import ANY
 
 import pytest
 
 from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
-from cephadm.services.osd import OSDRemoval
+from cephadm.services.osd import OSD, OSDQueue
 
 try:
     from typing import Any, List
@@ -21,9 +22,9 @@ from ceph.deployment.inventory import Devices, Device
 from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
     HostSpec, OrchestratorError
 from tests import mock
-from .fixtures import cephadm_module, wait, _run_cephadm, mon_command, match_glob
-from cephadm.module import CephadmOrchestrator
-
+from .fixtures import cephadm_module, wait, _run_cephadm, match_glob, with_host, \
+    with_cephadm_module, with_service, assert_rm_service
+from cephadm.module import CephadmOrchestrator, CEPH_DATEFMT
 
 """
 TODOs:
@@ -33,12 +34,6 @@ TODOs:
 """
 
 
-def assert_rm_service(cephadm, srv_name):
-    assert wait(cephadm, cephadm.remove_service(srv_name)) == [
-        f'Removed service {srv_name}']
-    cephadm._apply_all_services()
-
-
 def assert_rm_daemon(cephadm: CephadmOrchestrator, prefix, host):
     dds: List[DaemonDescription] = wait(cephadm, cephadm.list_daemons(host=host))
     d_names = [dd.name() for dd in dds if dd.name().startswith(prefix)]
@@ -48,14 +43,25 @@ def assert_rm_daemon(cephadm: CephadmOrchestrator, prefix, host):
     match_glob(out, f"Removed {d_names}* from host '{host}'")
 
 
-class TestCephadm(object):
+@contextmanager
+def with_daemon(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth, host: str):
+    spec.placement = PlacementSpec(hosts=[host], count=1)
+
+    c = meth(cephadm_module, spec)
+    [out] = wait(cephadm_module, c)
+    match_glob(out, f"Deployed {spec.service_name()}.* on host '{host}'")
 
-    @contextmanager
-    def _with_host(self, m, name):
-        # type: (CephadmOrchestrator, str) -> None
-        wait(m, m.add_host(HostSpec(hostname=name)))
-        yield
-        wait(m, m.remove_host(name))
+    dds = cephadm_module.cache.get_daemons_by_service(spec.service_name())
+    for dd in dds:
+        if dd.hostname == host:
+            yield dd.daemon_id
+            assert_rm_daemon(cephadm_module, spec.service_name(), host)
+            return
+
+    assert False, 'Daemon not found'
+
+
+class TestCephadm(object):
 
     def test_get_unique_name(self, cephadm_module):
         # type: (CephadmOrchestrator) -> None
@@ -70,14 +76,14 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
     def test_host(self, cephadm_module):
         assert wait(cephadm_module, cephadm_module.get_hosts()) == []
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
             assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', 'test')]
 
             # Be careful with backward compatibility when changing things here:
-            assert json.loads(cephadm_module._store['inventory']) == \
+            assert json.loads(cephadm_module.get_store('inventory')) == \
                    {"test": {"hostname": "test", "addr": "test", "labels": [], "status": ""}}
 
-            with self._with_host(cephadm_module, 'second'):
+            with with_host(cephadm_module, 'second'):
                 assert wait(cephadm_module, cephadm_module.get_hosts()) == [
                     HostSpec('test', 'test'),
                     HostSpec('second', 'second')
@@ -87,69 +93,67 @@ class TestCephadm(object):
         assert wait(cephadm_module, cephadm_module.get_hosts()) == []
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
     def test_service_ls(self, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
             c = cephadm_module.list_daemons(refresh=True)
             assert wait(cephadm_module, c) == []
 
-            ps = PlacementSpec(hosts=['test'], count=1)
-            c = cephadm_module.add_mds(ServiceSpec('mds', 'name', placement=ps))
-            [out] = wait(cephadm_module, c)
-            match_glob(out, "Deployed mds.name.* on host 'test'")
-
-            c = cephadm_module.list_daemons()
+            with with_daemon(cephadm_module, ServiceSpec('mds', 'name'), CephadmOrchestrator.add_mds, 'test'):
 
-            def remove_id(dd):
-                out = dd.to_json()
-                del out['daemon_id']
-                return out
-
-            assert [remove_id(dd) for dd in wait(cephadm_module, c)] == [
-                {
-                    'daemon_type': 'mds',
-                    'hostname': 'test',
-                    'status': 1,
-                    'status_desc': 'starting'}
-            ]
+                c = cephadm_module.list_daemons()
 
-            ps = PlacementSpec(hosts=['test'], count=1)
-            spec = ServiceSpec('rgw', 'r.z', placement=ps)
-            c = cephadm_module.apply_rgw(spec)
-            assert wait(cephadm_module, c) == 'Scheduled rgw.r.z update...'
+                def remove_id_events(dd):
+                    out = dd.to_json()
+                    del out['daemon_id']
+                    del out['events']
+                    return out
 
-            c = cephadm_module.describe_service()
-            out = [o.to_json() for o in wait(cephadm_module, c)]
-            expected = [
-                {
-                    'placement': {'hosts': [{'hostname': 'test', 'name': '', 'network': ''}]},
-                    'service_id': 'name',
-                    'service_name': 'mds.name',
-                    'service_type': 'mds',
-                    'status': {'running': 1, 'size': 0},
-                    'unmanaged': True
-                },
-                {
-                    'placement': {
-                        'count': 1,
-                        'hosts': [{'hostname': 'test', 'name': '', 'network': ''}]
-                    },
-                    'rgw_realm': 'r',
-                    'rgw_zone': 'z',
-                    'service_id': 'r.z',
-                    'service_name': 'rgw.r.z',
-                    'service_type': 'rgw',
-                    'status': {'running': 0, 'size': 1}
-                }
-            ]
-            assert out == expected
-            assert [ServiceDescription.from_json(o).to_json() for o in expected] == expected
+                assert [remove_id_events(dd) for dd in wait(cephadm_module, c)] == [
+                    {
+                        'daemon_type': 'mds',
+                        'hostname': 'test',
+                        'status': 1,
+                        'status_desc': 'starting',
+                        'is_active': False}
+                ]
 
-            assert_rm_service(cephadm_module, 'rgw.r.z')
-            assert_rm_daemon(cephadm_module, 'mds.name', 'test')
+                with with_service(cephadm_module, ServiceSpec('rgw', 'r.z'), CephadmOrchestrator.apply_rgw, 'test'):
+
+                    c = cephadm_module.describe_service()
+                    out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
+                    expected = [
+                        {
+                            'placement': {'hosts': [{'hostname': 'test', 'name': '', 'network': ''}]},
+                            'service_id': 'name',
+                            'service_name': 'mds.name',
+                            'service_type': 'mds',
+                            'status': {'running': 1, 'size': 0},
+                            'unmanaged': True
+                        },
+                        {
+                            'placement': {
+                                'count': 1,
+                                'hosts': [{'hostname': 'test', 'name': '', 'network': ''}]
+                            },
+                            'spec': {
+                                'rgw_realm': 'r',
+                                'rgw_zone': 'z',
+                            },
+                            'service_id': 'r.z',
+                            'service_name': 'rgw.r.z',
+                            'service_type': 'rgw',
+                            'status': {'created': mock.ANY, 'running': 1, 'size': 1},
+                        }
+                    ]
+                    for o in out:
+                        if 'events' in o:
+                            del o['events']  # delete it, as it contains a timestamp
+                    assert out == expected
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
     def test_device_ls(self, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
             c = cephadm_module.get_inventory()
             assert wait(cephadm_module, c) == [InventoryHost('test')]
 
@@ -165,23 +169,83 @@ class TestCephadm(object):
             )
         ])
     ))
-    def test_daemon_action(self, cephadm_module):
+    def test_list_daemons(self, cephadm_module: CephadmOrchestrator):
         cephadm_module.service_cache_timeout = 10
-        with self._with_host(cephadm_module, 'test'):
-            c = cephadm_module.list_daemons(refresh=True)
-            wait(cephadm_module, c)
-            c = cephadm_module.daemon_action('redeploy', 'rgw', 'myrgw.foobar')
-            assert wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"]
+        with with_host(cephadm_module, 'test'):
+            cephadm_module._refresh_host_daemons('test')
+            c = cephadm_module.list_daemons()
+            assert wait(cephadm_module, c)[0].name() == 'rgw.myrgw.foobar'
+
+    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
+    def test_daemon_action(self, cephadm_module: CephadmOrchestrator):
+        cephadm_module.service_cache_timeout = 10
+        with with_host(cephadm_module, 'test'):
+            with with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
+
+                c = cephadm_module.daemon_action('redeploy', 'rgw.' + daemon_id)
+                assert wait(cephadm_module, c) == f"Deployed rgw.{daemon_id} on host 'test'"
+
+                for what in ('start', 'stop', 'restart'):
+                    c = cephadm_module.daemon_action(what, 'rgw.' + daemon_id)
+                    assert wait(cephadm_module, c) == what + f" rgw.{daemon_id} from host 'test'"
+
+                # Make sure, _check_daemons does a redeploy due to monmap change:
+                cephadm_module._store['_ceph_get/mon_map'] = {
+                    'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+                    'fsid': 'foobar',
+                }
+                cephadm_module.notify('mon_map', None)
+
+                cephadm_module._check_daemons()
+
+    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
+    def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator):
+        cephadm_module.service_cache_timeout = 10
+        with with_host(cephadm_module, 'test'):
+            with with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
+                with mock.patch('ceph_module.BaseMgrModule._ceph_send_command') as _ceph_send_command:
+
+                    _ceph_send_command.side_effect = Exception("myerror")
+
+                    # Make sure, _check_daemons does a redeploy due to monmap change:
+                    cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
+                        'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+                        'fsid': 'foobar',
+                    })
+                    cephadm_module.notify('mon_map', None)
+
+                    cephadm_module._check_daemons()
 
-            for what in ('start', 'stop', 'restart'):
-                c = cephadm_module.daemon_action(what, 'rgw', 'myrgw.foobar')
-                assert wait(cephadm_module, c) == [what + " rgw.myrgw.foobar from host 'test'"]
+                    evs = [e.message for e in cephadm_module.events.get_for_daemon(f'rgw.{daemon_id}')]
+
+                    assert 'myerror' in ''.join(evs)
+
+    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    def test_daemon_check_post(self, cephadm_module: CephadmOrchestrator):
+        with with_host(cephadm_module, 'test'):
+            with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test'):
+
+                # Make sure, _check_daemons does a redeploy due to monmap change:
+                cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
+                    'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+                    'fsid': 'foobar',
+                })
+                cephadm_module.notify('mon_map', None)
+                cephadm_module.mock_store_set('_ceph_get', 'mgr_map', {
+                    'modules': ['dashboard']
+                })
+
+                with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd:
+
+                    cephadm_module._check_daemons()
+                    _mon_cmd.assert_any_call({'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'})
 
-            assert_rm_daemon(cephadm_module, 'rgw.myrgw.foobar', 'test')
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
     def test_mon_add(self, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
             c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
             assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
@@ -193,7 +257,7 @@ class TestCephadm(object):
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
     def test_mgr_update(self, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
             r = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps))
             assert r
@@ -254,7 +318,7 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
     def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
         _run_cephadm.return_value = ('{}', '', 0)
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
 
             spec = DriveGroupSpec(
                 service_id='foo',
@@ -266,7 +330,7 @@ class TestCephadm(object):
                 )
             )
 
-            c = cephadm_module.apply_drivegroups([spec])
+            c = cephadm_module.apply([spec])
             assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
 
             inventory = Devices([
@@ -285,24 +349,24 @@ class TestCephadm(object):
             _run_cephadm.assert_any_call(
                 'test', 'osd', 'ceph-volume',
                 ['--config-json', '-', '--', 'lvm', 'prepare', '--bluestore', '--data', '/dev/sdb', '--no-systemd'],
-                env_vars=[], error_ok=True, stdin='{"config": "", "keyring": ""}')
+                env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=foo'], error_ok=True, stdin='{"config": "", "keyring": ""}')
             _run_cephadm.assert_called_with('test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'])
 
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.module.SpecStore.save")
     def test_apply_osd_save_placement(self, _save_spec, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
             json_spec = {'service_type': 'osd', 'placement': {'host_pattern': 'test'}, 'service_id': 'foo', 'data_devices': {'all': True}}
             spec = ServiceSpec.from_json(json_spec)
             assert isinstance(spec, DriveGroupSpec)
-            c = cephadm_module.apply_drivegroups([spec])
+            c = cephadm_module.apply([spec])
             assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
             _save_spec.assert_called_with(spec)
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
     def test_create_osds(self, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
             dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
             c = cephadm_module.create_osds(dg)
             out = wait(cephadm_module, c)
@@ -310,7 +374,7 @@ class TestCephadm(object):
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
     def test_prepare_drivegroup(self, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
             dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
             out = cephadm_module.osd_service.prepare_drivegroup(dg)
             assert len(out) == 1
@@ -333,11 +397,11 @@ class TestCephadm(object):
     )
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
     def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
             dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
             ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
             preview = preview
-            out = cephadm_module.osd_service.driveselection_to_ceph_volume(dg, ds, [], preview)
+            out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
             assert out in exp_command
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
@@ -352,29 +416,38 @@ class TestCephadm(object):
             )
         ])
     ))
+    @mock.patch("cephadm.services.osd.OSD.exists", True)
     @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
     def test_remove_osds(self, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
-            c = cephadm_module.list_daemons(refresh=True)
+        with with_host(cephadm_module, 'test'):
+            cephadm_module._refresh_host_daemons('test')
+            c = cephadm_module.list_daemons()
             wait(cephadm_module, c)
 
             c = cephadm_module.remove_daemons(['osd.0'])
             out = wait(cephadm_module, c)
             assert out == ["Removed osd.0 from host 'test'"]
 
-            osd_removal_op = OSDRemoval(0, False, False, 'test', 'osd.0', datetime.datetime.utcnow(), -1)
-            cephadm_module.rm_util.queue_osds_for_removal({osd_removal_op})
-            cephadm_module.rm_util._remove_osds_bg()
-            assert cephadm_module.rm_util.to_remove_osds == set()
+            cephadm_module.to_remove_osds.enqueue(OSD(osd_id=0,
+                                                      replace=False,
+                                                      force=False,
+                                                      hostname='test',
+                                                      fullname='osd.0',
+                                                      process_started_at=datetime.datetime.utcnow(),
+                                                      remove_util=cephadm_module.rm_util
+                                                      ))
+            cephadm_module.rm_util.process_removal_queue()
+            assert cephadm_module.to_remove_osds == OSDQueue()
 
             c = cephadm_module.remove_osds_status()
             out = wait(cephadm_module, c)
-            assert out == set()
+            assert out == []
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
     def test_rgw_update(self, cephadm_module):
-        with self._with_host(cephadm_module, 'host1'):
-            with self._with_host(cephadm_module, 'host2'):
+        with with_host(cephadm_module, 'host1'):
+            with with_host(cephadm_module, 'host2'):
                 ps = PlacementSpec(hosts=['host1'], count=1)
                 c = cephadm_module.add_rgw(RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
                 [out] = wait(cephadm_module, c)
@@ -400,8 +473,9 @@ class TestCephadm(object):
         ])
     ))
     def test_remove_daemon(self, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
-            c = cephadm_module.list_daemons(refresh=True)
+        with with_host(cephadm_module, 'test'):
+            cephadm_module._refresh_host_daemons('test')
+            c = cephadm_module.list_daemons()
             wait(cephadm_module, c)
             c = cephadm_module.remove_daemons(['rgw.myrgw.myhost.myid'])
             out = wait(cephadm_module, c)
@@ -421,20 +495,16 @@ class TestCephadm(object):
         ]
     )
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
     def test_daemon_add(self, spec: ServiceSpec, meth, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
-            spec.placement = PlacementSpec(hosts=['test'], count=1)
-
-            c = meth(cephadm_module, spec)
-            [out] = wait(cephadm_module, c)
-            match_glob(out, f"Deployed {spec.service_name()}.* on host 'test'")
-
-            assert_rm_daemon(cephadm_module, spec.service_name(), 'test')
+        with with_host(cephadm_module, 'test'):
+            with with_daemon(cephadm_module, spec, meth, 'test'):
+                pass
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
     def test_nfs(self, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
             spec = NFSServiceSpec(
                     service_id='name',
@@ -455,7 +525,7 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
     def test_iscsi(self, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
             spec = IscsiServiceSpec(
                     service_id='name',
@@ -476,7 +546,7 @@ class TestCephadm(object):
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
     def test_blink_device_light(self, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
             c = cephadm_module.blink_device_light('ident', True, [('test', '', '')])
             assert wait(cephadm_module, c) == ['Set ident light for test: on']
 
@@ -526,22 +596,41 @@ class TestCephadm(object):
         ]
     )
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
     def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
-        with self._with_host(cephadm_module, 'test'):
-            if not spec.placement:
-                spec.placement = PlacementSpec(hosts=['test'], count=1)
-            c = meth(cephadm_module, spec)
-            assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...'
-            assert [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())] == [spec]
+        with with_host(cephadm_module, 'test'):
+            with with_service(cephadm_module, spec, meth, 'test'):
+                pass
 
+    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop")
+    def test_daemon_ok_to_stop(self, ok_to_stop, cephadm_module: CephadmOrchestrator):
+        spec = ServiceSpec(
+            'mds',
+            service_id='fsname',
+            placement=PlacementSpec(hosts=['host1', 'host2'])
+        )
+        with with_host(cephadm_module, 'host1'), with_host(cephadm_module, 'host2'):
+            c = cephadm_module.apply_mds(spec)
+            out = wait(cephadm_module, c)
+            match_glob(out, "Scheduled mds.fsname update...")
             cephadm_module._apply_all_services()
 
-            dds = wait(cephadm_module, cephadm_module.list_daemons())
-            for dd in dds:
-                assert dd.service_name() == spec.service_name()
+            [daemon] = cephadm_module.cache.daemons['host1'].keys()
+
+            spec.placement.set_hosts(['host2'])
 
+            ok_to_stop.side_effect = False
 
-            assert_rm_service(cephadm_module, spec.service_name())
+            c = cephadm_module.apply_mds(spec)
+            out = wait(cephadm_module, c)
+            match_glob(out, "Scheduled mds.fsname update...")
+            cephadm_module._apply_all_services()
+
+            ok_to_stop.assert_called_with([daemon[4:]])
+
+            assert_rm_daemon(cephadm_module, spec.service_name(), 'host1')  # verifies ok-to-stop
+            assert_rm_daemon(cephadm_module, spec.service_name(), 'host2')
 
 
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@@ -549,11 +638,11 @@ class TestCephadm(object):
     def test_offline(self, _check, _get_connection, cephadm_module):
         _check.return_value = '{}', '', 0
         _get_connection.return_value = mock.Mock(), mock.Mock()
-        with self._with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test'):
             _get_connection.side_effect = HostNotFound
             code, out, err = cephadm_module.check_host('test')
             assert out == ''
-            assert 'Failed to connect to test (test)' in err
+            assert "Host 'test' not found" in err
 
             out = wait(cephadm_module, cephadm_module.get_hosts())[0].to_json()
             assert out == HostSpec('test', 'test', status='Offline').to_json()
@@ -594,7 +683,7 @@ class TestCephadm(object):
             return '{}', None, 0
         with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]):
             with mock.patch("remoto.process.check", _check):
-                with self._with_host(cephadm_module, 'test'):
+                with with_host(cephadm_module, 'test'):
                     code, out, err = cephadm_module.check_host('test')
                     # First should succeed.
                     assert err is None
@@ -604,3 +693,91 @@ class TestCephadm(object):
                     # code will blow up here triggering the BOOM!
                     code, out, err = cephadm_module.check_host('test')
                     assert err is None
+
+    @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
+    @mock.patch("remoto.process.check")
+    def test_etc_ceph(self, _check, _get_connection, cephadm_module):
+        _get_connection.return_value = mock.Mock(), mock.Mock()
+        _check.return_value = '{}', '', 0
+
+        assert cephadm_module.manage_etc_ceph_ceph_conf is False
+
+        with with_host(cephadm_module, 'test'):
+            assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+
+        with with_host(cephadm_module, 'test'):
+            cephadm_module.set_module_option('manage_etc_ceph_ceph_conf', True)
+            cephadm_module.config_notify()
+            assert cephadm_module.manage_etc_ceph_ceph_conf == True
+
+            cephadm_module._refresh_hosts_and_daemons()
+            _check.assert_called_with(ANY, ['dd', 'of=/etc/ceph/ceph.conf'], stdin=b'')
+
+            assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+
+            cephadm_module.cache.last_etc_ceph_ceph_conf = {}
+            cephadm_module.cache.load()
+
+            assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+
+            # Make sure, _check_daemons does a redeploy due to monmap change:
+            cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
+                'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+                'fsid': 'foobar',
+            })
+            cephadm_module.notify('mon_map', mock.MagicMock())
+            assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+            cephadm_module.cache.last_etc_ceph_ceph_conf = {}
+            cephadm_module.cache.load()
+            assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+
+
+    def test_etc_ceph_init(self):
+        with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m:
+            assert m.manage_etc_ceph_ceph_conf is True
+
+    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+        def check_registry_credentials(url, username, password):
+            assert cephadm_module.get_module_option('registry_url') == url
+            assert cephadm_module.get_module_option('registry_username') == username
+            assert cephadm_module.get_module_option('registry_password') == password
+
+        _run_cephadm.return_value = '{}', '', 0
+        with with_host(cephadm_module, 'test'):
+            # test successful login with valid args
+            code, out, err = cephadm_module.registry_login('test-url', 'test-user', 'test-password')
+            assert out == 'registry login scheduled'
+            assert err == ''
+            check_registry_credentials('test-url', 'test-user', 'test-password')
+            
+            # test bad login attempt with invalid args
+            code, out, err = cephadm_module.registry_login('bad-args')
+            assert err == ("Invalid arguments. Please provide arguments <url> <username> <password> "
+                            "or -i <login credentials json file>")
+            check_registry_credentials('test-url', 'test-user', 'test-password')
+            
+            # test bad login using invalid json file
+            code, out, err = cephadm_module.registry_login(None, None, None, '{"bad-json": "bad-json"}')
+            assert err == ("json provided for custom registry login did not include all necessary fields. "
+                            "Please setup json file as\n"
+                            "{\n"
+                              " \"url\": \"REGISTRY_URL\",\n"
+                              " \"username\": \"REGISTRY_USERNAME\",\n"
+                              " \"password\": \"REGISTRY_PASSWORD\"\n"
+                            "}\n")
+            check_registry_credentials('test-url', 'test-user', 'test-password')
+            
+            # test  good login using valid json file
+            good_json = ("{\"url\": \"" + "json-url" + "\", \"username\": \"" + "json-user" + "\", "
+                        " \"password\": \"" + "json-pass" + "\"}")
+            code, out, err = cephadm_module.registry_login(None, None, None, good_json)
+            assert out == 'registry login scheduled'
+            assert err == ''
+            check_registry_credentials('json-url', 'json-user', 'json-pass')
+            
+            # test bad login where args are valid but login command fails
+            _run_cephadm.return_value = '{}', 'error', 1
+            code, out, err = cephadm_module.registry_login('fail-url', 'fail-user', 'fail-password')
+            assert err == 'Host test failed to login to fail-url as fail-user with given password'
+            check_registry_credentials('json-url', 'json-user', 'json-pass')