import pytest
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
+from cephadm.serve import CephadmServe
from cephadm.services.osd import OSD, OSDQueue
try:
from execnet.gateway_bootstrap import HostNotFound
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, RGWSpec, \
- NFSServiceSpec, IscsiServiceSpec, HostPlacementSpec
+ NFSServiceSpec, IscsiServiceSpec, HostPlacementSpec, CustomContainerSpec
from ceph.deployment.drive_selection.selector import DriveSelection
from ceph.deployment.inventory import Devices, Device
from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
# Be careful with backward compatibility when changing things here:
assert json.loads(cephadm_module.get_store('inventory')) == \
- {"test": {"hostname": "test", "addr": "test", "labels": [], "status": ""}}
+ {"test": {"hostname": "test", "addr": "test", "labels": [], "status": ""}}
with with_host(cephadm_module, 'second'):
assert wait(cephadm_module, cephadm_module.get_hosts()) == [
assert wait(cephadm_module, cephadm_module.get_hosts()) == []
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
- @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
+ @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_service_ls(self, cephadm_module):
with with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
expected = [
{
- 'placement': {'hosts': [{'hostname': 'test', 'name': '', 'network': ''}]},
+ 'placement': {'hosts': ['test']},
'service_id': 'name',
'service_name': 'mds.name',
'service_type': 'mds',
{
'placement': {
'count': 1,
- 'hosts': [{'hostname': 'test', 'name': '', 'network': ''}]
+ 'hosts': ["test"]
},
'spec': {
'rgw_realm': 'r',
def test_list_daemons(self, cephadm_module: CephadmOrchestrator):
cephadm_module.service_cache_timeout = 10
with with_host(cephadm_module, 'test'):
- cephadm_module._refresh_host_daemons('test')
+ CephadmServe(cephadm_module)._refresh_host_daemons('test')
c = cephadm_module.list_daemons()
assert wait(cephadm_module, c)[0].name() == 'rgw.myrgw.foobar'
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
- @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
+ @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_daemon_action(self, cephadm_module: CephadmOrchestrator):
cephadm_module.service_cache_timeout = 10
with with_host(cephadm_module, 'test'):
with with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
c = cephadm_module.daemon_action('redeploy', 'rgw.' + daemon_id)
- assert wait(cephadm_module, c) == f"Deployed rgw.{daemon_id} on host 'test'"
+ assert wait(cephadm_module,
+ c) == f"Scheduled to redeploy rgw.{daemon_id} on host 'test'"
for what in ('start', 'stop', 'restart'):
c = cephadm_module.daemon_action(what, 'rgw.' + daemon_id)
- assert wait(cephadm_module, c) == what + f" rgw.{daemon_id} from host 'test'"
+ assert wait(cephadm_module,
+ c) == F"Scheduled to {what} rgw.{daemon_id} on host 'test'"
# Make sure, _check_daemons does a redeploy due to monmap change:
cephadm_module._store['_ceph_get/mon_map'] = {
}
cephadm_module.notify('mon_map', None)
- cephadm_module._check_daemons()
+ CephadmServe(cephadm_module)._check_daemons()
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
- @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
+ @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator):
cephadm_module.service_cache_timeout = 10
with with_host(cephadm_module, 'test'):
})
cephadm_module.notify('mon_map', None)
- cephadm_module._check_daemons()
+ CephadmServe(cephadm_module)._check_daemons()
- evs = [e.message for e in cephadm_module.events.get_for_daemon(f'rgw.{daemon_id}')]
+ evs = [e.message for e in cephadm_module.events.get_for_daemon(
+ f'rgw.{daemon_id}')]
assert 'myerror' in ''.join(evs)
+ @pytest.mark.parametrize(
+ "action",
+ [
+ 'start',
+ 'stop',
+ 'restart',
+ 'reconfig',
+ 'redeploy'
+ ]
+ )
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
+ def test_daemon_check(self, cephadm_module: CephadmOrchestrator, action):
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test') as d_names:
+ [daemon_name] = d_names
+
+ cephadm_module._schedule_daemon_action(daemon_name, action)
+
+ assert cephadm_module.cache.get_scheduled_daemon_action(
+ 'test', daemon_name) == action
+
+ CephadmServe(cephadm_module)._check_daemons()
+
+ assert cephadm_module.cache.get_scheduled_daemon_action('test', daemon_name) is None
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ def test_daemon_check_extra_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
+ _run_cephadm.return_value = ('{}', '', 0)
+
+ with with_host(cephadm_module, 'test'):
+
+ # Also testing deploying mons without explicit network placement
+ cephadm_module.check_mon_command({
+ 'prefix': 'config set',
+ 'who': 'mon',
+ 'name': 'public_network',
+ 'value': '127.0.0.0/8'
+ })
+
+ cephadm_module.cache.update_host_devices_networks(
+ 'test',
+ [],
+ {
+ "127.0.0.0/8": [
+ "127.0.0.1"
+ ],
+ }
+ )
+
+ with with_service(cephadm_module, ServiceSpec(service_type='mon'), CephadmOrchestrator.apply_mon, 'test') as d_names:
+ [daemon_name] = d_names
+
+ cephadm_module._set_extra_ceph_conf('[mon]\nk=v')
+
+ CephadmServe(cephadm_module)._check_daemons()
+
+ _run_cephadm.assert_called_with('test', 'mon.test', 'deploy', [
+ '--name', 'mon.test', '--reconfig', '--config-json', '-'],
+ stdin='{"config": "\\n\\n[mon]\\nk=v\\n", "keyring": ""}',
+ image='')
+
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_daemon_check_post(self, cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'test'):
})
with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd:
-
- cephadm_module._check_daemons()
- _mon_cmd.assert_any_call({'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'})
-
+ CephadmServe(cephadm_module)._check_daemons()
+ _mon_cmd.assert_any_call(
+ {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'})
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_mon_add(self, cephadm_module):
def test_mgr_update(self, cephadm_module):
with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
- r = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps))
+ r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps))
assert r
assert_rm_daemon(cephadm_module, 'mgr.a', 'test')
_run_cephadm.return_value = (['{}'], '', 0)
- assert cephadm_module._apply_all_services() == False
+ assert CephadmServe(cephadm_module)._apply_all_services() == False
_run_cephadm.assert_any_call(
'test', 'osd', 'ceph-volume',
- ['--config-json', '-', '--', 'lvm', 'prepare', '--bluestore', '--data', '/dev/sdb', '--no-systemd'],
+ ['--config-json', '-', '--', 'lvm', 'batch',
+ '--no-auto', '/dev/sdb', '--yes', '--no-systemd'],
env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=foo'], error_ok=True, stdin='{"config": "", "keyring": ""}')
- _run_cephadm.assert_called_with('test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'])
-
+ _run_cephadm.assert_called_with(
+ 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'])
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")
def test_apply_osd_save_placement(self, _save_spec, cephadm_module):
with with_host(cephadm_module, 'test'):
- json_spec = {'service_type': 'osd', 'placement': {'host_pattern': 'test'}, 'service_id': 'foo', 'data_devices': {'all': True}}
+ json_spec = {'service_type': 'osd', 'placement': {'host_pattern': 'test'},
+ 'service_id': 'foo', 'data_devices': {'all': True}}
spec = ServiceSpec.from_json(json_spec)
assert isinstance(spec, DriveGroupSpec)
c = cephadm_module.apply([spec])
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_create_osds(self, cephadm_module):
with with_host(cephadm_module, 'test'):
- dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['']))
c = cephadm_module.create_osds(dg)
out = wait(cephadm_module, c)
assert out == "Created no osd(s) on host test; already created?"
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_prepare_drivegroup(self, cephadm_module):
with with_host(cephadm_module, 'test'):
- dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
+ dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
+ data_devices=DeviceSelection(paths=['']))
out = cephadm_module.osd_service.prepare_drivegroup(dg)
assert len(out) == 1
f1 = out[0]
"devices, preview, exp_command",
[
# no preview and only one disk, prepare is used due the hack that is in place.
- (['/dev/sda'], False, "lvm prepare --bluestore --data /dev/sda --no-systemd"),
+ (['/dev/sda'], False, "lvm batch --no-auto /dev/sda --yes --no-systemd"),
# no preview and multiple disks, uses batch
- (['/dev/sda', '/dev/sdb'], False, "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"),
+ (['/dev/sda', '/dev/sdb'], False,
+ "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"),
# preview and only one disk needs to use batch again to generate the preview
- (['/dev/sda'], True, "lvm batch --no-auto /dev/sda --report --format json"),
+ (['/dev/sda'], True, "lvm batch --no-auto /dev/sda --yes --no-systemd --report --format json"),
# preview and multiple disks work the same
- (['/dev/sda', '/dev/sdb'], True, "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
+ (['/dev/sda', '/dev/sdb'], True,
+ "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
]
)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
with with_host(cephadm_module, 'test'):
- dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
+ dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(
+ host_pattern='test'), data_devices=DeviceSelection(paths=devices))
ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
preview = preview
out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
@mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
def test_remove_osds(self, cephadm_module):
with with_host(cephadm_module, 'test'):
- cephadm_module._refresh_host_daemons('test')
+ CephadmServe(cephadm_module)._refresh_host_daemons('test')
c = cephadm_module.list_daemons()
wait(cephadm_module, c)
assert out == []
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
- @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
+ @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_rgw_update(self, cephadm_module):
with with_host(cephadm_module, 'host1'):
with with_host(cephadm_module, 'host2'):
ps = PlacementSpec(hosts=['host1'], count=1)
- c = cephadm_module.add_rgw(RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
+ c = cephadm_module.add_rgw(
+ RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
- r = cephadm_module._apply_service(RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
+ r = CephadmServe(cephadm_module)._apply_service(
+ RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
assert r
assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host1')
))
def test_remove_daemon(self, cephadm_module):
with with_host(cephadm_module, 'test'):
- cephadm_module._refresh_host_daemons('test')
+ CephadmServe(cephadm_module)._refresh_host_daemons('test')
c = cephadm_module.list_daemons()
wait(cephadm_module, c)
c = cephadm_module.remove_daemons(['rgw.myrgw.myhost.myid'])
]
)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
- @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
+ @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_daemon_add(self, spec: ServiceSpec, meth, cephadm_module):
with with_host(cephadm_module, 'test'):
with with_daemon(cephadm_module, spec, meth, 'test'):
with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
spec = NFSServiceSpec(
- service_id='name',
- pool='pool',
- namespace='namespace',
- placement=ps)
+ service_id='name',
+ pool='pool',
+ namespace='namespace',
+ placement=ps)
c = cephadm_module.add_nfs(spec)
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed nfs.name.* on host 'test'")
with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
spec = IscsiServiceSpec(
- service_id='name',
- pool='pool',
- api_user='user',
- api_password='password',
- placement=ps)
+ service_id='name',
+ pool='pool',
+ api_user='user',
+ api_password='password',
+ placement=ps)
c = cephadm_module.add_iscsi(spec)
[out] = wait(cephadm_module, c)
match_glob(out, "Deployed iscsi.name.* on host 'test'")
# automatically.
assert_rm_service(cephadm_module, 'iscsi.name')
- @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
- def test_blink_device_light(self, cephadm_module):
+ @pytest.mark.parametrize(
+ "on_bool",
+ [
+ True,
+ False
+ ]
+ )
+ @pytest.mark.parametrize(
+ "fault_ident",
+ [
+ 'fault',
+ 'ident'
+ ]
+ )
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ def test_blink_device_light(self, _run_cephadm, on_bool, fault_ident, cephadm_module):
+ _run_cephadm.return_value = '{}', '', 0
+ with with_host(cephadm_module, 'test'):
+ c = cephadm_module.blink_device_light(fault_ident, on_bool, [('test', '', 'dev')])
+ on_off = 'on' if on_bool else 'off'
+ assert wait(cephadm_module, c) == [f'Set {fault_ident} light for test: {on_off}']
+ _run_cephadm.assert_called_with('test', 'osd', 'shell', [
+ '--', 'lsmcli', f'local-disk-{fault_ident}-led-{on_off}', '--path', 'dev'], error_ok=True)
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ def test_blink_device_light_custom(self, _run_cephadm, cephadm_module):
+ _run_cephadm.return_value = '{}', '', 0
with with_host(cephadm_module, 'test'):
- c = cephadm_module.blink_device_light('ident', True, [('test', '', '')])
+ cephadm_module.set_store('blink_device_light_cmd', 'echo hello')
+ c = cephadm_module.blink_device_light('ident', True, [('test', '', '/dev/sda')])
assert wait(cephadm_module, c) == ['Set ident light for test: on']
+ _run_cephadm.assert_called_with('test', 'osd', 'shell', [
+ '--', 'echo', 'hello'], error_ok=True)
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
+ def test_blink_device_light_custom_per_host(self, _run_cephadm, cephadm_module):
+ _run_cephadm.return_value = '{}', '', 0
+ with with_host(cephadm_module, 'mgr0'):
+ cephadm_module.set_store('mgr0/blink_device_light_cmd',
+ 'xyz --foo --{{ ident_fault }}={{\'on\' if on else \'off\'}} \'{{ path or dev }}\'')
+ c = cephadm_module.blink_device_light(
+ 'fault', True, [('mgr0', 'SanDisk_X400_M.2_2280_512GB_162924424784', '')])
+ assert wait(cephadm_module, c) == [
+ 'Set fault light for mgr0:SanDisk_X400_M.2_2280_512GB_162924424784 on']
+ _run_cephadm.assert_called_with('mgr0', 'osd', 'shell', [
+ '--', 'xyz', '--foo', '--fault=on', 'SanDisk_X400_M.2_2280_512GB_162924424784'
+ ], error_ok=True)
@pytest.mark.parametrize(
"spec, meth",
api_user='user',
api_password='password'
), CephadmOrchestrator.apply_iscsi),
+ (CustomContainerSpec(
+ service_id='hello-world',
+ image='docker.io/library/hello-world:latest',
+ uid=65534,
+ gid=65534,
+ dirs=['foo/bar'],
+ files={
+ 'foo/bar/xyz.conf': 'aaa\nbbb'
+ },
+ bind_mounts=[[
+ 'type=bind',
+ 'source=lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true'
+ ]],
+ volume_mounts={
+ 'foo/bar': '/foo/bar:Z'
+ },
+ args=['--no-healthcheck'],
+ envs=['SECRET=password'],
+ ports=[8080, 8443]
+ ), CephadmOrchestrator.apply_container),
]
)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
- @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _,__,___: None)
+ @mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'test'):
with with_service(cephadm_module, spec, meth, 'test'):
c = cephadm_module.apply_mds(spec)
out = wait(cephadm_module, c)
match_glob(out, "Scheduled mds.fsname update...")
- cephadm_module._apply_all_services()
+ CephadmServe(cephadm_module)._apply_all_services()
[daemon] = cephadm_module.cache.daemons['host1'].keys()
c = cephadm_module.apply_mds(spec)
out = wait(cephadm_module, c)
match_glob(out, "Scheduled mds.fsname update...")
- cephadm_module._apply_all_services()
+ CephadmServe(cephadm_module)._apply_all_services()
ok_to_stop.assert_called_with([daemon[4:]])
assert_rm_daemon(cephadm_module, spec.service_name(), 'host1') # verifies ok-to-stop
assert_rm_daemon(cephadm_module, spec.service_name(), 'host2')
-
@mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
@mock.patch("remoto.process.check")
def test_offline(self, _check, _get_connection, cephadm_module):
assert out == HostSpec('test', 'test', status='Offline').to_json()
_get_connection.side_effect = None
- assert cephadm_module._check_host('test') is None
+ assert CephadmServe(cephadm_module)._check_host('test') is None
out = wait(cephadm_module, cephadm_module.get_hosts())[0].to_json()
assert out == HostSpec('test', 'test').to_json()
return '{}', None, 0
with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]):
with mock.patch("remoto.process.check", _check):
- with with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
code, out, err = cephadm_module.check_host('test')
# First should succeed.
assert err is None
cephadm_module.config_notify()
assert cephadm_module.manage_etc_ceph_ceph_conf == True
- cephadm_module._refresh_hosts_and_daemons()
+ CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
_check.assert_called_with(ANY, ['dd', 'of=/etc/ceph/ceph.conf'], stdin=b'')
assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
+ # set extra config and expect that we deploy another ceph.conf
+ cephadm_module._set_extra_ceph_conf('[mon]\nk=v')
+ CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
+ _check.assert_called_with(
+ ANY, ['dd', 'of=/etc/ceph/ceph.conf'], stdin=b'\n\n[mon]\nk=v\n')
+
+ # reload
cephadm_module.cache.last_etc_ceph_ceph_conf = {}
cephadm_module.cache.load()
cephadm_module.cache.load()
assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
-
def test_etc_ceph_init(self):
with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m:
assert m.manage_etc_ceph_ceph_conf is True
assert out == 'registry login scheduled'
assert err == ''
check_registry_credentials('test-url', 'test-user', 'test-password')
-
+
# test bad login attempt with invalid args
code, out, err = cephadm_module.registry_login('bad-args')
assert err == ("Invalid arguments. Please provide arguments <url> <username> <password> "
- "or -i <login credentials json file>")
+ "or -i <login credentials json file>")
check_registry_credentials('test-url', 'test-user', 'test-password')
-
+
# test bad login using invalid json file
- code, out, err = cephadm_module.registry_login(None, None, None, '{"bad-json": "bad-json"}')
+ code, out, err = cephadm_module.registry_login(
+ None, None, None, '{"bad-json": "bad-json"}')
assert err == ("json provided for custom registry login did not include all necessary fields. "
- "Please setup json file as\n"
- "{\n"
- " \"url\": \"REGISTRY_URL\",\n"
- " \"username\": \"REGISTRY_USERNAME\",\n"
- " \"password\": \"REGISTRY_PASSWORD\"\n"
- "}\n")
+ "Please setup json file as\n"
+ "{\n"
+ " \"url\": \"REGISTRY_URL\",\n"
+ " \"username\": \"REGISTRY_USERNAME\",\n"
+ " \"password\": \"REGISTRY_PASSWORD\"\n"
+ "}\n")
check_registry_credentials('test-url', 'test-user', 'test-password')
-
+
# test good login using valid json file
good_json = ("{\"url\": \"" + "json-url" + "\", \"username\": \"" + "json-user" + "\", "
- " \"password\": \"" + "json-pass" + "\"}")
+ " \"password\": \"" + "json-pass" + "\"}")
code, out, err = cephadm_module.registry_login(None, None, None, good_json)
assert out == 'registry login scheduled'
assert err == ''
check_registry_credentials('json-url', 'json-user', 'json-pass')
-
+
# test bad login where args are valid but login command fails
_run_cephadm.return_value = '{}', 'error', 1
code, out, err = cephadm_module.registry_login('fail-url', 'fail-user', 'fail-password')
assert err == 'Host test failed to login to fail-url as fail-user with given password'
check_registry_credentials('json-url', 'json-user', 'json-pass')
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(json.dumps({
+ 'image_id': 'image_id',
+ 'repo_digest': 'image@repo_digest',
+ })))
+ @pytest.mark.parametrize("use_repo_digest",
+ [
+ False,
+ True
+ ])
+ def test_upgrade_run(self, use_repo_digest, cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'test', refresh_hosts=False):
+ cephadm_module.set_container_image('global', 'image')
+ if use_repo_digest:
+ cephadm_module.use_repo_digest = True
+
+ CephadmServe(cephadm_module).convert_tags_to_repo_digest()
+
+ _, image, _ = cephadm_module.check_mon_command({
+ 'prefix': 'config get',
+ 'who': 'global',
+ 'key': 'container_image',
+ })
+ if use_repo_digest:
+ assert image == 'image@repo_digest'
+ else:
+ assert image == 'image'