]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/tests/test_cephadm.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / cephadm / tests / test_cephadm.py
index 82f5812ab1aee3f484e9ef7945355525aecbe53e..8c1949e74db69aa3688d8b2a26d9d8e108d87cf5 100644 (file)
@@ -7,10 +7,9 @@ import pytest
 from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
 from cephadm.serve import CephadmServe
 from cephadm.services.osd import OSD, OSDRemovalQueue
-from cephadm.utils import CephadmNoImage
 
 try:
-    from typing import Any, List
+    from typing import List
 except ImportError:
     pass
 
@@ -21,11 +20,11 @@ from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, RGWSpec, \
 from ceph.deployment.drive_selection.selector import DriveSelection
 from ceph.deployment.inventory import Devices, Device
 from ceph.utils import datetime_to_str, datetime_now
-from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
+from orchestrator import DaemonDescription, InventoryHost, \
     HostSpec, OrchestratorError
 from tests import mock
-from .fixtures import cephadm_module, wait, _run_cephadm, match_glob, with_host, \
-    with_cephadm_module, with_service, assert_rm_service
+from .fixtures import wait, _run_cephadm, match_glob, with_host, \
+    with_cephadm_module, with_service, _deploy_cephadm_binary
 from cephadm.module import CephadmOrchestrator
 
 """
@@ -40,16 +39,23 @@ def assert_rm_daemon(cephadm: CephadmOrchestrator, prefix, host):
     dds: List[DaemonDescription] = wait(cephadm, cephadm.list_daemons(host=host))
     d_names = [dd.name() for dd in dds if dd.name().startswith(prefix)]
     assert d_names
+    # there should only be one daemon (if not match_glob will throw mismatch)
+    assert len(d_names) == 1
+
     c = cephadm.remove_daemons(d_names)
     [out] = wait(cephadm, c)
-    match_glob(out, f"Removed {d_names}* from host '{host}'")
+    # picking the 1st element is needed, rather than passing the list when the daemon
+    # name contains '-' char. If not, the '-' is treated as a range i.e. cephadm-exporter
+    # is treated like a m-e range which is invalid. rbd-mirror (d-m) and node-exporter (e-e)
+    # are valid, so pass without incident! Also, match_gob acts on strings anyway!
+    match_glob(out, f"Removed {d_names[0]}* from host '{host}'")
 
 
 @contextmanager
-def with_daemon(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, meth, host: str):
+def with_daemon(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, host: str):
     spec.placement = PlacementSpec(hosts=[host], count=1)
 
-    c = meth(cephadm_module, spec)
+    c = cephadm_module.add_daemon(spec)
     [out] = wait(cephadm_module, c)
     match_glob(out, f"Deployed {spec.service_name()}.* on host '{host}'")
 
@@ -75,7 +81,7 @@ class TestCephadm(object):
         new_mgr = cephadm_module.get_unique_name('mgr', 'myhost', existing)
         match_glob(new_mgr, 'myhost.*')
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     def test_host(self, cephadm_module):
         assert wait(cephadm_module, cephadm_module.get_hosts()) == []
         with with_host(cephadm_module, 'test'):
@@ -94,14 +100,13 @@ class TestCephadm(object):
             assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', 'test')]
         assert wait(cephadm_module, cephadm_module.get_hosts()) == []
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
-    @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     def test_service_ls(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
             c = cephadm_module.list_daemons(refresh=True)
             assert wait(cephadm_module, c) == []
-
-            with with_daemon(cephadm_module, ServiceSpec('mds', 'name'), CephadmOrchestrator.add_mds, 'test'):
+            with with_service(cephadm_module, ServiceSpec('mds', 'name', unmanaged=True)) as _, \
+                    with_daemon(cephadm_module, ServiceSpec('mds', 'name'), 'test') as _:
 
                 c = cephadm_module.list_daemons()
 
@@ -117,7 +122,9 @@ class TestCephadm(object):
                         'hostname': 'test',
                         'status': 1,
                         'status_desc': 'starting',
-                        'is_active': False}
+                        'is_active': False,
+                        'ports': [],
+                    }
                 ]
 
                 with with_service(cephadm_module, ServiceSpec('rgw', 'r.z'), CephadmOrchestrator.apply_rgw, 'test'):
@@ -126,11 +133,11 @@ class TestCephadm(object):
                     out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
                     expected = [
                         {
-                            'placement': {'hosts': ['test']},
+                            'placement': {'count': 2},
                             'service_id': 'name',
                             'service_name': 'mds.name',
                             'service_type': 'mds',
-                            'status': {'running': 1, 'size': 0},
+                            'status': {'created': mock.ANY, 'running': 1, 'size': 2},
                             'unmanaged': True
                         },
                         {
@@ -138,14 +145,11 @@ class TestCephadm(object):
                                 'count': 1,
                                 'hosts': ["test"]
                             },
-                            'spec': {
-                                'rgw_realm': 'r',
-                                'rgw_zone': 'z',
-                            },
                             'service_id': 'r.z',
                             'service_name': 'rgw.r.z',
                             'service_type': 'rgw',
-                            'status': {'created': mock.ANY, 'running': 1, 'size': 1},
+                            'status': {'created': mock.ANY, 'running': 1, 'size': 1,
+                                       'ports': [80]},
                         }
                     ]
                     for o in out:
@@ -153,13 +157,13 @@ class TestCephadm(object):
                             del o['events']  # delete it, as it contains a timestamp
                     assert out == expected
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     def test_device_ls(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
             c = cephadm_module.get_inventory()
             assert wait(cephadm_module, c) == [InventoryHost('test')]
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
         json.dumps([
             dict(
                 name='rgw.myrgw.foobar',
@@ -178,12 +182,12 @@ class TestCephadm(object):
             c = cephadm_module.list_daemons()
             assert wait(cephadm_module, c)[0].name() == 'rgw.myrgw.foobar'
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
-    @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     def test_daemon_action(self, cephadm_module: CephadmOrchestrator):
         cephadm_module.service_cache_timeout = 10
         with with_host(cephadm_module, 'test'):
-            with with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
+            with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \
+                    with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), 'test') as daemon_id:
 
                 c = cephadm_module.daemon_action('redeploy', 'rgw.' + daemon_id)
                 assert wait(cephadm_module,
@@ -203,12 +207,12 @@ class TestCephadm(object):
 
                 CephadmServe(cephadm_module)._check_daemons()
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
-    @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator):
         cephadm_module.service_cache_timeout = 10
         with with_host(cephadm_module, 'test'):
-            with with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
+            with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \
+                    with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), 'test') as daemon_id:
                 with mock.patch('ceph_module.BaseMgrModule._ceph_send_command') as _ceph_send_command:
 
                     _ceph_send_command.side_effect = Exception("myerror")
@@ -237,7 +241,7 @@ class TestCephadm(object):
             'redeploy'
         ]
     )
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     def test_daemon_check(self, cephadm_module: CephadmOrchestrator, action):
         with with_host(cephadm_module, 'test'):
             with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test') as d_names:
@@ -252,7 +256,7 @@ class TestCephadm(object):
 
                 assert cephadm_module.cache.get_scheduled_daemon_action('test', daemon_name) is None
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
     def test_daemon_check_extra_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
         _run_cephadm.return_value = ('{}', '', 0)
 
@@ -283,12 +287,18 @@ class TestCephadm(object):
 
                 CephadmServe(cephadm_module)._check_daemons()
 
-                _run_cephadm.assert_called_with('test', 'mon.test', 'deploy', [
-                                                '--name', 'mon.test', '--reconfig', '--config-json', '-'],
-                                                stdin='{"config": "\\n\\n[mon]\\nk=v\\n", "keyring": ""}',
-                                                image='')
+                _run_cephadm.assert_called_with(
+                    'test', 'mon.test', 'deploy', [
+                        '--name', 'mon.test',
+                        '--meta-json', '{"service_name": "mon", "ports": [], "ip": null, "deployed_by": []}',
+                        '--config-json', '-',
+                        '--reconfig',
+                    ],
+                    stdin='{"config": "\\n\\n[mon]\\nk=v\\n[mon.test]\\npublic network = 127.0.0.0/8\\n", '
+                    + '"keyring": "", "files": {"config": "[mon.test]\\npublic network = 127.0.0.0/8\\n"}}',
+                    image='')
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     def test_daemon_check_post(self, cephadm_module: CephadmOrchestrator):
         with with_host(cephadm_module, 'test'):
             with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test'):
@@ -309,19 +319,20 @@ class TestCephadm(object):
                         {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'},
                         None)
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     def test_mon_add(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
-            ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
-            c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
-            assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
+            with with_service(cephadm_module, ServiceSpec(service_type='mon', unmanaged=True)):
+                ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
+                c = cephadm_module.add_daemon(ServiceSpec('mon', placement=ps))
+                assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
 
-            with pytest.raises(OrchestratorError, match="Must set public_network config option or specify a CIDR network,"):
-                ps = PlacementSpec(hosts=['test'], count=1)
-                c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
-                wait(cephadm_module, c)
+                with pytest.raises(OrchestratorError, match="Must set public_network config option or specify a CIDR network,"):
+                    ps = PlacementSpec(hosts=['test'], count=1)
+                    c = cephadm_module.add_daemon(ServiceSpec('mon', placement=ps))
+                    wait(cephadm_module, c)
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
     def test_mgr_update(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
@@ -375,13 +386,105 @@ class TestCephadm(object):
         out = cephadm_module.osd_service.find_destroyed_osds()
         assert out == {'host1': ['0']}
 
+    @ pytest.mark.parametrize(
+        "ceph_services, cephadm_daemons, strays_expected, metadata",
+        # [ ([(daemon_type, daemon_id), ... ], [...], [...]), ... ]
+        [
+            (
+                [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+                [],
+                [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+                {},
+            ),
+            (
+                [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+                [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+                [],
+                {},
+            ),
+            (
+                [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+                [('mds', 'a'), ('osd', '0')],
+                [('mgr', 'x')],
+                {},
+            ),
+            # https://tracker.ceph.com/issues/49573
+            (
+                [('rgw-nfs', '14649')],
+                [],
+                [('nfs', 'foo-rgw.host1')],
+                {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}},
+            ),
+            (
+                [('rgw-nfs', '14649'), ('rgw-nfs', '14650')],
+                [('nfs', 'foo-rgw.host1'), ('nfs', 'foo2.host2')],
+                [],
+                {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}},
+            ),
+            (
+                [('rgw-nfs', '14649'), ('rgw-nfs', '14650')],
+                [('nfs', 'foo-rgw.host1')],
+                [('nfs', 'foo2.host2')],
+                {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}},
+            ),
+        ]
+    )
+    def test_check_for_stray_daemons(
+            self,
+            cephadm_module,
+            ceph_services,
+            cephadm_daemons,
+            strays_expected,
+            metadata
+    ):
+        # mock ceph service-map
+        services = []
+        for service in ceph_services:
+            s = {'type': service[0], 'id': service[1]}
+            services.append(s)
+        ls = [{'hostname': 'host1', 'services': services}]
+
+        with mock.patch.object(cephadm_module, 'list_servers', mock.MagicMock()) as list_servers:
+            list_servers.return_value = ls
+            list_servers.__iter__.side_effect = ls.__iter__
+
+            # populate cephadm daemon cache
+            dm = {}
+            for daemon_type, daemon_id in cephadm_daemons:
+                dd = DaemonDescription(daemon_type=daemon_type, daemon_id=daemon_id)
+                dm[dd.name()] = dd
+            cephadm_module.cache.update_host_daemons('host1', dm)
+
+            def get_metadata_mock(svc_type, svc_id, default):
+                return metadata[svc_id]
+
+            with mock.patch.object(cephadm_module, 'get_metadata', new_callable=lambda: get_metadata_mock):
+
+                # test
+                CephadmServe(cephadm_module)._check_for_strays()
+
+                # verify
+                strays = cephadm_module.health_checks.get('CEPHADM_STRAY_DAEMON')
+                if not strays:
+                    assert len(strays_expected) == 0
+                else:
+                    for dt, di in strays_expected:
+                        name = '%s.%s' % (dt, di)
+                        for detail in strays['detail']:
+                            if name in detail:
+                                strays['detail'].remove(detail)
+                                break
+                        assert name in detail
+                    assert len(strays['detail']) == 0
+                    assert strays['count'] == len(strays_expected)
+
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
     def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module):
         _mon_cmd.return_value = (1, "", "fail_msg")
         with pytest.raises(OrchestratorError):
-            out = cephadm_module.osd_service.find_destroyed_osds()
+            cephadm_module.osd_service.find_destroyed_osds()
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
     def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
         _run_cephadm.return_value = ('{}', '', 0)
         with with_host(cephadm_module, 'test'):
@@ -410,7 +513,7 @@ class TestCephadm(object):
 
             _run_cephadm.return_value = (['{}'], '', 0)
 
-            assert CephadmServe(cephadm_module)._apply_all_services() == False
+            assert CephadmServe(cephadm_module)._apply_all_services() is False
 
             _run_cephadm.assert_any_call(
                 'test', 'osd', 'ceph-volume',
@@ -418,9 +521,49 @@ class TestCephadm(object):
                     '--no-auto', '/dev/sdb', '--yes', '--no-systemd'],
                 env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=foo'], error_ok=True, stdin='{"config": "", "keyring": ""}')
             _run_cephadm.assert_called_with(
-                'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'])
+                'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False)
+
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+    def test_apply_osd_save_non_collocated(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+        _run_cephadm.return_value = ('{}', '', 0)
+        with with_host(cephadm_module, 'test'):
+
+            spec = DriveGroupSpec(
+                service_id='noncollocated',
+                placement=PlacementSpec(
+                    hosts=['test']
+                ),
+                data_devices=DeviceSelection(paths=['/dev/sdb']),
+                db_devices=DeviceSelection(paths=['/dev/sdc']),
+                wal_devices=DeviceSelection(paths=['/dev/sdd'])
+            )
+
+            c = cephadm_module.apply([spec])
+            assert wait(cephadm_module, c) == ['Scheduled osd.noncollocated update...']
+
+            inventory = Devices([
+                Device('/dev/sdb', available=True),
+                Device('/dev/sdc', available=True),
+                Device('/dev/sdd', available=True)
+            ])
+
+            cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {})
+
+            _run_cephadm.return_value = (['{}'], '', 0)
+
+            assert CephadmServe(cephadm_module)._apply_all_services() is False
+
+            _run_cephadm.assert_any_call(
+                'test', 'osd', 'ceph-volume',
+                ['--config-json', '-', '--', 'lvm', 'batch',
+                    '--no-auto', '/dev/sdb', '--db-devices', '/dev/sdc',
+                    '--wal-devices', '/dev/sdd', '--yes', '--no-systemd'],
+                env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=noncollocated'],
+                error_ok=True, stdin='{"config": "", "keyring": ""}')
+            _run_cephadm.assert_called_with(
+                'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False)
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.module.SpecStore.save")
     def test_apply_osd_save_placement(self, _save_spec, cephadm_module):
         with with_host(cephadm_module, 'test'):
@@ -432,7 +575,7 @@ class TestCephadm(object):
             assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
             _save_spec.assert_called_with(spec)
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     def test_create_osds(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
             dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
@@ -441,7 +584,16 @@ class TestCephadm(object):
             out = wait(cephadm_module, c)
             assert out == "Created no osd(s) on host test; already created?"
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_create_noncollocated_osd(self, cephadm_module):
+        with with_host(cephadm_module, 'test'):
+            dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
+                                data_devices=DeviceSelection(paths=['']))
+            c = cephadm_module.create_osds(dg)
+            out = wait(cephadm_module, c)
+            assert out == "Created no osd(s) on host test; already created?"
+
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     def test_prepare_drivegroup(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
             dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
@@ -467,7 +619,7 @@ class TestCephadm(object):
              "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
         ]
     )
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
         with with_host(cephadm_module, 'test'):
             dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(
@@ -477,7 +629,7 @@ class TestCephadm(object):
             out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
             assert out in exp_command
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
         json.dumps([
             dict(
                 name='osd.0',
@@ -505,7 +657,6 @@ class TestCephadm(object):
                                                       replace=False,
                                                       force=False,
                                                       hostname='test',
-                                                      fullname='osd.0',
                                                       process_started_at=datetime_now(),
                                                       remove_util=cephadm_module.to_remove_osds.rm_util
                                                       ))
@@ -516,26 +667,26 @@ class TestCephadm(object):
             out = wait(cephadm_module, c)
             assert out == []
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
-    @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     def test_rgw_update(self, cephadm_module):
         with with_host(cephadm_module, 'host1'):
             with with_host(cephadm_module, 'host2'):
-                ps = PlacementSpec(hosts=['host1'], count=1)
-                c = cephadm_module.add_rgw(
-                    RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
-                [out] = wait(cephadm_module, c)
-                match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
-
-                ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
-                r = CephadmServe(cephadm_module)._apply_service(
-                    RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
-                assert r
-
-                assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host1')
-                assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host2')
-
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
+                with with_service(cephadm_module, RGWSpec(service_id="foo", unmanaged=True)):
+                    ps = PlacementSpec(hosts=['host1'], count=1)
+                    c = cephadm_module.add_daemon(
+                        RGWSpec(service_id="foo", placement=ps))
+                    [out] = wait(cephadm_module, c)
+                    match_glob(out, "Deployed rgw.foo.* on host 'host1'")
+
+                    ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
+                    r = CephadmServe(cephadm_module)._apply_service(
+                        RGWSpec(service_id="foo", placement=ps))
+                    assert r
+
+                    assert_rm_daemon(cephadm_module, 'rgw.foo', 'host1')
+                    assert_rm_daemon(cephadm_module, 'rgw.foo', 'host2')
+
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
         json.dumps([
             dict(
                 name='rgw.myrgw.myhost.myid',
@@ -557,26 +708,50 @@ class TestCephadm(object):
             assert out == ["Removed rgw.myrgw.myhost.myid from host 'test'"]
 
     @pytest.mark.parametrize(
-        "spec, meth",
+        "spec",
         [
-            (ServiceSpec('crash'), CephadmOrchestrator.add_crash),
-            (ServiceSpec('prometheus'), CephadmOrchestrator.add_prometheus),
-            (ServiceSpec('grafana'), CephadmOrchestrator.add_grafana),
-            (ServiceSpec('node-exporter'), CephadmOrchestrator.add_node_exporter),
-            (ServiceSpec('alertmanager'), CephadmOrchestrator.add_alertmanager),
-            (ServiceSpec('rbd-mirror'), CephadmOrchestrator.add_rbd_mirror),
-            (ServiceSpec('mds', service_id='fsname'), CephadmOrchestrator.add_mds),
-            (RGWSpec(rgw_realm='realm', rgw_zone='zone'), CephadmOrchestrator.add_rgw),
+            ServiceSpec('crash'),
+            ServiceSpec('prometheus'),
+            ServiceSpec('grafana'),
+            ServiceSpec('node-exporter'),
+            ServiceSpec('alertmanager'),
+            ServiceSpec('rbd-mirror'),
+            ServiceSpec('cephfs-mirror'),
+            ServiceSpec('mds', service_id='fsname'),
+            RGWSpec(rgw_realm='realm', rgw_zone='zone'),
+            RGWSpec(service_id="foo"),
+            ServiceSpec('cephadm-exporter'),
         ]
     )
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
-    @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
-    def test_daemon_add(self, spec: ServiceSpec, meth, cephadm_module):
+    @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_daemon_add(self, spec: ServiceSpec, cephadm_module):
+        unmanaged_spec = ServiceSpec.from_json(spec.to_json())
+        unmanaged_spec.unmanaged = True
         with with_host(cephadm_module, 'test'):
-            with with_daemon(cephadm_module, spec, meth, 'test'):
-                pass
+            with with_service(cephadm_module, unmanaged_spec):
+                with with_daemon(cephadm_module, spec, 'test'):
+                    pass
+
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+    def test_daemon_add_fail(self, _run_cephadm, cephadm_module):
+        _run_cephadm.return_value = '{}', '', 0
+        with with_host(cephadm_module, 'test'):
+            spec = ServiceSpec(
+                service_type='mgr',
+                placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1),
+                unmanaged=True
+            )
+            with with_service(cephadm_module, spec):
+                _run_cephadm.side_effect = OrchestratorError('fail')
+                with pytest.raises(OrchestratorError):
+                    wait(cephadm_module, cephadm_module.add_daemon(spec))
+                cephadm_module.assert_issued_mon_command({
+                    'prefix': 'auth rm',
+                    'entity': 'mgr.x',
+                })
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
     def test_nfs(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
@@ -586,18 +761,16 @@ class TestCephadm(object):
                 pool='pool',
                 namespace='namespace',
                 placement=ps)
-            c = cephadm_module.add_nfs(spec)
-            [out] = wait(cephadm_module, c)
-            match_glob(out, "Deployed nfs.name.* on host 'test'")
-
-            assert_rm_daemon(cephadm_module, 'nfs.name.test', 'test')
+            unmanaged_spec = ServiceSpec.from_json(spec.to_json())
+            unmanaged_spec.unmanaged = True
+            with with_service(cephadm_module, unmanaged_spec):
+                c = cephadm_module.add_daemon(spec)
+                [out] = wait(cephadm_module, c)
+                match_glob(out, "Deployed nfs.name.* on host 'test'")
 
-            # Hack. We never created the service, but we now need to remove it.
-            # this is in contrast to the other services, which don't create this service
-            # automatically.
-            assert_rm_service(cephadm_module, 'nfs.name')
+                assert_rm_daemon(cephadm_module, 'nfs.name.test', 'test')
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
     def test_iscsi(self, cephadm_module):
         with with_host(cephadm_module, 'test'):
@@ -608,16 +781,15 @@ class TestCephadm(object):
                 api_user='user',
                 api_password='password',
                 placement=ps)
-            c = cephadm_module.add_iscsi(spec)
-            [out] = wait(cephadm_module, c)
-            match_glob(out, "Deployed iscsi.name.* on host 'test'")
+            unmanaged_spec = ServiceSpec.from_json(spec.to_json())
+            unmanaged_spec.unmanaged = True
+            with with_service(cephadm_module, unmanaged_spec):
 
-            assert_rm_daemon(cephadm_module, 'iscsi.name.test', 'test')
+                c = cephadm_module.add_daemon(spec)
+                [out] = wait(cephadm_module, c)
+                match_glob(out, "Deployed iscsi.name.* on host 'test'")
 
-            # Hack. We never created the service, but we now need to remove it.
-            # this is in contrast to the other services, which don't create this service
-            # automatically.
-            assert_rm_service(cephadm_module, 'iscsi.name')
+                assert_rm_daemon(cephadm_module, 'iscsi.name.test', 'test')
 
     @pytest.mark.parametrize(
         "on_bool",
@@ -633,7 +805,7 @@ class TestCephadm(object):
             'ident'
         ]
     )
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
     def test_blink_device_light(self, _run_cephadm, on_bool, fault_ident, cephadm_module):
         _run_cephadm.return_value = '{}', '', 0
         with with_host(cephadm_module, 'test'):
@@ -643,7 +815,7 @@ class TestCephadm(object):
             _run_cephadm.assert_called_with('test', 'osd', 'shell', [
                                             '--', 'lsmcli', f'local-disk-{fault_ident}-led-{on_off}', '--path', 'dev'], error_ok=True)
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
     def test_blink_device_light_custom(self, _run_cephadm, cephadm_module):
         _run_cephadm.return_value = '{}', '', 0
         with with_host(cephadm_module, 'test'):
@@ -653,7 +825,7 @@ class TestCephadm(object):
             _run_cephadm.assert_called_with('test', 'osd', 'shell', [
                                             '--', 'echo', 'hello'], error_ok=True)
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
     def test_blink_device_light_custom_per_host(self, _run_cephadm, cephadm_module):
         _run_cephadm.return_value = '{}', '', 0
         with with_host(cephadm_module, 'mgr0'):
@@ -677,6 +849,7 @@ class TestCephadm(object):
             (ServiceSpec('node-exporter'), CephadmOrchestrator.apply_node_exporter),
             (ServiceSpec('alertmanager'), CephadmOrchestrator.apply_alertmanager),
             (ServiceSpec('rbd-mirror'), CephadmOrchestrator.apply_rbd_mirror),
+            (ServiceSpec('cephfs-mirror'), CephadmOrchestrator.apply_rbd_mirror),
             (ServiceSpec('mds', service_id='fsname'), CephadmOrchestrator.apply_mds),
             (ServiceSpec(
                 'mds', service_id='fsname',
@@ -688,13 +861,14 @@ class TestCephadm(object):
                     )]
                 )
             ), CephadmOrchestrator.apply_mds),
-            (RGWSpec(rgw_realm='realm', rgw_zone='zone'), CephadmOrchestrator.apply_rgw),
+            (RGWSpec(service_id='foo'), CephadmOrchestrator.apply_rgw),
             (RGWSpec(
+                service_id='bar',
                 rgw_realm='realm', rgw_zone='zone',
                 placement=PlacementSpec(
                     hosts=[HostPlacementSpec(
                         hostname='test',
-                        name='realm.zone.a',
+                        name='bar',
                         network=''
                     )]
                 )
@@ -732,16 +906,36 @@ class TestCephadm(object):
                 envs=['SECRET=password'],
                 ports=[8080, 8443]
             ), CephadmOrchestrator.apply_container),
+            (ServiceSpec('cephadm-exporter'), CephadmOrchestrator.apply_cephadm_exporter),
         ]
     )
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
-    @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
+    @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
         with with_host(cephadm_module, 'test'):
             with with_service(cephadm_module, spec, meth, 'test'):
                 pass
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_mds_config_purge(self, cephadm_module: CephadmOrchestrator):
+        spec = ServiceSpec('mds', service_id='fsname')
+        with with_host(cephadm_module, 'test'):
+            with with_service(cephadm_module, spec, host='test'):
+                ret, out, err = cephadm_module.check_mon_command({
+                    'prefix': 'config get',
+                    'who': spec.service_name(),
+                    'key': 'mds_join_fs',
+                })
+                assert out == 'fsname'
+            ret, out, err = cephadm_module.check_mon_command({
+                'prefix': 'config get',
+                'who': spec.service_name(),
+                'key': 'mds_join_fs',
+            })
+            assert not out
+
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop")
     def test_daemon_ok_to_stop(self, ok_to_stop, cephadm_module: CephadmOrchestrator):
         spec = ServiceSpec(
@@ -766,7 +960,7 @@ class TestCephadm(object):
             match_glob(out, "Scheduled mds.fsname update...")
             CephadmServe(cephadm_module)._apply_all_services()
 
-            ok_to_stop.assert_called_with([daemon[4:]])
+            ok_to_stop.assert_called_with([daemon[4:]], force=True)
 
             assert_rm_daemon(cephadm_module, spec.service_name(), 'host1')  # verifies ok-to-stop
             assert_rm_daemon(cephadm_module, spec.service_name(), 'host2')
@@ -802,14 +996,14 @@ class TestCephadm(object):
             """
             fuse = False
 
