]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/test_orchestrator/module.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / test_orchestrator / module.py
1 import errno
2 import json
3 import re
4 import os
5 import threading
6 import functools
7 import itertools
8 from subprocess import check_output, CalledProcessError
9
10 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, IscsiServiceSpec
11
12 try:
13 from typing import Callable, List, Sequence, Tuple
14 except ImportError:
15 pass # type checking
16
17 from ceph.deployment import inventory
18 from ceph.deployment.drive_group import DriveGroupSpec
19 from mgr_module import CLICommand, HandleCommandResult
20 from mgr_module import MgrModule
21
22 import orchestrator
23 from orchestrator import handle_orch_error, raise_if_exception
24
25
26 class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
27 """
28 This is an orchestrator implementation used for internal testing. It's meant for
29 development environments and integration testing.
30
31 It does not actually do anything.
32
33 The implementation is similar to the Rook orchestrator, but simpler.
34 """
35
36 @CLICommand('test_orchestrator load_data', perm='w')
37 def _load_data(self, inbuf):
38 """
39 load dummy data into test orchestrator
40 """
41 try:
42 data = json.loads(inbuf)
43 self._init_data(data)
44 return HandleCommandResult()
45 except json.decoder.JSONDecodeError as e:
46 msg = 'Invalid JSON file: {}'.format(e)
47 return HandleCommandResult(retval=-errno.EINVAL, stderr=msg)
48 except orchestrator.OrchestratorValidationError as e:
49 return HandleCommandResult(retval=-errno.EINVAL, stderr=str(e))
50
51 def available(self):
52 return True, "", {}
53
54 def __init__(self, *args, **kwargs):
55 super(TestOrchestrator, self).__init__(*args, **kwargs)
56
57 self._initialized = threading.Event()
58 self._shutdown = threading.Event()
59 self._init_data({})
60
61 def shutdown(self):
62 self._shutdown.set()
63
64 def serve(self):
65
66 self._initialized.set()
67
68 while not self._shutdown.is_set():
69 self._shutdown.wait(5)
70
71 def _init_data(self, data=None):
72 self._inventory = [orchestrator.InventoryHost.from_json(inventory_host)
73 for inventory_host in data.get('inventory', [])]
74 self._services = [orchestrator.ServiceDescription.from_json(service)
75 for service in data.get('services', [])]
76 self._daemons = [orchestrator.DaemonDescription.from_json(daemon)
77 for daemon in data.get('daemons', [])]
78
79 @handle_orch_error
80 def get_inventory(self, host_filter=None, refresh=False):
81 """
82 There is no guarantee which devices are returned by get_inventory.
83 """
84 if host_filter and host_filter.hosts is not None:
85 assert isinstance(host_filter.hosts, list)
86
87 if self._inventory:
88 if host_filter:
89 return list(filter(lambda host: host.name in host_filter.hosts,
90 self._inventory))
91 return self._inventory
92
93 try:
94 c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json'])
95 except OSError:
96 cmd = """
97 . {tmpdir}/ceph-volume-virtualenv/bin/activate
98 ceph-volume inventory --format json
99 """
100 try:
101 c_v_out = check_output(cmd.format(tmpdir=os.environ.get('TMPDIR', '/tmp')), shell=True)
102 except (OSError, CalledProcessError):
103 c_v_out = check_output(cmd.format(tmpdir='.'),shell=True)
104
105 for out in c_v_out.splitlines():
106 self.log.error(out)
107 devs = inventory.Devices.from_json(json.loads(out))
108 return [orchestrator.InventoryHost('localhost', devs)]
109 self.log.error('c-v failed: ' + str(c_v_out))
110 raise Exception('c-v failed')
111
112 def _get_ceph_daemons(self):
113 # type: () -> List[orchestrator.DaemonDescription]
114 """ Return ceph daemons on the running host."""
115 types = ("mds", "osd", "mon", "rgw", "mgr", "nfs", "iscsi")
116 out = map(str, check_output(['ps', 'aux']).splitlines())
117 processes = [p for p in out if any(
118 [('ceph-{} '.format(t) in p) for t in types])]
119
120 daemons = []
121 for p in processes:
122 # parse daemon type
123 m = re.search('ceph-([^ ]+)', p)
124 if m:
125 _daemon_type = m.group(1)
126 else:
127 raise AssertionError('Fail to determine daemon type from {}'.format(p))
128
129 # parse daemon ID. Possible options: `-i <id>`, `--id=<id>`, `--id <id>`
130 patterns = [r'-i\s(\w+)', r'--id[\s=](\w+)']
131 for pattern in patterns:
132 m = re.search(pattern, p)
133 if m:
134 daemon_id = m.group(1)
135 break
136 else:
137 raise AssertionError('Fail to determine daemon ID from {}'.format(p))
138 daemon = orchestrator.DaemonDescription(
139 daemon_type=_daemon_type, daemon_id=daemon_id, hostname='localhost')
140 daemons.append(daemon)
141 return daemons
142
143 @handle_orch_error
144 def describe_service(self, service_type=None, service_name=None, refresh=False):
145 if self._services:
146 # Dummy data
147 services = self._services
148 if service_type is not None:
149 services = list(filter(lambda s: s.spec.service_type == service_type, services))
150 else:
151 # Deduce services from daemons running on localhost
152 all_daemons = self._get_ceph_daemons()
153 services = []
154 for daemon_type, daemons in itertools.groupby(all_daemons, key=lambda d: d.daemon_type):
155 if service_type is not None and service_type != daemon_type:
156 continue
157 daemon_size = len(list(daemons))
158 services.append(orchestrator.ServiceDescription(
159 spec=ServiceSpec(
160 service_type=daemon_type, # type: ignore
161 ),
162 size=daemon_size, running=daemon_size))
163
164 def _filter_func(svc):
165 if service_name is not None and service_name != svc.spec.service_name():
166 return False
167 return True
168
169 return list(filter(_filter_func, services))
170
171 @handle_orch_error
172 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False):
173 """
174 There is no guarantee which daemons are returned by describe_service, except that
175 it returns the mgr we're running in.
176 """
177 if daemon_type:
178 daemon_types = ("mds", "osd", "mon", "rgw", "mgr", "iscsi", "crash", "nfs")
179 assert daemon_type in daemon_types, daemon_type + " unsupported"
180
181 daemons = self._daemons if self._daemons else self._get_ceph_daemons()
182
183 def _filter_func(d):
184 if service_name is not None and service_name != d.service_name():
185 return False
186 if daemon_type is not None and daemon_type != d.daemon_type:
187 return False
188 if daemon_id is not None and daemon_id != d.daemon_id:
189 return False
190 if host is not None and host != d.hostname:
191 return False
192 return True
193
194 return list(filter(_filter_func, daemons))
195
196 def preview_drivegroups(self, drive_group_name=None, dg_specs=None):
197 return [{}]
198
199 @handle_orch_error
200 def create_osds(self, drive_group):
201 # type: (DriveGroupSpec) -> str
202 """ Creates OSDs from a drive group specification.
203
204 $: ceph orch osd create -i <dg.file>
205
206 The drivegroup file must only contain one spec at a time.
207 """
208 return self._create_osds(drive_group)
209
210 def _create_osds(self, drive_group):
211 # type: (DriveGroupSpec) -> str
212
213 drive_group.validate()
214 all_hosts = raise_if_exception(self.get_hosts())
215 if not drive_group.placement.filter_matching_hostspecs(all_hosts):
216 raise orchestrator.OrchestratorValidationError('failed to match')
217 return ''
218
219 @handle_orch_error
220 def apply_drivegroups(self, specs):
221 # type: (List[DriveGroupSpec]) -> List[str]
222 return [self._create_osds(dg) for dg in specs]
223
224 @handle_orch_error
225 def remove_daemons(self, names):
226 assert isinstance(names, list)
227 return 'done'
228
229 @handle_orch_error
230 def remove_service(self, service_name):
231 assert isinstance(service_name, str)
232 return 'done'
233
234 @handle_orch_error
235 def blink_device_light(self, ident_fault, on, locations):
236 assert ident_fault in ("ident", "fault")
237 assert len(locations)
238 return ''
239
240 @handle_orch_error
241 def service_action(self, action, service_name):
242 return 'done'
243
244 @handle_orch_error
245 def daemon_action(self, action, daemon_name, image=None):
246 return 'done'
247
248 @handle_orch_error
249 def add_daemon(self, spec: ServiceSpec):
250 return [spec.one_line_str()]
251
252 @handle_orch_error
253 def apply_nfs(self, spec):
254 return spec.one_line_str()
255
256 @handle_orch_error
257 def apply_iscsi(self, spec):
258 # type: (IscsiServiceSpec) -> str
259 return spec.one_line_str()
260
261 @handle_orch_error
262 def get_hosts(self):
263 if self._inventory:
264 return [orchestrator.HostSpec(i.name, i.addr, i.labels) for i in self._inventory]
265 return [orchestrator.HostSpec('localhost')]
266
267 @handle_orch_error
268 def add_host(self, spec):
269 # type: (orchestrator.HostSpec) -> str
270 host = spec.hostname
271 if host == 'raise_validation_error':
272 raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
273 if host == 'raise_error':
274 raise orchestrator.OrchestratorError("host address is empty")
275 if host == 'raise_bug':
276 raise ZeroDivisionError()
277 if host == 'raise_not_implemented':
278 raise NotImplementedError()
279 if host == 'raise_no_orchestrator':
280 raise orchestrator.NoOrchestrator()
281 if host == 'raise_import_error':
282 raise ImportError("test_orchestrator not enabled")
283 assert isinstance(host, str)
284 return ''
285
286 @handle_orch_error
287 def remove_host(self, host):
288 assert isinstance(host, str)
289 return 'done'
290
291 @handle_orch_error
292 def apply_mgr(self, spec):
293 # type: (ServiceSpec) -> str
294
295 assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
296 assert all([isinstance(h, str) for h in spec.placement.hosts])
297 return spec.one_line_str()
298
299 @handle_orch_error
300 def apply_mon(self, spec):
301 # type: (ServiceSpec) -> str
302
303 assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
304 assert all([isinstance(h[0], str) for h in spec.placement.hosts])
305 assert all([isinstance(h[1], str) or h[1] is None for h in spec.placement.hosts])
306 return spec.one_line_str()