]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/test_orchestrator/module.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / test_orchestrator / module.py
CommitLineData
9f95a23c 1import errno
11fdf7f2
TL
2import json
3import re
4import os
5import threading
6import functools
9f95a23c 7import itertools
11fdf7f2
TL
8from subprocess import check_output, CalledProcessError
9
9f95a23c 10from ceph.deployment.service_spec import NFSServiceSpec, ServiceSpec
11fdf7f2 11
9f95a23c
TL
12try:
13 from typing import Callable, List, Sequence, Tuple
14except ImportError:
15 pass # type checking
11fdf7f2 16
9f95a23c 17import six
11fdf7f2 18
9f95a23c
TL
19from ceph.deployment import inventory
20from ceph.deployment.drive_group import DriveGroupSpec
21from mgr_module import CLICommand, HandleCommandResult
22from mgr_module import MgrModule
11fdf7f2 23
9f95a23c 24import orchestrator
11fdf7f2 25
11fdf7f2 26
9f95a23c
TL
27class TestCompletion(orchestrator.Completion):
28 def evaluate(self):
29 self.finalize(None)
11fdf7f2
TL
30
31
32def deferred_read(f):
9f95a23c 33 # type: (Callable) -> Callable[..., TestCompletion]
11fdf7f2 34 """
9f95a23c 35 Decorator to make methods return
11fdf7f2
TL
36 a completion object that executes themselves.
37 """
38
39 @functools.wraps(f)
40 def wrapper(*args, **kwargs):
9f95a23c 41 return TestCompletion(on_complete=lambda _: f(*args, **kwargs))
11fdf7f2
TL
42
43 return wrapper
44
45
9f95a23c
TL
46def deferred_write(message):
47 def inner(f):
48 # type: (Callable) -> Callable[..., TestCompletion]
49
50 @functools.wraps(f)
51 def wrapper(self, *args, **kwargs):
52 return TestCompletion.with_progress(
53 message=message,
54 mgr=self,
55 on_complete=lambda _: f(self, *args, **kwargs),
56 )
57
58 return wrapper
59 return inner
60
61
11fdf7f2
TL
62class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
63 """
64 This is an orchestrator implementation used for internal testing. It's meant for
65 development environments and integration testing.
66
67 It does not actually do anything.
68
69 The implementation is similar to the Rook orchestrator, but simpler.
70 """
11fdf7f2 71
9f95a23c
TL
72 def process(self, completions):
73 # type: (List[TestCompletion]) -> None
74 if completions:
75 self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
11fdf7f2 76
9f95a23c
TL
77 for p in completions:
78 p.evaluate()
11fdf7f2 79
9f95a23c
TL
80 @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
81 def _load_data(self, inbuf):
82 try:
83 data = json.loads(inbuf)
84 self._init_data(data)
85 return HandleCommandResult()
86 except json.decoder.JSONDecodeError as e:
87 msg = 'Invalid JSON file: {}'.format(e)
88 return HandleCommandResult(retval=-errno.EINVAL, stderr=msg)
89 except orchestrator.OrchestratorValidationError as e:
90 return HandleCommandResult(retval=-errno.EINVAL, stderr=str(e))
11fdf7f2
TL
91
92 def available(self):
93 return True, ""
94
95 def __init__(self, *args, **kwargs):
96 super(TestOrchestrator, self).__init__(*args, **kwargs)
97
98 self._initialized = threading.Event()
99 self._shutdown = threading.Event()
9f95a23c
TL
100 self._init_data({})
101 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
11fdf7f2
TL
102
103 def shutdown(self):
104 self._shutdown.set()
105
106 def serve(self):
107
108 self._initialized.set()
109
110 while not self._shutdown.is_set():
111 # XXX hack (or is it?) to kick all completions periodically,
112 # in case we had a caller that wait()'ed on them long enough
113 # to get persistence but not long enough to get completion
114
9f95a23c
TL
115 self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
116 for p in self.all_progress_references:
117 p.update()
11fdf7f2
TL
118
119 self._shutdown.wait(5)
120
9f95a23c
TL
121 def _init_data(self, data=None):
122 self._inventory = [orchestrator.InventoryHost.from_json(inventory_host)
123 for inventory_host in data.get('inventory', [])]
124 self._services = [orchestrator.ServiceDescription.from_json(service)
125 for service in data.get('services', [])]
126 self._daemons = [orchestrator.DaemonDescription.from_json(daemon)
127 for daemon in data.get('daemons', [])]
128
11fdf7f2 129 @deferred_read
9f95a23c 130 def get_inventory(self, host_filter=None, refresh=False):
11fdf7f2
TL
131 """
132 There is no guarantee which devices are returned by get_inventory.
133 """
9f95a23c
TL
134 if host_filter and host_filter.hosts is not None:
135 assert isinstance(host_filter.hosts, list)
136
137 if self._inventory:
138 if host_filter:
139 return list(filter(lambda host: host.name in host_filter.hosts,
140 self._inventory))
141 return self._inventory
142
11fdf7f2
TL
143 try:
144 c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json'])
145 except OSError:
146 cmd = """
147 . {tmpdir}/ceph-volume-virtualenv/bin/activate
148 ceph-volume inventory --format json
149 """
150 try:
151 c_v_out = check_output(cmd.format(tmpdir=os.environ.get('TMPDIR', '/tmp')), shell=True)
152 except (OSError, CalledProcessError):
153 c_v_out = check_output(cmd.format(tmpdir='.'),shell=True)
154
155 for out in c_v_out.splitlines():
494da23a 156 self.log.error(out)
9f95a23c
TL
157 devs = inventory.Devices.from_json(json.loads(out))
158 return [orchestrator.InventoryHost('localhost', devs)]
11fdf7f2
TL
159 self.log.error('c-v failed: ' + str(c_v_out))
160 raise Exception('c-v failed')
161
9f95a23c
TL
162 def _get_ceph_daemons(self):
163 # type: () -> List[orchestrator.DaemonDescription]
164 """ Return ceph daemons on the running host."""
165 types = ("mds", "osd", "mon", "rgw", "mgr")
166 out = map(str, check_output(['ps', 'aux']).splitlines())
167 processes = [p for p in out if any(
e306af50 168 [('ceph-{} '.format(t) in p) for t in types])]
9f95a23c
TL
169
170 daemons = []
171 for p in processes:
9f95a23c
TL
172 # parse daemon type
173 m = re.search('ceph-([^ ]+)', p)
174 if m:
175 _daemon_type = m.group(1)
176 else:
177 raise AssertionError('Fail to determine daemon type from {}'.format(p))
178
179 # parse daemon ID. Possible options: `-i <id>`, `--id=<id>`, `--id <id>`
1911f103 180 patterns = [r'-i\s(\w+)', r'--id[\s=](\w+)']
9f95a23c
TL
181 for pattern in patterns:
182 m = re.search(pattern, p)
183 if m:
184 daemon_id = m.group(1)
185 break
186 else:
187 raise AssertionError('Fail to determine daemon ID from {}'.format(p))
188 daemon = orchestrator.DaemonDescription(
189 daemon_type=_daemon_type, daemon_id=daemon_id, hostname='localhost')
190 daemons.append(daemon)
191 return daemons
192
11fdf7f2 193 @deferred_read
9f95a23c
TL
194 def describe_service(self, service_type=None, service_name=None, refresh=False):
195 if self._services:
196 # Dummy data
197 services = self._services
9f95a23c 198 if service_type is not None:
1911f103 199 services = list(filter(lambda s: s.spec.service_type == service_type, services))
9f95a23c
TL
200 else:
201 # Deduce services from daemons running on localhost
202 all_daemons = self._get_ceph_daemons()
203 services = []
204 for daemon_type, daemons in itertools.groupby(all_daemons, key=lambda d: d.daemon_type):
205 if service_type is not None and service_type != daemon_type:
206 continue
207 daemon_size = len(list(daemons))
208 services.append(orchestrator.ServiceDescription(
1911f103
TL
209 spec=ServiceSpec(
210 service_type=daemon_type,
211 ),
212 size=daemon_size, running=daemon_size))
9f95a23c
TL
213
214 def _filter_func(svc):
1911f103 215 if service_name is not None and service_name != svc.spec.service_name():
9f95a23c
TL
216 return False
217 return True
218
219 return list(filter(_filter_func, services))
220
221 @deferred_read
801d1391 222 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False):
11fdf7f2
TL
223 """
224 There is no guarantee which daemons are returned by describe_service, except that
225 it returns the mgr we're running in.
226 """
9f95a23c
TL
227 if daemon_type:
228 daemon_types = ("mds", "osd", "mon", "rgw", "mgr", "iscsi", "crash")
229 assert daemon_type in daemon_types, daemon_type + " unsupported"
11fdf7f2 230
9f95a23c 231 daemons = self._daemons if self._daemons else self._get_ceph_daemons()
11fdf7f2 232
9f95a23c 233 def _filter_func(d):
801d1391
TL
234 if service_name is not None and service_name != d.service_name():
235 return False
9f95a23c
TL
236 if daemon_type is not None and daemon_type != d.daemon_type:
237 return False
238 if daemon_id is not None and daemon_id != d.daemon_id:
239 return False
240 if host is not None and host != d.hostname:
241 return False
242 return True
11fdf7f2 243
9f95a23c 244 return list(filter(_filter_func, daemons))
11fdf7f2 245
1911f103
TL
246 def preview_drivegroups(self, drive_group_name=None, dg_specs=None):
247 return [{}]
248
9f95a23c
TL
249 def create_osds(self, drive_group):
250 # type: (DriveGroupSpec) -> TestCompletion
251 """ Creates OSDs from a drive group specification.
11fdf7f2 252
9f95a23c
TL
253 $: ceph orch osd create -i <dg.file>
254
255 The drivegroup file must only contain one spec at a time.
256 """
11fdf7f2 257
9f95a23c
TL
258 def run(all_hosts):
259 # type: (List[orchestrator.HostSpec]) -> None
260 drive_group.validate()
e306af50
TL
261 if not drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None:
262 [h.hostname for h in all_hosts]):
263 raise orchestrator.OrchestratorValidationError('failed to match')
264
9f95a23c
TL
265 return self.get_hosts().then(run).then(
266 on_complete=orchestrator.ProgressReference(
267 message='create_osds',
268 mgr=self,
269 )
270 )
271
272 def apply_drivegroups(self, specs):
273 # type: (List[DriveGroupSpec]) -> TestCompletion
274 drive_group = specs[0]
e306af50 275
9f95a23c
TL
276 def run(all_hosts):
277 # type: (List[orchestrator.HostSpec]) -> None
278 drive_group.validate()
e306af50
TL
279 if not drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None:
280 [h.hostname for h in all_hosts]):
281 raise orchestrator.OrchestratorValidationError('failed to match')
9f95a23c
TL
282 return self.get_hosts().then(run).then(
283 on_complete=orchestrator.ProgressReference(
284 message='apply_drivesgroups',
285 mgr=self,
286 )
287 )
288
289 @deferred_write("remove_daemons")
290 def remove_daemons(self, names):
291 assert isinstance(names, list)
292
293 @deferred_write("remove_service")
294 def remove_service(self, service_name):
295 assert isinstance(service_name, str)
296
297 @deferred_write("blink_device_light")
298 def blink_device_light(self, ident_fault, on, locations):
299 assert ident_fault in ("ident", "fault")
300 assert len(locations)
301 return ''
11fdf7f2
TL
302
303 @deferred_write("service_action")
9f95a23c 304 def service_action(self, action, service_name):
11fdf7f2
TL
305 pass
306
9f95a23c
TL
307 @deferred_write("daemon_action")
308 def daemon_action(self, action, daemon_type, daemon_id):
309 pass
310
311 @deferred_write("Adding NFS service")
312 def add_nfs(self, spec):
313 # type: (NFSServiceSpec) -> None
314 assert isinstance(spec.pool, str)
315
316 @deferred_write("apply_nfs")
317 def apply_nfs(self, spec):
11fdf7f2
TL
318 pass
319
9f95a23c
TL
320 @deferred_write("add_mds")
321 def add_mds(self, spec):
322 pass
323
324 @deferred_write("add_rgw")
325 def add_rgw(self, spec):
11fdf7f2
TL
326 pass
327
328 @deferred_read
329 def get_hosts(self):
9f95a23c
TL
330 if self._inventory:
331 return [orchestrator.HostSpec(i.name, i.addr, i.labels) for i in self._inventory]
332 return [orchestrator.HostSpec('localhost')]
11fdf7f2
TL
333
334 @deferred_write("add_host")
9f95a23c
TL
335 def add_host(self, spec):
336 # type: (orchestrator.HostSpec) -> None
337 host = spec.hostname
11fdf7f2
TL
338 if host == 'raise_no_support':
339 raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
340 if host == 'raise_bug':
341 raise ZeroDivisionError()
342 if host == 'raise_not_implemented':
343 raise NotImplementedError()
344 if host == 'raise_no_orchestrator':
345 raise orchestrator.NoOrchestrator()
346 if host == 'raise_import_error':
347 raise ImportError("test_orchestrator not enabled")
9f95a23c 348 assert isinstance(host, six.string_types)
11fdf7f2
TL
349
350 @deferred_write("remove_host")
351 def remove_host(self, host):
9f95a23c
TL
352 assert isinstance(host, six.string_types)
353
354 @deferred_write("apply_mgr")
355 def apply_mgr(self, spec):
356 # type: (ServiceSpec) -> None
357
358 assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
359 assert all([isinstance(h, str) for h in spec.placement.hosts])
360
361 @deferred_write("apply_mon")
362 def apply_mon(self, spec):
363 # type: (ServiceSpec) -> None
364
365 assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
366 assert all([isinstance(h[0], str) for h in spec.placement.hosts])
367 assert all([isinstance(h[1], str) or h[1] is None for h in spec.placement.hosts])