]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/test_orchestrator/module.py
7 from subprocess
import check_output
, CalledProcessError
9 from mgr_module
import MgrModule
16 class TestCompletionMixin(object):
17 all_completions
= [] # Hacky global
19 def __init__(self
, cb
, message
, *args
, **kwargs
):
20 super(TestCompletionMixin
, self
).__init
__(*args
, **kwargs
)
23 self
._complete
= False
25 self
.message
= message
27 TestCompletionMixin
.all_completions
.append(self
)
34 def is_complete(self
):
38 self
._result
= self
.cb()
43 return "{}(result={} message={}, exception={})".format(self
.__class
__.__name
__, self
.result
,
44 self
.message
, self
.exception
)
47 class TestReadCompletion(TestCompletionMixin
, orchestrator
.ReadCompletion
):
48 def __init__(self
, cb
):
49 super(TestReadCompletion
, self
).__init
__(cb
, "<read op>")
52 class TestWriteCompletion(TestCompletionMixin
, orchestrator
.WriteCompletion
):
53 def __init__(self
, cb
, message
):
54 super(TestWriteCompletion
, self
).__init
__(cb
, message
)
55 self
.id = str(uuid
.uuid4())
58 def is_persistent(self
):
59 return (not self
.is_errored
) and self
.executed
62 def is_effective(self
):
66 def deferred_write(message
):
69 def inner(*args
, **kwargs
):
70 return TestWriteCompletion(lambda: f(*args
, **kwargs
),
71 '{}, args={}, kwargs={}'.format(message
, args
, kwargs
))
78 Decorator to make TestOrchestrator methods return
79 a completion object that executes themselves.
83 def wrapper(*args
, **kwargs
):
84 return TestReadCompletion(lambda: f(*args
, **kwargs
))
89 class TestOrchestrator(MgrModule
, orchestrator
.Orchestrator
):
91 This is an orchestrator implementation used for internal testing. It's meant for
92 development environments and integration testing.
94 It does not actually do anything.
96 The implementation is similar to the Rook orchestrator, but simpler.
98 def _progress(self
, *args
, **kwargs
):
100 self
.remote("progress", *args
, **kwargs
)
102 # If the progress module is disabled that's fine,
103 # they just won't see the output.
106 def wait(self
, completions
):
107 self
.log
.info("wait: completions={0}".format(completions
))
111 # Our `wait` implementation is very simple because everything's
113 for c
in completions
:
114 if not isinstance(c
, TestReadCompletion
) and \
115 not isinstance(c
, TestWriteCompletion
):
117 "wait() requires list of completions, not {0}".format(
125 self
._progress
("update", c
.id, c
.message
, 0.5)
129 except Exception as e
:
130 self
.log
.exception("Completion {0} threw an exception:".format(
136 self
._progress
("complete", c
.id)
140 self
._progress
("complete", c
.id)
142 if not c
.is_complete
:
145 return not incomplete
150 def __init__(self
, *args
, **kwargs
):
151 super(TestOrchestrator
, self
).__init
__(*args
, **kwargs
)
153 self
._initialized
= threading
.Event()
154 self
._shutdown
= threading
.Event()
161 self
._initialized
.set()
163 while not self
._shutdown
.is_set():
164 # XXX hack (or is it?) to kick all completions periodically,
165 # in case we had a caller that wait()'ed on them long enough
166 # to get persistence but not long enough to get completion
168 self
.wait(TestCompletionMixin
.all_completions
)
169 TestCompletionMixin
.all_completions
= [c
for c
in TestCompletionMixin
.all_completions
if
172 self
._shutdown
.wait(5)
175 def get_inventory(self
, node_filter
=None, refresh
=False):
177 There is no guarantee which devices are returned by get_inventory.
179 if node_filter
and node_filter
.nodes
is not None:
180 assert isinstance(node_filter
.nodes
, list)
182 c_v_out
= check_output(['ceph-volume', 'inventory', '--format', 'json'])
185 . {tmpdir}/ceph-volume-virtualenv/bin/activate
186 ceph-volume inventory --format json
189 c_v_out
= check_output(cmd
.format(tmpdir
=os
.environ
.get('TMPDIR', '/tmp')), shell
=True)
190 except (OSError, CalledProcessError
):
191 c_v_out
= check_output(cmd
.format(tmpdir
='.'),shell
=True)
193 for out
in c_v_out
.splitlines():
194 if not out
.startswith(b
'-->') and not out
.startswith(b
' stderr'):
197 for device
in json
.loads(out
):
198 dev
= orchestrator
.InventoryDevice
.from_ceph_volume_inventory(device
)
200 return [orchestrator
.InventoryNode('localhost', devs
)]
201 self
.log
.error('c-v failed: ' + str(c_v_out
))
202 raise Exception('c-v failed')
205 def describe_service(self
, service_type
=None, service_id
=None, node_name
=None):
207 There is no guarantee which daemons are returned by describe_service, except that
208 it returns the mgr we're running in.
211 assert service_type
in ("mds", "osd", "mon", "rgw", "mgr"), service_type
+ " unsupported"
213 out
= map(str, check_output(['ps', 'aux']).splitlines())
214 types
= [service_type
] if service_type
else ("mds", "osd", "mon", "rgw", "mgr")
215 processes
= [p
for p
in out
if any([('ceph-' + t
in p
) for t
in types
])]
219 sd
= orchestrator
.ServiceDescription()
220 sd
.nodename
= 'localhost'
221 sd
.service_instance
= re
.search('ceph-[^ ]+', p
).group()
226 @deferred_write("Adding stateless service")
227 def add_stateless_service(self
, service_type
, spec
):
230 @deferred_write("create_osds")
231 def create_osds(self
, drive_group
, all_hosts
):
232 drive_group
.validate(all_hosts
)
234 @deferred_write("remove_osds")
235 def remove_osds(self
, osd_ids
):
236 assert isinstance(osd_ids
, list)
238 @deferred_write("service_action")
239 def service_action(self
, action
, service_type
, service_name
=None, service_id
=None):
242 @deferred_write("remove_stateless_service")
243 def remove_stateless_service(self
, service_type
, id_
):
246 @deferred_write("update_stateless_service")
247 def update_stateless_service(self
, service_type
, spec
):
252 return [orchestrator
.InventoryNode('localhost', [])]
254 @deferred_write("add_host")
255 def add_host(self
, host
):
256 if host
== 'raise_no_support':
257 raise orchestrator
.OrchestratorValidationError("MON count must be either 1, 3 or 5")
258 if host
== 'raise_bug':
259 raise ZeroDivisionError()
260 if host
== 'raise_not_implemented':
261 raise NotImplementedError()
262 if host
== 'raise_no_orchestrator':
263 raise orchestrator
.NoOrchestrator()
264 if host
== 'raise_import_error':
265 raise ImportError("test_orchestrator not enabled")
266 assert isinstance(host
, str)
268 @deferred_write("remove_host")
269 def remove_host(self
, host
):
270 assert isinstance(host
, str)
272 @deferred_write("update_mgrs")
273 def update_mgrs(self
, num
, hosts
):
274 assert not hosts
or len(hosts
) == num
275 assert all([isinstance(h
, str) for h
in hosts
])
277 @deferred_write("update_mons")
278 def update_mons(self
, num
, hosts
):
279 assert not hosts
or len(hosts
) == num
280 assert all([isinstance(h
[0], str) for h
in hosts
])
281 assert all([isinstance(h
[1], str) or h
[1] is None for h
in hosts
])