]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/test_orchestrator/module.py
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / pybind / mgr / test_orchestrator / module.py
CommitLineData
11fdf7f2
TL
1import json
2import re
3import os
4import threading
5import functools
6import uuid
7from subprocess import check_output, CalledProcessError
8
9from mgr_module import MgrModule
10
11import orchestrator
12
13
14
15
16class TestCompletionMixin(object):
17 all_completions = [] # Hacky global
18
19 def __init__(self, cb, message, *args, **kwargs):
20 super(TestCompletionMixin, self).__init__(*args, **kwargs)
21 self.cb = cb
22 self._result = None
23 self._complete = False
24
25 self.message = message
81eedcae 26 self.id = str(uuid.uuid4())
11fdf7f2
TL
27
28 TestCompletionMixin.all_completions.append(self)
29
30 @property
31 def result(self):
32 return self._result
33
34 @property
35 def is_complete(self):
36 return self._complete
37
38 def execute(self):
39 self._result = self.cb()
40 self.executed = True
41 self._complete = True
42
43 def __str__(self):
44 return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result,
45 self.message, self.exception)
46
47
48class TestReadCompletion(TestCompletionMixin, orchestrator.ReadCompletion):
49 def __init__(self, cb):
50 super(TestReadCompletion, self).__init__(cb, "<read op>")
51
52
53class TestWriteCompletion(TestCompletionMixin, orchestrator.WriteCompletion):
54 def __init__(self, cb, message):
55 super(TestWriteCompletion, self).__init__(cb, message)
11fdf7f2
TL
56
57 @property
58 def is_persistent(self):
59 return (not self.is_errored) and self.executed
60
61 @property
62 def is_effective(self):
63 return self._complete
64
65
66def deferred_write(message):
67 def wrapper(f):
68 @functools.wraps(f)
69 def inner(*args, **kwargs):
70 return TestWriteCompletion(lambda: f(*args, **kwargs),
71 '{}, args={}, kwargs={}'.format(message, args, kwargs))
72 return inner
73 return wrapper
74
75
76def deferred_read(f):
77 """
78 Decorator to make TestOrchestrator methods return
79 a completion object that executes themselves.
80 """
81
82 @functools.wraps(f)
83 def wrapper(*args, **kwargs):
84 return TestReadCompletion(lambda: f(*args, **kwargs))
85
86 return wrapper
87
88
89class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
90 """
91 This is an orchestrator implementation used for internal testing. It's meant for
92 development environments and integration testing.
93
94 It does not actually do anything.
95
96 The implementation is similar to the Rook orchestrator, but simpler.
97 """
11fdf7f2
TL
98
99 def wait(self, completions):
100 self.log.info("wait: completions={0}".format(completions))
101
11fdf7f2
TL
102 # Our `wait` implementation is very simple because everything's
103 # just an API call.
104 for c in completions:
105 if not isinstance(c, TestReadCompletion) and \
106 not isinstance(c, TestWriteCompletion):
107 raise TypeError(
108 "wait() requires list of completions, not {0}".format(
109 c.__class__
110 ))
111
112 if c.is_complete:
113 continue
114
11fdf7f2
TL
115 try:
116 c.execute()
117 except Exception as e:
118 self.log.exception("Completion {0} threw an exception:".format(
119 c.message
120 ))
121 c.exception = e
122 c._complete = True
11fdf7f2 123
81eedcae 124 return all(c.is_complete for c in completions)
11fdf7f2
TL
125
126 def available(self):
127 return True, ""
128
129 def __init__(self, *args, **kwargs):
130 super(TestOrchestrator, self).__init__(*args, **kwargs)
131
132 self._initialized = threading.Event()
133 self._shutdown = threading.Event()
134
135 def shutdown(self):
136 self._shutdown.set()
137
138 def serve(self):
139
140 self._initialized.set()
141
142 while not self._shutdown.is_set():
143 # XXX hack (or is it?) to kick all completions periodically,
144 # in case we had a caller that wait()'ed on them long enough
145 # to get persistence but not long enough to get completion
146
147 self.wait(TestCompletionMixin.all_completions)
148 TestCompletionMixin.all_completions = [c for c in TestCompletionMixin.all_completions if
149 not c.is_complete]
150
151 self._shutdown.wait(5)
152
153 @deferred_read
154 def get_inventory(self, node_filter=None, refresh=False):
155 """
156 There is no guarantee which devices are returned by get_inventory.
157 """
158 if node_filter and node_filter.nodes is not None:
159 assert isinstance(node_filter.nodes, list)
160 try:
161 c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json'])
162 except OSError:
163 cmd = """
164 . {tmpdir}/ceph-volume-virtualenv/bin/activate
165 ceph-volume inventory --format json
166 """
167 try:
168 c_v_out = check_output(cmd.format(tmpdir=os.environ.get('TMPDIR', '/tmp')), shell=True)
169 except (OSError, CalledProcessError):
170 c_v_out = check_output(cmd.format(tmpdir='.'),shell=True)
171
172 for out in c_v_out.splitlines():
173 if not out.startswith(b'-->') and not out.startswith(b' stderr'):
174 self.log.error(out)
175 devs = []
176 for device in json.loads(out):
177 dev = orchestrator.InventoryDevice.from_ceph_volume_inventory(device)
178 devs.append(dev)
179 return [orchestrator.InventoryNode('localhost', devs)]
180 self.log.error('c-v failed: ' + str(c_v_out))
181 raise Exception('c-v failed')
182
183 @deferred_read
184 def describe_service(self, service_type=None, service_id=None, node_name=None):
185 """
186 There is no guarantee which daemons are returned by describe_service, except that
187 it returns the mgr we're running in.
188 """
189 if service_type:
190 assert service_type in ("mds", "osd", "mon", "rgw", "mgr"), service_type + " unsupported"
191
192 out = map(str, check_output(['ps', 'aux']).splitlines())
193 types = [service_type] if service_type else ("mds", "osd", "mon", "rgw", "mgr")
194 processes = [p for p in out if any([('ceph-' + t in p) for t in types])]
195
196 result = []
197 for p in processes:
198 sd = orchestrator.ServiceDescription()
199 sd.nodename = 'localhost'
200 sd.service_instance = re.search('ceph-[^ ]+', p).group()
201 result.append(sd)
202
203 return result
204
205 @deferred_write("Adding stateless service")
206 def add_stateless_service(self, service_type, spec):
207 pass
208
209 @deferred_write("create_osds")
210 def create_osds(self, drive_group, all_hosts):
211 drive_group.validate(all_hosts)
212
213 @deferred_write("remove_osds")
214 def remove_osds(self, osd_ids):
215 assert isinstance(osd_ids, list)
216
217 @deferred_write("service_action")
218 def service_action(self, action, service_type, service_name=None, service_id=None):
219 pass
220
221 @deferred_write("remove_stateless_service")
222 def remove_stateless_service(self, service_type, id_):
223 pass
224
225 @deferred_write("update_stateless_service")
226 def update_stateless_service(self, service_type, spec):
227 pass
228
229 @deferred_read
230 def get_hosts(self):
231 return [orchestrator.InventoryNode('localhost', [])]
232
233 @deferred_write("add_host")
234 def add_host(self, host):
235 if host == 'raise_no_support':
236 raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
237 if host == 'raise_bug':
238 raise ZeroDivisionError()
239 if host == 'raise_not_implemented':
240 raise NotImplementedError()
241 if host == 'raise_no_orchestrator':
242 raise orchestrator.NoOrchestrator()
243 if host == 'raise_import_error':
244 raise ImportError("test_orchestrator not enabled")
245 assert isinstance(host, str)
246
247 @deferred_write("remove_host")
248 def remove_host(self, host):
249 assert isinstance(host, str)
250
251 @deferred_write("update_mgrs")
252 def update_mgrs(self, num, hosts):
253 assert not hosts or len(hosts) == num
254 assert all([isinstance(h, str) for h in hosts])
255
256 @deferred_write("update_mons")
257 def update_mons(self, num, hosts):
258 assert not hosts or len(hosts) == num
259 assert all([isinstance(h[0], str) for h in hosts])
260 assert all([isinstance(h[1], str) or h[1] is None for h in hosts])