]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | import json |
2 | import re | |
3 | import os | |
4 | import threading | |
5 | import functools | |
6 | import uuid | |
7 | from subprocess import check_output, CalledProcessError | |
8 | ||
9 | from mgr_module import MgrModule | |
10 | ||
11 | import orchestrator | |
12 | ||
13 | ||
14 | ||
15 | ||
16 | class TestCompletionMixin(object): | |
17 | all_completions = [] # Hacky global | |
18 | ||
19 | def __init__(self, cb, message, *args, **kwargs): | |
20 | super(TestCompletionMixin, self).__init__(*args, **kwargs) | |
21 | self.cb = cb | |
22 | self._result = None | |
23 | self._complete = False | |
24 | ||
25 | self.message = message | |
81eedcae | 26 | self.id = str(uuid.uuid4()) |
11fdf7f2 TL |
27 | |
28 | TestCompletionMixin.all_completions.append(self) | |
29 | ||
30 | @property | |
31 | def result(self): | |
32 | return self._result | |
33 | ||
34 | @property | |
35 | def is_complete(self): | |
36 | return self._complete | |
37 | ||
38 | def execute(self): | |
39 | self._result = self.cb() | |
40 | self.executed = True | |
41 | self._complete = True | |
42 | ||
43 | def __str__(self): | |
44 | return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result, | |
45 | self.message, self.exception) | |
46 | ||
47 | ||
48 | class TestReadCompletion(TestCompletionMixin, orchestrator.ReadCompletion): | |
49 | def __init__(self, cb): | |
50 | super(TestReadCompletion, self).__init__(cb, "<read op>") | |
51 | ||
52 | ||
53 | class TestWriteCompletion(TestCompletionMixin, orchestrator.WriteCompletion): | |
54 | def __init__(self, cb, message): | |
55 | super(TestWriteCompletion, self).__init__(cb, message) | |
11fdf7f2 TL |
56 | |
57 | @property | |
58 | def is_persistent(self): | |
59 | return (not self.is_errored) and self.executed | |
60 | ||
61 | @property | |
62 | def is_effective(self): | |
63 | return self._complete | |
64 | ||
65 | ||
66 | def deferred_write(message): | |
67 | def wrapper(f): | |
68 | @functools.wraps(f) | |
69 | def inner(*args, **kwargs): | |
70 | return TestWriteCompletion(lambda: f(*args, **kwargs), | |
71 | '{}, args={}, kwargs={}'.format(message, args, kwargs)) | |
72 | return inner | |
73 | return wrapper | |
74 | ||
75 | ||
76 | def deferred_read(f): | |
77 | """ | |
78 | Decorator to make TestOrchestrator methods return | |
79 | a completion object that executes themselves. | |
80 | """ | |
81 | ||
82 | @functools.wraps(f) | |
83 | def wrapper(*args, **kwargs): | |
84 | return TestReadCompletion(lambda: f(*args, **kwargs)) | |
85 | ||
86 | return wrapper | |
87 | ||
88 | ||
89 | class TestOrchestrator(MgrModule, orchestrator.Orchestrator): | |
90 | """ | |
91 | This is an orchestrator implementation used for internal testing. It's meant for | |
92 | development environments and integration testing. | |
93 | ||
94 | It does not actually do anything. | |
95 | ||
96 | The implementation is similar to the Rook orchestrator, but simpler. | |
97 | """ | |
11fdf7f2 TL |
98 | |
99 | def wait(self, completions): | |
100 | self.log.info("wait: completions={0}".format(completions)) | |
101 | ||
11fdf7f2 TL |
102 | # Our `wait` implementation is very simple because everything's |
103 | # just an API call. | |
104 | for c in completions: | |
105 | if not isinstance(c, TestReadCompletion) and \ | |
106 | not isinstance(c, TestWriteCompletion): | |
107 | raise TypeError( | |
108 | "wait() requires list of completions, not {0}".format( | |
109 | c.__class__ | |
110 | )) | |
111 | ||
112 | if c.is_complete: | |
113 | continue | |
114 | ||
11fdf7f2 TL |
115 | try: |
116 | c.execute() | |
117 | except Exception as e: | |
118 | self.log.exception("Completion {0} threw an exception:".format( | |
119 | c.message | |
120 | )) | |
121 | c.exception = e | |
122 | c._complete = True | |
11fdf7f2 | 123 | |
81eedcae | 124 | return all(c.is_complete for c in completions) |
11fdf7f2 TL |
125 | |
126 | def available(self): | |
127 | return True, "" | |
128 | ||
129 | def __init__(self, *args, **kwargs): | |
130 | super(TestOrchestrator, self).__init__(*args, **kwargs) | |
131 | ||
132 | self._initialized = threading.Event() | |
133 | self._shutdown = threading.Event() | |
134 | ||
135 | def shutdown(self): | |
136 | self._shutdown.set() | |
137 | ||
138 | def serve(self): | |
139 | ||
140 | self._initialized.set() | |
141 | ||
142 | while not self._shutdown.is_set(): | |
143 | # XXX hack (or is it?) to kick all completions periodically, | |
144 | # in case we had a caller that wait()'ed on them long enough | |
145 | # to get persistence but not long enough to get completion | |
146 | ||
147 | self.wait(TestCompletionMixin.all_completions) | |
148 | TestCompletionMixin.all_completions = [c for c in TestCompletionMixin.all_completions if | |
149 | not c.is_complete] | |
150 | ||
151 | self._shutdown.wait(5) | |
152 | ||
153 | @deferred_read | |
154 | def get_inventory(self, node_filter=None, refresh=False): | |
155 | """ | |
156 | There is no guarantee which devices are returned by get_inventory. | |
157 | """ | |
158 | if node_filter and node_filter.nodes is not None: | |
159 | assert isinstance(node_filter.nodes, list) | |
160 | try: | |
161 | c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json']) | |
162 | except OSError: | |
163 | cmd = """ | |
164 | . {tmpdir}/ceph-volume-virtualenv/bin/activate | |
165 | ceph-volume inventory --format json | |
166 | """ | |
167 | try: | |
168 | c_v_out = check_output(cmd.format(tmpdir=os.environ.get('TMPDIR', '/tmp')), shell=True) | |
169 | except (OSError, CalledProcessError): | |
170 | c_v_out = check_output(cmd.format(tmpdir='.'),shell=True) | |
171 | ||
172 | for out in c_v_out.splitlines(): | |
173 | if not out.startswith(b'-->') and not out.startswith(b' stderr'): | |
174 | self.log.error(out) | |
175 | devs = [] | |
176 | for device in json.loads(out): | |
177 | dev = orchestrator.InventoryDevice.from_ceph_volume_inventory(device) | |
178 | devs.append(dev) | |
179 | return [orchestrator.InventoryNode('localhost', devs)] | |
180 | self.log.error('c-v failed: ' + str(c_v_out)) | |
181 | raise Exception('c-v failed') | |
182 | ||
183 | @deferred_read | |
184 | def describe_service(self, service_type=None, service_id=None, node_name=None): | |
185 | """ | |
186 | There is no guarantee which daemons are returned by describe_service, except that | |
187 | it returns the mgr we're running in. | |
188 | """ | |
189 | if service_type: | |
190 | assert service_type in ("mds", "osd", "mon", "rgw", "mgr"), service_type + " unsupported" | |
191 | ||
192 | out = map(str, check_output(['ps', 'aux']).splitlines()) | |
193 | types = [service_type] if service_type else ("mds", "osd", "mon", "rgw", "mgr") | |
194 | processes = [p for p in out if any([('ceph-' + t in p) for t in types])] | |
195 | ||
196 | result = [] | |
197 | for p in processes: | |
198 | sd = orchestrator.ServiceDescription() | |
199 | sd.nodename = 'localhost' | |
200 | sd.service_instance = re.search('ceph-[^ ]+', p).group() | |
201 | result.append(sd) | |
202 | ||
203 | return result | |
204 | ||
205 | @deferred_write("Adding stateless service") | |
206 | def add_stateless_service(self, service_type, spec): | |
207 | pass | |
208 | ||
209 | @deferred_write("create_osds") | |
210 | def create_osds(self, drive_group, all_hosts): | |
211 | drive_group.validate(all_hosts) | |
212 | ||
213 | @deferred_write("remove_osds") | |
214 | def remove_osds(self, osd_ids): | |
215 | assert isinstance(osd_ids, list) | |
216 | ||
217 | @deferred_write("service_action") | |
218 | def service_action(self, action, service_type, service_name=None, service_id=None): | |
219 | pass | |
220 | ||
221 | @deferred_write("remove_stateless_service") | |
222 | def remove_stateless_service(self, service_type, id_): | |
223 | pass | |
224 | ||
225 | @deferred_write("update_stateless_service") | |
226 | def update_stateless_service(self, service_type, spec): | |
227 | pass | |
228 | ||
229 | @deferred_read | |
230 | def get_hosts(self): | |
231 | return [orchestrator.InventoryNode('localhost', [])] | |
232 | ||
233 | @deferred_write("add_host") | |
234 | def add_host(self, host): | |
235 | if host == 'raise_no_support': | |
236 | raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5") | |
237 | if host == 'raise_bug': | |
238 | raise ZeroDivisionError() | |
239 | if host == 'raise_not_implemented': | |
240 | raise NotImplementedError() | |
241 | if host == 'raise_no_orchestrator': | |
242 | raise orchestrator.NoOrchestrator() | |
243 | if host == 'raise_import_error': | |
244 | raise ImportError("test_orchestrator not enabled") | |
245 | assert isinstance(host, str) | |
246 | ||
247 | @deferred_write("remove_host") | |
248 | def remove_host(self, host): | |
249 | assert isinstance(host, str) | |
250 | ||
251 | @deferred_write("update_mgrs") | |
252 | def update_mgrs(self, num, hosts): | |
253 | assert not hosts or len(hosts) == num | |
254 | assert all([isinstance(h, str) for h in hosts]) | |
255 | ||
256 | @deferred_write("update_mons") | |
257 | def update_mons(self, num, hosts): | |
258 | assert not hosts or len(hosts) == num | |
259 | assert all([isinstance(h[0], str) for h in hosts]) | |
260 | assert all([isinstance(h[1], str) or h[1] is None for h in hosts]) |