]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/test/crossrunner/run.py
bb06d25efc57d6d3c022def1b18a9ffa77ec48e1
[ceph.git] / ceph / src / jaegertracing / thrift / test / crossrunner / run.py
1 #
2 # Licensed to the Apache Software Foundation (ASF) under one
3 # or more contributor license agreements. See the NOTICE file
4 # distributed with this work for additional information
5 # regarding copyright ownership. The ASF licenses this file
6 # to you under the Apache License, Version 2.0 (the
7 # "License"); you may not use this file except in compliance
8 # with the License. You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing,
13 # software distributed under the License is distributed on an
14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 # KIND, either express or implied. See the License for the
16 # specific language governing permissions and limitations
17 # under the License.
18 #
19
20 import contextlib
21 import multiprocessing
22 import multiprocessing.managers
23 import os
24 import platform
25 import random
26 import socket
27 import subprocess
28 import sys
29 import time
30
31 from .compat import str_join
32 from .report import ExecReporter, SummaryReporter
33 from .test import TestEntry
34 from .util import domain_socket_path
35
36 RESULT_ERROR = 64
37 RESULT_TIMEOUT = 128
38 SIGNONE = 0
39 SIGKILL = 15
40
41 # globals
42 ports = None
43 stop = None
44
45
46 class ExecutionContext(object):
47 def __init__(self, cmd, cwd, env, stop_signal, is_server, report):
48 self._log = multiprocessing.get_logger()
49 self.cmd = cmd
50 self.cwd = cwd
51 self.env = env
52 self.stop_signal = stop_signal
53 self.is_server = is_server
54 self.report = report
55 self.expired = False
56 self.killed = False
57 self.proc = None
58
59 def _popen_args(self):
60 args = {
61 'cwd': self.cwd,
62 'env': self.env,
63 'stdout': self.report.out,
64 'stderr': subprocess.STDOUT,
65 }
66 # make sure child processes doesn't remain after killing
67 if platform.system() == 'Windows':
68 DETACHED_PROCESS = 0x00000008
69 args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP)
70 else:
71 args.update(preexec_fn=os.setsid)
72 return args
73
74 def start(self):
75 joined = str_join(' ', self.cmd)
76 self._log.debug('COMMAND: %s', joined)
77 self._log.debug('WORKDIR: %s', self.cwd)
78 self._log.debug('LOGFILE: %s', self.report.logpath)
79 self.report.begin()
80 self.proc = subprocess.Popen(self.cmd, **self._popen_args())
81 self._log.debug(' PID: %d', self.proc.pid)
82 self._log.debug(' PGID: %d', os.getpgid(self.proc.pid))
83 return self._scoped()
84
85 @contextlib.contextmanager
86 def _scoped(self):
87 yield self
88 if self.is_server:
89 # the server is supposed to run until we stop it
90 if self.returncode is not None:
91 self.report.died()
92 else:
93 if self.stop_signal != SIGNONE:
94 if self.sigwait(self.stop_signal):
95 self.report.end(self.returncode)
96 else:
97 self.report.killed()
98 else:
99 self.sigwait(SIGKILL)
100 else:
101 # the client is supposed to exit normally
102 if self.returncode is not None:
103 self.report.end(self.returncode)
104 else:
105 self.sigwait(SIGKILL)
106 self.report.killed()
107 self._log.debug('[{0}] exited with return code {1}'.format(self.proc.pid, self.returncode))
108
109 # Send a signal to the process and then wait for it to end
110 # If the signal requested is SIGNONE, no signal is sent, and
111 # instead we just wait for the process to end; further if it
112 # does not end normally with SIGNONE, we mark it as expired.
113 # If the process fails to end and the signal is not SIGKILL,
114 # it re-runs with SIGKILL so that a real process kill occurs
115 # returns True if the process ended, False if it may not have
116 def sigwait(self, sig=SIGKILL, timeout=2):
117 try:
118 if sig != SIGNONE:
119 self._log.debug('[{0}] send signal {1}'.format(self.proc.pid, sig))
120 if sig == SIGKILL:
121 self.killed = True
122 try:
123 if platform.system() != 'Windows':
124 os.killpg(os.getpgid(self.proc.pid), sig)
125 else:
126 self.proc.send_signal(sig)
127 except Exception:
128 self._log.info('[{0}] Failed to kill process'.format(self.proc.pid), exc_info=sys.exc_info())
129 self._log.debug('[{0}] wait begin, timeout {1} sec(s)'.format(self.proc.pid, timeout))
130 self.proc.communicate(timeout=timeout)
131 self._log.debug('[{0}] process ended with return code {1}'.format(self.proc.pid, self.returncode))
132 self.report.end(self.returncode)
133 return True
134 except subprocess.TimeoutExpired:
135 self._log.info('[{0}] timeout waiting for process to end'.format(self.proc.pid))
136 if sig == SIGNONE:
137 self.expired = True
138 return False if sig == SIGKILL else self.sigwait(SIGKILL, 1)
139
140 # called on the client process to wait for it to end naturally
141 def wait(self, timeout):
142 self.sigwait(SIGNONE, timeout)
143
144 @property
145 def returncode(self):
146 return self.proc.returncode if self.proc else None
147
148
149 def exec_context(port, logdir, test, prog, is_server):
150 report = ExecReporter(logdir, test, prog)
151 prog.build_command(port)
152 return ExecutionContext(prog.command, prog.workdir, prog.env, prog.stop_signal, is_server, report)
153
154
155 def run_test(testdir, logdir, test_dict, max_retry, async_mode=True):
156 logger = multiprocessing.get_logger()
157
158 def ensure_socket_open(sv, port, test):
159 slept = 0.1
160 time.sleep(slept)
161 sleep_step = 0.1
162 while True:
163 if slept > test.delay:
164 logger.warn('[{0}] slept for {1} seconds but server is not open'.format(sv.proc.pid, slept))
165 return False
166 if test.socket == 'domain':
167 if not os.path.exists(domain_socket_path(port)):
168 logger.debug('[{0}] domain(unix) socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
169 time.sleep(sleep_step)
170 slept += sleep_step
171 elif test.socket == 'abstract':
172 return True
173 else:
174 # Create sockets every iteration because refused sockets cannot be
175 # reused on some systems.
176 sock4 = socket.socket()
177 sock6 = socket.socket(family=socket.AF_INET6)
178 try:
179 if sock4.connect_ex(('127.0.0.1', port)) == 0 \
180 or sock6.connect_ex(('::1', port)) == 0:
181 return True
182 if sv.proc.poll() is not None:
183 logger.warn('[{0}] server process is exited'.format(sv.proc.pid))
184 return False
185 logger.debug('[{0}] socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
186 time.sleep(sleep_step)
187 slept += sleep_step
188 finally:
189 sock4.close()
190 sock6.close()
191 logger.debug('[{0}] server ready - waited for {1} seconds'.format(sv.proc.pid, slept))
192 return True
193
194 try:
195 max_bind_retry = 3
196 retry_count = 0
197 bind_retry_count = 0
198 test = TestEntry(testdir, **test_dict)
199 while True:
200 if stop.is_set():
201 logger.debug('Skipping because shutting down')
202 return (retry_count, None)
203 logger.debug('Start')
204 with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
205 logger.debug('Start with port %d' % port)
206 sv = exec_context(port, logdir, test, test.server, True)
207 cl = exec_context(port, logdir, test, test.client, False)
208
209 logger.debug('Starting server')
210 with sv.start():
211 port_ok = ensure_socket_open(sv, port, test)
212 if port_ok:
213 connect_retry_count = 0
214 max_connect_retry = 12
215 connect_retry_wait = 0.25
216 while True:
217 if sv.proc.poll() is not None:
218 logger.info('not starting client because server process is absent')
219 break
220 logger.debug('Starting client')
221 cl.start()
222 logger.debug('Waiting client (up to %d secs)' % test.timeout)
223 cl.wait(test.timeout)
224 if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry:
225 if connect_retry_count > 0 and connect_retry_count < max_connect_retry:
226 logger.info('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
227 # Wait for 50ms to see if server does not die at the end.
228 time.sleep(0.05)
229 break
230 logger.debug('Server may not be ready, waiting %.2f second...' % connect_retry_wait)
231 time.sleep(connect_retry_wait)
232 connect_retry_count += 1
233
234 if sv.report.maybe_false_positive() and bind_retry_count < max_bind_retry:
235 logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name)
236 bind_retry_count += 1
237 else:
238 result = RESULT_TIMEOUT if cl.expired else cl.returncode if (cl.proc and cl.proc.poll()) is not None else RESULT_ERROR
239
240 # For servers that handle a controlled shutdown by signal
241 # if they are killed, or return an error code, that is a
242 # problem. For servers that are not signal-aware, we simply
243 # kill them off; if we didn't kill them off, something else
244 # happened (crashed?)
245 if test.server.stop_signal != 0:
246 if sv.killed or sv.returncode > 0:
247 result |= RESULT_ERROR
248 else:
249 if not sv.killed:
250 result |= RESULT_ERROR
251
252 if result == 0 or retry_count >= max_retry:
253 return (retry_count, result)
254 else:
255 logger.info('[%s-%s]: test failed, retrying...', test.server.name, test.client.name)
256 retry_count += 1
257 except Exception:
258 if not async_mode:
259 raise
260 logger.warn('Error executing [%s]', test.name, exc_info=True)
261 return (retry_count, RESULT_ERROR)
262 except Exception:
263 logger.info('Interrupted execution', exc_info=True)
264 if not async_mode:
265 raise
266 stop.set()
267 return (retry_count, RESULT_ERROR)
268
269
270 class PortAllocator(object):
271 def __init__(self):
272 self._log = multiprocessing.get_logger()
273 self._lock = multiprocessing.Lock()
274 self._ports = set()
275 self._dom_ports = set()
276 self._last_alloc = 0
277
278 def _get_tcp_port(self):
279 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
280 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
281 sock.bind(('', 0))
282 port = sock.getsockname()[1]
283 self._lock.acquire()
284 try:
285 ok = port not in self._ports
286 if ok:
287 self._ports.add(port)
288 self._last_alloc = time.time()
289 finally:
290 self._lock.release()
291 sock.close()
292 return port if ok else self._get_tcp_port()
293
294 def _get_domain_port(self):
295 port = random.randint(1024, 65536)
296 self._lock.acquire()
297 try:
298 ok = port not in self._dom_ports
299 if ok:
300 self._dom_ports.add(port)
301 finally:
302 self._lock.release()
303 return port if ok else self._get_domain_port()
304
305 def alloc_port(self, socket_type):
306 if socket_type in ('domain', 'abstract'):
307 return self._get_domain_port()
308 else:
309 return self._get_tcp_port()
310
311 # static method for inter-process invokation
312 @staticmethod
313 @contextlib.contextmanager
314 def alloc_port_scoped(allocator, socket_type):
315 port = allocator.alloc_port(socket_type)
316 yield port
317 allocator.free_port(socket_type, port)
318
319 def free_port(self, socket_type, port):
320 self._log.debug('free_port')
321 self._lock.acquire()
322 try:
323 if socket_type == 'domain':
324 self._dom_ports.remove(port)
325 path = domain_socket_path(port)
326 if os.path.exists(path):
327 os.remove(path)
328 elif socket_type == 'abstract':
329 self._dom_ports.remove(port)
330 else:
331 self._ports.remove(port)
332 except IOError:
333 self._log.info('Error while freeing port', exc_info=sys.exc_info())
334 finally:
335 self._lock.release()
336
337
338 class NonAsyncResult(object):
339 def __init__(self, value):
340 self._value = value
341
342 def get(self, timeout=None):
343 return self._value
344
345 def wait(self, timeout=None):
346 pass
347
348 def ready(self):
349 return True
350
351 def successful(self):
352 return self._value == 0
353
354
355 class TestDispatcher(object):
356 def __init__(self, testdir, basedir, logdir_rel, concurrency):
357 self._log = multiprocessing.get_logger()
358 self.testdir = testdir
359 self._report = SummaryReporter(basedir, logdir_rel, concurrency > 1)
360 self.logdir = self._report.testdir
361 # seems needed for python 2.x to handle keyboard interrupt
362 self._stop = multiprocessing.Event()
363 self._async = concurrency > 1
364 if not self._async:
365 self._pool = None
366 global stop
367 global ports
368 stop = self._stop
369 ports = PortAllocator()
370 else:
371 self._m = multiprocessing.managers.BaseManager()
372 self._m.register('ports', PortAllocator)
373 self._m.start()
374 self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,))
375 self._log.debug(
376 'TestDispatcher started with %d concurrent jobs' % concurrency)
377
378 def _pool_init(self, address):
379 global stop
380 global m
381 global ports
382 stop = self._stop
383 m = multiprocessing.managers.BaseManager(address)
384 m.connect()
385 ports = m.ports()
386
387 def _dispatch_sync(self, test, cont, max_retry):
388 r = run_test(self.testdir, self.logdir, test, max_retry, async_mode=False)
389 cont(r)
390 return NonAsyncResult(r)
391
392 def _dispatch_async(self, test, cont, max_retry):
393 self._log.debug('_dispatch_async')
394 return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test, max_retry), callback=cont)
395
396 def dispatch(self, test, max_retry):
397 index = self._report.add_test(test)
398
399 def cont(result):
400 if not self._stop.is_set():
401 if result and len(result) == 2:
402 retry_count, returncode = result
403 else:
404 retry_count = 0
405 returncode = RESULT_ERROR
406 self._log.debug('freeing port')
407 self._log.debug('adding result')
408 self._report.add_result(index, returncode, returncode == RESULT_TIMEOUT, retry_count)
409 self._log.debug('finish continuation')
410 fn = self._dispatch_async if self._async else self._dispatch_sync
411 return fn(test, cont, max_retry)
412
413 def wait(self):
414 if self._async:
415 self._pool.close()
416 self._pool.join()
417 self._m.shutdown()
418 return self._report.end()
419
420 def terminate(self):
421 self._stop.set()
422 if self._async:
423 self._pool.terminate()
424 self._pool.join()
425 self._m.shutdown()