]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/tests/test_cephadm.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / cephadm / tests / test_cephadm.py
index f13680cab26441c0da6b8ef1c8ffae586d12f530..f64d8a0ce904d9ae074aa722f2f321c16df4ec6b 100644 (file)
@@ -5,7 +5,7 @@ from contextlib import contextmanager
 import pytest
 
 from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
-from cephadm.osd import OSDRemoval
+from cephadm.services.osd import OSDRemoval
 
 try:
     from typing import Any, List
@@ -15,7 +15,7 @@ except ImportError:
 from execnet.gateway_bootstrap import HostNotFound
 
 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, RGWSpec, \
-    NFSServiceSpec, IscsiServiceSpec
+    NFSServiceSpec, IscsiServiceSpec, HostPlacementSpec
 from ceph.deployment.drive_selection.selector import DriveSelection
 from ceph.deployment.inventory import Devices, Device
 from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
@@ -242,25 +242,52 @@ class TestCephadm(object):
         }
         json_out = json.dumps(dict_out)
         _mon_cmd.return_value = (0, json_out, '')
-        out = cephadm_module.find_destroyed_osds()
+        out = cephadm_module.osd_service.find_destroyed_osds()
         assert out == {'host1': ['0']}
 
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
     def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module):
         _mon_cmd.return_value = (1, "", "fail_msg")
         with pytest.raises(OrchestratorError):
-            out = cephadm_module.find_destroyed_osds()
+            out = cephadm_module.osd_service.find_destroyed_osds()
 
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
-    @mock.patch("cephadm.module.SpecStore.save")
-    def test_apply_osd_save(self, _save_spec, cephadm_module):
+    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+    def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+        _run_cephadm.return_value = ('{}', '', 0)
         with self._with_host(cephadm_module, 'test'):
-            json_spec = {'service_type': 'osd', 'host_pattern': 'test', 'service_id': 'foo', 'data_devices': {'all': True}}
-            spec = ServiceSpec.from_json(json_spec)
-            assert isinstance(spec, DriveGroupSpec)
+
+            spec = DriveGroupSpec(
+                service_id='foo',
+                placement=PlacementSpec(
+                    host_pattern='*',
+                ),
+                data_devices=DeviceSelection(
+                    all=True
+                )
+            )
+
             c = cephadm_module.apply_drivegroups([spec])
             assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
-            _save_spec.assert_called_with(spec)
+
+            inventory = Devices([
+                Device(
+                    '/dev/sdb',
+                    available=True
+                ),
+            ])
+
+            cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {})
+
+            _run_cephadm.return_value = (['{}'], '', 0)
+
+            assert cephadm_module._apply_all_services() == False
+
+            _run_cephadm.assert_any_call(
+                'test', 'osd', 'ceph-volume',
+                ['--config-json', '-', '--', 'lvm', 'prepare', '--bluestore', '--data', '/dev/sdb', '--no-systemd'],
+                env_vars=[], error_ok=True, stdin='{"config": "", "keyring": ""}')
+            _run_cephadm.assert_called_with('test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'])
+
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
     @mock.patch("cephadm.module.SpecStore.save")
@@ -285,7 +312,7 @@ class TestCephadm(object):
     def test_prepare_drivegroup(self, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
-            out = cephadm_module.prepare_drivegroup(dg)
+            out = cephadm_module.osd_service.prepare_drivegroup(dg)
             assert len(out) == 1
             f1 = out[0]
             assert f1[0] == 'test'
@@ -297,38 +324,22 @@ class TestCephadm(object):
             # no preview and only one disk, prepare is used due the hack that is in place.
             (['/dev/sda'], False, "lvm prepare --bluestore --data /dev/sda --no-systemd"),
             # no preview and multiple disks, uses batch
-            (['/dev/sda', '/dev/sdb'], False, "lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"),
+            (['/dev/sda', '/dev/sdb'], False, "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"),
             # preview and only one disk needs to use batch again to generate the preview
             (['/dev/sda'], True, "lvm batch --no-auto /dev/sda --report --format json"),
             # preview and multiple disks work the same
-            (['/dev/sda', '/dev/sdb'], True, "lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
+            (['/dev/sda', '/dev/sdb'], True, "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
         ]
     )
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
     def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
         with self._with_host(cephadm_module, 'test'):
-            dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
+            dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
             ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
             preview = preview
-            out = cephadm_module.driveselection_to_ceph_volume(dg, ds, [], preview)
+            out = cephadm_module.osd_service.driveselection_to_ceph_volume(dg, ds, [], preview)
             assert out in exp_command
 
-    @mock.patch("cephadm.module.SpecStore.find")
-    @mock.patch("cephadm.module.CephadmOrchestrator.prepare_drivegroup")
-    @mock.patch("cephadm.module.CephadmOrchestrator.driveselection_to_ceph_volume")
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_ceph_volume_command")
-    @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
-    def test_preview_drivegroups_str(self, _run_c_v_command, _ds_to_cv, _prepare_dg, _find_store, cephadm_module):
-        with self._with_host(cephadm_module, 'test'):
-            dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
-            _find_store.return_value = [dg]
-            _prepare_dg.return_value = [('host1', 'ds_dummy')]
-            _run_c_v_command.return_value = ("{}", '', 0)
-            cephadm_module.preview_drivegroups(drive_group_name='foo')
-            _find_store.assert_called_once_with(service_name='foo')
-            _prepare_dg.assert_called_once_with(dg)
-            _run_c_v_command.assert_called_once()
-
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
         json.dumps([
             dict(
@@ -341,7 +352,7 @@ class TestCephadm(object):
             )
         ])
     ))
