]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/test_orchestrator/module.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / pybind / mgr / test_orchestrator / module.py
CommitLineData
11fdf7f2
TL
1import json
2import re
3import os
4import threading
5import functools
6import uuid
7from subprocess import check_output, CalledProcessError
8
494da23a 9from mgr_module import MgrModule, PersistentStoreDict
11fdf7f2
TL
10
11import orchestrator
12
13
14
15
16class TestCompletionMixin(object):
17 all_completions = [] # Hacky global
18
19 def __init__(self, cb, message, *args, **kwargs):
20 super(TestCompletionMixin, self).__init__(*args, **kwargs)
21 self.cb = cb
22 self._result = None
23 self._complete = False
24
25 self.message = message
81eedcae 26 self.id = str(uuid.uuid4())
11fdf7f2
TL
27
28 TestCompletionMixin.all_completions.append(self)
29
30 @property
31 def result(self):
32 return self._result
33
34 @property
35 def is_complete(self):
36 return self._complete
37
38 def execute(self):
39 self._result = self.cb()
40 self.executed = True
41 self._complete = True
42
43 def __str__(self):
44 return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result,
45 self.message, self.exception)
46
47
48class TestReadCompletion(TestCompletionMixin, orchestrator.ReadCompletion):
49 def __init__(self, cb):
50 super(TestReadCompletion, self).__init__(cb, "<read op>")
51
52
53class TestWriteCompletion(TestCompletionMixin, orchestrator.WriteCompletion):
54 def __init__(self, cb, message):
55 super(TestWriteCompletion, self).__init__(cb, message)
11fdf7f2
TL
56
57 @property
58 def is_persistent(self):
59 return (not self.is_errored) and self.executed
60
61 @property
62 def is_effective(self):
63 return self._complete
64
65
66def deferred_write(message):
67 def wrapper(f):
68 @functools.wraps(f)
69 def inner(*args, **kwargs):
70 return TestWriteCompletion(lambda: f(*args, **kwargs),
71 '{}, args={}, kwargs={}'.format(message, args, kwargs))
72 return inner
73 return wrapper
74
75
76def deferred_read(f):
77 """
78 Decorator to make TestOrchestrator methods return
79 a completion object that executes themselves.
80 """
81
82 @functools.wraps(f)
83 def wrapper(*args, **kwargs):
84 return TestReadCompletion(lambda: f(*args, **kwargs))
85
86 return wrapper
87
88
89class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
90 """
91 This is an orchestrator implementation used for internal testing. It's meant for
92 development environments and integration testing.
93
94 It does not actually do anything.
95
96 The implementation is similar to the Rook orchestrator, but simpler.
97 """
11fdf7f2
TL
98
99 def wait(self, completions):
100 self.log.info("wait: completions={0}".format(completions))
101
11fdf7f2
TL
102 # Our `wait` implementation is very simple because everything's
103 # just an API call.
104 for c in completions:
105 if not isinstance(c, TestReadCompletion) and \
106 not isinstance(c, TestWriteCompletion):
107 raise TypeError(
108 "wait() requires list of completions, not {0}".format(
109 c.__class__
110 ))
111
112 if c.is_complete:
113 continue
114
11fdf7f2
TL
115 try:
116 c.execute()
117 except Exception as e:
118 self.log.exception("Completion {0} threw an exception:".format(
119 c.message
120 ))
121 c.exception = e
122 c._complete = True
11fdf7f2 123
81eedcae 124 return all(c.is_complete for c in completions)
11fdf7f2
TL
125
126 def available(self):
127 return True, ""
128
129 def __init__(self, *args, **kwargs):
130 super(TestOrchestrator, self).__init__(*args, **kwargs)
131
132 self._initialized = threading.Event()
133 self._shutdown = threading.Event()
134
135 def shutdown(self):
136 self._shutdown.set()
137
138 def serve(self):
139
140 self._initialized.set()
141
142 while not self._shutdown.is_set():
143 # XXX hack (or is it?) to kick all completions periodically,
144 # in case we had a caller that wait()'ed on them long enough
145 # to get persistence but not long enough to get completion
146
147 self.wait(TestCompletionMixin.all_completions)
148 TestCompletionMixin.all_completions = [c for c in TestCompletionMixin.all_completions if
149 not c.is_complete]
150
151 self._shutdown.wait(5)
152
153 @deferred_read
154 def get_inventory(self, node_filter=None, refresh=False):
155 """
156 There is no guarantee which devices are returned by get_inventory.
157 """
158 if node_filter and node_filter.nodes is not None:
159 assert isinstance(node_filter.nodes, list)
160 try:
161 c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json'])
162 except OSError:
163 cmd = """
164 . {tmpdir}/ceph-volume-virtualenv/bin/activate
165 ceph-volume inventory --format json
166 """
167 try:
168 c_v_out = check_output(cmd.format(tmpdir=os.environ.get('TMPDIR', '/tmp')), shell=True)
169 except (OSError, CalledProcessError):
170 c_v_out = check_output(cmd.format(tmpdir='.'),shell=True)
171
172 for out in c_v_out.splitlines():
494da23a
TL
173 self.log.error(out)
174 devs = []
175 for device in json.loads(out):
176 dev = orchestrator.InventoryDevice.from_ceph_volume_inventory(device)
177 devs.append(dev)
178 return [orchestrator.InventoryNode('localhost', devs)]
11fdf7f2
TL
179 self.log.error('c-v failed: ' + str(c_v_out))
180 raise Exception('c-v failed')
181
182 @deferred_read
494da23a 183 def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
11fdf7f2
TL
184 """
185 There is no guarantee which daemons are returned by describe_service, except that
186 it returns the mgr we're running in.
187 """
188 if service_type:
189 assert service_type in ("mds", "osd", "mon", "rgw", "mgr"), service_type + " unsupported"
190
191 out = map(str, check_output(['ps', 'aux']).splitlines())
192 types = [service_type] if service_type else ("mds", "osd", "mon", "rgw", "mgr")
193 processes = [p for p in out if any([('ceph-' + t in p) for t in types])]
194
195 result = []
196 for p in processes:
197 sd = orchestrator.ServiceDescription()
198 sd.nodename = 'localhost'
199 sd.service_instance = re.search('ceph-[^ ]+', p).group()
200 result.append(sd)
201
202 return result
203
204 @deferred_write("Adding stateless service")
205 def add_stateless_service(self, service_type, spec):
206 pass
207
208 @deferred_write("create_osds")
209 def create_osds(self, drive_group, all_hosts):
210 drive_group.validate(all_hosts)
211
212 @deferred_write("remove_osds")
213 def remove_osds(self, osd_ids):
214 assert isinstance(osd_ids, list)
215
216 @deferred_write("service_action")
217 def service_action(self, action, service_type, service_name=None, service_id=None):
218 pass
219
220 @deferred_write("remove_stateless_service")
221 def remove_stateless_service(self, service_type, id_):
222 pass
223
224 @deferred_write("update_stateless_service")
225 def update_stateless_service(self, service_type, spec):
226 pass
227
228 @deferred_read
229 def get_hosts(self):
230 return [orchestrator.InventoryNode('localhost', [])]
231
232 @deferred_write("add_host")
233 def add_host(self, host):
234 if host == 'raise_no_support':
235 raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
236 if host == 'raise_bug':
237 raise ZeroDivisionError()
238 if host == 'raise_not_implemented':
239 raise NotImplementedError()
240 if host == 'raise_no_orchestrator':
241 raise orchestrator.NoOrchestrator()
242 if host == 'raise_import_error':
243 raise ImportError("test_orchestrator not enabled")
244 assert isinstance(host, str)
245
246 @deferred_write("remove_host")
247 def remove_host(self, host):
248 assert isinstance(host, str)
249
250 @deferred_write("update_mgrs")
251 def update_mgrs(self, num, hosts):
252 assert not hosts or len(hosts) == num
253 assert all([isinstance(h, str) for h in hosts])
254
255 @deferred_write("update_mons")
256 def update_mons(self, num, hosts):
257 assert not hosts or len(hosts) == num
258 assert all([isinstance(h[0], str) for h in hosts])
259 assert all([isinstance(h[1], str) or h[1] is None for h in hosts])