]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/tests/test_cephadm.py
8c1949e74db69aa3688d8b2a26d9d8e108d87cf5
[ceph.git] / ceph / src / pybind / mgr / cephadm / tests / test_cephadm.py
1 import json
2 from contextlib import contextmanager
3 from unittest.mock import ANY
4
5 import pytest
6
7 from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
8 from cephadm.serve import CephadmServe
9 from cephadm.services.osd import OSD, OSDRemovalQueue
10
11 try:
12 from typing import List
13 except ImportError:
14 pass
15
16 from execnet.gateway_bootstrap import HostNotFound
17
18 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec, RGWSpec, \
19 NFSServiceSpec, IscsiServiceSpec, HostPlacementSpec, CustomContainerSpec
20 from ceph.deployment.drive_selection.selector import DriveSelection
21 from ceph.deployment.inventory import Devices, Device
22 from ceph.utils import datetime_to_str, datetime_now
23 from orchestrator import DaemonDescription, InventoryHost, \
24 HostSpec, OrchestratorError
25 from tests import mock
26 from .fixtures import wait, _run_cephadm, match_glob, with_host, \
27 with_cephadm_module, with_service, _deploy_cephadm_binary
28 from cephadm.module import CephadmOrchestrator
29
30 """
31 TODOs:
32 There is really room for improvement here. I just quickly assembled theses tests.
33 I general, everything should be testes in Teuthology as well. Reasons for
34 also testing this here is the development roundtrip time.
35 """
36
37
38 def assert_rm_daemon(cephadm: CephadmOrchestrator, prefix, host):
39 dds: List[DaemonDescription] = wait(cephadm, cephadm.list_daemons(host=host))
40 d_names = [dd.name() for dd in dds if dd.name().startswith(prefix)]
41 assert d_names
42 # there should only be one daemon (if not match_glob will throw mismatch)
43 assert len(d_names) == 1
44
45 c = cephadm.remove_daemons(d_names)
46 [out] = wait(cephadm, c)
47 # picking the 1st element is needed, rather than passing the list when the daemon
48 # name contains '-' char. If not, the '-' is treated as a range i.e. cephadm-exporter
49 # is treated like a m-e range which is invalid. rbd-mirror (d-m) and node-exporter (e-e)
50 # are valid, so pass without incident! Also, match_gob acts on strings anyway!
51 match_glob(out, f"Removed {d_names[0]}* from host '{host}'")
52
53
54 @contextmanager
55 def with_daemon(cephadm_module: CephadmOrchestrator, spec: ServiceSpec, host: str):
56 spec.placement = PlacementSpec(hosts=[host], count=1)
57
58 c = cephadm_module.add_daemon(spec)
59 [out] = wait(cephadm_module, c)
60 match_glob(out, f"Deployed {spec.service_name()}.* on host '{host}'")
61
62 dds = cephadm_module.cache.get_daemons_by_service(spec.service_name())
63 for dd in dds:
64 if dd.hostname == host:
65 yield dd.daemon_id
66 assert_rm_daemon(cephadm_module, spec.service_name(), host)
67 return
68
69 assert False, 'Daemon not found'
70
71
72 class TestCephadm(object):
73
74 def test_get_unique_name(self, cephadm_module):
75 # type: (CephadmOrchestrator) -> None
76 existing = [
77 DaemonDescription(daemon_type='mon', daemon_id='a')
78 ]
79 new_mon = cephadm_module.get_unique_name('mon', 'myhost', existing)
80 match_glob(new_mon, 'myhost')
81 new_mgr = cephadm_module.get_unique_name('mgr', 'myhost', existing)
82 match_glob(new_mgr, 'myhost.*')
83
84 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
85 def test_host(self, cephadm_module):
86 assert wait(cephadm_module, cephadm_module.get_hosts()) == []
87 with with_host(cephadm_module, 'test'):
88 assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', 'test')]
89
90 # Be careful with backward compatibility when changing things here:
91 assert json.loads(cephadm_module.get_store('inventory')) == \
92 {"test": {"hostname": "test", "addr": "test", "labels": [], "status": ""}}
93
94 with with_host(cephadm_module, 'second'):
95 assert wait(cephadm_module, cephadm_module.get_hosts()) == [
96 HostSpec('test', 'test'),
97 HostSpec('second', 'second')
98 ]
99
100 assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', 'test')]
101 assert wait(cephadm_module, cephadm_module.get_hosts()) == []
102
103 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
104 def test_service_ls(self, cephadm_module):
105 with with_host(cephadm_module, 'test'):
106 c = cephadm_module.list_daemons(refresh=True)
107 assert wait(cephadm_module, c) == []
108 with with_service(cephadm_module, ServiceSpec('mds', 'name', unmanaged=True)) as _, \
109 with_daemon(cephadm_module, ServiceSpec('mds', 'name'), 'test') as _:
110
111 c = cephadm_module.list_daemons()
112
113 def remove_id_events(dd):
114 out = dd.to_json()
115 del out['daemon_id']
116 del out['events']
117 return out
118
119 assert [remove_id_events(dd) for dd in wait(cephadm_module, c)] == [
120 {
121 'daemon_type': 'mds',
122 'hostname': 'test',
123 'status': 1,
124 'status_desc': 'starting',
125 'is_active': False,
126 'ports': [],
127 }
128 ]
129
130 with with_service(cephadm_module, ServiceSpec('rgw', 'r.z'), CephadmOrchestrator.apply_rgw, 'test'):
131
132 c = cephadm_module.describe_service()
133 out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
134 expected = [
135 {
136 'placement': {'count': 2},
137 'service_id': 'name',
138 'service_name': 'mds.name',
139 'service_type': 'mds',
140 'status': {'created': mock.ANY, 'running': 1, 'size': 2},
141 'unmanaged': True
142 },
143 {
144 'placement': {
145 'count': 1,
146 'hosts': ["test"]
147 },
148 'service_id': 'r.z',
149 'service_name': 'rgw.r.z',
150 'service_type': 'rgw',
151 'status': {'created': mock.ANY, 'running': 1, 'size': 1,
152 'ports': [80]},
153 }
154 ]
155 for o in out:
156 if 'events' in o:
157 del o['events'] # delete it, as it contains a timestamp
158 assert out == expected
159
160 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
161 def test_device_ls(self, cephadm_module):
162 with with_host(cephadm_module, 'test'):
163 c = cephadm_module.get_inventory()
164 assert wait(cephadm_module, c) == [InventoryHost('test')]
165
166 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
167 json.dumps([
168 dict(
169 name='rgw.myrgw.foobar',
170 style='cephadm',
171 fsid='fsid',
172 container_id='container_id',
173 version='version',
174 state='running',
175 )
176 ])
177 ))
178 def test_list_daemons(self, cephadm_module: CephadmOrchestrator):
179 cephadm_module.service_cache_timeout = 10
180 with with_host(cephadm_module, 'test'):
181 CephadmServe(cephadm_module)._refresh_host_daemons('test')
182 c = cephadm_module.list_daemons()
183 assert wait(cephadm_module, c)[0].name() == 'rgw.myrgw.foobar'
184
185 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
186 def test_daemon_action(self, cephadm_module: CephadmOrchestrator):
187 cephadm_module.service_cache_timeout = 10
188 with with_host(cephadm_module, 'test'):
189 with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \
190 with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), 'test') as daemon_id:
191
192 c = cephadm_module.daemon_action('redeploy', 'rgw.' + daemon_id)
193 assert wait(cephadm_module,
194 c) == f"Scheduled to redeploy rgw.{daemon_id} on host 'test'"
195
196 for what in ('start', 'stop', 'restart'):
197 c = cephadm_module.daemon_action(what, 'rgw.' + daemon_id)
198 assert wait(cephadm_module,
199 c) == F"Scheduled to {what} rgw.{daemon_id} on host 'test'"
200
201 # Make sure, _check_daemons does a redeploy due to monmap change:
202 cephadm_module._store['_ceph_get/mon_map'] = {
203 'modified': datetime_to_str(datetime_now()),
204 'fsid': 'foobar',
205 }
206 cephadm_module.notify('mon_map', None)
207
208 CephadmServe(cephadm_module)._check_daemons()
209
210 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
211 def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator):
212 cephadm_module.service_cache_timeout = 10
213 with with_host(cephadm_module, 'test'):
214 with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \
215 with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), 'test') as daemon_id:
216 with mock.patch('ceph_module.BaseMgrModule._ceph_send_command') as _ceph_send_command:
217
218 _ceph_send_command.side_effect = Exception("myerror")
219
220 # Make sure, _check_daemons does a redeploy due to monmap change:
221 cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
222 'modified': datetime_to_str(datetime_now()),
223 'fsid': 'foobar',
224 })
225 cephadm_module.notify('mon_map', None)
226
227 CephadmServe(cephadm_module)._check_daemons()
228
229 evs = [e.message for e in cephadm_module.events.get_for_daemon(
230 f'rgw.{daemon_id}')]
231
232 assert 'myerror' in ''.join(evs)
233
234 @pytest.mark.parametrize(
235 "action",
236 [
237 'start',
238 'stop',
239 'restart',
240 'reconfig',
241 'redeploy'
242 ]
243 )
244 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
245 def test_daemon_check(self, cephadm_module: CephadmOrchestrator, action):
246 with with_host(cephadm_module, 'test'):
247 with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test') as d_names:
248 [daemon_name] = d_names
249
250 cephadm_module._schedule_daemon_action(daemon_name, action)
251
252 assert cephadm_module.cache.get_scheduled_daemon_action(
253 'test', daemon_name) == action
254
255 CephadmServe(cephadm_module)._check_daemons()
256
257 assert cephadm_module.cache.get_scheduled_daemon_action('test', daemon_name) is None
258
259 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
260 def test_daemon_check_extra_config(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
261 _run_cephadm.return_value = ('{}', '', 0)
262
263 with with_host(cephadm_module, 'test'):
264
265 # Also testing deploying mons without explicit network placement
266 cephadm_module.check_mon_command({
267 'prefix': 'config set',
268 'who': 'mon',
269 'name': 'public_network',
270 'value': '127.0.0.0/8'
271 })
272
273 cephadm_module.cache.update_host_devices_networks(
274 'test',
275 [],
276 {
277 "127.0.0.0/8": [
278 "127.0.0.1"
279 ],
280 }
281 )
282
283 with with_service(cephadm_module, ServiceSpec(service_type='mon'), CephadmOrchestrator.apply_mon, 'test') as d_names:
284 [daemon_name] = d_names
285
286 cephadm_module._set_extra_ceph_conf('[mon]\nk=v')
287
288 CephadmServe(cephadm_module)._check_daemons()
289
290 _run_cephadm.assert_called_with(
291 'test', 'mon.test', 'deploy', [
292 '--name', 'mon.test',
293 '--meta-json', '{"service_name": "mon", "ports": [], "ip": null, "deployed_by": []}',
294 '--config-json', '-',
295 '--reconfig',
296 ],
297 stdin='{"config": "\\n\\n[mon]\\nk=v\\n[mon.test]\\npublic network = 127.0.0.0/8\\n", '
298 + '"keyring": "", "files": {"config": "[mon.test]\\npublic network = 127.0.0.0/8\\n"}}',
299 image='')
300
301 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
302 def test_daemon_check_post(self, cephadm_module: CephadmOrchestrator):
303 with with_host(cephadm_module, 'test'):
304 with with_service(cephadm_module, ServiceSpec(service_type='grafana'), CephadmOrchestrator.apply_grafana, 'test'):
305
306 # Make sure, _check_daemons does a redeploy due to monmap change:
307 cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
308 'modified': datetime_to_str(datetime_now()),
309 'fsid': 'foobar',
310 })
311 cephadm_module.notify('mon_map', None)
312 cephadm_module.mock_store_set('_ceph_get', 'mgr_map', {
313 'modules': ['dashboard']
314 })
315
316 with mock.patch("cephadm.module.CephadmOrchestrator.mon_command") as _mon_cmd:
317 CephadmServe(cephadm_module)._check_daemons()
318 _mon_cmd.assert_any_call(
319 {'prefix': 'dashboard set-grafana-api-url', 'value': 'https://test:3000'},
320 None)
321
322 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
323 def test_mon_add(self, cephadm_module):
324 with with_host(cephadm_module, 'test'):
325 with with_service(cephadm_module, ServiceSpec(service_type='mon', unmanaged=True)):
326 ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
327 c = cephadm_module.add_daemon(ServiceSpec('mon', placement=ps))
328 assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
329
330 with pytest.raises(OrchestratorError, match="Must set public_network config option or specify a CIDR network,"):
331 ps = PlacementSpec(hosts=['test'], count=1)
332 c = cephadm_module.add_daemon(ServiceSpec('mon', placement=ps))
333 wait(cephadm_module, c)
334
335 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
336 def test_mgr_update(self, cephadm_module):
337 with with_host(cephadm_module, 'test'):
338 ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
339 r = CephadmServe(cephadm_module)._apply_service(ServiceSpec('mgr', placement=ps))
340 assert r
341
342 assert_rm_daemon(cephadm_module, 'mgr.a', 'test')
343
344 @mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
345 def test_find_destroyed_osds(self, _mon_cmd, cephadm_module):
346 dict_out = {
347 "nodes": [
348 {
349 "id": -1,
350 "name": "default",
351 "type": "root",
352 "type_id": 11,
353 "children": [
354 -3
355 ]
356 },
357 {
358 "id": -3,
359 "name": "host1",
360 "type": "host",
361 "type_id": 1,
362 "pool_weights": {},
363 "children": [
364 0
365 ]
366 },
367 {
368 "id": 0,
369 "device_class": "hdd",
370 "name": "osd.0",
371 "type": "osd",
372 "type_id": 0,
373 "crush_weight": 0.0243988037109375,
374 "depth": 2,
375 "pool_weights": {},
376 "exists": 1,
377 "status": "destroyed",
378 "reweight": 1,
379 "primary_affinity": 1
380 }
381 ],
382 "stray": []
383 }
384 json_out = json.dumps(dict_out)
385 _mon_cmd.return_value = (0, json_out, '')
386 out = cephadm_module.osd_service.find_destroyed_osds()
387 assert out == {'host1': ['0']}
388
389 @ pytest.mark.parametrize(
390 "ceph_services, cephadm_daemons, strays_expected, metadata",
391 # [ ([(daemon_type, daemon_id), ... ], [...], [...]), ... ]
392 [
393 (
394 [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
395 [],
396 [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
397 {},
398 ),
399 (
400 [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
401 [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
402 [],
403 {},
404 ),
405 (
406 [('mds', 'a'), ('osd', '0'), ('mgr', 'x')],
407 [('mds', 'a'), ('osd', '0')],
408 [('mgr', 'x')],
409 {},
410 ),
411 # https://tracker.ceph.com/issues/49573
412 (
413 [('rgw-nfs', '14649')],
414 [],
415 [('nfs', 'foo-rgw.host1')],
416 {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}},
417 ),
418 (
419 [('rgw-nfs', '14649'), ('rgw-nfs', '14650')],
420 [('nfs', 'foo-rgw.host1'), ('nfs', 'foo2.host2')],
421 [],
422 {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}},
423 ),
424 (
425 [('rgw-nfs', '14649'), ('rgw-nfs', '14650')],
426 [('nfs', 'foo-rgw.host1')],
427 [('nfs', 'foo2.host2')],
428 {'14649': {'id': 'nfs.foo-rgw.host1-rgw'}, '14650': {'id': 'nfs.foo2.host2-rgw'}},
429 ),
430 ]
431 )
432 def test_check_for_stray_daemons(
433 self,
434 cephadm_module,
435 ceph_services,
436 cephadm_daemons,
437 strays_expected,
438 metadata
439 ):
440 # mock ceph service-map
441 services = []
442 for service in ceph_services:
443 s = {'type': service[0], 'id': service[1]}
444 services.append(s)
445 ls = [{'hostname': 'host1', 'services': services}]
446
447 with mock.patch.object(cephadm_module, 'list_servers', mock.MagicMock()) as list_servers:
448 list_servers.return_value = ls
449 list_servers.__iter__.side_effect = ls.__iter__
450
451 # populate cephadm daemon cache
452 dm = {}
453 for daemon_type, daemon_id in cephadm_daemons:
454 dd = DaemonDescription(daemon_type=daemon_type, daemon_id=daemon_id)
455 dm[dd.name()] = dd
456 cephadm_module.cache.update_host_daemons('host1', dm)
457
458 def get_metadata_mock(svc_type, svc_id, default):
459 return metadata[svc_id]
460
461 with mock.patch.object(cephadm_module, 'get_metadata', new_callable=lambda: get_metadata_mock):
462
463 # test
464 CephadmServe(cephadm_module)._check_for_strays()
465
466 # verify
467 strays = cephadm_module.health_checks.get('CEPHADM_STRAY_DAEMON')
468 if not strays:
469 assert len(strays_expected) == 0
470 else:
471 for dt, di in strays_expected:
472 name = '%s.%s' % (dt, di)
473 for detail in strays['detail']:
474 if name in detail:
475 strays['detail'].remove(detail)
476 break
477 assert name in detail
478 assert len(strays['detail']) == 0
479 assert strays['count'] == len(strays_expected)
480
481 @mock.patch("cephadm.module.CephadmOrchestrator.mon_command")
482 def test_find_destroyed_osds_cmd_failure(self, _mon_cmd, cephadm_module):
483 _mon_cmd.return_value = (1, "", "fail_msg")
484 with pytest.raises(OrchestratorError):
485 cephadm_module.osd_service.find_destroyed_osds()
486
487 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
488 def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
489 _run_cephadm.return_value = ('{}', '', 0)
490 with with_host(cephadm_module, 'test'):
491
492 spec = DriveGroupSpec(
493 service_id='foo',
494 placement=PlacementSpec(
495 host_pattern='*',
496 ),
497 data_devices=DeviceSelection(
498 all=True
499 )
500 )
501
502 c = cephadm_module.apply([spec])
503 assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
504
505 inventory = Devices([
506 Device(
507 '/dev/sdb',
508 available=True
509 ),
510 ])
511
512 cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {})
513
514 _run_cephadm.return_value = (['{}'], '', 0)
515
516 assert CephadmServe(cephadm_module)._apply_all_services() is False
517
518 _run_cephadm.assert_any_call(
519 'test', 'osd', 'ceph-volume',
520 ['--config-json', '-', '--', 'lvm', 'batch',
521 '--no-auto', '/dev/sdb', '--yes', '--no-systemd'],
522 env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=foo'], error_ok=True, stdin='{"config": "", "keyring": ""}')
523 _run_cephadm.assert_called_with(
524 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False)
525
526 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
527 def test_apply_osd_save_non_collocated(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
528 _run_cephadm.return_value = ('{}', '', 0)
529 with with_host(cephadm_module, 'test'):
530
531 spec = DriveGroupSpec(
532 service_id='noncollocated',
533 placement=PlacementSpec(
534 hosts=['test']
535 ),
536 data_devices=DeviceSelection(paths=['/dev/sdb']),
537 db_devices=DeviceSelection(paths=['/dev/sdc']),
538 wal_devices=DeviceSelection(paths=['/dev/sdd'])
539 )
540
541 c = cephadm_module.apply([spec])
542 assert wait(cephadm_module, c) == ['Scheduled osd.noncollocated update...']
543
544 inventory = Devices([
545 Device('/dev/sdb', available=True),
546 Device('/dev/sdc', available=True),
547 Device('/dev/sdd', available=True)
548 ])
549
550 cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {})
551
552 _run_cephadm.return_value = (['{}'], '', 0)
553
554 assert CephadmServe(cephadm_module)._apply_all_services() is False
555
556 _run_cephadm.assert_any_call(
557 'test', 'osd', 'ceph-volume',
558 ['--config-json', '-', '--', 'lvm', 'batch',
559 '--no-auto', '/dev/sdb', '--db-devices', '/dev/sdc',
560 '--wal-devices', '/dev/sdd', '--yes', '--no-systemd'],
561 env_vars=['CEPH_VOLUME_OSDSPEC_AFFINITY=noncollocated'],
562 error_ok=True, stdin='{"config": "", "keyring": ""}')
563 _run_cephadm.assert_called_with(
564 'test', 'osd', 'ceph-volume', ['--', 'lvm', 'list', '--format', 'json'], image='', no_fsid=False)
565
566 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
567 @mock.patch("cephadm.module.SpecStore.save")
568 def test_apply_osd_save_placement(self, _save_spec, cephadm_module):
569 with with_host(cephadm_module, 'test'):
570 json_spec = {'service_type': 'osd', 'placement': {'host_pattern': 'test'},
571 'service_id': 'foo', 'data_devices': {'all': True}}
572 spec = ServiceSpec.from_json(json_spec)
573 assert isinstance(spec, DriveGroupSpec)
574 c = cephadm_module.apply([spec])
575 assert wait(cephadm_module, c) == ['Scheduled osd.foo update...']
576 _save_spec.assert_called_with(spec)
577
578 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
579 def test_create_osds(self, cephadm_module):
580 with with_host(cephadm_module, 'test'):
581 dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
582 data_devices=DeviceSelection(paths=['']))
583 c = cephadm_module.create_osds(dg)
584 out = wait(cephadm_module, c)
585 assert out == "Created no osd(s) on host test; already created?"
586
587 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
588 def test_create_noncollocated_osd(self, cephadm_module):
589 with with_host(cephadm_module, 'test'):
590 dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
591 data_devices=DeviceSelection(paths=['']))
592 c = cephadm_module.create_osds(dg)
593 out = wait(cephadm_module, c)
594 assert out == "Created no osd(s) on host test; already created?"
595
596 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
597 def test_prepare_drivegroup(self, cephadm_module):
598 with with_host(cephadm_module, 'test'):
599 dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'),
600 data_devices=DeviceSelection(paths=['']))
601 out = cephadm_module.osd_service.prepare_drivegroup(dg)
602 assert len(out) == 1
603 f1 = out[0]
604 assert f1[0] == 'test'
605 assert isinstance(f1[1], DriveSelection)
606
607 @pytest.mark.parametrize(
608 "devices, preview, exp_command",
609 [
610 # no preview and only one disk, prepare is used due the hack that is in place.
611 (['/dev/sda'], False, "lvm batch --no-auto /dev/sda --yes --no-systemd"),
612 # no preview and multiple disks, uses batch
613 (['/dev/sda', '/dev/sdb'], False,
614 "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd"),
615 # preview and only one disk needs to use batch again to generate the preview
616 (['/dev/sda'], True, "lvm batch --no-auto /dev/sda --yes --no-systemd --report --format json"),
617 # preview and multiple disks work the same
618 (['/dev/sda', '/dev/sdb'], True,
619 "CEPH_VOLUME_OSDSPEC_AFFINITY=test.spec lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd --report --format json"),
620 ]
621 )
622 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
623 def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
624 with with_host(cephadm_module, 'test'):
625 dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(
626 host_pattern='test'), data_devices=DeviceSelection(paths=devices))
627 ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
628 preview = preview
629 out = cephadm_module.osd_service.driveselection_to_ceph_volume(ds, [], preview)
630 assert out in exp_command
631
632 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
633 json.dumps([
634 dict(
635 name='osd.0',
636 style='cephadm',
637 fsid='fsid',
638 container_id='container_id',
639 version='version',
640 state='running',
641 )
642 ])
643 ))
644 @mock.patch("cephadm.services.osd.OSD.exists", True)
645 @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
646 def test_remove_osds(self, cephadm_module):
647 with with_host(cephadm_module, 'test'):
648 CephadmServe(cephadm_module)._refresh_host_daemons('test')
649 c = cephadm_module.list_daemons()
650 wait(cephadm_module, c)
651
652 c = cephadm_module.remove_daemons(['osd.0'])
653 out = wait(cephadm_module, c)
654 assert out == ["Removed osd.0 from host 'test'"]
655
656 cephadm_module.to_remove_osds.enqueue(OSD(osd_id=0,
657 replace=False,
658 force=False,
659 hostname='test',
660 process_started_at=datetime_now(),
661 remove_util=cephadm_module.to_remove_osds.rm_util
662 ))
663 cephadm_module.to_remove_osds.process_removal_queue()
664 assert cephadm_module.to_remove_osds == OSDRemovalQueue(cephadm_module)
665
666 c = cephadm_module.remove_osds_status()
667 out = wait(cephadm_module, c)
668 assert out == []
669
670 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
671 def test_rgw_update(self, cephadm_module):
672 with with_host(cephadm_module, 'host1'):
673 with with_host(cephadm_module, 'host2'):
674 with with_service(cephadm_module, RGWSpec(service_id="foo", unmanaged=True)):
675 ps = PlacementSpec(hosts=['host1'], count=1)
676 c = cephadm_module.add_daemon(
677 RGWSpec(service_id="foo", placement=ps))
678 [out] = wait(cephadm_module, c)
679 match_glob(out, "Deployed rgw.foo.* on host 'host1'")
680
681 ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
682 r = CephadmServe(cephadm_module)._apply_service(
683 RGWSpec(service_id="foo", placement=ps))
684 assert r
685
686 assert_rm_daemon(cephadm_module, 'rgw.foo', 'host1')
687 assert_rm_daemon(cephadm_module, 'rgw.foo', 'host2')
688
689 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
690 json.dumps([
691 dict(
692 name='rgw.myrgw.myhost.myid',
693 style='cephadm',
694 fsid='fsid',
695 container_id='container_id',
696 version='version',
697 state='running',
698 )
699 ])
700 ))
701 def test_remove_daemon(self, cephadm_module):
702 with with_host(cephadm_module, 'test'):
703 CephadmServe(cephadm_module)._refresh_host_daemons('test')
704 c = cephadm_module.list_daemons()
705 wait(cephadm_module, c)
706 c = cephadm_module.remove_daemons(['rgw.myrgw.myhost.myid'])
707 out = wait(cephadm_module, c)
708 assert out == ["Removed rgw.myrgw.myhost.myid from host 'test'"]
709
710 @pytest.mark.parametrize(
711 "spec",
712 [
713 ServiceSpec('crash'),
714 ServiceSpec('prometheus'),
715 ServiceSpec('grafana'),
716 ServiceSpec('node-exporter'),
717 ServiceSpec('alertmanager'),
718 ServiceSpec('rbd-mirror'),
719 ServiceSpec('cephfs-mirror'),
720 ServiceSpec('mds', service_id='fsname'),
721 RGWSpec(rgw_realm='realm', rgw_zone='zone'),
722 RGWSpec(service_id="foo"),
723 ServiceSpec('cephadm-exporter'),
724 ]
725 )
726 @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
727 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
728 def test_daemon_add(self, spec: ServiceSpec, cephadm_module):
729 unmanaged_spec = ServiceSpec.from_json(spec.to_json())
730 unmanaged_spec.unmanaged = True
731 with with_host(cephadm_module, 'test'):
732 with with_service(cephadm_module, unmanaged_spec):
733 with with_daemon(cephadm_module, spec, 'test'):
734 pass
735
736 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
737 def test_daemon_add_fail(self, _run_cephadm, cephadm_module):
738 _run_cephadm.return_value = '{}', '', 0
739 with with_host(cephadm_module, 'test'):
740 spec = ServiceSpec(
741 service_type='mgr',
742 placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1),
743 unmanaged=True
744 )
745 with with_service(cephadm_module, spec):
746 _run_cephadm.side_effect = OrchestratorError('fail')
747 with pytest.raises(OrchestratorError):
748 wait(cephadm_module, cephadm_module.add_daemon(spec))
749 cephadm_module.assert_issued_mon_command({
750 'prefix': 'auth rm',
751 'entity': 'mgr.x',
752 })
753
754 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
755 @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
756 def test_nfs(self, cephadm_module):
757 with with_host(cephadm_module, 'test'):
758 ps = PlacementSpec(hosts=['test'], count=1)
759 spec = NFSServiceSpec(
760 service_id='name',
761 pool='pool',
762 namespace='namespace',
763 placement=ps)
764 unmanaged_spec = ServiceSpec.from_json(spec.to_json())
765 unmanaged_spec.unmanaged = True
766 with with_service(cephadm_module, unmanaged_spec):
767 c = cephadm_module.add_daemon(spec)
768 [out] = wait(cephadm_module, c)
769 match_glob(out, "Deployed nfs.name.* on host 'test'")
770
771 assert_rm_daemon(cephadm_module, 'nfs.name.test', 'test')
772
773 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
774 @mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
775 def test_iscsi(self, cephadm_module):
776 with with_host(cephadm_module, 'test'):
777 ps = PlacementSpec(hosts=['test'], count=1)
778 spec = IscsiServiceSpec(
779 service_id='name',
780 pool='pool',
781 api_user='user',
782 api_password='password',
783 placement=ps)
784 unmanaged_spec = ServiceSpec.from_json(spec.to_json())
785 unmanaged_spec.unmanaged = True
786 with with_service(cephadm_module, unmanaged_spec):
787
788 c = cephadm_module.add_daemon(spec)
789 [out] = wait(cephadm_module, c)
790 match_glob(out, "Deployed iscsi.name.* on host 'test'")
791
792 assert_rm_daemon(cephadm_module, 'iscsi.name.test', 'test')
793
794 @pytest.mark.parametrize(
795 "on_bool",
796 [
797 True,
798 False
799 ]
800 )
801 @pytest.mark.parametrize(
802 "fault_ident",
803 [
804 'fault',
805 'ident'
806 ]
807 )
808 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
809 def test_blink_device_light(self, _run_cephadm, on_bool, fault_ident, cephadm_module):
810 _run_cephadm.return_value = '{}', '', 0
811 with with_host(cephadm_module, 'test'):
812 c = cephadm_module.blink_device_light(fault_ident, on_bool, [('test', '', 'dev')])
813 on_off = 'on' if on_bool else 'off'
814 assert wait(cephadm_module, c) == [f'Set {fault_ident} light for test: {on_off}']
815 _run_cephadm.assert_called_with('test', 'osd', 'shell', [
816 '--', 'lsmcli', f'local-disk-{fault_ident}-led-{on_off}', '--path', 'dev'], error_ok=True)
817
818 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
819 def test_blink_device_light_custom(self, _run_cephadm, cephadm_module):
820 _run_cephadm.return_value = '{}', '', 0
821 with with_host(cephadm_module, 'test'):
822 cephadm_module.set_store('blink_device_light_cmd', 'echo hello')
823 c = cephadm_module.blink_device_light('ident', True, [('test', '', '/dev/sda')])
824 assert wait(cephadm_module, c) == ['Set ident light for test: on']
825 _run_cephadm.assert_called_with('test', 'osd', 'shell', [
826 '--', 'echo', 'hello'], error_ok=True)
827
828 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
829 def test_blink_device_light_custom_per_host(self, _run_cephadm, cephadm_module):
830 _run_cephadm.return_value = '{}', '', 0
831 with with_host(cephadm_module, 'mgr0'):
832 cephadm_module.set_store('mgr0/blink_device_light_cmd',
833 'xyz --foo --{{ ident_fault }}={{\'on\' if on else \'off\'}} \'{{ path or dev }}\'')
834 c = cephadm_module.blink_device_light(
835 'fault', True, [('mgr0', 'SanDisk_X400_M.2_2280_512GB_162924424784', '')])
836 assert wait(cephadm_module, c) == [
837 'Set fault light for mgr0:SanDisk_X400_M.2_2280_512GB_162924424784 on']
838 _run_cephadm.assert_called_with('mgr0', 'osd', 'shell', [
839 '--', 'xyz', '--foo', '--fault=on', 'SanDisk_X400_M.2_2280_512GB_162924424784'
840 ], error_ok=True)
841
842 @pytest.mark.parametrize(
843 "spec, meth",
844 [
845 (ServiceSpec('mgr'), CephadmOrchestrator.apply_mgr),
846 (ServiceSpec('crash'), CephadmOrchestrator.apply_crash),
847 (ServiceSpec('prometheus'), CephadmOrchestrator.apply_prometheus),
848 (ServiceSpec('grafana'), CephadmOrchestrator.apply_grafana),
849 (ServiceSpec('node-exporter'), CephadmOrchestrator.apply_node_exporter),
850 (ServiceSpec('alertmanager'), CephadmOrchestrator.apply_alertmanager),
851 (ServiceSpec('rbd-mirror'), CephadmOrchestrator.apply_rbd_mirror),
852 (ServiceSpec('cephfs-mirror'), CephadmOrchestrator.apply_rbd_mirror),
853 (ServiceSpec('mds', service_id='fsname'), CephadmOrchestrator.apply_mds),
854 (ServiceSpec(
855 'mds', service_id='fsname',
856 placement=PlacementSpec(
857 hosts=[HostPlacementSpec(
858 hostname='test',
859 name='fsname',
860 network=''
861 )]
862 )
863 ), CephadmOrchestrator.apply_mds),
864 (RGWSpec(service_id='foo'), CephadmOrchestrator.apply_rgw),
865 (RGWSpec(
866 service_id='bar',
867 rgw_realm='realm', rgw_zone='zone',
868 placement=PlacementSpec(
869 hosts=[HostPlacementSpec(
870 hostname='test',
871 name='bar',
872 network=''
873 )]
874 )
875 ), CephadmOrchestrator.apply_rgw),
876 (NFSServiceSpec(
877 service_id='name',
878 pool='pool',
879 namespace='namespace'
880 ), CephadmOrchestrator.apply_nfs),
881 (IscsiServiceSpec(
882 service_id='name',
883 pool='pool',
884 api_user='user',
885 api_password='password'
886 ), CephadmOrchestrator.apply_iscsi),
887 (CustomContainerSpec(
888 service_id='hello-world',
889 image='docker.io/library/hello-world:latest',
890 uid=65534,
891 gid=65534,
892 dirs=['foo/bar'],
893 files={
894 'foo/bar/xyz.conf': 'aaa\nbbb'
895 },
896 bind_mounts=[[
897 'type=bind',
898 'source=lib/modules',
899 'destination=/lib/modules',
900 'ro=true'
901 ]],
902 volume_mounts={
903 'foo/bar': '/foo/bar:Z'
904 },
905 args=['--no-healthcheck'],
906 envs=['SECRET=password'],
907 ports=[8080, 8443]
908 ), CephadmOrchestrator.apply_container),
909 (ServiceSpec('cephadm-exporter'), CephadmOrchestrator.apply_cephadm_exporter),
910 ]
911 )
912 @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
913 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
914 def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
915 with with_host(cephadm_module, 'test'):
916 with with_service(cephadm_module, spec, meth, 'test'):
917 pass
918
919 @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
920 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
921 def test_mds_config_purge(self, cephadm_module: CephadmOrchestrator):
922 spec = ServiceSpec('mds', service_id='fsname')
923 with with_host(cephadm_module, 'test'):
924 with with_service(cephadm_module, spec, host='test'):
925 ret, out, err = cephadm_module.check_mon_command({
926 'prefix': 'config get',
927 'who': spec.service_name(),
928 'key': 'mds_join_fs',
929 })
930 assert out == 'fsname'
931 ret, out, err = cephadm_module.check_mon_command({
932 'prefix': 'config get',
933 'who': spec.service_name(),
934 'key': 'mds_join_fs',
935 })
936 assert not out
937
938 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
939 @mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop")
940 def test_daemon_ok_to_stop(self, ok_to_stop, cephadm_module: CephadmOrchestrator):
941 spec = ServiceSpec(
942 'mds',
943 service_id='fsname',
944 placement=PlacementSpec(hosts=['host1', 'host2'])
945 )
946 with with_host(cephadm_module, 'host1'), with_host(cephadm_module, 'host2'):
947 c = cephadm_module.apply_mds(spec)
948 out = wait(cephadm_module, c)
949 match_glob(out, "Scheduled mds.fsname update...")
950 CephadmServe(cephadm_module)._apply_all_services()
951
952 [daemon] = cephadm_module.cache.daemons['host1'].keys()
953
954 spec.placement.set_hosts(['host2'])
955
956 ok_to_stop.side_effect = False
957
958 c = cephadm_module.apply_mds(spec)
959 out = wait(cephadm_module, c)
960 match_glob(out, "Scheduled mds.fsname update...")
961 CephadmServe(cephadm_module)._apply_all_services()
962
963 ok_to_stop.assert_called_with([daemon[4:]], force=True)
964
965 assert_rm_daemon(cephadm_module, spec.service_name(), 'host1') # verifies ok-to-stop
966 assert_rm_daemon(cephadm_module, spec.service_name(), 'host2')
967
968 @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
969 @mock.patch("remoto.process.check")
970 def test_offline(self, _check, _get_connection, cephadm_module):
971 _check.return_value = '{}', '', 0
972 _get_connection.return_value = mock.Mock(), mock.Mock()
973 with with_host(cephadm_module, 'test'):
974 _get_connection.side_effect = HostNotFound
975 code, out, err = cephadm_module.check_host('test')
976 assert out == ''
977 assert "Host 'test' not found" in err
978
979 out = wait(cephadm_module, cephadm_module.get_hosts())[0].to_json()
980 assert out == HostSpec('test', 'test', status='Offline').to_json()
981
982 _get_connection.side_effect = None
983 assert CephadmServe(cephadm_module)._check_host('test') is None
984 out = wait(cephadm_module, cephadm_module.get_hosts())[0].to_json()
985 assert out == HostSpec('test', 'test').to_json()
986
987 def test_stale_connections(self, cephadm_module):
988 class Connection(object):
989 """
990 A mocked connection class that only allows the use of the connection
991 once. If you attempt to use it again via a _check, it'll explode (go
992 boom!).
993
994 The old code triggers the boom. The new code checks the has_connection
995 and will recreate the connection.
996 """
997 fuse = False
998
999 @ staticmethod
1000 def has_connection():
1001 return False
1002
1003 def import_module(self, *args, **kargs):
1004 return mock.Mock()
1005
1006 @ staticmethod
1007 def exit():
1008 pass
1009
1010 def _check(conn, *args, **kargs):
1011 if conn.fuse:
1012 raise Exception("boom: connection is dead")
1013 else:
1014 conn.fuse = True
1015 return '{}', [], 0
1016 with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]):
1017 with mock.patch("remoto.process.check", _check):
1018 with with_host(cephadm_module, 'test', refresh_hosts=False):
1019 code, out, err = cephadm_module.check_host('test')
1020 # First should succeed.
1021 assert err == ''
1022
1023 # On second it should attempt to reuse the connection, where the
1024 # connection is "down" so will recreate the connection. The old
1025 # code will blow up here triggering the BOOM!
1026 code, out, err = cephadm_module.check_host('test')
1027 assert err == ''
1028
1029 @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
1030 @mock.patch("remoto.process.check")
1031 def test_etc_ceph(self, _check, _get_connection, cephadm_module):
1032 _get_connection.return_value = mock.Mock(), mock.Mock()
1033 _check.return_value = '{}', '', 0
1034
1035 assert cephadm_module.manage_etc_ceph_ceph_conf is False
1036
1037 with with_host(cephadm_module, 'test'):
1038 assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
1039
1040 with with_host(cephadm_module, 'test'):
1041 cephadm_module.set_module_option('manage_etc_ceph_ceph_conf', True)
1042 cephadm_module.config_notify()
1043 assert cephadm_module.manage_etc_ceph_ceph_conf is True
1044
1045 CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
1046 _check.assert_called_with(ANY, ['dd', 'of=/etc/ceph/ceph.conf'], stdin=b'')
1047
1048 assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
1049
1050 # set extra config and expect that we deploy another ceph.conf
1051 cephadm_module._set_extra_ceph_conf('[mon]\nk=v')
1052 CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
1053 _check.assert_called_with(
1054 ANY, ['dd', 'of=/etc/ceph/ceph.conf'], stdin=b'\n\n[mon]\nk=v\n')
1055
1056 # reload
1057 cephadm_module.cache.last_etc_ceph_ceph_conf = {}
1058 cephadm_module.cache.load()
1059
1060 assert not cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
1061
1062 # Make sure, _check_daemons does a redeploy due to monmap change:
1063 cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
1064 'modified': datetime_to_str(datetime_now()),
1065 'fsid': 'foobar',
1066 })
1067 cephadm_module.notify('mon_map', mock.MagicMock())
1068 assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
1069 cephadm_module.cache.last_etc_ceph_ceph_conf = {}
1070 cephadm_module.cache.load()
1071 assert cephadm_module.cache.host_needs_new_etc_ceph_ceph_conf('test')
1072
1073 def test_etc_ceph_init(self):
1074 with with_cephadm_module({'manage_etc_ceph_ceph_conf': True}) as m:
1075 assert m.manage_etc_ceph_ceph_conf is True
1076
1077 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
1078 def test_registry_login(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
1079 def check_registry_credentials(url, username, password):
1080 assert cephadm_module.get_module_option('registry_url') == url
1081 assert cephadm_module.get_module_option('registry_username') == username
1082 assert cephadm_module.get_module_option('registry_password') == password
1083
1084 _run_cephadm.return_value = '{}', '', 0
1085 with with_host(cephadm_module, 'test'):
1086 # test successful login with valid args
1087 code, out, err = cephadm_module.registry_login('test-url', 'test-user', 'test-password')
1088 assert out == 'registry login scheduled'
1089 assert err == ''
1090 check_registry_credentials('test-url', 'test-user', 'test-password')
1091
1092 # test bad login attempt with invalid args
1093 code, out, err = cephadm_module.registry_login('bad-args')
1094 assert err == ("Invalid arguments. Please provide arguments <url> <username> <password> "
1095 "or -i <login credentials json file>")
1096 check_registry_credentials('test-url', 'test-user', 'test-password')
1097
1098 # test bad login using invalid json file
1099 code, out, err = cephadm_module.registry_login(
1100 None, None, None, '{"bad-json": "bad-json"}')
1101 assert err == ("json provided for custom registry login did not include all necessary fields. "
1102 "Please setup json file as\n"
1103 "{\n"
1104 " \"url\": \"REGISTRY_URL\",\n"
1105 " \"username\": \"REGISTRY_USERNAME\",\n"
1106 " \"password\": \"REGISTRY_PASSWORD\"\n"
1107 "}\n")
1108 check_registry_credentials('test-url', 'test-user', 'test-password')
1109
1110 # test good login using valid json file
1111 good_json = ("{\"url\": \"" + "json-url" + "\", \"username\": \"" + "json-user" + "\", "
1112 " \"password\": \"" + "json-pass" + "\"}")
1113 code, out, err = cephadm_module.registry_login(None, None, None, good_json)
1114 assert out == 'registry login scheduled'
1115 assert err == ''
1116 check_registry_credentials('json-url', 'json-user', 'json-pass')
1117
1118 # test bad login where args are valid but login command fails
1119 _run_cephadm.return_value = '{}', 'error', 1
1120 code, out, err = cephadm_module.registry_login('fail-url', 'fail-user', 'fail-password')
1121 assert err == 'Host test failed to login to fail-url as fail-user with given password'
1122 check_registry_credentials('json-url', 'json-user', 'json-pass')
1123
1124 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({
1125 'image_id': 'image_id',
1126 'repo_digests': ['image@repo_digest'],
1127 })))
1128 @pytest.mark.parametrize("use_repo_digest",
1129 [
1130 False,
1131 True
1132 ])
1133 def test_upgrade_run(self, use_repo_digest, cephadm_module: CephadmOrchestrator):
1134 cephadm_module.use_repo_digest = use_repo_digest
1135
1136 with with_host(cephadm_module, 'test', refresh_hosts=False):
1137 cephadm_module.set_container_image('global', 'image')
1138
1139 if use_repo_digest:
1140
1141 CephadmServe(cephadm_module).convert_tags_to_repo_digest()
1142
1143 _, image, _ = cephadm_module.check_mon_command({
1144 'prefix': 'config get',
1145 'who': 'global',
1146 'key': 'container_image',
1147 })
1148 if use_repo_digest:
1149 assert image == 'image@repo_digest'
1150 else:
1151 assert image == 'image'
1152
1153 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
1154 def test_ceph_volume_no_filter_for_batch(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
1155 _run_cephadm.return_value = ('{}', '', 0)
1156
1157 error_message = """cephadm exited with an error code: 1, stderr:/usr/bin/podman:stderr usage: ceph-volume inventory [-h] [--format {plain,json,json-pretty}] [path]/usr/bin/podman:stderr ceph-volume inventory: error: unrecognized arguments: --filter-for-batch
1158 Traceback (most recent call last):
1159 File "<stdin>", line 6112, in <module>
1160 File "<stdin>", line 1299, in _infer_fsid
1161 File "<stdin>", line 1382, in _infer_image
1162 File "<stdin>", line 3612, in command_ceph_volume
1163 File "<stdin>", line 1061, in call_throws"""
1164
1165 with with_host(cephadm_module, 'test'):
1166 _run_cephadm.reset_mock()
1167 _run_cephadm.side_effect = OrchestratorError(error_message)
1168
1169 s = CephadmServe(cephadm_module)._refresh_host_devices('test')
1170 assert s == 'host test `cephadm ceph-volume` failed: ' + error_message
1171
1172 assert _run_cephadm.mock_calls == [
1173 mock.call('test', 'osd', 'ceph-volume',
1174 ['--', 'inventory', '--format=json', '--filter-for-batch'], image='',
1175 no_fsid=False),
1176 mock.call('test', 'osd', 'ceph-volume',
1177 ['--', 'inventory', '--format=json'], image='',
1178 no_fsid=False),
1179 ]
1180
1181 @mock.patch("cephadm.serve.CephadmServe._run_cephadm")
1182 def test_osd_activate(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
1183 _run_cephadm.return_value = ('{}', '', 0)
1184 with with_host(cephadm_module, 'test', refresh_hosts=False):
1185 cephadm_module.mock_store_set('_ceph_get', 'osd_map', {
1186 'osds': [
1187 {
1188 'osd': 1,
1189 'up_from': 0,
1190 'uuid': 'uuid'
1191 }
1192 ]
1193 })
1194
1195 ceph_volume_lvm_list = {
1196 '1': [{
1197 'tags': {
1198 'ceph.cluster_fsid': cephadm_module._cluster_fsid,
1199 'ceph.osd_fsid': 'uuid'
1200 }
1201 }]
1202 }
1203 _run_cephadm.return_value = (json.dumps(ceph_volume_lvm_list), '', 0)
1204 assert cephadm_module._osd_activate(
1205 ['test']).stdout == "Created osd(s) 1 on host 'test'"