import pytest
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
-from cephadm.osd import OSDRemoval
+from cephadm.services.osd import OSDRemoval
try:
from typing import Any, List
from execnet.gateway_bootstrap import HostNotFound
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, RGWSpec, \
- NFSServiceSpec, IscsiServiceSpec
+ NFSServiceSpec, IscsiServiceSpec, HostPlacementSpec
from ceph.deployment.drive_selection.selector import DriveSelection
from ceph.deployment.inventory import Devices, Device
from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
}
json_out = json.dumps(dict_out)
_mon_cmd.return_value = (0, json_out, '')
- out = cephadm_module.find_destroyed_osds()
+ out = cephadm_module.osd_service.find_destroyed_osds()
assert out == {'host1': ['0']}
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module):
_mon_cmd.return_value = (1, "", "fail_msg")
with pytest.raises(OrchestratorError):
- out = cephadm_module.find_destroyed_osds()
+ out = cephadm_module.osd_service.find_destroyed_osds()
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
- @mock.patch("cephadm.module.SpecStore.save")
- def test_apply_osd_save(self, _save_spec, cephadm_module):
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
with self._with_host(cephadm_module, 'test'):
- json_spec = {'service_type': 'osd', 'host_pattern': 'test', 'service_id': 'foo', 'data_devices': {'all': True}}
- spec = ServiceSpec.from_json(json_spec)
- assert isinstance(spec, DriveGroupSpec)
+
+ spec = DriveGroupSpec(
+ service_id='foo',
+ placement=PlacementSpec(
+ host_pattern='*',
+ ),
+ data_devices=DeviceSelection(
+ all=True
+ )
+ )
+
c = cephadm_module.apply_drivegroups([spec])
assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
- _save_spec.assert_called_with(spec)
+
+ inventory = Devices([
+ Device(
+ '/dev/sdb',
+ available=True
+ ),
+ ])
+
+ cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {})
+
+ _run_cephadm.return_value = (['{}'], '', 0)
+
+ assert cephadm_module._apply_all_services() == False
+
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume',
+ ['--config-json', '-', '--', 'lvm', 'prepare', '--bluestore', '--data', '/dev/sdb', '--no-systemd'],
+ env_vars=[], error_ok=True, stdin='{"config": "", "keyring": ""}')
+ _run_cephadm.assert_called_with('test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'])
+
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")
def test_prepare_drivegroup(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
- out = cephadm_module.prepare_drivegroup(dg)
+ out = cephadm_module.osd_service.prepare_drivegroup(dg)
assert len(out) == 1
f1 = out[0]
assert f1[0] == 'test'
# no preview and only one disk, prepare is used due the hack that is in place.
(['/dev/sda'], False, "lvm prepare --bluestore --data /dev/sda --no-systemd"),
# no preview and multiple disks, uses batch
- (['/dev/sda', '/dev/sdb'], False, "lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"),
+ (['/dev/sda', '/dev/sdb'], False, "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"),
# preview and only one disk needs to use batch again to generate the preview
(['/dev/sda'], True, "lvm batch --no-auto /dev/sda --report --format json"),
# preview and multiple disks work the same
- (['/dev/sda', '/dev/sdb'], True, "lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
+ (['/dev/sda', '/dev/sdb'], True, "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
]
)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
with self._with_host(cephadm_module, 'test'):
- dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
+ dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
preview = preview
- out = cephadm_module.driveselection_to_ceph_volume(dg, ds, [], preview)
+ out = cephadm_module.osd_service.driveselection_to_ceph_volume(dg, ds, [], preview)
assert out in exp_command
- @mock.patch("cephadm.module.SpecStore.find")
- @mock.patch("cephadm.module.CephadmOrchestrator.prepare_drivegroup")
- @mock.patch("cephadm.module.CephadmOrchestrator.driveselection_to_ceph_volume")
- @mock.patch("cephadm.module.CephadmOrchestrator._run_ceph_volume_command")
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
- def test_preview_drivegroups_str(self, _run_c_v_command, _ds_to_cv, _prepare_dg, _find_store, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
- dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
- _find_store.return_value = [dg]
- _prepare_dg.return_value = [('host1', 'ds_dummy')]
- _run_c_v_command.return_value = ("{}", '', 0)
- cephadm_module.preview_drivegroups(drive_group_name='foo')
- _find_store.assert_called_once_with(service_name='foo')
- _prepare_dg.assert_called_once_with(dg)
- _run_c_v_command.assert_called_once()
-
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
json.dumps([
dict(
)
])
))
- @mock.patch("cephadm.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
+ @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
def test_remove_osds(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
def test_nfs(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
- spec = NFSServiceSpec('name', pool='pool', namespace='namespace', placement=ps)
+ spec = NFSServiceSpec(
+ service_id='name',
+ pool='pool',
+ namespace='namespace',
+ placement=ps)
c = cephadm_module.add_nfs(spec)
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed nfs.name.* on host 'test'")
assert_rm_service(cephadm_module, 'nfs.name')
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
def test_iscsi(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
- spec = IscsiServiceSpec('name', pool='pool', placement=ps)
+ spec = IscsiServiceSpec(
+ service_id='name',
+ pool='pool',
+ api_user='user',
+ api_password='password',
+ placement=ps)
c = cephadm_module.add_iscsi(spec)
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed iscsi.name.* on host 'test'")
(ServiceSpec('alertmanager'), CephadmOrchestrator.apply_alertmanager),
(ServiceSpec('rbd-mirror'), CephadmOrchestrator.apply_rbd_mirror),
(ServiceSpec('mds', service_id='fsname'), CephadmOrchestrator.apply_mds),
+ (ServiceSpec(
+ 'mds', service_id='fsname',
+ placement=PlacementSpec(
+ hosts=[HostPlacementSpec(
+ hostname='test',
+ name='fsname',
+ network=''
+ )]
+ )
+ ), CephadmOrchestrator.apply_mds),
(RGWSpec(rgw_realm='realm', rgw_zone='zone'), CephadmOrchestrator.apply_rgw),
- (NFSServiceSpec('name', pool='pool', namespace='namespace'), CephadmOrchestrator.apply_nfs),
- (IscsiServiceSpec('name', pool='pool'), CephadmOrchestrator.apply_iscsi),
+ (RGWSpec(
+ rgw_realm='realm', rgw_zone='zone',
+ placement=PlacementSpec(
+ hosts=[HostPlacementSpec(
+ hostname='test',
+ name='realm.zone.a',
+ network=''
+ )]
+ )
+ ), CephadmOrchestrator.apply_rgw),
+ (NFSServiceSpec(
+ service_id='name',
+ pool='pool',
+ namespace='namespace'
+ ), CephadmOrchestrator.apply_nfs),
+ (IscsiServiceSpec(
+ service_id='name',
+ pool='pool',
+ api_user='user',
+ api_password='password'
+ ), CephadmOrchestrator.apply_iscsi),
]
)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
- def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module):
+ def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
with self._with_host(cephadm_module, 'test'):
- spec.placement = PlacementSpec(hosts=['test'], count=1)
+ if not spec.placement:
+ spec.placement = PlacementSpec(hosts=['test'], count=1)
c = meth(cephadm_module, spec)
assert wait(cephadm_module, c) == f'Scheduled {spec.service_name()} update...'
assert [d.spec for d in wait(cephadm_module, cephadm_module.describe_service())] == [spec]
+ cephadm_module._apply_all_services()
+
+ dds = wait(cephadm_module, cephadm_module.list_daemons())
+ for dd in dds:
+ assert dd.service_name() == spec.service_name()
+
+
assert_rm_service(cephadm_module, spec.service_name())
assert cephadm_module._check_host('test') is None
out = wait(cephadm_module, cephadm_module.get_hosts())[0].to_json()
assert out == HostSpec('test', 'test').to_json()
+
+ def test_stale_connections(self, cephadm_module):
+ class Connection(object):
+ """
+ A mocked connection class that only allows the use of the connection
+ once. If you attempt to use it again via a _check, it'll explode (go
+ boom!).
+
+ The old code triggers the boom. The new code checks the has_connection
+ and will recreate the connection.
+ """
+ fuse = False
+
+ @staticmethod
+ def has_connection():
+ return False
+
+ def import_module(self, *args, **kargs):
+ return mock.Mock()
+
+ @staticmethod
+ def exit():
+ pass
+
+ def _check(conn, *args, **kargs):
+ if conn.fuse:
+ raise Exception("boom: connection is dead")
+ else:
+ conn.fuse = True
+ return '{}', None, 0
+ with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]):
+ with mock.patch("remoto.process.check", _check):
+ with self._with_host(cephadm_module, 'test'):
+ code, out, err = cephadm_module.check_host('test')
+ # First should succeed.
+ assert err is None
+
+ # On second it should attempt to reuse the connection, where the
+ # connection is "down" so will recreate the connection. The old
+ # code will blow up here triggering the BOOM!
+ code, out, err = cephadm_module.check_host('test')
+ assert err is None