-            @staticmethod
+            @ staticmethod
             def has_connection():
                 return False
 
             def import_module(self, *args, **kargs):
                 return mock.Mock()
 
-            @staticmethod
+            @ staticmethod
             def exit():
                 pass
 
@@ -824,13 +1018,13 @@ class TestCephadm(object):
                 with with_host(cephadm_module, 'test', refresh_hosts=False):
                     code, out, err = cephadm_module.check_host('test')
                     # First should succeed.
-                    assert err is ''
+                    assert err == ''
 
                     # On second it should attempt to reuse the connection, where the
                     # connection is "down" so will recreate the connection. The old
                     # code will blow up here triggering the BOOM!
                     code, out, err = cephadm_module.check_host('test')
-                    assert err is ''
+                    assert err == ''
 
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
     @mock.patch("remoto.process.check")
@@ -846,7 +1040,7 @@ class TestCephadm(object):
         with with_host(cephadm_module, 'test'):
             cephadm_module.set_module_option('manage_etc_ceph_ceph_conf', True)
             cephadm_module.config_notify()
-            assert cephadm_module.manage_etc_ceph_ceph_conf == True
+            assert cephadm_module.manage_etc_ceph_ceph_conf is True
 
             CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
             _check.assert_called_with(ANY, ['dd', 'of=/etc/ceph/ceph.conf'], stdin=b'')
