]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/test_orchestrator/module.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / test_orchestrator / module.py
CommitLineData
9f95a23c 1import errno
11fdf7f2
TL
2import json
3import re
4import os
5import threading
6import functools
9f95a23c 7import itertools
11fdf7f2
TL
8from subprocess import check_output, CalledProcessError
9
f6b5b4d7 10from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, IscsiServiceSpec
11fdf7f2 11
9f95a23c
TL
12try:
13 from typing import Callable, List, Sequence, Tuple
14except ImportError:
15 pass # type checking
11fdf7f2 16
9f95a23c 17import six
11fdf7f2 18
9f95a23c
TL
19from ceph.deployment import inventory
20from ceph.deployment.drive_group import DriveGroupSpec
21from mgr_module import CLICommand, HandleCommandResult
22from mgr_module import MgrModule
11fdf7f2 23
9f95a23c 24import orchestrator
11fdf7f2 25
11fdf7f2 26
9f95a23c
TL
27class TestCompletion(orchestrator.Completion):
28 def evaluate(self):
29 self.finalize(None)
11fdf7f2
TL
30
31
32def deferred_read(f):
9f95a23c 33 # type: (Callable) -> Callable[..., TestCompletion]
11fdf7f2 34 """
9f95a23c 35 Decorator to make methods return
11fdf7f2
TL
36 a completion object that executes themselves.
37 """
38
39 @functools.wraps(f)
40 def wrapper(*args, **kwargs):
9f95a23c 41 return TestCompletion(on_complete=lambda _: f(*args, **kwargs))
11fdf7f2
TL
42
43 return wrapper
44
45
9f95a23c
TL
46def deferred_write(message):
47 def inner(f):
48 # type: (Callable) -> Callable[..., TestCompletion]
49
50 @functools.wraps(f)
51 def wrapper(self, *args, **kwargs):
52 return TestCompletion.with_progress(
53 message=message,
54 mgr=self,
55 on_complete=lambda _: f(self, *args, **kwargs),
56 )
57
58 return wrapper
59 return inner
60
61
11fdf7f2
TL
62class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
63 """
64 This is an orchestrator implementation used for internal testing. It's meant for
65 development environments and integration testing.
66
67 It does not actually do anything.
68
69 The implementation is similar to the Rook orchestrator, but simpler.
70 """
11fdf7f2 71
9f95a23c
TL
72 def process(self, completions):
73 # type: (List[TestCompletion]) -> None
74 if completions:
75 self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
11fdf7f2 76
9f95a23c
TL
77 for p in completions:
78 p.evaluate()
11fdf7f2 79
9f95a23c
TL
80 @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
81 def _load_data(self, inbuf):
82 try:
83 data = json.loads(inbuf)
84 self._init_data(data)
85 return HandleCommandResult()
86 except json.decoder.JSONDecodeError as e:
87 msg = 'Invalid JSON file: {}'.format(e)
88 return HandleCommandResult(retval=-errno.EINVAL, stderr=msg)
89 except orchestrator.OrchestratorValidationError as e:
90 return HandleCommandResult(retval=-errno.EINVAL, stderr=str(e))
11fdf7f2
TL
91
92 def available(self):
93 return True, ""
94
95 def __init__(self, *args, **kwargs):
96 super(TestOrchestrator, self).__init__(*args, **kwargs)
97
98 self._initialized = threading.Event()
99 self._shutdown = threading.Event()
9f95a23c
TL
100 self._init_data({})
101 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
11fdf7f2
TL
102
103 def shutdown(self):
104 self._shutdown.set()
105
106 def serve(self):
107
108 self._initialized.set()
109
110 while not self._shutdown.is_set():
111 # XXX hack (or is it?) to kick all completions periodically,
112 # in case we had a caller that wait()'ed on them long enough
113 # to get persistence but not long enough to get completion
114
9f95a23c
TL
115 self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
116 for p in self.all_progress_references:
117 p.update()
11fdf7f2
TL
118
119 self._shutdown.wait(5)
120
9f95a23c
TL
121 def _init_data(self, data=None):
122 self._inventory = [orchestrator.InventoryHost.from_json(inventory_host)
123 for inventory_host in data.get('inventory', [])]
124 self._services = [orchestrator.ServiceDescription.from_json(service)
125 for service in data.get('services', [])]
126 self._daemons = [orchestrator.DaemonDescription.from_json(daemon)
127 for daemon in data.get('daemons', [])]
128
11fdf7f2 129 @deferred_read
9f95a23c 130 def get_inventory(self, host_filter=None, refresh=False):
11fdf7f2
TL
131 """
132 There is no guarantee which devices are returned by get_inventory.
133 """
9f95a23c
TL
134 if host_filter and host_filter.hosts is not None:
135 assert isinstance(host_filter.hosts, list)
136
137 if self._inventory:
138 if host_filter:
139 return list(filter(lambda host: host.name in host_filter.hosts,
140 self._inventory))
141 return self._inventory
142
11fdf7f2
TL
143 try:
144 c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json'])
145 except OSError:
146 cmd = """
147 . {tmpdir}/ceph-volume-virtualenv/bin/activate
148 ceph-volume inventory --format json
149 """
150 try:
151 c_v_out = check_output(cmd.format(tmpdir=os.environ.get('TMPDIR', '/tmp')), shell=True)
152 except (OSError, CalledProcessError):
153 c_v_out = check_output(cmd.format(tmpdir='.'),shell=True)
154
155 for out in c_v_out.splitlines():
494da23a 156 self.log.error(out)
9f95a23c
TL
157 devs = inventory.Devices.from_json(json.loads(out))
158 return [orchestrator.InventoryHost('localhost', devs)]
11fdf7f2
TL
159 self.log.error('c-v failed: ' + str(c_v_out))
160 raise Exception('c-v failed')
161
9f95a23c
TL
162 def _get_ceph_daemons(self):
163 # type: () -> List[orchestrator.DaemonDescription]
164 """ Return ceph daemons on the running host."""
f6b5b4d7 165 types = ("mds", "osd", "mon", "rgw", "mgr", "nfs", "iscsi")
9f95a23c
TL
166 out = map(str, check_output(['ps', 'aux']).splitlines())
167 processes = [p for p in out if any(
e306af50 168 [('ceph-{} '.format(t) in p) for t in types])]
9f95a23c
TL
169
170 daemons = []
171 for p in processes:
9f95a23c
TL
172 # parse daemon type
173 m = re.search('ceph-([^ ]+)', p)
174 if m:
175 _daemon_type = m.group(1)
176 else:
177 raise AssertionError('Fail to determine daemon type from {}'.format(p))
178
179 # parse daemon ID. Possible options: `-i <id>`, `--id=<id>`, `--id <id>`
1911f103 180 patterns = [r'-i\s(\w+)', r'--id[\s=](\w+)']
9f95a23c
TL
181 for pattern in patterns:
182 m = re.search(pattern, p)
183 if m:
184 daemon_id = m.group(1)
185 break
186 else:
187 raise AssertionError('Fail to determine daemon ID from {}'.format(p))
188 daemon = orchestrator.DaemonDescription(
189 daemon_type=_daemon_type, daemon_id=daemon_id, hostname='localhost')
190 daemons.append(daemon)
191 return daemons
192
11fdf7f2 193 @deferred_read
9f95a23c
TL
194 def describe_service(self, service_type=None, service_name=None, refresh=False):
195 if self._services:
196 # Dummy data
197 services = self._services
9f95a23c 198 if service_type is not None:
1911f103 199 services = list(filter(lambda s: s.spec.service_type == service_type, services))
9f95a23c
TL
200 else:
201 # Deduce services from daemons running on localhost
202 all_daemons = self._get_ceph_daemons()
203 services = []
204 for daemon_type, daemons in itertools.groupby(all_daemons, key=lambda d: d.daemon_type):
205 if service_type is not None and service_type != daemon_type:
206 continue
207 daemon_size = len(list(daemons))
208 services.append(orchestrator.ServiceDescription(
1911f103
TL
209 spec=ServiceSpec(
210 service_type=daemon_type,
211 ),
212 size=daemon_size, running=daemon_size))
9f95a23c
TL
213
214 def _filter_func(svc):
1911f103 215 if service_name is not None and service_name != svc.spec.service_name():
9f95a23c
TL
216 return False
217 return True
218
219 return list(filter(_filter_func, services))
220
221 @deferred_read
801d1391 222 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False):
11fdf7f2
TL
223 """
224 There is no guarantee which daemons are returned by describe_service, except that
225 it returns the mgr we're running in.
226 """
9f95a23c 227 if daemon_type:
f6b5b4d7 228 daemon_types = ("mds", "osd", "mon", "rgw", "mgr", "iscsi", "crash", "nfs")
9f95a23c 229 assert daemon_type in daemon_types, daemon_type + " unsupported"
11fdf7f2 230
9f95a23c 231 daemons = self._daemons if self._daemons else self._get_ceph_daemons()
11fdf7f2 232
9f95a23c 233 def _filter_func(d):
801d1391
TL
234 if service_name is not None and service_name != d.service_name():
235 return False
9f95a23c
TL
236 if daemon_type is not None and daemon_type != d.daemon_type:
237 return False
238 if daemon_id is not None and daemon_id != d.daemon_id:
239 return False
240 if host is not None and host != d.hostname:
241 return False
242 return True
11fdf7f2 243
9f95a23c 244 return list(filter(_filter_func, daemons))
11fdf7f2 245
1911f103
TL
246 def preview_drivegroups(self, drive_group_name=None, dg_specs=None):
247 return [{}]
248
9f95a23c
TL
249 def create_osds(self, drive_group):
250 # type: (DriveGroupSpec) -> TestCompletion
251 """ Creates OSDs from a drive group specification.
11fdf7f2 252
9f95a23c
TL
253 $: ceph orch osd create -i <dg.file>
254
255 The drivegroup file must only contain one spec at a time.
256 """
11fdf7f2 257
9f95a23c
TL
258 def run(all_hosts):
259 # type: (List[orchestrator.HostSpec]) -> None
260 drive_group.validate()
f6b5b4d7
TL
261
262 def get_hosts_func(label=None, as_hostspec=False):
263 if as_hostspec:
264 return all_hosts
265 return [h.hostname for h in all_hosts]
266
267 if not drive_group.placement.filter_matching_hosts(get_hosts_func):
e306af50
TL
268 raise orchestrator.OrchestratorValidationError('failed to match')
269
9f95a23c
TL
270 return self.get_hosts().then(run).then(
271 on_complete=orchestrator.ProgressReference(
272 message='create_osds',
273 mgr=self,
274 )
275 )
276
277 def apply_drivegroups(self, specs):
278 # type: (List[DriveGroupSpec]) -> TestCompletion
279 drive_group = specs[0]
e306af50 280
9f95a23c
TL
281 def run(all_hosts):
282 # type: (List[orchestrator.HostSpec]) -> None
283 drive_group.validate()
f6b5b4d7
TL
284
285 def get_hosts_func(label=None, as_hostspec=False):
286 if as_hostspec:
287 return all_hosts
288 return [h.hostname for h in all_hosts]
289
290 if not drive_group.placement.filter_matching_hosts(get_hosts_func):
e306af50 291 raise orchestrator.OrchestratorValidationError('failed to match')
9f95a23c
TL
292 return self.get_hosts().then(run).then(
293 on_complete=orchestrator.ProgressReference(
294 message='apply_drivesgroups',
295 mgr=self,
296 )
297 )
298
299 @deferred_write("remove_daemons")
300 def remove_daemons(self, names):
301 assert isinstance(names, list)
302
303 @deferred_write("remove_service")
304 def remove_service(self, service_name):
305 assert isinstance(service_name, str)
306
307 @deferred_write("blink_device_light")
308 def blink_device_light(self, ident_fault, on, locations):
309 assert ident_fault in ("ident", "fault")
310 assert len(locations)
311 return ''
11fdf7f2
TL
312
313 @deferred_write("service_action")
9f95a23c 314 def service_action(self, action, service_name):
11fdf7f2
TL
315 pass
316
9f95a23c 317 @deferred_write("daemon_action")
f6b5b4d7 318 def daemon_action(self, action, daemon_name, image=None):
9f95a23c
TL
319 pass
320
321 @deferred_write("Adding NFS service")
322 def add_nfs(self, spec):
323 # type: (NFSServiceSpec) -> None
324 assert isinstance(spec.pool, str)
325
326 @deferred_write("apply_nfs")
327 def apply_nfs(self, spec):
11fdf7f2
TL
328 pass
329
f6b5b4d7
TL
330 @deferred_write("add_iscsi")
331 def add_iscsi(self, spec):
332 # type: (IscsiServiceSpec) -> None
333 pass
334
335 @deferred_write("apply_iscsi")
336 def apply_iscsi(self, spec):
337 # type: (IscsiServiceSpec) -> None
338 pass
339
9f95a23c
TL
340 @deferred_write("add_mds")
341 def add_mds(self, spec):
342 pass
343
344 @deferred_write("add_rgw")
345 def add_rgw(self, spec):
11fdf7f2
TL
346 pass
347
348 @deferred_read
349 def get_hosts(self):
9f95a23c
TL
350 if self._inventory:
351 return [orchestrator.HostSpec(i.name, i.addr, i.labels) for i in self._inventory]
352 return [orchestrator.HostSpec('localhost')]
11fdf7f2
TL
353
354 @deferred_write("add_host")
9f95a23c
TL
355 def add_host(self, spec):
356 # type: (orchestrator.HostSpec) -> None
357 host = spec.hostname
f6b5b4d7 358 if host == 'raise_validation_error':
11fdf7f2 359 raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
f6b5b4d7
TL
360 if host == 'raise_error':
361 raise orchestrator.OrchestratorError("host address is empty")
11fdf7f2
TL
362 if host == 'raise_bug':
363 raise ZeroDivisionError()
364 if host == 'raise_not_implemented':
365 raise NotImplementedError()
366 if host == 'raise_no_orchestrator':
367 raise orchestrator.NoOrchestrator()
368 if host == 'raise_import_error':
369 raise ImportError("test_orchestrator not enabled")
9f95a23c 370 assert isinstance(host, six.string_types)
11fdf7f2
TL
371
372 @deferred_write("remove_host")
373 def remove_host(self, host):
9f95a23c
TL
374 assert isinstance(host, six.string_types)
375
376 @deferred_write("apply_mgr")
377 def apply_mgr(self, spec):
378 # type: (ServiceSpec) -> None
379
380 assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
381 assert all([isinstance(h, str) for h in spec.placement.hosts])
382
383 @deferred_write("apply_mon")
384 def apply_mon(self, spec):
385 # type: (ServiceSpec) -> None
386
387 assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
388 assert all([isinstance(h[0], str) for h in spec.placement.hosts])
389 assert all([isinstance(h[1], str) or h[1] is None for h in spec.placement.hosts])