]>
Commit | Line | Data |
---|---|---|
9f95a23c | 1 | import errno |
11fdf7f2 TL |
2 | import json |
3 | import re | |
4 | import os | |
5 | import threading | |
6 | import functools | |
9f95a23c | 7 | import itertools |
11fdf7f2 TL |
8 | from subprocess import check_output, CalledProcessError |
9 | ||
f6b5b4d7 | 10 | from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, IscsiServiceSpec |
11fdf7f2 | 11 | |
9f95a23c TL |
12 | try: |
13 | from typing import Callable, List, Sequence, Tuple | |
14 | except ImportError: | |
15 | pass # type checking | |
11fdf7f2 | 16 | |
9f95a23c | 17 | import six |
11fdf7f2 | 18 | |
9f95a23c TL |
19 | from ceph.deployment import inventory |
20 | from ceph.deployment.drive_group import DriveGroupSpec | |
21 | from mgr_module import CLICommand, HandleCommandResult | |
22 | from mgr_module import MgrModule | |
11fdf7f2 | 23 | |
9f95a23c | 24 | import orchestrator |
11fdf7f2 | 25 | |
11fdf7f2 | 26 | |
9f95a23c TL |
27 | class TestCompletion(orchestrator.Completion): |
28 | def evaluate(self): | |
29 | self.finalize(None) | |
11fdf7f2 TL |
30 | |
31 | ||
32 | def deferred_read(f): | |
9f95a23c | 33 | # type: (Callable) -> Callable[..., TestCompletion] |
11fdf7f2 | 34 | """ |
9f95a23c | 35 | Decorator to make methods return |
11fdf7f2 TL |
36 | a completion object that executes themselves. |
37 | """ | |
38 | ||
39 | @functools.wraps(f) | |
40 | def wrapper(*args, **kwargs): | |
9f95a23c | 41 | return TestCompletion(on_complete=lambda _: f(*args, **kwargs)) |
11fdf7f2 TL |
42 | |
43 | return wrapper | |
44 | ||
45 | ||
9f95a23c TL |
46 | def deferred_write(message): |
47 | def inner(f): | |
48 | # type: (Callable) -> Callable[..., TestCompletion] | |
49 | ||
50 | @functools.wraps(f) | |
51 | def wrapper(self, *args, **kwargs): | |
52 | return TestCompletion.with_progress( | |
53 | message=message, | |
54 | mgr=self, | |
55 | on_complete=lambda _: f(self, *args, **kwargs), | |
56 | ) | |
57 | ||
58 | return wrapper | |
59 | return inner | |
60 | ||
61 | ||
11fdf7f2 TL |
62 | class TestOrchestrator(MgrModule, orchestrator.Orchestrator): |
63 | """ | |
64 | This is an orchestrator implementation used for internal testing. It's meant for | |
65 | development environments and integration testing. | |
66 | ||
67 | It does not actually do anything. | |
68 | ||
69 | The implementation is similar to the Rook orchestrator, but simpler. | |
70 | """ | |
11fdf7f2 | 71 | |
9f95a23c TL |
72 | def process(self, completions): |
73 | # type: (List[TestCompletion]) -> None | |
74 | if completions: | |
75 | self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions))) | |
11fdf7f2 | 76 | |
9f95a23c TL |
77 | for p in completions: |
78 | p.evaluate() | |
11fdf7f2 | 79 | |
9f95a23c TL |
80 | @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w') |
81 | def _load_data(self, inbuf): | |
82 | try: | |
83 | data = json.loads(inbuf) | |
84 | self._init_data(data) | |
85 | return HandleCommandResult() | |
86 | except json.decoder.JSONDecodeError as e: | |
87 | msg = 'Invalid JSON file: {}'.format(e) | |
88 | return HandleCommandResult(retval=-errno.EINVAL, stderr=msg) | |
89 | except orchestrator.OrchestratorValidationError as e: | |
90 | return HandleCommandResult(retval=-errno.EINVAL, stderr=str(e)) | |
11fdf7f2 TL |
91 | |
92 | def available(self): | |
93 | return True, "" | |
94 | ||
95 | def __init__(self, *args, **kwargs): | |
96 | super(TestOrchestrator, self).__init__(*args, **kwargs) | |
97 | ||
98 | self._initialized = threading.Event() | |
99 | self._shutdown = threading.Event() | |
9f95a23c TL |
100 | self._init_data({}) |
101 | self.all_progress_references = list() # type: List[orchestrator.ProgressReference] | |
11fdf7f2 TL |
102 | |
103 | def shutdown(self): | |
104 | self._shutdown.set() | |
105 | ||
106 | def serve(self): | |
107 | ||
108 | self._initialized.set() | |
109 | ||
110 | while not self._shutdown.is_set(): | |
111 | # XXX hack (or is it?) to kick all completions periodically, | |
112 | # in case we had a caller that wait()'ed on them long enough | |
113 | # to get persistence but not long enough to get completion | |
114 | ||
9f95a23c TL |
115 | self.all_progress_references = [p for p in self.all_progress_references if not p.effective] |
116 | for p in self.all_progress_references: | |
117 | p.update() | |
11fdf7f2 TL |
118 | |
119 | self._shutdown.wait(5) | |
120 | ||
9f95a23c TL |
121 | def _init_data(self, data=None): |
122 | self._inventory = [orchestrator.InventoryHost.from_json(inventory_host) | |
123 | for inventory_host in data.get('inventory', [])] | |
124 | self._services = [orchestrator.ServiceDescription.from_json(service) | |
125 | for service in data.get('services', [])] | |
126 | self._daemons = [orchestrator.DaemonDescription.from_json(daemon) | |
127 | for daemon in data.get('daemons', [])] | |
128 | ||
11fdf7f2 | 129 | @deferred_read |
9f95a23c | 130 | def get_inventory(self, host_filter=None, refresh=False): |
11fdf7f2 TL |
131 | """ |
132 | There is no guarantee which devices are returned by get_inventory. | |
133 | """ | |
9f95a23c TL |
134 | if host_filter and host_filter.hosts is not None: |
135 | assert isinstance(host_filter.hosts, list) | |
136 | ||
137 | if self._inventory: | |
138 | if host_filter: | |
139 | return list(filter(lambda host: host.name in host_filter.hosts, | |
140 | self._inventory)) | |
141 | return self._inventory | |
142 | ||
11fdf7f2 TL |
143 | try: |
144 | c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json']) | |
145 | except OSError: | |
146 | cmd = """ | |
147 | . {tmpdir}/ceph-volume-virtualenv/bin/activate | |
148 | ceph-volume inventory --format json | |
149 | """ | |
150 | try: | |
151 | c_v_out = check_output(cmd.format(tmpdir=os.environ.get('TMPDIR', '/tmp')), shell=True) | |
152 | except (OSError, CalledProcessError): | |
153 | c_v_out = check_output(cmd.format(tmpdir='.'),shell=True) | |
154 | ||
155 | for out in c_v_out.splitlines(): | |
494da23a | 156 | self.log.error(out) |
9f95a23c TL |
157 | devs = inventory.Devices.from_json(json.loads(out)) |
158 | return [orchestrator.InventoryHost('localhost', devs)] | |
11fdf7f2 TL |
159 | self.log.error('c-v failed: ' + str(c_v_out)) |
160 | raise Exception('c-v failed') | |
161 | ||
9f95a23c TL |
162 | def _get_ceph_daemons(self): |
163 | # type: () -> List[orchestrator.DaemonDescription] | |
164 | """ Return ceph daemons on the running host.""" | |
f6b5b4d7 | 165 | types = ("mds", "osd", "mon", "rgw", "mgr", "nfs", "iscsi") |
9f95a23c TL |
166 | out = map(str, check_output(['ps', 'aux']).splitlines()) |
167 | processes = [p for p in out if any( | |
e306af50 | 168 | [('ceph-{} '.format(t) in p) for t in types])] |
9f95a23c TL |
169 | |
170 | daemons = [] | |
171 | for p in processes: | |
9f95a23c TL |
172 | # parse daemon type |
173 | m = re.search('ceph-([^ ]+)', p) | |
174 | if m: | |
175 | _daemon_type = m.group(1) | |
176 | else: | |
177 | raise AssertionError('Fail to determine daemon type from {}'.format(p)) | |
178 | ||
179 | # parse daemon ID. Possible options: `-i <id>`, `--id=<id>`, `--id <id>` | |
1911f103 | 180 | patterns = [r'-i\s(\w+)', r'--id[\s=](\w+)'] |
9f95a23c TL |
181 | for pattern in patterns: |
182 | m = re.search(pattern, p) | |
183 | if m: | |
184 | daemon_id = m.group(1) | |
185 | break | |
186 | else: | |
187 | raise AssertionError('Fail to determine daemon ID from {}'.format(p)) | |
188 | daemon = orchestrator.DaemonDescription( | |
189 | daemon_type=_daemon_type, daemon_id=daemon_id, hostname='localhost') | |
190 | daemons.append(daemon) | |
191 | return daemons | |
192 | ||
11fdf7f2 | 193 | @deferred_read |
9f95a23c TL |
194 | def describe_service(self, service_type=None, service_name=None, refresh=False): |
195 | if self._services: | |
196 | # Dummy data | |
197 | services = self._services | |
9f95a23c | 198 | if service_type is not None: |
1911f103 | 199 | services = list(filter(lambda s: s.spec.service_type == service_type, services)) |
9f95a23c TL |
200 | else: |
201 | # Deduce services from daemons running on localhost | |
202 | all_daemons = self._get_ceph_daemons() | |
203 | services = [] | |
204 | for daemon_type, daemons in itertools.groupby(all_daemons, key=lambda d: d.daemon_type): | |
205 | if service_type is not None and service_type != daemon_type: | |
206 | continue | |
207 | daemon_size = len(list(daemons)) | |
208 | services.append(orchestrator.ServiceDescription( | |
1911f103 TL |
209 | spec=ServiceSpec( |
210 | service_type=daemon_type, | |
211 | ), | |
212 | size=daemon_size, running=daemon_size)) | |
9f95a23c TL |
213 | |
214 | def _filter_func(svc): | |
1911f103 | 215 | if service_name is not None and service_name != svc.spec.service_name(): |
9f95a23c TL |
216 | return False |
217 | return True | |
218 | ||
219 | return list(filter(_filter_func, services)) | |
220 | ||
221 | @deferred_read | |
801d1391 | 222 | def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False): |
11fdf7f2 TL |
223 | """ |
224 | There is no guarantee which daemons are returned by describe_service, except that | |
225 | it returns the mgr we're running in. | |
226 | """ | |
9f95a23c | 227 | if daemon_type: |
f6b5b4d7 | 228 | daemon_types = ("mds", "osd", "mon", "rgw", "mgr", "iscsi", "crash", "nfs") |
9f95a23c | 229 | assert daemon_type in daemon_types, daemon_type + " unsupported" |
11fdf7f2 | 230 | |
9f95a23c | 231 | daemons = self._daemons if self._daemons else self._get_ceph_daemons() |
11fdf7f2 | 232 | |
9f95a23c | 233 | def _filter_func(d): |
801d1391 TL |
234 | if service_name is not None and service_name != d.service_name(): |
235 | return False | |
9f95a23c TL |
236 | if daemon_type is not None and daemon_type != d.daemon_type: |
237 | return False | |
238 | if daemon_id is not None and daemon_id != d.daemon_id: | |
239 | return False | |
240 | if host is not None and host != d.hostname: | |
241 | return False | |
242 | return True | |
11fdf7f2 | 243 | |
9f95a23c | 244 | return list(filter(_filter_func, daemons)) |
11fdf7f2 | 245 | |
1911f103 TL |
246 | def preview_drivegroups(self, drive_group_name=None, dg_specs=None): |
247 | return [{}] | |
248 | ||
9f95a23c TL |
249 | def create_osds(self, drive_group): |
250 | # type: (DriveGroupSpec) -> TestCompletion | |
251 | """ Creates OSDs from a drive group specification. | |
11fdf7f2 | 252 | |
9f95a23c TL |
253 | $: ceph orch osd create -i <dg.file> |
254 | ||
255 | The drivegroup file must only contain one spec at a time. | |
256 | """ | |
11fdf7f2 | 257 | |
9f95a23c TL |
258 | def run(all_hosts): |
259 | # type: (List[orchestrator.HostSpec]) -> None | |
260 | drive_group.validate() | |
f6b5b4d7 TL |
261 | |
262 | def get_hosts_func(label=None, as_hostspec=False): | |
263 | if as_hostspec: | |
264 | return all_hosts | |
265 | return [h.hostname for h in all_hosts] | |
266 | ||
267 | if not drive_group.placement.filter_matching_hosts(get_hosts_func): | |
e306af50 TL |
268 | raise orchestrator.OrchestratorValidationError('failed to match') |
269 | ||
9f95a23c TL |
270 | return self.get_hosts().then(run).then( |
271 | on_complete=orchestrator.ProgressReference( | |
272 | message='create_osds', | |
273 | mgr=self, | |
274 | ) | |
275 | ) | |
276 | ||
277 | def apply_drivegroups(self, specs): | |
278 | # type: (List[DriveGroupSpec]) -> TestCompletion | |
279 | drive_group = specs[0] | |
e306af50 | 280 | |
9f95a23c TL |
281 | def run(all_hosts): |
282 | # type: (List[orchestrator.HostSpec]) -> None | |
283 | drive_group.validate() | |
f6b5b4d7 TL |
284 | |
285 | def get_hosts_func(label=None, as_hostspec=False): | |
286 | if as_hostspec: | |
287 | return all_hosts | |
288 | return [h.hostname for h in all_hosts] | |
289 | ||
290 | if not drive_group.placement.filter_matching_hosts(get_hosts_func): | |
e306af50 | 291 | raise orchestrator.OrchestratorValidationError('failed to match') |
9f95a23c TL |
292 | return self.get_hosts().then(run).then( |
293 | on_complete=orchestrator.ProgressReference( | |
294 | message='apply_drivesgroups', | |
295 | mgr=self, | |
296 | ) | |
297 | ) | |
298 | ||
299 | @deferred_write("remove_daemons") | |
300 | def remove_daemons(self, names): | |
301 | assert isinstance(names, list) | |
302 | ||
303 | @deferred_write("remove_service") | |
304 | def remove_service(self, service_name): | |
305 | assert isinstance(service_name, str) | |
306 | ||
307 | @deferred_write("blink_device_light") | |
308 | def blink_device_light(self, ident_fault, on, locations): | |
309 | assert ident_fault in ("ident", "fault") | |
310 | assert len(locations) | |
311 | return '' | |
11fdf7f2 TL |
312 | |
313 | @deferred_write("service_action") | |
9f95a23c | 314 | def service_action(self, action, service_name): |
11fdf7f2 TL |
315 | pass |
316 | ||
9f95a23c | 317 | @deferred_write("daemon_action") |
f6b5b4d7 | 318 | def daemon_action(self, action, daemon_name, image=None): |
9f95a23c TL |
319 | pass |
320 | ||
321 | @deferred_write("Adding NFS service") | |
322 | def add_nfs(self, spec): | |
323 | # type: (NFSServiceSpec) -> None | |
324 | assert isinstance(spec.pool, str) | |
325 | ||
326 | @deferred_write("apply_nfs") | |
327 | def apply_nfs(self, spec): | |
11fdf7f2 TL |
328 | pass |
329 | ||
f6b5b4d7 TL |
330 | @deferred_write("add_iscsi") |
331 | def add_iscsi(self, spec): | |
332 | # type: (IscsiServiceSpec) -> None | |
333 | pass | |
334 | ||
335 | @deferred_write("apply_iscsi") | |
336 | def apply_iscsi(self, spec): | |
337 | # type: (IscsiServiceSpec) -> None | |
338 | pass | |
339 | ||
9f95a23c TL |
340 | @deferred_write("add_mds") |
341 | def add_mds(self, spec): | |
342 | pass | |
343 | ||
344 | @deferred_write("add_rgw") | |
345 | def add_rgw(self, spec): | |
11fdf7f2 TL |
346 | pass |
347 | ||
348 | @deferred_read | |
349 | def get_hosts(self): | |
9f95a23c TL |
350 | if self._inventory: |
351 | return [orchestrator.HostSpec(i.name, i.addr, i.labels) for i in self._inventory] | |
352 | return [orchestrator.HostSpec('localhost')] | |
11fdf7f2 TL |
353 | |
354 | @deferred_write("add_host") | |
9f95a23c TL |
355 | def add_host(self, spec): |
356 | # type: (orchestrator.HostSpec) -> None | |
357 | host = spec.hostname | |
f6b5b4d7 | 358 | if host == 'raise_validation_error': |
11fdf7f2 | 359 | raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5") |
f6b5b4d7 TL |
360 | if host == 'raise_error': |
361 | raise orchestrator.OrchestratorError("host address is empty") | |
11fdf7f2 TL |
362 | if host == 'raise_bug': |
363 | raise ZeroDivisionError() | |
364 | if host == 'raise_not_implemented': | |
365 | raise NotImplementedError() | |
366 | if host == 'raise_no_orchestrator': | |
367 | raise orchestrator.NoOrchestrator() | |
368 | if host == 'raise_import_error': | |
369 | raise ImportError("test_orchestrator not enabled") | |
9f95a23c | 370 | assert isinstance(host, six.string_types) |
11fdf7f2 TL |
371 | |
372 | @deferred_write("remove_host") | |
373 | def remove_host(self, host): | |
9f95a23c TL |
374 | assert isinstance(host, six.string_types) |
375 | ||
376 | @deferred_write("apply_mgr") | |
377 | def apply_mgr(self, spec): | |
378 | # type: (ServiceSpec) -> None | |
379 | ||
380 | assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count | |
381 | assert all([isinstance(h, str) for h in spec.placement.hosts]) | |
382 | ||
383 | @deferred_write("apply_mon") | |
384 | def apply_mon(self, spec): | |
385 | # type: (ServiceSpec) -> None | |
386 | ||
387 | assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count | |
388 | assert all([isinstance(h[0], str) for h in spec.placement.hosts]) | |
389 | assert all([isinstance(h[1], str) or h[1] is None for h in spec.placement.hosts]) |