]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/test_orchestrator/module.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / test_orchestrator / module.py
1 import errno
2 import json
3 import re
4 import os
5 import threading
6 import functools
7 import itertools
8 from subprocess import check_output, CalledProcessError
9
10 from ceph.deployment.service_spec import NFSServiceSpec, ServiceSpec
11
12 try:
13 from typing import Callable, List, Sequence, Tuple
14 except ImportError:
15 pass # type checking
16
17 import six
18
19 from ceph.deployment import inventory
20 from ceph.deployment.drive_group import DriveGroupSpec
21 from mgr_module import CLICommand, HandleCommandResult
22 from mgr_module import MgrModule
23
24 import orchestrator
25
26
27 class TestCompletion(orchestrator.Completion):
28 def evaluate(self):
29 self.finalize(None)
30
31
32 def deferred_read(f):
33 # type: (Callable) -> Callable[..., TestCompletion]
34 """
35 Decorator to make methods return
36 a completion object that executes themselves.
37 """
38
39 @functools.wraps(f)
40 def wrapper(*args, **kwargs):
41 return TestCompletion(on_complete=lambda _: f(*args, **kwargs))
42
43 return wrapper
44
45
46 def deferred_write(message):
47 def inner(f):
48 # type: (Callable) -> Callable[..., TestCompletion]
49
50 @functools.wraps(f)
51 def wrapper(self, *args, **kwargs):
52 return TestCompletion.with_progress(
53 message=message,
54 mgr=self,
55 on_complete=lambda _: f(self, *args, **kwargs),
56 )
57
58 return wrapper
59 return inner
60
61
62 class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
63 """
64 This is an orchestrator implementation used for internal testing. It's meant for
65 development environments and integration testing.
66
67 It does not actually do anything.
68
69 The implementation is similar to the Rook orchestrator, but simpler.
70 """
71
72 def process(self, completions):
73 # type: (List[TestCompletion]) -> None
74 if completions:
75 self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
76
77 for p in completions:
78 p.evaluate()
79
80 @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
81 def _load_data(self, inbuf):
82 try:
83 data = json.loads(inbuf)
84 self._init_data(data)
85 return HandleCommandResult()
86 except json.decoder.JSONDecodeError as e:
87 msg = 'Invalid JSON file: {}'.format(e)
88 return HandleCommandResult(retval=-errno.EINVAL, stderr=msg)
89 except orchestrator.OrchestratorValidationError as e:
90 return HandleCommandResult(retval=-errno.EINVAL, stderr=str(e))
91
92 def available(self):
93 return True, ""
94
95 def __init__(self, *args, **kwargs):
96 super(TestOrchestrator, self).__init__(*args, **kwargs)
97
98 self._initialized = threading.Event()
99 self._shutdown = threading.Event()
100 self._init_data({})
101 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
102
103 def shutdown(self):
104 self._shutdown.set()
105
106 def serve(self):
107
108 self._initialized.set()
109
110 while not self._shutdown.is_set():
111 # XXX hack (or is it?) to kick all completions periodically,
112 # in case we had a caller that wait()'ed on them long enough
113 # to get persistence but not long enough to get completion
114
115 self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
116 for p in self.all_progress_references:
117 p.update()
118
119 self._shutdown.wait(5)
120
121 def _init_data(self, data=None):
122 self._inventory = [orchestrator.InventoryHost.from_json(inventory_host)
123 for inventory_host in data.get('inventory', [])]
124 self._services = [orchestrator.ServiceDescription.from_json(service)
125 for service in data.get('services', [])]
126 self._daemons = [orchestrator.DaemonDescription.from_json(daemon)
127 for daemon in data.get('daemons', [])]
128
129 @deferred_read
130 def get_inventory(self, host_filter=None, refresh=False):
131 """
132 There is no guarantee which devices are returned by get_inventory.
133 """
134 if host_filter and host_filter.hosts is not None:
135 assert isinstance(host_filter.hosts, list)
136
137 if self._inventory:
138 if host_filter:
139 return list(filter(lambda host: host.name in host_filter.hosts,
140 self._inventory))
141 return self._inventory
142
143 try:
144 c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json'])
145 except OSError:
146 cmd = """
147 . {tmpdir}/ceph-volume-virtualenv/bin/activate
148 ceph-volume inventory --format json
149 """
150 try:
151 c_v_out = check_output(cmd.format(tmpdir=os.environ.get('TMPDIR', '/tmp')), shell=True)
152 except (OSError, CalledProcessError):
153 c_v_out = check_output(cmd.format(tmpdir='.'),shell=True)
154
155 for out in c_v_out.splitlines():
156 self.log.error(out)
157 devs = inventory.Devices.from_json(json.loads(out))
158 return [orchestrator.InventoryHost('localhost', devs)]
159 self.log.error('c-v failed: ' + str(c_v_out))
160 raise Exception('c-v failed')
161
162 def _get_ceph_daemons(self):
163 # type: () -> List[orchestrator.DaemonDescription]
164 """ Return ceph daemons on the running host."""
165 types = ("mds", "osd", "mon", "rgw", "mgr")
166 out = map(str, check_output(['ps', 'aux']).splitlines())
167 processes = [p for p in out if any(
168 [('ceph-{} '.format(t) in p) for t in types])]
169
170 daemons = []
171 for p in processes:
172 # parse daemon type
173 m = re.search('ceph-([^ ]+)', p)
174 if m:
175 _daemon_type = m.group(1)
176 else:
177 raise AssertionError('Fail to determine daemon type from {}'.format(p))
178
179 # parse daemon ID. Possible options: `-i <id>`, `--id=<id>`, `--id <id>`
180 patterns = [r'-i\s(\w+)', r'--id[\s=](\w+)']
181 for pattern in patterns:
182 m = re.search(pattern, p)
183 if m:
184 daemon_id = m.group(1)
185 break
186 else:
187 raise AssertionError('Fail to determine daemon ID from {}'.format(p))
188 daemon = orchestrator.DaemonDescription(
189 daemon_type=_daemon_type, daemon_id=daemon_id, hostname='localhost')
190 daemons.append(daemon)
191 return daemons
192
193 @deferred_read
194 def describe_service(self, service_type=None, service_name=None, refresh=False):
195 if self._services:
196 # Dummy data
197 services = self._services
198 if service_type is not None:
199 services = list(filter(lambda s: s.spec.service_type == service_type, services))
200 else:
201 # Deduce services from daemons running on localhost
202 all_daemons = self._get_ceph_daemons()
203 services = []
204 for daemon_type, daemons in itertools.groupby(all_daemons, key=lambda d: d.daemon_type):
205 if service_type is not None and service_type != daemon_type:
206 continue
207 daemon_size = len(list(daemons))
208 services.append(orchestrator.ServiceDescription(
209 spec=ServiceSpec(
210 service_type=daemon_type,
211 ),
212 size=daemon_size, running=daemon_size))
213
214 def _filter_func(svc):
215 if service_name is not None and service_name != svc.spec.service_name():
216 return False
217 return True
218
219 return list(filter(_filter_func, services))
220
221 @deferred_read
222 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False):
223 """
224 There is no guarantee which daemons are returned by describe_service, except that
225 it returns the mgr we're running in.
226 """
227 if daemon_type:
228 daemon_types = ("mds", "osd", "mon", "rgw", "mgr", "iscsi", "crash")
229 assert daemon_type in daemon_types, daemon_type + " unsupported"
230
231 daemons = self._daemons if self._daemons else self._get_ceph_daemons()
232
233 def _filter_func(d):
234 if service_name is not None and service_name != d.service_name():
235 return False
236 if daemon_type is not None and daemon_type != d.daemon_type:
237 return False
238 if daemon_id is not None and daemon_id != d.daemon_id:
239 return False
240 if host is not None and host != d.hostname:
241 return False
242 return True
243
244 return list(filter(_filter_func, daemons))
245
246 def preview_drivegroups(self, drive_group_name=None, dg_specs=None):
247 return [{}]
248
249 def create_osds(self, drive_group):
250 # type: (DriveGroupSpec) -> TestCompletion
251 """ Creates OSDs from a drive group specification.
252
253 $: ceph orch osd create -i <dg.file>
254
255 The drivegroup file must only contain one spec at a time.
256 """
257
258 def run(all_hosts):
259 # type: (List[orchestrator.HostSpec]) -> None
260 drive_group.validate()
261 if not drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None:
262 [h.hostname for h in all_hosts]):
263 raise orchestrator.OrchestratorValidationError('failed to match')
264
265 return self.get_hosts().then(run).then(
266 on_complete=orchestrator.ProgressReference(
267 message='create_osds',
268 mgr=self,
269 )
270 )
271
272 def apply_drivegroups(self, specs):
273 # type: (List[DriveGroupSpec]) -> TestCompletion
274 drive_group = specs[0]
275
276 def run(all_hosts):
277 # type: (List[orchestrator.HostSpec]) -> None
278 drive_group.validate()
279 if not drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None:
280 [h.hostname for h in all_hosts]):
281 raise orchestrator.OrchestratorValidationError('failed to match')
282 return self.get_hosts().then(run).then(
283 on_complete=orchestrator.ProgressReference(
284 message='apply_drivesgroups',
285 mgr=self,
286 )
287 )
288
289 @deferred_write("remove_daemons")
290 def remove_daemons(self, names):
291 assert isinstance(names, list)
292
293 @deferred_write("remove_service")
294 def remove_service(self, service_name):
295 assert isinstance(service_name, str)
296
297 @deferred_write("blink_device_light")
298 def blink_device_light(self, ident_fault, on, locations):
299 assert ident_fault in ("ident", "fault")
300 assert len(locations)
301 return ''
302
303 @deferred_write("service_action")
304 def service_action(self, action, service_name):
305 pass
306
307 @deferred_write("daemon_action")
308 def daemon_action(self, action, daemon_type, daemon_id):
309 pass
310
311 @deferred_write("Adding NFS service")
312 def add_nfs(self, spec):
313 # type: (NFSServiceSpec) -> None
314 assert isinstance(spec.pool, str)
315
316 @deferred_write("apply_nfs")
317 def apply_nfs(self, spec):
318 pass
319
320 @deferred_write("add_mds")
321 def add_mds(self, spec):
322 pass
323
324 @deferred_write("add_rgw")
325 def add_rgw(self, spec):
326 pass
327
328 @deferred_read
329 def get_hosts(self):
330 if self._inventory:
331 return [orchestrator.HostSpec(i.name, i.addr, i.labels) for i in self._inventory]
332 return [orchestrator.HostSpec('localhost')]
333
334 @deferred_write("add_host")
335 def add_host(self, spec):
336 # type: (orchestrator.HostSpec) -> None
337 host = spec.hostname
338 if host == 'raise_no_support':
339 raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
340 if host == 'raise_bug':
341 raise ZeroDivisionError()
342 if host == 'raise_not_implemented':
343 raise NotImplementedError()
344 if host == 'raise_no_orchestrator':
345 raise orchestrator.NoOrchestrator()
346 if host == 'raise_import_error':
347 raise ImportError("test_orchestrator not enabled")
348 assert isinstance(host, six.string_types)
349
350 @deferred_write("remove_host")
351 def remove_host(self, host):
352 assert isinstance(host, six.string_types)
353
354 @deferred_write("apply_mgr")
355 def apply_mgr(self, spec):
356 # type: (ServiceSpec) -> None
357
358 assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
359 assert all([isinstance(h, str) for h in spec.placement.hosts])
360
361 @deferred_write("apply_mon")
362 def apply_mon(self, spec):
363 # type: (ServiceSpec) -> None
364
365 assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
366 assert all([isinstance(h[0], str) for h in spec.placement.hosts])
367 assert all([isinstance(h[1], str) or h[1] is None for h in spec.placement.hosts])