-    @mock.patch("cephadm.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
+    @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
     def test_remove_osds(self, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.list_daemons(refresh=True)
@@ -425,7 +436,11 @@ class TestCephadm(object):
     def test_nfs(self, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
-            spec = NFSServiceSpec('name', pool='pool', namespace='namespace', placement=ps)
+            spec = NFSServiceSpec(
+                    service_id='name',
+                    pool='pool',
+                    namespace='namespace',
+                    placement=ps)
             c = cephadm_module.add_nfs(spec)
             [out] = wait(cephadm_module, c)
             match_glob(out, "Deployed nfs.name.* on host 'test'")
@@ -438,10 +453,16 @@ class TestCephadm(object):
             assert_rm_service(cephadm_module, 'nfs.name')
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+    @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
     def test_iscsi(self, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
-            spec = IscsiServiceSpec('name', pool='pool', placement=ps)
+            spec = IscsiServiceSpec(
+                    service_id='name',
+                    pool='pool',
+                    api_user='user',
+                    api_password='password',
+                    placement=ps)
             c = cephadm_module.add_iscsi(spec)
             [out] = wait(cephadm_module, c)
             match_glob(out, "Deployed iscsi.name.* on host 'test'")
@@ -470,19 +491,56 @@ class TestCephadm(object):
             (ServiceSpec('alertmanager'), CephadmOrchestrator.apply_alertmanager),
             (ServiceSpec('rbd-mirror'), CephadmOrchestrator.apply_rbd_mirror),
             (ServiceSpec('mds', service_id='fsname'), CephadmOrchestrator.apply_mds),
+            (ServiceSpec(
+                'mds', service_id='fsname',
+                placement=PlacementSpec(
+                    hosts=[HostPlacementSpec(
+                        hostname='test',
+                        name='fsname',
+                        network=''
+                    )]
+                )
+            ), CephadmOrchestrator.apply_mds),
             (RGWSpec(rgw_realm='realm', rgw_zone='zone'), CephadmOrchestrator.apply_rgw),
-            (NFSServiceSpec('name', pool='pool', namespace='namespace'), CephadmOrchestrator.apply_nfs),
-            (IscsiServiceSpec('name', pool='pool'), CephadmOrchestrator.apply_iscsi),
+            (RGWSpec(
+                rgw_realm='realm', rgw_zone='zone',
+                placement=PlacementSpec(
+                    hosts=[HostPlacementSpec(
+                        hostname='test',
+                        name='realm.zone.a',
+                        network=''
+                    )]
+                )
+            ), CephadmOrchestrator.apply_rgw),
+            (NFSServiceSpec(
+                service_id='name',
+                pool='pool',
+                namespace='namespace'
+            ), CephadmOrchestrator.apply_nfs),
+            (IscsiServiceSpec(
+                service_id='name',
+                pool='pool',
+                api_user='user',
+                api_password='password'
+            ), CephadmOrchestrator.apply_iscsi),
         ]
     )
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
-    def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module):
+    def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
         with self._with_host(cephadm_module, 'test'):
-            spec.placement = PlacementSpec(hosts=['test'], count=1)
+            if not spec.placement:
+                spec.placement = PlacementSpec(hosts=['test'], count=1)
             c = meth(cephadm_module, spec)
             assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...'
             assert [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())] == [spec]
 
+            cephadm_module._apply_all_services()
+
+            dds = wait(cephadm_module, cephadm_module.list_daemons())
+            for dd in dds:
+                assert dd.service_name() == spec.service_name()
+
+
             assert_rm_service(cephadm_module, spec.service_name())
 
 
@@ -504,3 +562,45 @@ class TestCephadm(object):
             assert cephadm_module._check_host('test') is None
             out = wait(cephadm_module, cephadm_module.get_hosts())[0].to_json()
             assert out == HostSpec('test', 'test').to_json()
+
+    def test_stale_connections(self, cephadm_module):
+        class Connection(object):
+            """
+            A mocked connection class that only allows the use of the connection
+            once. If you attempt to use it again via a _check, it'll explode (go
+            boom!).
+
+            The old code triggers the boom. The new code checks the has_connection
+            and will recreate the connection.
+            """
+            fuse = False
+
+            @staticmethod
+            def has_connection():
+                return False
+
+            def import_module(self, *args, **kargs):
+                return mock.Mock()
+
+            @staticmethod
+            def exit():
+                pass
+
+        def _check(conn, *args, **kargs):
+            if conn.fuse:
+                raise Exception("boom: connection is dead")
+            else:
+                conn.fuse = True
+            return '{}', None, 0
+        with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]):
+            with mock.patch("remoto.process.check", _check):
+                with self._with_host(cephadm_module, 'test'):
+                    code, out, err = cephadm_module.check_host('test')
+                    # First should succeed.
+                    assert err is None
+
+                    # On second it should attempt to reuse the connection, where the
+                    # connection is "down" so will recreate the connection. The old
+                    # code will blow up here triggering the BOOM!
+                    code, out, err = cephadm_module.check_host('test')
+                    assert err is None