]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/tests/fixtures.py
4 from tempfile
import NamedTemporaryFile
5 from contextlib
import contextmanager
7 from ceph
.deployment
.service_spec
import PlacementSpec
, ServiceSpec
8 from ceph
.utils
import datetime_to_str
, datetime_now
9 from cephadm
.serve
import CephadmServe
, cephadmNoImage
12 from typing
import Any
, Iterator
, List
, Callable
, Dict
16 from cephadm
import CephadmOrchestrator
17 from orchestrator
import raise_if_exception
, OrchResult
, HostSpec
, DaemonDescriptionStatus
18 from tests
import mock
21 def async_side_effect(result
):
22 async def side_effect(*args
, **kwargs
):
27 def get_ceph_option(_
, key
):
31 def _run_cephadm(ret
):
32 async def foo(s
, host
, entity
, cmd
, e
, **kwargs
):
33 if cmd
== 'gather-facts':
39 def match_glob(val
, pat
):
40 ok
= fnmatch
.fnmatchcase(val
, pat
)
45 class MockEventLoopThread
:
46 def get_result(self
, coro
):
47 if sys
.version_info
>= (3, 7):
48 return asyncio
.run(coro
)
50 loop
= asyncio
.new_event_loop()
51 asyncio
.set_event_loop(loop
)
53 return loop
.run_until_complete(coro
)
56 asyncio
.set_event_loop(None)
59 def receive_agent_metadata(m
: CephadmOrchestrator
, host
: str, ops
: List
[str] = None) -> None:
60 to_update
: Dict
[str, Callable
[[str, Any
], None]] = {
61 'ls': m
._process
_ls
_output
,
62 'gather-facts': m
.cache
.update_host_facts
,
63 'list-networks': m
.cache
.update_host_networks
,
67 out
= m
.wait_async(CephadmServe(m
)._run
_cephadm
_json
(host
, cephadmNoImage
, op
, []))
68 to_update
[op
](host
, out
)
69 m
.cache
.last_daemon_update
[host
] = datetime_now()
70 m
.cache
.last_facts_update
[host
] = datetime_now()
71 m
.cache
.last_network_update
[host
] = datetime_now()
72 m
.cache
.metadata_up_to_date
[host
] = True
75 def receive_agent_metadata_all_hosts(m
: CephadmOrchestrator
) -> None:
76 for host
in m
.cache
.get_hosts():
77 receive_agent_metadata(m
, host
)
81 def with_cephadm_module(module_options
=None, store
=None):
83 :param module_options: Set opts as if they were set before module.__init__ is called
84 :param store: Set the store before module.__init__ is called
86 with mock
.patch("cephadm.module.CephadmOrchestrator.get_ceph_option", get_ceph_option
),\
87 mock
.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
88 mock
.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
89 mock
.patch("cephadm.module.CephadmOrchestrator.remote"), \
90 mock
.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \
91 mock
.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value
=False), \
92 mock
.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value
=False), \
93 mock
.patch('cephadm.agent.CherryPyThread.run'):
95 m
= CephadmOrchestrator
.__new
__(CephadmOrchestrator
)
96 if module_options
is not None:
97 for k
, v
in module_options
.items():
98 m
._ceph
_set
_module
_option
('cephadm', k
, v
)
101 if '_ceph_get/mon_map' not in store
:
102 m
.mock_store_set('_ceph_get', 'mon_map', {
103 'modified': datetime_to_str(datetime_now()),
106 if '_ceph_get/mgr_map' not in store
:
107 m
.mock_store_set('_ceph_get', 'mgr_map', {
109 'dashboard': 'http://[::1]:8080',
110 'prometheus': 'http://[::1]:8081'
112 'modules': ['dashboard', 'prometheus'],
114 for k
, v
in store
.items():
115 m
._ceph
_set
_store
(k
, v
)
117 m
.__init
__('cephadm', 0, 0)
118 m
._cluster
_fsid
= "fsid"
120 m
.event_loop
= MockEventLoopThread()
121 m
.tkey
= NamedTemporaryFile(prefix
='test-cephadm-identity-')
127 # type: (CephadmOrchestrator, OrchResult) -> Any
128 return raise_if_exception(c
)
132 def with_host(m
: CephadmOrchestrator
, name
, addr
='1::4', refresh_hosts
=True, rm_with_force
=True):
133 with mock
.patch("cephadm.utils.resolve_ip", return_value
=addr
):
134 wait(m
, m
.add_host(HostSpec(hostname
=name
)))
136 CephadmServe(m
)._refresh
_hosts
_and
_daemons
()
137 receive_agent_metadata(m
, name
)
139 wait(m
, m
.remove_host(name
, force
=rm_with_force
))
142 def assert_rm_service(cephadm
: CephadmOrchestrator
, srv_name
):
143 mon_or_mgr
= cephadm
.spec_store
[srv_name
].spec
.service_type
in ('mon', 'mgr')
145 assert 'Unable' in wait(cephadm
, cephadm
.remove_service(srv_name
))
147 assert wait(cephadm
, cephadm
.remove_service(srv_name
)) == f
'Removed service {srv_name}'
148 assert cephadm
.spec_store
[srv_name
].deleted
is not None
149 CephadmServe(cephadm
)._check
_daemons
()
150 CephadmServe(cephadm
)._apply
_all
_services
()
151 assert cephadm
.spec_store
[srv_name
].deleted
152 unmanaged
= cephadm
.spec_store
[srv_name
].spec
.unmanaged
153 CephadmServe(cephadm
)._purge
_deleted
_services
()
154 if not unmanaged
: # cause then we're not deleting daemons
155 assert srv_name
not in cephadm
.spec_store
, f
'{cephadm.spec_store[srv_name]!r}'
159 def with_service(cephadm_module
: CephadmOrchestrator
, spec
: ServiceSpec
, meth
=None, host
: str = '', status_running
=False) -> Iterator
[List
[str]]:
160 if spec
.placement
.is_empty() and host
:
161 spec
.placement
= PlacementSpec(hosts
=[host
], count
=1)
163 c
= meth(cephadm_module
, spec
)
164 assert wait(cephadm_module
, c
) == f
'Scheduled {spec.service_name()} update...'
166 c
= cephadm_module
.apply([spec
])
167 assert wait(cephadm_module
, c
) == [f
'Scheduled {spec.service_name()} update...']
169 specs
= [d
.spec
for d
in wait(cephadm_module
, cephadm_module
.describe_service())]
172 CephadmServe(cephadm_module
)._apply
_all
_services
()
175 make_daemons_running(cephadm_module
, spec
.service_name())
177 dds
= wait(cephadm_module
, cephadm_module
.list_daemons())
178 own_dds
= [dd
for dd
in dds
if dd
.service_name() == spec
.service_name()]
179 if host
and spec
.service_type
!= 'osd':
182 yield [dd
.name() for dd
in own_dds
]
184 assert_rm_service(cephadm_module
, spec
.service_name())
187 def make_daemons_running(cephadm_module
, service_name
):
188 own_dds
= cephadm_module
.cache
.get_daemons_by_service(service_name
)
190 dd
.status
= DaemonDescriptionStatus
.running
# We're changing the reference