]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/test_orchestrator/module.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / pybind / mgr / test_orchestrator / module.py
1 import json
2 import re
3 import os
4 import threading
5 import functools
6 import uuid
7 from subprocess import check_output, CalledProcessError
8
9 from mgr_module import MgrModule
10
11 import orchestrator
12
13
14
15
16 class TestCompletionMixin(object):
17 all_completions = [] # Hacky global
18
19 def __init__(self, cb, message, *args, **kwargs):
20 super(TestCompletionMixin, self).__init__(*args, **kwargs)
21 self.cb = cb
22 self._result = None
23 self._complete = False
24
25 self.message = message
26
27 TestCompletionMixin.all_completions.append(self)
28
29 @property
30 def result(self):
31 return self._result
32
33 @property
34 def is_complete(self):
35 return self._complete
36
37 def execute(self):
38 self._result = self.cb()
39 self.executed = True
40 self._complete = True
41
42 def __str__(self):
43 return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result,
44 self.message, self.exception)
45
46
47 class TestReadCompletion(TestCompletionMixin, orchestrator.ReadCompletion):
48 def __init__(self, cb):
49 super(TestReadCompletion, self).__init__(cb, "<read op>")
50
51
52 class TestWriteCompletion(TestCompletionMixin, orchestrator.WriteCompletion):
53 def __init__(self, cb, message):
54 super(TestWriteCompletion, self).__init__(cb, message)
55 self.id = str(uuid.uuid4())
56
57 @property
58 def is_persistent(self):
59 return (not self.is_errored) and self.executed
60
61 @property
62 def is_effective(self):
63 return self._complete
64
65
66 def deferred_write(message):
67 def wrapper(f):
68 @functools.wraps(f)
69 def inner(*args, **kwargs):
70 return TestWriteCompletion(lambda: f(*args, **kwargs),
71 '{}, args={}, kwargs={}'.format(message, args, kwargs))
72 return inner
73 return wrapper
74
75
76 def deferred_read(f):
77 """
78 Decorator to make TestOrchestrator methods return
79 a completion object that executes themselves.
80 """
81
82 @functools.wraps(f)
83 def wrapper(*args, **kwargs):
84 return TestReadCompletion(lambda: f(*args, **kwargs))
85
86 return wrapper
87
88
89 class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
90 """
91 This is an orchestrator implementation used for internal testing. It's meant for
92 development environments and integration testing.
93
94 It does not actually do anything.
95
96 The implementation is similar to the Rook orchestrator, but simpler.
97 """
98 def _progress(self, *args, **kwargs):
99 try:
100 self.remote("progress", *args, **kwargs)
101 except ImportError:
102 # If the progress module is disabled that's fine,
103 # they just won't see the output.
104 pass
105
106 def wait(self, completions):
107 self.log.info("wait: completions={0}".format(completions))
108
109 incomplete = False
110
111 # Our `wait` implementation is very simple because everything's
112 # just an API call.
113 for c in completions:
114 if not isinstance(c, TestReadCompletion) and \
115 not isinstance(c, TestWriteCompletion):
116 raise TypeError(
117 "wait() requires list of completions, not {0}".format(
118 c.__class__
119 ))
120
121 if c.is_complete:
122 continue
123
124 if not c.is_read:
125 self._progress("update", c.id, c.message, 0.5)
126
127 try:
128 c.execute()
129 except Exception as e:
130 self.log.exception("Completion {0} threw an exception:".format(
131 c.message
132 ))
133 c.exception = e
134 c._complete = True
135 if not c.is_read:
136 self._progress("complete", c.id)
137 else:
138 if c.is_complete:
139 if not c.is_read:
140 self._progress("complete", c.id)
141
142 if not c.is_complete:
143 incomplete = True
144
145 return not incomplete
146
147 def available(self):
148 return True, ""
149
150 def __init__(self, *args, **kwargs):
151 super(TestOrchestrator, self).__init__(*args, **kwargs)
152
153 self._initialized = threading.Event()
154 self._shutdown = threading.Event()
155
156 def shutdown(self):
157 self._shutdown.set()
158
159 def serve(self):
160
161 self._initialized.set()
162
163 while not self._shutdown.is_set():
164 # XXX hack (or is it?) to kick all completions periodically,
165 # in case we had a caller that wait()'ed on them long enough
166 # to get persistence but not long enough to get completion
167
168 self.wait(TestCompletionMixin.all_completions)
169 TestCompletionMixin.all_completions = [c for c in TestCompletionMixin.all_completions if
170 not c.is_complete]
171
172 self._shutdown.wait(5)
173
174 @deferred_read
175 def get_inventory(self, node_filter=None, refresh=False):
176 """
177 There is no guarantee which devices are returned by get_inventory.
178 """
179 if node_filter and node_filter.nodes is not None:
180 assert isinstance(node_filter.nodes, list)
181 try:
182 c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json'])
183 except OSError:
184 cmd = """
185 . {tmpdir}/ceph-volume-virtualenv/bin/activate
186 ceph-volume inventory --format json
187 """
188 try:
189 c_v_out = check_output(cmd.format(tmpdir=os.environ.get('TMPDIR', '/tmp')), shell=True)
190 except (OSError, CalledProcessError):
191 c_v_out = check_output(cmd.format(tmpdir='.'),shell=True)
192
193 for out in c_v_out.splitlines():
194 if not out.startswith(b'-->') and not out.startswith(b' stderr'):
195 self.log.error(out)
196 devs = []
197 for device in json.loads(out):
198 dev = orchestrator.InventoryDevice.from_ceph_volume_inventory(device)
199 devs.append(dev)
200 return [orchestrator.InventoryNode('localhost', devs)]
201 self.log.error('c-v failed: ' + str(c_v_out))
202 raise Exception('c-v failed')
203
204 @deferred_read
205 def describe_service(self, service_type=None, service_id=None, node_name=None):
206 """
207 There is no guarantee which daemons are returned by describe_service, except that
208 it returns the mgr we're running in.
209 """
210 if service_type:
211 assert service_type in ("mds", "osd", "mon", "rgw", "mgr"), service_type + " unsupported"
212
213 out = map(str, check_output(['ps', 'aux']).splitlines())
214 types = [service_type] if service_type else ("mds", "osd", "mon", "rgw", "mgr")
215 processes = [p for p in out if any([('ceph-' + t in p) for t in types])]
216
217 result = []
218 for p in processes:
219 sd = orchestrator.ServiceDescription()
220 sd.nodename = 'localhost'
221 sd.service_instance = re.search('ceph-[^ ]+', p).group()
222 result.append(sd)
223
224 return result
225
226 @deferred_write("Adding stateless service")
227 def add_stateless_service(self, service_type, spec):
228 pass
229
230 @deferred_write("create_osds")
231 def create_osds(self, drive_group, all_hosts):
232 drive_group.validate(all_hosts)
233
234 @deferred_write("remove_osds")
235 def remove_osds(self, osd_ids):
236 assert isinstance(osd_ids, list)
237
238 @deferred_write("service_action")
239 def service_action(self, action, service_type, service_name=None, service_id=None):
240 pass
241
242 @deferred_write("remove_stateless_service")
243 def remove_stateless_service(self, service_type, id_):
244 pass
245
246 @deferred_write("update_stateless_service")
247 def update_stateless_service(self, service_type, spec):
248 pass
249
250 @deferred_read
251 def get_hosts(self):
252 return [orchestrator.InventoryNode('localhost', [])]
253
254 @deferred_write("add_host")
255 def add_host(self, host):
256 if host == 'raise_no_support':
257 raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
258 if host == 'raise_bug':
259 raise ZeroDivisionError()
260 if host == 'raise_not_implemented':
261 raise NotImplementedError()
262 if host == 'raise_no_orchestrator':
263 raise orchestrator.NoOrchestrator()
264 if host == 'raise_import_error':
265 raise ImportError("test_orchestrator not enabled")
266 assert isinstance(host, str)
267
268 @deferred_write("remove_host")
269 def remove_host(self, host):
270 assert isinstance(host, str)
271
272 @deferred_write("update_mgrs")
273 def update_mgrs(self, num, hosts):
274 assert not hosts or len(hosts) == num
275 assert all([isinstance(h, str) for h in hosts])
276
277 @deferred_write("update_mons")
278 def update_mons(self, num, hosts):
279 assert not hosts or len(hosts) == num
280 assert all([isinstance(h[0], str) for h in hosts])
281 assert all([isinstance(h[1], str) or h[1] is None for h in hosts])