-import datetime
import json
from contextlib import contextmanager
from unittest.mock import ANY
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
from cephadm.serve import CephadmServe
-from cephadm.services.osd import OSD, OSDQueue
+from cephadm.services.osd import OSD, OSDRemovalQueue
+from cephadm.utils import CephadmNoImage
try:
from typing import Any, List
NFSServiceSpec, IscsiServiceSpec, HostPlacementSpec, CustomContainerSpec
from ceph.deployment.drive_selection.selector import DriveSelection
from ceph.deployment.inventory import Devices, Device
+from ceph.utils import datetime_to_str, datetime_now
from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
HostSpec, OrchestratorError
from tests import mock
from .fixtures import cephadm_module, wait, _run_cephadm, match_glob, with_host, \
with_cephadm_module, with_service, assert_rm_service
-from cephadm.module import CephadmOrchestrator, CEPH_DATEFMT
+from cephadm.module import CephadmOrchestrator
"""
TODOs:
# Make sure, _check_daemons does a redeploy due to monmap change:
cephadm_module._store['_ceph_get/mon_map'] = {
- 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'modified': datetime_to_str(datetime_now()),
'fsid': 'foobar',
}
cephadm_module.notify('mon_map', None)
# Make sure, _check_daemons does a redeploy due to monmap change:
cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
- 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'modified': datetime_to_str(datetime_now()),
'fsid': 'foobar',
})
cephadm_module.notify('mon_map', None)
# Make sure, _check_daemons does a redeploy due to monmap change:
cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
- 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'modified': datetime_to_str(datetime_now()),
'fsid': 'foobar',
})
cephadm_module.notify('mon_map', None)
with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd:
CephadmServe(cephadm_module)._check_daemons()
_mon_cmd.assert_any_call(
- {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'})
+ {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'},
+ None)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_mon_add(self, cephadm_module):
out = cephadm_module.osd_service.find_destroyed_osds()
assert out == {'host1': ['0']}
+ @ pytest.mark.parametrize(
+ "ceph_services, cephadm_daemons, strays_expected",
+ # [ ([(daemon_type, daemon_id), ... ], [...], [...]), ... ]
+ [
+ (
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ [],
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ ),
+ (
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ [],
+ ),
+ (
+ [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
+ [('mds', 'a'), ('osd', '0')],
+ [('mgr', 'x')],
+ ),
+ # https://tracker.ceph.com/issues/49573
+ (
+ [('rgw-nfs', 'nfs.foo.host1-rgw')],
+ [],
+ [('nfs', 'foo.host1')],
+ ),
+ (
+ [('rgw-nfs', 'nfs.foo.host1-rgw')],
+ [('nfs', 'foo.host1')],
+ [],
+ ),
+ (
+ [],
+ [('nfs', 'foo.host1')],
+ [],
+ ),
+ ]
+ )
+ def test_check_for_stray_daemons(
+ self,
+ cephadm_module,
+ ceph_services,
+ cephadm_daemons,
+ strays_expected
+ ):
+ # mock ceph service-map
+ services = []
+ for service in ceph_services:
+ s = {'type': service[0], 'id': service[1]}
+ services.append(s)
+ ls = [{'hostname': 'host1', 'services': services}]
+
+ with mock.patch.object(cephadm_module, 'list_servers', mock.MagicMock()) as list_servers:
+ list_servers.return_value = ls
+ list_servers.__iter__.side_effect = ls.__iter__
+
+ # populate cephadm daemon cache
+ dm = {}
+ for daemon_type, daemon_id in cephadm_daemons:
+ dd = DaemonDescription(daemon_type=daemon_type, daemon_id=daemon_id)
+ dm[dd.name()] = dd
+ cephadm_module.cache.update_host_daemons('host1', dm)
+
+ # test
+ CephadmServe(cephadm_module)._check_for_strays()
+
+ # verify
+ strays = cephadm_module.health_checks.get('CEPHADM_STRAY_DAEMON')
+ if not strays:
+ assert len(strays_expected) == 0
+ else:
+ for dt, di in strays_expected:
+ name = '%s.%s' % (dt, di)
+ for detail in strays['detail']:
+ if name in detail:
+ strays['detail'].remove(detail)
+ break
+ assert name in detail
+ assert len(strays['detail']) == 0
+ assert strays['count'] == len(strays_expected)
+
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module):
_mon_cmd.return_value = (1, "", "fail_msg")
_run_cephadm.assert_called_with(
'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'])
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ def test_apply_osd_save_non_collocated(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+ with with_host(cephadm_module, 'test'):
+
+ spec = DriveGroupSpec(
+ service_id='noncollocated',
+ placement=PlacementSpec(
+ hosts=['test']
+ ),
+ data_devices=DeviceSelection(paths=['/dev/sdb']),
+ db_devices=DeviceSelection(paths=['/dev/sdc']),
+ wal_devices=DeviceSelection(paths=['/dev/sdd'])
+ )
+
+ c = cephadm_module.apply([spec])
+ assert wait(cephadm_module, c) == ['Scheduled osd.noncollocated update...']
+
+ inventory = Devices([
+ Device('/dev/sdb', available=True),
+ Device('/dev/sdc', available=True),
+ Device('/dev/sdd', available=True)
+ ])
+
+ cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {})
+
+ _run_cephadm.return_value = (['{}'], '', 0)
+
+ assert CephadmServe(cephadm_module)._apply_all_services() is False
+
+ _run_cephadm.assert_any_call(
+ 'test', 'osd', 'ceph-volume',
+ ['--config-json', '-', '--', 'lvm', 'batch',
+ '--no-auto', '/dev/sdb', '--db-devices', '/dev/sdc',
+ '--wal-devices', '/dev/sdd', '--yes', '--no-systemd'],
+ env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=noncollocated'],
+ error_ok=True, stdin='{"config": "", "keyring": ""}')
+ _run_cephadm.assert_called_with(
+ 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'])
+
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")
def test_apply_osd_save_placement(self, _save_spec, cephadm_module):
out = wait(cephadm_module, c)
assert out == "Created no osd(s) on host test; already created?"
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ def test_create_noncollocated_osd(self, cephadm_module):
+ with with_host(cephadm_module, 'test'):
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['']))
+ c = cephadm_module.create_osds(dg)
+ out = wait(cephadm_module, c)
+ assert out == "Created no osd(s) on host test; already created?"
+
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_prepare_drivegroup(self, cephadm_module):
with with_host(cephadm_module, 'test'):
force=False,
hostname='test',
fullname='osd.0',
- process_started_at=datetime.datetime.utcnow(),
- remove_util=cephadm_module.rm_util
+ process_started_at=datetime_now(),
+ remove_util=cephadm_module.to_remove_osds.rm_util
))
- cephadm_module.rm_util.process_removal_queue()
- assert cephadm_module.to_remove_osds == OSDQueue()
+ cephadm_module.to_remove_osds.process_removal_queue()
+ assert cephadm_module.to_remove_osds == OSDRemovalQueue(cephadm_module)
c = cephadm_module.remove_osds_status()
out = wait(cephadm_module, c)
raise Exception("boom: connection is dead")
else:
conn.fuse = True
- return '{}', None, 0
+ return '{}', [], 0
with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]):
with mock.patch("remoto.process.check", _check):
with with_host(cephadm_module, 'test', refresh_hosts=False):
code, out, err = cephadm_module.check_host('test')
# First should succeed.
- assert err is None
+ assert err is ''
# On second it should attempt to reuse the connection, where the
# connection is "down" so will recreate the connection. The old
# code will blow up here triggering the BOOM!
code, out, err = cephadm_module.check_host('test')
- assert err is None
+ assert err is ''
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("remoto.process.check")
# Make sure, _check_daemons does a redeploy due to monmap change:
cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
- 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'modified': datetime_to_str(datetime_now()),
'fsid': 'foobar',
})
cephadm_module.notify('mon_map', mock.MagicMock())
assert image == 'image@repo_digest'
else:
assert image == 'image'
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ def test_ceph_volume_no_filter_for_batch(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ error_message = """cephadm exited with an error code: 1, stderr:/usr/bin/podman:stderr usage: ceph-volume inventory [-h] [--format {plain,json,json-pretty}] [path]/usr/bin/podman:stderr ceph-volume inventory: error: unrecognized arguments: --filter-for-batch
+Traceback (most recent call last):
+ File "<stdin>", line 6112, in <module>
+ File "<stdin>", line 1299, in _infer_fsid
+ File "<stdin>", line 1382, in _infer_image
+ File "<stdin>", line 3612, in command_ceph_volume
+ File "<stdin>", line 1061, in call_throws"""
+
+ with with_host(cephadm_module, 'test'):
+ _run_cephadm.reset_mock()
+ _run_cephadm.side_effect = OrchestratorError(error_message)
+
+ s = CephadmServe(cephadm_module)._refresh_host_devices('test')
+ assert s == 'host test `cephadm ceph-volume` failed: ' + error_message
+
+ assert _run_cephadm.mock_calls == [
+ mock.call('test', 'osd', 'ceph-volume',
+ ['--', 'inventory', '--format=json', '--filter-for-batch'], image='',
+ no_fsid=False),
+ mock.call('test', 'osd', 'ceph-volume',
+ ['--', 'inventory', '--format=json'], image='',
+ no_fsid=False),
+ ]