@@ -880,7 +1074,7 @@ class TestCephadm(object):
         with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m:
             assert m.manage_etc_ceph_ceph_conf is True
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
     def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
         def check_registry_credentials(url, username, password):
             assert cephadm_module.get_module_option('registry_url') == url
@@ -927,9 +1121,9 @@ class TestCephadm(object):
             assert err == 'Host test failed to login to fail-url as fail-user with given password'
             check_registry_credentials('json-url', 'json-user', 'json-pass')
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(json.dumps({
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({
         'image_id': 'image_id',
-                    'repo_digest': 'image@repo_digest',
+                    'repo_digests': ['image@repo_digest'],
     })))
     @pytest.mark.parametrize("use_repo_digest",
                              [
@@ -937,10 +1131,12 @@ class TestCephadm(object):
                                  True
                              ])
     def test_upgrade_run(self, use_repo_digest, cephadm_module: CephadmOrchestrator):
+        cephadm_module.use_repo_digest = use_repo_digest
+
         with with_host(cephadm_module, 'test', refresh_hosts=False):
             cephadm_module.set_container_image('global', 'image')
+
             if use_repo_digest:
-                cephadm_module.use_repo_digest = True
 
                 CephadmServe(cephadm_module).convert_tags_to_repo_digest()
 
@@ -954,7 +1150,7 @@ class TestCephadm(object):
             else:
                 assert image == 'image'
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
     def test_ceph_volume_no_filter_for_batch(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
         _run_cephadm.return_value = ('{}', '', 0)
 
@@ -981,3 +1177,29 @@ Traceback (most recent call last):
                           ['--', 'inventory', '--format=json'], image='',
                           no_fsid=False),
             ]
+
+    @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
+    def test_osd_activate(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+        _run_cephadm.return_value = ('{}', '', 0)
+        with with_host(cephadm_module, 'test', refresh_hosts=False):
+            cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
+                'osds': [
+                    {
+                        'osd': 1,
+                        'up_from': 0,
+                        'uuid': 'uuid'
+                    }
+                ]
+            })
+
+            ceph_volume_lvm_list = {
+                '1': [{
+                    'tags': {
+                        'ceph.cluster_fsid': cephadm_module._cluster_fsid,
+                        'ceph.osd_fsid': 'uuid'
+                    }
+                }]
+            }
+            _run_cephadm.return_value = (json.dumps(ceph_volume_lvm_list), '', 0)
+            assert cephadm_module._osd_activate(
+                ['test']).stdout == "Created osd(s) 1 on host 'test'"