]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/test_orchestrator/module.py
bump version to 15.2.1-pve1
[ceph.git] / ceph / src / pybind / mgr / test_orchestrator / module.py
CommitLineData
9f95a23c 1import errno
11fdf7f2
TL
2import json
3import re
4import os
5import threading
6import functools
9f95a23c 7import itertools
11fdf7f2
TL
8from subprocess import check_output, CalledProcessError
9
9f95a23c 10from ceph.deployment.service_spec import NFSServiceSpec, ServiceSpec
11fdf7f2 11
9f95a23c
TL
12try:
13 from typing import Callable, List, Sequence, Tuple
14except ImportError:
15 pass # type checking
11fdf7f2 16
9f95a23c 17import six
11fdf7f2 18
9f95a23c
TL
19from ceph.deployment import inventory
20from ceph.deployment.drive_group import DriveGroupSpec
21from mgr_module import CLICommand, HandleCommandResult
22from mgr_module import MgrModule
11fdf7f2 23
9f95a23c 24import orchestrator
11fdf7f2 25
11fdf7f2 26
9f95a23c
TL
27class TestCompletion(orchestrator.Completion):
28 def evaluate(self):
29 self.finalize(None)
11fdf7f2
TL
30
31
32def deferred_read(f):
9f95a23c 33 # type: (Callable) -> Callable[..., TestCompletion]
11fdf7f2 34 """
9f95a23c 35 Decorator to make methods return
11fdf7f2
TL
36 a completion object that executes themselves.
37 """
38
39 @functools.wraps(f)
40 def wrapper(*args, **kwargs):
9f95a23c 41 return TestCompletion(on_complete=lambda _: f(*args, **kwargs))
11fdf7f2
TL
42
43 return wrapper
44
45
9f95a23c
TL
46def deferred_write(message):
47 def inner(f):
48 # type: (Callable) -> Callable[..., TestCompletion]
49
50 @functools.wraps(f)
51 def wrapper(self, *args, **kwargs):
52 return TestCompletion.with_progress(
53 message=message,
54 mgr=self,
55 on_complete=lambda _: f(self, *args, **kwargs),
56 )
57
58 return wrapper
59 return inner
60
61
11fdf7f2
TL
62class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
63 """
64 This is an orchestrator implementation used for internal testing. It's meant for
65 development environments and integration testing.
66
67 It does not actually do anything.
68
69 The implementation is similar to the Rook orchestrator, but simpler.
70 """
11fdf7f2 71
9f95a23c
TL
72 def process(self, completions):
73 # type: (List[TestCompletion]) -> None
74 if completions:
75 self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
11fdf7f2 76
9f95a23c
TL
77 for p in completions:
78 p.evaluate()
11fdf7f2 79
9f95a23c
TL
80 @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
81 def _load_data(self, inbuf):
82 try:
83 data = json.loads(inbuf)
84 self._init_data(data)
85 return HandleCommandResult()
86 except json.decoder.JSONDecodeError as e:
87 msg = 'Invalid JSON file: {}'.format(e)
88 return HandleCommandResult(retval=-errno.EINVAL, stderr=msg)
89 except orchestrator.OrchestratorValidationError as e:
90 return HandleCommandResult(retval=-errno.EINVAL, stderr=str(e))
11fdf7f2
TL
91
92 def available(self):
93 return True, ""
94
95 def __init__(self, *args, **kwargs):
96 super(TestOrchestrator, self).__init__(*args, **kwargs)
97
98 self._initialized = threading.Event()
99 self._shutdown = threading.Event()
9f95a23c
TL
100 self._init_data({})
101 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
11fdf7f2
TL
102
103 def shutdown(self):
104 self._shutdown.set()
105
106 def serve(self):
107
108 self._initialized.set()
109
110 while not self._shutdown.is_set():
111 # XXX hack (or is it?) to kick all completions periodically,
112 # in case we had a caller that wait()'ed on them long enough
113 # to get persistence but not long enough to get completion
114
9f95a23c
TL
115 self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
116 for p in self.all_progress_references:
117 p.update()
11fdf7f2
TL
118
119 self._shutdown.wait(5)
120
9f95a23c
TL
121 def _init_data(self, data=None):
122 self._inventory = [orchestrator.InventoryHost.from_json(inventory_host)
123 for inventory_host in data.get('inventory', [])]
124 self._services = [orchestrator.ServiceDescription.from_json(service)
125 for service in data.get('services', [])]
126 self._daemons = [orchestrator.DaemonDescription.from_json(daemon)
127 for daemon in data.get('daemons', [])]
128
11fdf7f2 129 @deferred_read
9f95a23c 130 def get_inventory(self, host_filter=None, refresh=False):
11fdf7f2
TL
131 """
132 There is no guarantee which devices are returned by get_inventory.
133 """
9f95a23c
TL
134 if host_filter and host_filter.hosts is not None:
135 assert isinstance(host_filter.hosts, list)
136
137 if self._inventory:
138 if host_filter:
139 return list(filter(lambda host: host.name in host_filter.hosts,
140 self._inventory))
141 return self._inventory
142
11fdf7f2
TL
143 try:
144 c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json'])
145 except OSError:
146 cmd = """
147 . {tmpdir}/ceph-volume-virtualenv/bin/activate
148 ceph-volume inventory --format json
149 """
150 try:
151 c_v_out = check_output(cmd.format(tmpdir=os.environ.get('TMPDIR', '/tmp')), shell=True)
152 except (OSError, CalledProcessError):
153 c_v_out = check_output(cmd.format(tmpdir='.'),shell=True)
154
155 for out in c_v_out.splitlines():
494da23a 156 self.log.error(out)
9f95a23c
TL
157 devs = inventory.Devices.from_json(json.loads(out))
158 return [orchestrator.InventoryHost('localhost', devs)]
11fdf7f2
TL
159 self.log.error('c-v failed: ' + str(c_v_out))
160 raise Exception('c-v failed')
161
9f95a23c
TL
162 def _get_ceph_daemons(self):
163 # type: () -> List[orchestrator.DaemonDescription]
164 """ Return ceph daemons on the running host."""
165 types = ("mds", "osd", "mon", "rgw", "mgr")
166 out = map(str, check_output(['ps', 'aux']).splitlines())
167 processes = [p for p in out if any(
168 [('ceph-' + t in p) for t in types])]
169
170 daemons = []
171 for p in processes:
172 daemon = orchestrator.DaemonDescription()
173 # parse daemon type
174 m = re.search('ceph-([^ ]+)', p)
175 if m:
176 _daemon_type = m.group(1)
177 else:
178 raise AssertionError('Fail to determine daemon type from {}'.format(p))
179
180 # parse daemon ID. Possible options: `-i <id>`, `--id=<id>`, `--id <id>`
181 patterns = ['-i\s(\w+)', '--id[\s=](\w+)']
182 daemon_id = None
183 for pattern in patterns:
184 m = re.search(pattern, p)
185 if m:
186 daemon_id = m.group(1)
187 break
188 else:
189 raise AssertionError('Fail to determine daemon ID from {}'.format(p))
190 daemon = orchestrator.DaemonDescription(
191 daemon_type=_daemon_type, daemon_id=daemon_id, hostname='localhost')
192 daemons.append(daemon)
193 return daemons
194
11fdf7f2 195 @deferred_read
9f95a23c
TL
196 def describe_service(self, service_type=None, service_name=None, refresh=False):
197 if self._services:
198 # Dummy data
199 services = self._services
200 # Can't deduce service type from dummy data (no daemons).
201 # Assume service_type is service_name.
202 if service_type is not None:
203 services = list(filter(lambda s: s.service_name == service_type, services))
204 else:
205 # Deduce services from daemons running on localhost
206 all_daemons = self._get_ceph_daemons()
207 services = []
208 for daemon_type, daemons in itertools.groupby(all_daemons, key=lambda d: d.daemon_type):
209 if service_type is not None and service_type != daemon_type:
210 continue
211 daemon_size = len(list(daemons))
212 services.append(orchestrator.ServiceDescription(
213 service_name=daemon_type, size=daemon_size, running=daemon_size))
214
215 def _filter_func(svc):
216 if service_name is not None and service_name != svc.service_name:
217 return False
218 return True
219
220 return list(filter(_filter_func, services))
221
222 @deferred_read
801d1391 223 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False):
11fdf7f2
TL
224 """
225 There is no guarantee which daemons are returned by describe_service, except that
226 it returns the mgr we're running in.
227 """
9f95a23c
TL
228 if daemon_type:
229 daemon_types = ("mds", "osd", "mon", "rgw", "mgr", "iscsi", "crash")
230 assert daemon_type in daemon_types, daemon_type + " unsupported"
11fdf7f2 231
9f95a23c 232 daemons = self._daemons if self._daemons else self._get_ceph_daemons()
11fdf7f2 233
9f95a23c 234 def _filter_func(d):
801d1391
TL
235 if service_name is not None and service_name != d.service_name():
236 return False
9f95a23c
TL
237 if daemon_type is not None and daemon_type != d.daemon_type:
238 return False
239 if daemon_id is not None and daemon_id != d.daemon_id:
240 return False
241 if host is not None and host != d.hostname:
242 return False
243 return True
11fdf7f2 244
9f95a23c 245 return list(filter(_filter_func, daemons))
11fdf7f2 246
9f95a23c
TL
247 def create_osds(self, drive_group):
248 # type: (DriveGroupSpec) -> TestCompletion
249 """ Creates OSDs from a drive group specification.
11fdf7f2 250
9f95a23c
TL
251 $: ceph orch osd create -i <dg.file>
252
253 The drivegroup file must only contain one spec at a time.
254 """
11fdf7f2 255
9f95a23c
TL
256 def run(all_hosts):
257 # type: (List[orchestrator.HostSpec]) -> None
258 drive_group.validate()
259 if drive_group.placement.host_pattern:
260 if not drive_group.placement.pattern_matches_hosts([h.hostname for h in all_hosts]):
261 raise orchestrator.OrchestratorValidationError('failed to match')
262 return self.get_hosts().then(run).then(
263 on_complete=orchestrator.ProgressReference(
264 message='create_osds',
265 mgr=self,
266 )
267 )
268
269 def apply_drivegroups(self, specs):
270 # type: (List[DriveGroupSpec]) -> TestCompletion
271 drive_group = specs[0]
272 def run(all_hosts):
273 # type: (List[orchestrator.HostSpec]) -> None
274 drive_group.validate()
275 if drive_group.placement.host_pattern:
276 if not drive_group.placement.pattern_matches_hosts([h.hostname for h in all_hosts]):
277 raise orchestrator.OrchestratorValidationError('failed to match')
278 return self.get_hosts().then(run).then(
279 on_complete=orchestrator.ProgressReference(
280 message='apply_drivesgroups',
281 mgr=self,
282 )
283 )
284
285 @deferred_write("remove_daemons")
286 def remove_daemons(self, names):
287 assert isinstance(names, list)
288
289 @deferred_write("remove_service")
290 def remove_service(self, service_name):
291 assert isinstance(service_name, str)
292
293 @deferred_write("blink_device_light")
294 def blink_device_light(self, ident_fault, on, locations):
295 assert ident_fault in ("ident", "fault")
296 assert len(locations)
297 return ''
11fdf7f2
TL
298
299 @deferred_write("service_action")
9f95a23c 300 def service_action(self, action, service_name):
11fdf7f2
TL
301 pass
302
9f95a23c
TL
303 @deferred_write("daemon_action")
304 def daemon_action(self, action, daemon_type, daemon_id):
305 pass
306
307 @deferred_write("Adding NFS service")
308 def add_nfs(self, spec):
309 # type: (NFSServiceSpec) -> None
310 assert isinstance(spec.pool, str)
311
312 @deferred_write("apply_nfs")
313 def apply_nfs(self, spec):
11fdf7f2
TL
314 pass
315
9f95a23c
TL
316 @deferred_write("add_mds")
317 def add_mds(self, spec):
318 pass
319
320 @deferred_write("add_rgw")
321 def add_rgw(self, spec):
11fdf7f2
TL
322 pass
323
324 @deferred_read
325 def get_hosts(self):
9f95a23c
TL
326 if self._inventory:
327 return [orchestrator.HostSpec(i.name, i.addr, i.labels) for i in self._inventory]
328 return [orchestrator.HostSpec('localhost')]
11fdf7f2
TL
329
330 @deferred_write("add_host")
9f95a23c
TL
331 def add_host(self, spec):
332 # type: (orchestrator.HostSpec) -> None
333 host = spec.hostname
11fdf7f2
TL
334 if host == 'raise_no_support':
335 raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
336 if host == 'raise_bug':
337 raise ZeroDivisionError()
338 if host == 'raise_not_implemented':
339 raise NotImplementedError()
340 if host == 'raise_no_orchestrator':
341 raise orchestrator.NoOrchestrator()
342 if host == 'raise_import_error':
343 raise ImportError("test_orchestrator not enabled")
9f95a23c 344 assert isinstance(host, six.string_types)
11fdf7f2
TL
345
346 @deferred_write("remove_host")
347 def remove_host(self, host):
9f95a23c
TL
348 assert isinstance(host, six.string_types)
349
350 @deferred_write("apply_mgr")
351 def apply_mgr(self, spec):
352 # type: (ServiceSpec) -> None
353
354 assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
355 assert all([isinstance(h, str) for h in spec.placement.hosts])
356
357 @deferred_write("apply_mon")
358 def apply_mon(self, spec):
359 # type: (ServiceSpec) -> None
360
361 assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
362 assert all([isinstance(h[0], str) for h in spec.placement.hosts])
363 assert all([isinstance(h[1], str) or h[1] is None for h in spec.placement.hosts])