2 from contextlib
import contextmanager
3 from unittest
.mock
import ANY
7 from ceph
.deployment
.drive_group
import DriveGroupSpec
, DeviceSelection
8 from cephadm
.serve
import CephadmServe
9 from cephadm
.services
.osd
import OSD
, OSDRemovalQueue
12 from typing
import List
16 from execnet
.gateway_bootstrap
import HostNotFound
18 from ceph
.deployment
.service_spec
import ServiceSpec
, PlacementSpec
, RGWSpec
, \
19 NFSServiceSpec
, IscsiServiceSpec
, HostPlacementSpec
, CustomContainerSpec
20 from ceph
.deployment
.drive_selection
.selector
import DriveSelection
21 from ceph
.deployment
.inventory
import Devices
, Device
22 from ceph
.utils
import datetime_to_str
, datetime_now
23 from orchestrator
import DaemonDescription
, InventoryHost
, \
24 HostSpec
, OrchestratorError
25 from tests
import mock
26 from .fixtures
import wait
, _run_cephadm
, match_glob
, with_host
, \
27 with_cephadm_module
, with_service
, _deploy_cephadm_binary
28 from cephadm
.module
import CephadmOrchestrator
32 There is really room for improvement here. I just quickly assembled theses tests.
33 I general, everything should be testes in Teuthology as well. Reasons for
34 also testing this here is the development roundtrip time.
38 def assert_rm_daemon(cephadm
: CephadmOrchestrator
, prefix
, host
):
39 dds
: List
[DaemonDescription
] = wait(cephadm
, cephadm
.list_daemons(host
=host
))
40 d_names
= [dd
.name() for dd
in dds
if dd
.name().startswith(prefix
)]
42 # there should only be one daemon (if not match_glob will throw mismatch)
43 assert len(d_names
) == 1
45 c
= cephadm
.remove_daemons(d_names
)
46 [out
] = wait(cephadm
, c
)
47 # picking the 1st element is needed, rather than passing the list when the daemon
48 # name contains '-' char. If not, the '-' is treated as a range i.e. cephadm-exporter
49 # is treated like a m-e range which is invalid. rbd-mirror (d-m) and node-exporter (e-e)
50 # are valid, so pass without incident! Also, match_gob acts on strings anyway!
51 match_glob(out
, f
"Removed {d_names[0]}* from host '{host}'")
55 def with_daemon(cephadm_module
: CephadmOrchestrator
, spec
: ServiceSpec
, host
: str):
56 spec
.placement
= PlacementSpec(hosts
=[host
], count
=1)
58 c
= cephadm_module
.add_daemon(spec
)
59 [out
] = wait(cephadm_module
, c
)
60 match_glob(out
, f
"Deployed {spec.service_name()}.* on host '{host}'")
62 dds
= cephadm_module
.cache
.get_daemons_by_service(spec
.service_name())
64 if dd
.hostname
== host
:
66 assert_rm_daemon(cephadm_module
, spec
.service_name(), host
)
69 assert False, 'Daemon not found'
72 class TestCephadm(object):
74 def test_get_unique_name(self
, cephadm_module
):
75 # type: (CephadmOrchestrator) -> None
77 DaemonDescription(daemon_type
='mon', daemon_id
='a')
79 new_mon
= cephadm_module
.get_unique_name('mon', 'myhost', existing
)
80 match_glob(new_mon
, 'myhost')
81 new_mgr
= cephadm_module
.get_unique_name('mgr', 'myhost', existing
)
82 match_glob(new_mgr
, 'myhost.*')
84 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
85 def test_host(self
, cephadm_module
):
86 assert wait(cephadm_module
, cephadm_module
.get_hosts()) == []
87 with
with_host(cephadm_module
, 'test'):
88 assert wait(cephadm_module
, cephadm_module
.get_hosts()) == [HostSpec('test', 'test')]
90 # Be careful with backward compatibility when changing things here:
91 assert json
.loads(cephadm_module
.get_store('inventory')) == \
92 {"test": {"hostname": "test", "addr": "test", "labels": [], "status": ""}}
94 with
with_host(cephadm_module
, 'second'):
95 assert wait(cephadm_module
, cephadm_module
.get_hosts()) == [
96 HostSpec('test', 'test'),
97 HostSpec('second', 'second')
100 assert wait(cephadm_module
, cephadm_module
.get_hosts()) == [HostSpec('test', 'test')]
101 assert wait(cephadm_module
, cephadm_module
.get_hosts()) == []
103 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
104 def test_service_ls(self
, cephadm_module
):
105 with
with_host(cephadm_module
, 'test'):
106 c
= cephadm_module
.list_daemons(refresh
=True)
107 assert wait(cephadm_module
, c
) == []
108 with
with_service(cephadm_module
, ServiceSpec('mds', 'name', unmanaged
=True)) as _
, \
109 with_daemon(cephadm_module
, ServiceSpec('mds', 'name'), 'test') as _
:
111 c
= cephadm_module
.list_daemons()
113 def remove_id_events(dd
):
119 assert [remove_id_events(dd
) for dd
in wait(cephadm_module
, c
)] == [
121 'daemon_type': 'mds',
124 'status_desc': 'starting',
130 with
with_service(cephadm_module
, ServiceSpec('rgw', 'r.z'), CephadmOrchestrator
.apply_rgw
, 'test'):
132 c
= cephadm_module
.describe_service()
133 out
= [dict(o
.to_json()) for o
in wait(cephadm_module
, c
)]
136 'placement': {'count': 2},
137 'service_id': 'name',
138 'service_name': 'mds.name',
139 'service_type': 'mds',
140 'status': {'created': mock
.ANY
, 'running': 1, 'size': 2},
149 'service_name': 'rgw.r.z',
150 'service_type': 'rgw',
151 'status': {'created': mock
.ANY
, 'running': 1, 'size': 1,
157 del o
['events'] # delete it, as it contains a timestamp
158 assert out
== expected
160 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
161 def test_device_ls(self
, cephadm_module
):
162 with
with_host(cephadm_module
, 'test'):
163 c
= cephadm_module
.get_inventory()
164 assert wait(cephadm_module
, c
) == [InventoryHost('test')]
166 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
169 name
='rgw.myrgw.foobar',
172 container_id
='container_id',
178 def test_list_daemons(self
, cephadm_module
: CephadmOrchestrator
):
179 cephadm_module
.service_cache_timeout
= 10
180 with
with_host(cephadm_module
, 'test'):
181 CephadmServe(cephadm_module
)._refresh
_host
_daemons
('test')
182 c
= cephadm_module
.list_daemons()
183 assert wait(cephadm_module
, c
)[0].name() == 'rgw.myrgw.foobar'
185 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
186 def test_daemon_action(self
, cephadm_module
: CephadmOrchestrator
):
187 cephadm_module
.service_cache_timeout
= 10
188 with
with_host(cephadm_module
, 'test'):
189 with
with_service(cephadm_module
, RGWSpec(service_id
='myrgw.foobar', unmanaged
=True)) as _
, \
190 with_daemon(cephadm_module
, RGWSpec(service_id
='myrgw.foobar'), 'test') as daemon_id
:
192 c
= cephadm_module
.daemon_action('redeploy', 'rgw.' + daemon_id
)
193 assert wait(cephadm_module
,
194 c
) == f
"Scheduled to redeploy rgw.{daemon_id} on host 'test'"
196 for what
in ('start', 'stop', 'restart'):
197 c
= cephadm_module
.daemon_action(what
, 'rgw.' + daemon_id
)
198 assert wait(cephadm_module
,
199 c
) == F
"Scheduled to {what} rgw.{daemon_id} on host 'test'"
201 # Make sure, _check_daemons does a redeploy due to monmap change:
202 cephadm_module
._store
['_ceph_get/mon_map'] = {
203 'modified': datetime_to_str(datetime_now()),
206 cephadm_module
.notify('mon_map', None)
208 CephadmServe(cephadm_module
)._check
_daemons
()
210 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
211 def test_daemon_action_fail(self
, cephadm_module
: CephadmOrchestrator
):
212 cephadm_module
.service_cache_timeout
= 10
213 with
with_host(cephadm_module
, 'test'):
214 with
with_service(cephadm_module
, RGWSpec(service_id
='myrgw.foobar', unmanaged
=True)) as _
, \
215 with_daemon(cephadm_module
, RGWSpec(service_id
='myrgw.foobar'), 'test') as daemon_id
:
216 with mock
.patch('ceph_module.BaseMgrModule._ceph_send_command') as _ceph_send_command
:
218 _ceph_send_command
.side_effect
= Exception("myerror")
220 # Make sure, _check_daemons does a redeploy due to monmap change:
221 cephadm_module
.mock_store_set('_ceph_get', 'mon_map', {
222 'modified': datetime_to_str(datetime_now()),
225 cephadm_module
.notify('mon_map', None)
227 CephadmServe(cephadm_module
)._check
_daemons
()
229 evs
= [e
.message
for e
in cephadm_module
.events
.get_for_daemon(
232 assert 'myerror' in ''.join(evs
)
234 @pytest.mark
.parametrize(
244 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
245 def test_daemon_check(self
, cephadm_module
: CephadmOrchestrator
, action
):
246 with
with_host(cephadm_module
, 'test'):
247 with
with_service(cephadm_module
, ServiceSpec(service_type
='grafana'), CephadmOrchestrator
.apply_grafana
, 'test') as d_names
:
248 [daemon_name
] = d_names
250 cephadm_module
._schedule
_daemon
_action
(daemon_name
, action
)
252 assert cephadm_module
.cache
.get_scheduled_daemon_action(
253 'test', daemon_name
) == action
255 CephadmServe(cephadm_module
)._check
_daemons
()
257 assert cephadm_module
.cache
.get_scheduled_daemon_action('test', daemon_name
) is None
259 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
260 def test_daemon_check_extra_config(self
, _run_cephadm
, cephadm_module
: CephadmOrchestrator
):
261 _run_cephadm
.return_value
= ('{}', '', 0)
263 with
with_host(cephadm_module
, 'test'):
265 # Also testing deploying mons without explicit network placement
266 cephadm_module
.check_mon_command({
267 'prefix': 'config set',
269 'name': 'public_network',
270 'value': '127.0.0.0/8'
273 cephadm_module
.cache
.update_host_devices_networks(
283 with
with_service(cephadm_module
, ServiceSpec(service_type
='mon'), CephadmOrchestrator
.apply_mon
, 'test') as d_names
:
284 [daemon_name
] = d_names
286 cephadm_module
._set
_extra
_ceph
_conf
('[mon]\nk=v')
288 CephadmServe(cephadm_module
)._check
_daemons
()
290 _run_cephadm
.assert_called_with(
291 'test', 'mon.test', 'deploy', [
292 '--name', 'mon.test',
293 '--meta-json', '{"service_name": "mon", "ports": [], "ip": null, "deployed_by": []}',
294 '--config-json', '-',
297 stdin
='{"config": "\\n\\n[mon]\\nk=v\\n[mon.test]\\npublic network = 127.0.0.0/8\\n", '
298 + '"keyring": "", "files": {"config": "[mon.test]\\npublic network = 127.0.0.0/8\\n"}}',
301 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
302 def test_daemon_check_post(self
, cephadm_module
: CephadmOrchestrator
):
303 with
with_host(cephadm_module
, 'test'):
304 with
with_service(cephadm_module
, ServiceSpec(service_type
='grafana'), CephadmOrchestrator
.apply_grafana
, 'test'):
306 # Make sure, _check_daemons does a redeploy due to monmap change:
307 cephadm_module
.mock_store_set('_ceph_get', 'mon_map', {
308 'modified': datetime_to_str(datetime_now()),
311 cephadm_module
.notify('mon_map', None)
312 cephadm_module
.mock_store_set('_ceph_get', 'mgr_map', {
313 'modules': ['dashboard']
316 with mock
.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd
:
317 CephadmServe(cephadm_module
)._check
_daemons
()
318 _mon_cmd
.assert_any_call(
319 {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'},
322 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
323 def test_mon_add(self
, cephadm_module
):
324 with
with_host(cephadm_module
, 'test'):
325 with
with_service(cephadm_module
, ServiceSpec(service_type
='mon', unmanaged
=True)):
326 ps
= PlacementSpec(hosts
=['test:0.0.0.0=a'], count
=1)
327 c
= cephadm_module
.add_daemon(ServiceSpec('mon', placement
=ps
))
328 assert wait(cephadm_module
, c
) == ["Deployed mon.a on host 'test'"]
330 with pytest
.raises(OrchestratorError
, match
="Must set public_network config option or specify a CIDR network,"):
331 ps
= PlacementSpec(hosts
=['test'], count
=1)
332 c
= cephadm_module
.add_daemon(ServiceSpec('mon', placement
=ps
))
333 wait(cephadm_module
, c
)
335 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
336 def test_mgr_update(self
, cephadm_module
):
337 with
with_host(cephadm_module
, 'test'):
338 ps
= PlacementSpec(hosts
=['test:0.0.0.0=a'], count
=1)
339 r
= CephadmServe(cephadm_module
)._apply
_service
(ServiceSpec('mgr', placement
=ps
))
342 assert_rm_daemon(cephadm_module
, 'mgr.a', 'test')
344 @mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
345 def test_find_destroyed_osds(self
, _mon_cmd
, cephadm_module
):
369 "device_class": "hdd",
373 "crush_weight": 0.0243988037109375,
377 "status": "destroyed",
379 "primary_affinity": 1
384 json_out
= json
.dumps(dict_out
)
385 _mon_cmd
.return_value
= (0, json_out
, '')
386 out
= cephadm_module
.osd_service
.find_destroyed_osds()
387 assert out
== {'host1': ['0']}
389 @ pytest
.mark
.parametrize(
390 "ceph_services, cephadm_daemons, strays_expected, metadata",
391 # [ ([(daemon_type, daemon_id), ... ], [...], [...]), ... ]
394 [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
396 [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
400 [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
401 [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
406 [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
407 [('mds', 'a'), ('osd', '0')],
411 # https://tracker.ceph.com/issues/49573
413 [('rgw-nfs', '14649')],
415 [('nfs', 'foo-rgw.host1')],
416 {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}},
419 [('rgw-nfs', '14649'), ('rgw-nfs', '14650')],
420 [('nfs', 'foo-rgw.host1'), ('nfs', 'foo2.host2')],
422 {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}},
425 [('rgw-nfs', '14649'), ('rgw-nfs', '14650')],
426 [('nfs', 'foo-rgw.host1')],
427 [('nfs', 'foo2.host2')],
428 {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}},
432 def test_check_for_stray_daemons(
440 # mock ceph service-map
442 for service
in ceph_services
:
443 s
= {'type': service
[0], 'id': service
[1]}
445 ls
= [{'hostname': 'host1', 'services': services
}]
447 with mock
.patch
.object(cephadm_module
, 'list_servers', mock
.MagicMock()) as list_servers
:
448 list_servers
.return_value
= ls
449 list_servers
.__iter
__.side_effect
= ls
.__iter
__
451 # populate cephadm daemon cache
453 for daemon_type
, daemon_id
in cephadm_daemons
:
454 dd
= DaemonDescription(daemon_type
=daemon_type
, daemon_id
=daemon_id
)
456 cephadm_module
.cache
.update_host_daemons('host1', dm
)
458 def get_metadata_mock(svc_type
, svc_id
, default
):
459 return metadata
[svc_id
]
461 with mock
.patch
.object(cephadm_module
, 'get_metadata', new_callable
=lambda: get_metadata_mock
):
464 CephadmServe(cephadm_module
)._check
_for
_strays
()
467 strays
= cephadm_module
.health_checks
.get('CEPHADM_STRAY_DAEMON')
469 assert len(strays_expected
) == 0
471 for dt
, di
in strays_expected
:
472 name
= '%s.%s' % (dt
, di
)
473 for detail
in strays
['detail']:
475 strays
['detail'].remove(detail
)
477 assert name
in detail
478 assert len(strays
['detail']) == 0
479 assert strays
['count'] == len(strays_expected
)
481 @mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
482 def test_find_destroyed_osds_cmd_failure(self
, _mon_cmd
, cephadm_module
):
483 _mon_cmd
.return_value
= (1, "", "fail_msg")
484 with pytest
.raises(OrchestratorError
):
485 cephadm_module
.osd_service
.find_destroyed_osds()
487 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
488 def test_apply_osd_save(self
, _run_cephadm
, cephadm_module
: CephadmOrchestrator
):
489 _run_cephadm
.return_value
= ('{}', '', 0)
490 with
with_host(cephadm_module
, 'test'):
492 spec
= DriveGroupSpec(
494 placement
=PlacementSpec(
497 data_devices
=DeviceSelection(
502 c
= cephadm_module
.apply([spec
])
503 assert wait(cephadm_module
, c
) == ['Scheduled osd.foo update...']
505 inventory
= Devices([
512 cephadm_module
.cache
.update_host_devices_networks('test', inventory
.devices
, {})
514 _run_cephadm
.return_value
= (['{}'], '', 0)
516 assert CephadmServe(cephadm_module
)._apply
_all
_services
() is False
518 _run_cephadm
.assert_any_call(
519 'test', 'osd', 'ceph-volume',
520 ['--config-json', '-', '--', 'lvm', 'batch',
521 '--no-auto', '/dev/sdb', '--yes', '--no-systemd'],
522 env_vars
=['CEPH_VOLUME_OSDSPEC_AFFINITY=foo'], error_ok
=True, stdin
='{"config": "", "keyring": ""}')
523 _run_cephadm
.assert_called_with(
524 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image
='', no_fsid
=False)
526 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
527 def test_apply_osd_save_non_collocated(self
, _run_cephadm
, cephadm_module
: CephadmOrchestrator
):
528 _run_cephadm
.return_value
= ('{}', '', 0)
529 with
with_host(cephadm_module
, 'test'):
531 spec
= DriveGroupSpec(
532 service_id
='noncollocated',
533 placement
=PlacementSpec(
536 data_devices
=DeviceSelection(paths
=['/dev/sdb']),
537 db_devices
=DeviceSelection(paths
=['/dev/sdc']),
538 wal_devices
=DeviceSelection(paths
=['/dev/sdd'])
541 c
= cephadm_module
.apply([spec
])
542 assert wait(cephadm_module
, c
) == ['Scheduled osd.noncollocated update...']
544 inventory
= Devices([
545 Device('/dev/sdb', available
=True),
546 Device('/dev/sdc', available
=True),
547 Device('/dev/sdd', available
=True)
550 cephadm_module
.cache
.update_host_devices_networks('test', inventory
.devices
, {})
552 _run_cephadm
.return_value
= (['{}'], '', 0)
554 assert CephadmServe(cephadm_module
)._apply
_all
_services
() is False
556 _run_cephadm
.assert_any_call(
557 'test', 'osd', 'ceph-volume',
558 ['--config-json', '-', '--', 'lvm', 'batch',
559 '--no-auto', '/dev/sdb', '--db-devices', '/dev/sdc',
560 '--wal-devices', '/dev/sdd', '--yes', '--no-systemd'],
561 env_vars
=['CEPH_VOLUME_OSDSPEC_AFFINITY=noncollocated'],
562 error_ok
=True, stdin
='{"config": "", "keyring": ""}')
563 _run_cephadm
.assert_called_with(
564 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image
='', no_fsid
=False)
566 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
567 @mock.patch("cephadm.module.SpecStore.save")
568 def test_apply_osd_save_placement(self
, _save_spec
, cephadm_module
):
569 with
with_host(cephadm_module
, 'test'):
570 json_spec
= {'service_type': 'osd', 'placement': {'host_pattern': 'test'},
571 'service_id': 'foo', 'data_devices': {'all': True}}
572 spec
= ServiceSpec
.from_json(json_spec
)
573 assert isinstance(spec
, DriveGroupSpec
)
574 c
= cephadm_module
.apply([spec
])
575 assert wait(cephadm_module
, c
) == ['Scheduled osd.foo update...']
576 _save_spec
.assert_called_with(spec
)
578 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
579 def test_create_osds(self
, cephadm_module
):
580 with
with_host(cephadm_module
, 'test'):
581 dg
= DriveGroupSpec(placement
=PlacementSpec(host_pattern
='test'),
582 data_devices
=DeviceSelection(paths
=['']))
583 c
= cephadm_module
.create_osds(dg
)
584 out
= wait(cephadm_module
, c
)
585 assert out
== "Created no osd(s) on host test; already created?"
587 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
588 def test_create_noncollocated_osd(self
, cephadm_module
):
589 with
with_host(cephadm_module
, 'test'):
590 dg
= DriveGroupSpec(placement
=PlacementSpec(host_pattern
='test'),
591 data_devices
=DeviceSelection(paths
=['']))
592 c
= cephadm_module
.create_osds(dg
)
593 out
= wait(cephadm_module
, c
)
594 assert out
== "Created no osd(s) on host test; already created?"
596 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
597 def test_prepare_drivegroup(self
, cephadm_module
):
598 with
with_host(cephadm_module
, 'test'):
599 dg
= DriveGroupSpec(placement
=PlacementSpec(host_pattern
='test'),
600 data_devices
=DeviceSelection(paths
=['']))
601 out
= cephadm_module
.osd_service
.prepare_drivegroup(dg
)
604 assert f1
[0] == 'test'
605 assert isinstance(f1
[1], DriveSelection
)
607 @pytest.mark
.parametrize(
608 "devices, preview, exp_command",
610 # no preview and only one disk, prepare is used due the hack that is in place.
611 (['/dev/sda'], False, "lvm batch --no-auto /dev/sda --yes --no-systemd"),
612 # no preview and multiple disks, uses batch
613 (['/dev/sda', '/dev/sdb'], False,
614 "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"),
615 # preview and only one disk needs to use batch again to generate the preview
616 (['/dev/sda'], True, "lvm batch --no-auto /dev/sda --yes --no-systemd --report --format json"),
617 # preview and multiple disks work the same
618 (['/dev/sda', '/dev/sdb'], True,
619 "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
622 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
623 def test_driveselection_to_ceph_volume(self
, cephadm_module
, devices
, preview
, exp_command
):
624 with
with_host(cephadm_module
, 'test'):
625 dg
= DriveGroupSpec(service_id
='test.spec', placement
=PlacementSpec(
626 host_pattern
='test'), data_devices
=DeviceSelection(paths
=devices
))
627 ds
= DriveSelection(dg
, Devices([Device(path
) for path
in devices
]))
629 out
= cephadm_module
.osd_service
.driveselection_to_ceph_volume(ds
, [], preview
)
630 assert out
in exp_command
632 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
638 container_id
='container_id',
644 @mock.patch("cephadm.services.osd.OSD.exists", True)
645 @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _
, __
: 0)
646 def test_remove_osds(self
, cephadm_module
):
647 with
with_host(cephadm_module
, 'test'):
648 CephadmServe(cephadm_module
)._refresh
_host
_daemons
('test')
649 c
= cephadm_module
.list_daemons()
650 wait(cephadm_module
, c
)
652 c
= cephadm_module
.remove_daemons(['osd.0'])
653 out
= wait(cephadm_module
, c
)
654 assert out
== ["Removed osd.0 from host 'test'"]
656 cephadm_module
.to_remove_osds
.enqueue(OSD(osd_id
=0,
660 process_started_at
=datetime_now(),
661 remove_util
=cephadm_module
.to_remove_osds
.rm_util
663 cephadm_module
.to_remove_osds
.process_removal_queue()
664 assert cephadm_module
.to_remove_osds
== OSDRemovalQueue(cephadm_module
)
666 c
= cephadm_module
.remove_osds_status()
667 out
= wait(cephadm_module
, c
)
670 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
671 def test_rgw_update(self
, cephadm_module
):
672 with
with_host(cephadm_module
, 'host1'):
673 with
with_host(cephadm_module
, 'host2'):
674 with
with_service(cephadm_module
, RGWSpec(service_id
="foo", unmanaged
=True)):
675 ps
= PlacementSpec(hosts
=['host1'], count
=1)
676 c
= cephadm_module
.add_daemon(
677 RGWSpec(service_id
="foo", placement
=ps
))
678 [out
] = wait(cephadm_module
, c
)
679 match_glob(out
, "Deployed rgw.foo.* on host 'host1'")
681 ps
= PlacementSpec(hosts
=['host1', 'host2'], count
=2)
682 r
= CephadmServe(cephadm_module
)._apply
_service
(
683 RGWSpec(service_id
="foo", placement
=ps
))
686 assert_rm_daemon(cephadm_module
, 'rgw.foo', 'host1')
687 assert_rm_daemon(cephadm_module
, 'rgw.foo', 'host2')
689 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
692 name
='rgw.myrgw.myhost.myid',
695 container_id
='container_id',
701 def test_remove_daemon(self
, cephadm_module
):
702 with
with_host(cephadm_module
, 'test'):
703 CephadmServe(cephadm_module
)._refresh
_host
_daemons
('test')
704 c
= cephadm_module
.list_daemons()
705 wait(cephadm_module
, c
)
706 c
= cephadm_module
.remove_daemons(['rgw.myrgw.myhost.myid'])
707 out
= wait(cephadm_module
, c
)
708 assert out
== ["Removed rgw.myrgw.myhost.myid from host 'test'"]
710 @pytest.mark
.parametrize(
713 ServiceSpec('crash'),
714 ServiceSpec('prometheus'),
715 ServiceSpec('grafana'),
716 ServiceSpec('node-exporter'),
717 ServiceSpec('alertmanager'),
718 ServiceSpec('rbd-mirror'),
719 ServiceSpec('cephfs-mirror'),
720 ServiceSpec('mds', service_id
='fsname'),
721 RGWSpec(rgw_realm
='realm', rgw_zone
='zone'),
722 RGWSpec(service_id
="foo"),
723 ServiceSpec('cephadm-exporter'),
726 @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
727 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
728 def test_daemon_add(self
, spec
: ServiceSpec
, cephadm_module
):
729 unmanaged_spec
= ServiceSpec
.from_json(spec
.to_json())
730 unmanaged_spec
.unmanaged
= True
731 with
with_host(cephadm_module
, 'test'):
732 with
with_service(cephadm_module
, unmanaged_spec
):
733 with
with_daemon(cephadm_module
, spec
, 'test'):
736 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
737 def test_daemon_add_fail(self
, _run_cephadm
, cephadm_module
):
738 _run_cephadm
.return_value
= '{}', '', 0
739 with
with_host(cephadm_module
, 'test'):
742 placement
=PlacementSpec(hosts
=[HostPlacementSpec('test', '', 'x')], count
=1),
745 with
with_service(cephadm_module
, spec
):
746 _run_cephadm
.side_effect
= OrchestratorError('fail')
747 with pytest
.raises(OrchestratorError
):
748 wait(cephadm_module
, cephadm_module
.add_daemon(spec
))
749 cephadm_module
.assert_issued_mon_command({
754 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
755 @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock
.MagicMock())
756 def test_nfs(self
, cephadm_module
):
757 with
with_host(cephadm_module
, 'test'):
758 ps
= PlacementSpec(hosts
=['test'], count
=1)
759 spec
= NFSServiceSpec(
762 namespace
='namespace',
764 unmanaged_spec
= ServiceSpec
.from_json(spec
.to_json())
765 unmanaged_spec
.unmanaged
= True
766 with
with_service(cephadm_module
, unmanaged_spec
):
767 c
= cephadm_module
.add_daemon(spec
)
768 [out
] = wait(cephadm_module
, c
)
769 match_glob(out
, "Deployed nfs.name.* on host 'test'")
771 assert_rm_daemon(cephadm_module
, 'nfs.name.test', 'test')
773 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
774 @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock
.MagicMock())
775 def test_iscsi(self
, cephadm_module
):
776 with
with_host(cephadm_module
, 'test'):
777 ps
= PlacementSpec(hosts
=['test'], count
=1)
778 spec
= IscsiServiceSpec(
782 api_password
='password',
784 unmanaged_spec
= ServiceSpec
.from_json(spec
.to_json())
785 unmanaged_spec
.unmanaged
= True
786 with
with_service(cephadm_module
, unmanaged_spec
):
788 c
= cephadm_module
.add_daemon(spec
)
789 [out
] = wait(cephadm_module
, c
)
790 match_glob(out
, "Deployed iscsi.name.* on host 'test'")
792 assert_rm_daemon(cephadm_module
, 'iscsi.name.test', 'test')
794 @pytest.mark
.parametrize(
801 @pytest.mark
.parametrize(
808 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
809 def test_blink_device_light(self
, _run_cephadm
, on_bool
, fault_ident
, cephadm_module
):
810 _run_cephadm
.return_value
= '{}', '', 0
811 with
with_host(cephadm_module
, 'test'):
812 c
= cephadm_module
.blink_device_light(fault_ident
, on_bool
, [('test', '', 'dev')])
813 on_off
= 'on' if on_bool
else 'off'
814 assert wait(cephadm_module
, c
) == [f
'Set {fault_ident} light for test: {on_off}']
815 _run_cephadm
.assert_called_with('test', 'osd', 'shell', [
816 '--', 'lsmcli', f
'local-disk-{fault_ident}-led-{on_off}', '--path', 'dev'], error_ok
=True)
818 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
819 def test_blink_device_light_custom(self
, _run_cephadm
, cephadm_module
):
820 _run_cephadm
.return_value
= '{}', '', 0
821 with
with_host(cephadm_module
, 'test'):
822 cephadm_module
.set_store('blink_device_light_cmd', 'echo hello')
823 c
= cephadm_module
.blink_device_light('ident', True, [('test', '', '/dev/sda')])
824 assert wait(cephadm_module
, c
) == ['Set ident light for test: on']
825 _run_cephadm
.assert_called_with('test', 'osd', 'shell', [
826 '--', 'echo', 'hello'], error_ok
=True)
828 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
829 def test_blink_device_light_custom_per_host(self
, _run_cephadm
, cephadm_module
):
830 _run_cephadm
.return_value
= '{}', '', 0
831 with
with_host(cephadm_module
, 'mgr0'):
832 cephadm_module
.set_store('mgr0/blink_device_light_cmd',
833 'xyz --foo --{{ ident_fault }}={{\'on\' if on else \'off\'}} \'{{ path or dev }}\'')
834 c
= cephadm_module
.blink_device_light(
835 'fault', True, [('mgr0', 'SanDisk_X400_M.2_2280_512GB_162924424784', '')])
836 assert wait(cephadm_module
, c
) == [
837 'Set fault light for mgr0:SanDisk_X400_M.2_2280_512GB_162924424784 on']
838 _run_cephadm
.assert_called_with('mgr0', 'osd', 'shell', [
839 '--', 'xyz', '--foo', '--fault=on', 'SanDisk_X400_M.2_2280_512GB_162924424784'
842 @pytest.mark
.parametrize(
845 (ServiceSpec('mgr'), CephadmOrchestrator
.apply_mgr
),
846 (ServiceSpec('crash'), CephadmOrchestrator
.apply_crash
),
847 (ServiceSpec('prometheus'), CephadmOrchestrator
.apply_prometheus
),
848 (ServiceSpec('grafana'), CephadmOrchestrator
.apply_grafana
),
849 (ServiceSpec('node-exporter'), CephadmOrchestrator
.apply_node_exporter
),
850 (ServiceSpec('alertmanager'), CephadmOrchestrator
.apply_alertmanager
),
851 (ServiceSpec('rbd-mirror'), CephadmOrchestrator
.apply_rbd_mirror
),
852 (ServiceSpec('cephfs-mirror'), CephadmOrchestrator
.apply_rbd_mirror
),
853 (ServiceSpec('mds', service_id
='fsname'), CephadmOrchestrator
.apply_mds
),
855 'mds', service_id
='fsname',
856 placement
=PlacementSpec(
857 hosts
=[HostPlacementSpec(
863 ), CephadmOrchestrator
.apply_mds
),
864 (RGWSpec(service_id
='foo'), CephadmOrchestrator
.apply_rgw
),
867 rgw_realm
='realm', rgw_zone
='zone',
868 placement
=PlacementSpec(
869 hosts
=[HostPlacementSpec(
875 ), CephadmOrchestrator
.apply_rgw
),
879 namespace
='namespace'
880 ), CephadmOrchestrator
.apply_nfs
),
885 api_password
='password'
886 ), CephadmOrchestrator
.apply_iscsi
),
887 (CustomContainerSpec(
888 service_id
='hello-world',
889 image
='docker.io/library/hello-world:latest',
894 'foo/bar/xyz.conf': 'aaa\nbbb'
898 'source=lib/modules',
899 'destination=/lib/modules',
903 'foo/bar': '/foo/bar:Z'
905 args
=['--no-healthcheck'],
906 envs
=['SECRET=password'],
908 ), CephadmOrchestrator
.apply_container
),
909 (ServiceSpec('cephadm-exporter'), CephadmOrchestrator
.apply_cephadm_exporter
),
912 @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
913 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
914 def test_apply_save(self
, spec
: ServiceSpec
, meth
, cephadm_module
: CephadmOrchestrator
):
915 with
with_host(cephadm_module
, 'test'):
916 with
with_service(cephadm_module
, spec
, meth
, 'test'):
919 @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
920 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
921 def test_mds_config_purge(self
, cephadm_module
: CephadmOrchestrator
):
922 spec
= ServiceSpec('mds', service_id
='fsname')
923 with
with_host(cephadm_module
, 'test'):
924 with
with_service(cephadm_module
, spec
, host
='test'):
925 ret
, out
, err
= cephadm_module
.check_mon_command({
926 'prefix': 'config get',
927 'who': spec
.service_name(),
928 'key': 'mds_join_fs',
930 assert out
== 'fsname'
931 ret
, out
, err
= cephadm_module
.check_mon_command({
932 'prefix': 'config get',
933 'who': spec
.service_name(),
934 'key': 'mds_join_fs',
938 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
939 @mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop")
940 def test_daemon_ok_to_stop(self
, ok_to_stop
, cephadm_module
: CephadmOrchestrator
):
944 placement
=PlacementSpec(hosts
=['host1', 'host2'])
946 with
with_host(cephadm_module
, 'host1'), with_host(cephadm_module
, 'host2'):
947 c
= cephadm_module
.apply_mds(spec
)
948 out
= wait(cephadm_module
, c
)
949 match_glob(out
, "Scheduled mds.fsname update...")
950 CephadmServe(cephadm_module
)._apply
_all
_services
()
952 [daemon
] = cephadm_module
.cache
.daemons
['host1'].keys()
954 spec
.placement
.set_hosts(['host2'])
956 ok_to_stop
.side_effect
= False
958 c
= cephadm_module
.apply_mds(spec
)
959 out
= wait(cephadm_module
, c
)
960 match_glob(out
, "Scheduled mds.fsname update...")
961 CephadmServe(cephadm_module
)._apply
_all
_services
()
963 ok_to_stop
.assert_called_with([daemon
[4:]], force
=True)
965 assert_rm_daemon(cephadm_module
, spec
.service_name(), 'host1') # verifies ok-to-stop
966 assert_rm_daemon(cephadm_module
, spec
.service_name(), 'host2')
968 @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
969 @mock.patch("remoto.process.check")
970 def test_offline(self
, _check
, _get_connection
, cephadm_module
):
971 _check
.return_value
= '{}', '', 0
972 _get_connection
.return_value
= mock
.Mock(), mock
.Mock()
973 with
with_host(cephadm_module
, 'test'):
974 _get_connection
.side_effect
= HostNotFound
975 code
, out
, err
= cephadm_module
.check_host('test')
977 assert "Host 'test' not found" in err
979 out
= wait(cephadm_module
, cephadm_module
.get_hosts())[0].to_json()
980 assert out
== HostSpec('test', 'test', status
='Offline').to_json()
982 _get_connection
.side_effect
= None
983 assert CephadmServe(cephadm_module
)._check
_host
('test') is None
984 out
= wait(cephadm_module
, cephadm_module
.get_hosts())[0].to_json()
985 assert out
== HostSpec('test', 'test').to_json()
987 def test_stale_connections(self
, cephadm_module
):
988 class Connection(object):
990 A mocked connection class that only allows the use of the connection
991 once. If you attempt to use it again via a _check, it'll explode (go
994 The old code triggers the boom. The new code checks the has_connection
995 and will recreate the connection.
1000 def has_connection():
1003 def import_module(self
, *args
, **kargs
):
1010 def _check(conn
, *args
, **kargs
):
1012 raise Exception("boom: connection is dead")
1016 with mock
.patch("remoto.Connection", side_effect
=[Connection(), Connection(), Connection()]):
1017 with mock
.patch("remoto.process.check", _check
):
1018 with
with_host(cephadm_module
, 'test', refresh_hosts
=False):
1019 code
, out
, err
= cephadm_module
.check_host('test')
1020 # First should succeed.
1023 # On second it should attempt to reuse the connection, where the
1024 # connection is "down" so will recreate the connection. The old
1025 # code will blow up here triggering the BOOM!
1026 code
, out
, err
= cephadm_module
.check_host('test')
1029 @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
1030 @mock.patch("remoto.process.check")
1031 def test_etc_ceph(self
, _check
, _get_connection
, cephadm_module
):
1032 _get_connection
.return_value
= mock
.Mock(), mock
.Mock()
1033 _check
.return_value
= '{}', '', 0
1035 assert cephadm_module
.manage_etc_ceph_ceph_conf
is False
1037 with
with_host(cephadm_module
, 'test'):
1038 assert not cephadm_module
.cache
.host_needs_new_etc_ceph_ceph_conf('test')
1040 with
with_host(cephadm_module
, 'test'):
1041 cephadm_module
.set_module_option('manage_etc_ceph_ceph_conf', True)
1042 cephadm_module
.config_notify()
1043 assert cephadm_module
.manage_etc_ceph_ceph_conf
is True
1045 CephadmServe(cephadm_module
)._refresh
_hosts
_and
_daemons
()
1046 _check
.assert_called_with(ANY
, ['dd', 'of=/etc/ceph/ceph.conf'], stdin
=b
'')
1048 assert not cephadm_module
.cache
.host_needs_new_etc_ceph_ceph_conf('test')
1050 # set extra config and expect that we deploy another ceph.conf
1051 cephadm_module
._set
_extra
_ceph
_conf
('[mon]\nk=v')
1052 CephadmServe(cephadm_module
)._refresh
_hosts
_and
_daemons
()
1053 _check
.assert_called_with(
1054 ANY
, ['dd', 'of=/etc/ceph/ceph.conf'], stdin
=b
'\n\n[mon]\nk=v\n')
1057 cephadm_module
.cache
.last_etc_ceph_ceph_conf
= {}
1058 cephadm_module
.cache
.load()
1060 assert not cephadm_module
.cache
.host_needs_new_etc_ceph_ceph_conf('test')
1062 # Make sure, _check_daemons does a redeploy due to monmap change:
1063 cephadm_module
.mock_store_set('_ceph_get', 'mon_map', {
1064 'modified': datetime_to_str(datetime_now()),
1067 cephadm_module
.notify('mon_map', mock
.MagicMock())
1068 assert cephadm_module
.cache
.host_needs_new_etc_ceph_ceph_conf('test')
1069 cephadm_module
.cache
.last_etc_ceph_ceph_conf
= {}
1070 cephadm_module
.cache
.load()
1071 assert cephadm_module
.cache
.host_needs_new_etc_ceph_ceph_conf('test')
1073 def test_etc_ceph_init(self
):
1074 with
with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m
:
1075 assert m
.manage_etc_ceph_ceph_conf
is True
1077 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
1078 def test_registry_login(self
, _run_cephadm
, cephadm_module
: CephadmOrchestrator
):
1079 def check_registry_credentials(url
, username
, password
):
1080 assert cephadm_module
.get_module_option('registry_url') == url
1081 assert cephadm_module
.get_module_option('registry_username') == username
1082 assert cephadm_module
.get_module_option('registry_password') == password
1084 _run_cephadm
.return_value
= '{}', '', 0
1085 with
with_host(cephadm_module
, 'test'):
1086 # test successful login with valid args
1087 code
, out
, err
= cephadm_module
.registry_login('test-url', 'test-user', 'test-password')
1088 assert out
== 'registry login scheduled'
1090 check_registry_credentials('test-url', 'test-user', 'test-password')
1092 # test bad login attempt with invalid args
1093 code
, out
, err
= cephadm_module
.registry_login('bad-args')
1094 assert err
== ("Invalid arguments. Please provide arguments <url> <username> <password> "
1095 "or -i <login credentials json file>")
1096 check_registry_credentials('test-url', 'test-user', 'test-password')
1098 # test bad login using invalid json file
1099 code
, out
, err
= cephadm_module
.registry_login(
1100 None, None, None, '{"bad-json": "bad-json"}')
1101 assert err
== ("json provided for custom registry login did not include all necessary fields. "
1102 "Please setup json file as\n"
1104 " \"url\": \"REGISTRY_URL\",\n"
1105 " \"username\": \"REGISTRY_USERNAME\",\n"
1106 " \"password\": \"REGISTRY_PASSWORD\"\n"
1108 check_registry_credentials('test-url', 'test-user', 'test-password')
1110 # test good login using valid json file
1111 good_json
= ("{\"url\": \"" + "json-url" + "\", \"username\": \"" + "json-user" + "\", "
1112 " \"password\": \"" + "json-pass" + "\"}")
1113 code
, out
, err
= cephadm_module
.registry_login(None, None, None, good_json
)
1114 assert out
== 'registry login scheduled'
1116 check_registry_credentials('json-url', 'json-user', 'json-pass')
1118 # test bad login where args are valid but login command fails
1119 _run_cephadm
.return_value
= '{}', 'error', 1
1120 code
, out
, err
= cephadm_module
.registry_login('fail-url', 'fail-user', 'fail-password')
1121 assert err
== 'Host test failed to login to fail-url as fail-user with given password'
1122 check_registry_credentials('json-url', 'json-user', 'json-pass')
1124 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json
.dumps({
1125 'image_id': 'image_id',
1126 'repo_digests': ['image@repo_digest'],
1128 @pytest.mark
.parametrize("use_repo_digest",
1133 def test_upgrade_run(self
, use_repo_digest
, cephadm_module
: CephadmOrchestrator
):
1134 cephadm_module
.use_repo_digest
= use_repo_digest
1136 with
with_host(cephadm_module
, 'test', refresh_hosts
=False):
1137 cephadm_module
.set_container_image('global', 'image')
1141 CephadmServe(cephadm_module
).convert_tags_to_repo_digest()
1143 _
, image
, _
= cephadm_module
.check_mon_command({
1144 'prefix': 'config get',
1146 'key': 'container_image',
1149 assert image
== 'image@repo_digest'
1151 assert image
== 'image'
1153 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
1154 def test_ceph_volume_no_filter_for_batch(self
, _run_cephadm
, cephadm_module
: CephadmOrchestrator
):
1155 _run_cephadm
.return_value
= ('{}', '', 0)
1157 error_message
= """cephadm exited with an error code: 1, stderr:/usr/bin/podman:stderr usage: ceph-volume inventory [-h] [--format {plain,json,json-pretty}] [path]/usr/bin/podman:stderr ceph-volume inventory: error: unrecognized arguments: --filter-for-batch
1158 Traceback (most recent call last):
1159 File "<stdin>", line 6112, in <module>
1160 File "<stdin>", line 1299, in _infer_fsid
1161 File "<stdin>", line 1382, in _infer_image
1162 File "<stdin>", line 3612, in command_ceph_volume
1163 File "<stdin>", line 1061, in call_throws"""
1165 with
with_host(cephadm_module
, 'test'):
1166 _run_cephadm
.reset_mock()
1167 _run_cephadm
.side_effect
= OrchestratorError(error_message
)
1169 s
= CephadmServe(cephadm_module
)._refresh
_host
_devices
('test')
1170 assert s
== 'host test `cephadm ceph-volume` failed: ' + error_message
1172 assert _run_cephadm
.mock_calls
== [
1173 mock
.call('test', 'osd', 'ceph-volume',
1174 ['--', 'inventory', '--format=json', '--filter-for-batch'], image
='',
1176 mock
.call('test', 'osd', 'ceph-volume',
1177 ['--', 'inventory', '--format=json'], image
='',
1181 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
1182 def test_osd_activate(self
, _run_cephadm
, cephadm_module
: CephadmOrchestrator
):
1183 _run_cephadm
.return_value
= ('{}', '', 0)
1184 with
with_host(cephadm_module
, 'test', refresh_hosts
=False):
1185 cephadm_module
.mock_store_set('_ceph_get', 'osd_map', {
1195 ceph_volume_lvm_list
= {
1198 'ceph.cluster_fsid': cephadm_module
._cluster
_fsid
,
1199 'ceph.osd_fsid': 'uuid'
1203 _run_cephadm
.return_value
= (json
.dumps(ceph_volume_lvm_list
), '', 0)
1204 assert cephadm_module
._osd
_activate
(
1205 ['test']).stdout
== "Created osd(s) 1 on host 'test'"