]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/tests/fixtures.py
2 from contextlib
import contextmanager
4 from ceph
.deployment
.service_spec
import PlacementSpec
, ServiceSpec
5 from ceph
.utils
import datetime_to_str
, datetime_now
6 from cephadm
.serve
import CephadmServe
9 from typing
import Any
, Iterator
, List
13 from cephadm
import CephadmOrchestrator
14 from orchestrator
import raise_if_exception
, OrchResult
, HostSpec
15 from tests
import mock
18 def get_ceph_option(_
, key
):
22 def _run_cephadm(ret
):
23 def foo(s
, host
, entity
, cmd
, e
, **kwargs
):
24 if cmd
== 'gather-facts':
30 def match_glob(val
, pat
):
31 ok
= fnmatch
.fnmatchcase(val
, pat
)
37 def with_cephadm_module(module_options
=None, store
=None):
39 :param module_options: Set opts as if they were set before module.__init__ is called
40 :param store: Set the store before module.__init__ is called
42 with mock
.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option
),\
43 mock
.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
44 mock
.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
45 mock
.patch("cephadm.services.osd.OSDService.get_osdspec_affinity", return_value
='test_spec'), \
46 mock
.patch("cephadm.module.CephadmOrchestrator.remote"):
48 m
= CephadmOrchestrator
.__new
__(CephadmOrchestrator
)
49 if module_options
is not None:
50 for k
, v
in module_options
.items():
51 m
._ceph
_set
_module
_option
('cephadm', k
, v
)
54 if '_ceph_get/mon_map' not in store
:
55 m
.mock_store_set('_ceph_get', 'mon_map', {
56 'modified': datetime_to_str(datetime_now()),
59 for k
, v
in store
.items():
60 m
._ceph
_set
_store
(k
, v
)
62 m
.__init
__('cephadm', 0, 0)
63 m
._cluster
_fsid
= "fsid"
68 # type: (CephadmOrchestrator, OrchResult) -> Any
69 return raise_if_exception(c
)
73 def with_host(m
: CephadmOrchestrator
, name
, addr
='1.2.3.4', refresh_hosts
=True):
74 # type: (CephadmOrchestrator, str) -> None
75 with mock
.patch("cephadm.utils.resolve_ip", return_value
=addr
):
76 wait(m
, m
.add_host(HostSpec(hostname
=name
)))
78 CephadmServe(m
)._refresh
_hosts
_and
_daemons
()
80 wait(m
, m
.remove_host(name
))
83 def assert_rm_service(cephadm
: CephadmOrchestrator
, srv_name
):
84 mon_or_mgr
= cephadm
.spec_store
[srv_name
].spec
.service_type
in ('mon', 'mgr')
86 assert 'Unable' in wait(cephadm
, cephadm
.remove_service(srv_name
))
88 assert wait(cephadm
, cephadm
.remove_service(srv_name
)) == f
'Removed service {srv_name}'
89 assert cephadm
.spec_store
[srv_name
].deleted
is not None
90 CephadmServe(cephadm
)._check
_daemons
()
91 CephadmServe(cephadm
)._apply
_all
_services
()
92 assert cephadm
.spec_store
[srv_name
].deleted
93 unmanaged
= cephadm
.spec_store
[srv_name
].spec
.unmanaged
94 CephadmServe(cephadm
)._purge
_deleted
_services
()
95 if not unmanaged
: # cause then we're not deleting daemons
96 assert srv_name
not in cephadm
.spec_store
, f
'{cephadm.spec_store[srv_name]!r}'
100 def with_service(cephadm_module
: CephadmOrchestrator
, spec
: ServiceSpec
, meth
=None, host
: str = '') -> Iterator
[List
[str]]:
101 if spec
.placement
.is_empty() and host
:
102 spec
.placement
= PlacementSpec(hosts
=[host
], count
=1)
104 c
= meth(cephadm_module
, spec
)
105 assert wait(cephadm_module
, c
) == f
'Scheduled {spec.service_name()} update...'
107 c
= cephadm_module
.apply([spec
])
108 assert wait(cephadm_module
, c
) == [f
'Scheduled {spec.service_name()} update...']
110 specs
= [d
.spec
for d
in wait(cephadm_module
, cephadm_module
.describe_service())]
113 CephadmServe(cephadm_module
)._apply
_all
_services
()
115 dds
= wait(cephadm_module
, cephadm_module
.list_daemons())
116 own_dds
= [dd
for dd
in dds
if dd
.service_name() == spec
.service_name()]
120 yield [dd
.name() for dd
in own_dds
]
122 assert_rm_service(cephadm_module
, spec
.service_name())
125 def _deploy_cephadm_binary(host
):
126 def foo(*args
, **kwargs
):