2 # Licensed to the Apache Software Foundation (ASF) under one
3 # or more contributor license agreements. See the NOTICE file
4 # distributed with this work for additional information
5 # regarding copyright ownership. The ASF licenses this file
6 # to you under the Apache License, Version 2.0 (the
7 # "License"); you may not use this file except in compliance
8 # with the License. You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing,
13 # software distributed under the License is distributed on an
14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 # KIND, either express or implied. See the License for the
16 # specific language governing permissions and limitations
21 import multiprocessing
22 import multiprocessing
.managers
31 from .compat
import str_join
32 from .report
import ExecReporter
, SummaryReporter
33 from .test
import TestEntry
34 from .util
import domain_socket_path
46 class ExecutionContext(object):
47 def __init__(self
, cmd
, cwd
, env
, stop_signal
, is_server
, report
):
48 self
._log
= multiprocessing
.get_logger()
52 self
.stop_signal
= stop_signal
53 self
.is_server
= is_server
59 def _popen_args(self
):
63 'stdout': self
.report
.out
,
64 'stderr': subprocess
.STDOUT
,
66 # make sure child processes doesn't remain after killing
67 if platform
.system() == 'Windows':
68 DETACHED_PROCESS
= 0x00000008
69 args
.update(creationflags
=DETACHED_PROCESS | subprocess
.CREATE_NEW_PROCESS_GROUP
)
71 args
.update(preexec_fn
=os
.setsid
)
75 joined
= str_join(' ', self
.cmd
)
76 self
._log
.debug('COMMAND: %s', joined
)
77 self
._log
.debug('WORKDIR: %s', self
.cwd
)
78 self
._log
.debug('LOGFILE: %s', self
.report
.logpath
)
80 self
.proc
= subprocess
.Popen(self
.cmd
, **self
._popen
_args
())
81 self
._log
.debug(' PID: %d', self
.proc
.pid
)
82 self
._log
.debug(' PGID: %d', os
.getpgid(self
.proc
.pid
))
85 @contextlib.contextmanager
89 # the server is supposed to run until we stop it
90 if self
.returncode
is not None:
93 if self
.stop_signal
!= SIGNONE
:
94 if self
.sigwait(self
.stop_signal
):
95 self
.report
.end(self
.returncode
)
101 # the client is supposed to exit normally
102 if self
.returncode
is not None:
103 self
.report
.end(self
.returncode
)
105 self
.sigwait(SIGKILL
)
107 self
._log
.debug('[{0}] exited with return code {1}'.format(self
.proc
.pid
, self
.returncode
))
109 # Send a signal to the process and then wait for it to end
110 # If the signal requested is SIGNONE, no signal is sent, and
111 # instead we just wait for the process to end; further if it
112 # does not end normally with SIGNONE, we mark it as expired.
113 # If the process fails to end and the signal is not SIGKILL,
114 # it re-runs with SIGKILL so that a real process kill occurs
115 # returns True if the process ended, False if it may not have
116 def sigwait(self
, sig
=SIGKILL
, timeout
=2):
119 self
._log
.debug('[{0}] send signal {1}'.format(self
.proc
.pid
, sig
))
123 if platform
.system() != 'Windows':
124 os
.killpg(os
.getpgid(self
.proc
.pid
), sig
)
126 self
.proc
.send_signal(sig
)
128 self
._log
.info('[{0}] Failed to kill process'.format(self
.proc
.pid
), exc_info
=sys
.exc_info())
129 self
._log
.debug('[{0}] wait begin, timeout {1} sec(s)'.format(self
.proc
.pid
, timeout
))
130 self
.proc
.communicate(timeout
=timeout
)
131 self
._log
.debug('[{0}] process ended with return code {1}'.format(self
.proc
.pid
, self
.returncode
))
132 self
.report
.end(self
.returncode
)
134 except subprocess
.TimeoutExpired
:
135 self
._log
.info('[{0}] timeout waiting for process to end'.format(self
.proc
.pid
))
138 return False if sig
== SIGKILL
else self
.sigwait(SIGKILL
, 1)
140 # called on the client process to wait for it to end naturally
141 def wait(self
, timeout
):
142 self
.sigwait(SIGNONE
, timeout
)
145 def returncode(self
):
146 return self
.proc
.returncode
if self
.proc
else None
149 def exec_context(port
, logdir
, test
, prog
, is_server
):
150 report
= ExecReporter(logdir
, test
, prog
)
151 prog
.build_command(port
)
152 return ExecutionContext(prog
.command
, prog
.workdir
, prog
.env
, prog
.stop_signal
, is_server
, report
)
155 def run_test(testdir
, logdir
, test_dict
, max_retry
, async_mode
=True):
156 logger
= multiprocessing
.get_logger()
158 def ensure_socket_open(sv
, port
, test
):
163 if slept
> test
.delay
:
164 logger
.warn('[{0}] slept for {1} seconds but server is not open'.format(sv
.proc
.pid
, slept
))
166 if test
.socket
== 'domain':
167 if not os
.path
.exists(domain_socket_path(port
)):
168 logger
.debug('[{0}] domain(unix) socket not available yet. slept for {1} seconds so far'.format(sv
.proc
.pid
, slept
))
169 time
.sleep(sleep_step
)
171 elif test
.socket
== 'abstract':
174 # Create sockets every iteration because refused sockets cannot be
175 # reused on some systems.
176 sock4
= socket
.socket()
177 sock6
= socket
.socket(family
=socket
.AF_INET6
)
179 if sock4
.connect_ex(('127.0.0.1', port
)) == 0 \
180 or sock6
.connect_ex(('::1', port
)) == 0:
182 if sv
.proc
.poll() is not None:
183 logger
.warn('[{0}] server process is exited'.format(sv
.proc
.pid
))
185 logger
.debug('[{0}] socket not available yet. slept for {1} seconds so far'.format(sv
.proc
.pid
, slept
))
186 time
.sleep(sleep_step
)
191 logger
.debug('[{0}] server ready - waited for {1} seconds'.format(sv
.proc
.pid
, slept
))
198 test
= TestEntry(testdir
, **test_dict
)
201 logger
.debug('Skipping because shutting down')
202 return (retry_count
, None)
203 logger
.debug('Start')
204 with PortAllocator
.alloc_port_scoped(ports
, test
.socket
) as port
:
205 logger
.debug('Start with port %d' % port
)
206 sv
= exec_context(port
, logdir
, test
, test
.server
, True)
207 cl
= exec_context(port
, logdir
, test
, test
.client
, False)
209 logger
.debug('Starting server')
211 port_ok
= ensure_socket_open(sv
, port
, test
)
213 connect_retry_count
= 0
214 max_connect_retry
= 12
215 connect_retry_wait
= 0.25
217 if sv
.proc
.poll() is not None:
218 logger
.info('not starting client because server process is absent')
220 logger
.debug('Starting client')
222 logger
.debug('Waiting client (up to %d secs)' % test
.timeout
)
223 cl
.wait(test
.timeout
)
224 if not cl
.report
.maybe_false_positive() or connect_retry_count
>= max_connect_retry
:
225 if connect_retry_count
> 0 and connect_retry_count
< max_connect_retry
:
226 logger
.info('[%s]: Connected after %d retry (%.2f sec each)' % (test
.server
.name
, connect_retry_count
, connect_retry_wait
))
227 # Wait for 50ms to see if server does not die at the end.
230 logger
.debug('Server may not be ready, waiting %.2f second...' % connect_retry_wait
)
231 time
.sleep(connect_retry_wait
)
232 connect_retry_count
+= 1
234 if sv
.report
.maybe_false_positive() and bind_retry_count
< max_bind_retry
:
235 logger
.warn('[%s]: Detected socket bind failure, retrying...', test
.server
.name
)
236 bind_retry_count
+= 1
238 result
= RESULT_TIMEOUT
if cl
.expired
else cl
.returncode
if (cl
.proc
and cl
.proc
.poll()) is not None else RESULT_ERROR
240 # For servers that handle a controlled shutdown by signal
241 # if they are killed, or return an error code, that is a
242 # problem. For servers that are not signal-aware, we simply
243 # kill them off; if we didn't kill them off, something else
244 # happened (crashed?)
245 if test
.server
.stop_signal
!= 0:
246 if sv
.killed
or sv
.returncode
> 0:
247 result |
= RESULT_ERROR
250 result |
= RESULT_ERROR
252 if result
== 0 or retry_count
>= max_retry
:
253 return (retry_count
, result
)
255 logger
.info('[%s-%s]: test failed, retrying...', test
.server
.name
, test
.client
.name
)
260 logger
.warn('Error executing [%s]', test
.name
, exc_info
=True)
261 return (retry_count
, RESULT_ERROR
)
263 logger
.info('Interrupted execution', exc_info
=True)
267 return (retry_count
, RESULT_ERROR
)
270 class PortAllocator(object):
272 self
._log
= multiprocessing
.get_logger()
273 self
._lock
= multiprocessing
.Lock()
275 self
._dom
_ports
= set()
278 def _get_tcp_port(self
):
279 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
280 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
282 port
= sock
.getsockname()[1]
285 ok
= port
not in self
._ports
287 self
._ports
.add(port
)
288 self
._last
_alloc
= time
.time()
292 return port
if ok
else self
._get
_tcp
_port
()
294 def _get_domain_port(self
):
295 port
= random
.randint(1024, 65536)
298 ok
= port
not in self
._dom
_ports
300 self
._dom
_ports
.add(port
)
303 return port
if ok
else self
._get
_domain
_port
()
305 def alloc_port(self
, socket_type
):
306 if socket_type
in ('domain', 'abstract'):
307 return self
._get
_domain
_port
()
309 return self
._get
_tcp
_port
()
311 # static method for inter-process invokation
313 @contextlib.contextmanager
314 def alloc_port_scoped(allocator
, socket_type
):
315 port
= allocator
.alloc_port(socket_type
)
317 allocator
.free_port(socket_type
, port
)
319 def free_port(self
, socket_type
, port
):
320 self
._log
.debug('free_port')
323 if socket_type
== 'domain':
324 self
._dom
_ports
.remove(port
)
325 path
= domain_socket_path(port
)
326 if os
.path
.exists(path
):
328 elif socket_type
== 'abstract':
329 self
._dom
_ports
.remove(port
)
331 self
._ports
.remove(port
)
333 self
._log
.info('Error while freeing port', exc_info
=sys
.exc_info())
338 class NonAsyncResult(object):
339 def __init__(self
, value
):
342 def get(self
, timeout
=None):
345 def wait(self
, timeout
=None):
351 def successful(self
):
352 return self
._value
== 0
355 class TestDispatcher(object):
356 def __init__(self
, testdir
, basedir
, logdir_rel
, concurrency
):
357 self
._log
= multiprocessing
.get_logger()
358 self
.testdir
= testdir
359 self
._report
= SummaryReporter(basedir
, logdir_rel
, concurrency
> 1)
360 self
.logdir
= self
._report
.testdir
361 # seems needed for python 2.x to handle keyboard interrupt
362 self
._stop
= multiprocessing
.Event()
363 self
._async
= concurrency
> 1
369 ports
= PortAllocator()
371 self
._m
= multiprocessing
.managers
.BaseManager()
372 self
._m
.register('ports', PortAllocator
)
374 self
._pool
= multiprocessing
.Pool(concurrency
, self
._pool
_init
, (self
._m
.address
,))
376 'TestDispatcher started with %d concurrent jobs' % concurrency
)
378 def _pool_init(self
, address
):
383 m
= multiprocessing
.managers
.BaseManager(address
)
387 def _dispatch_sync(self
, test
, cont
, max_retry
):
388 r
= run_test(self
.testdir
, self
.logdir
, test
, max_retry
, async_mode
=False)
390 return NonAsyncResult(r
)
392 def _dispatch_async(self
, test
, cont
, max_retry
):
393 self
._log
.debug('_dispatch_async')
394 return self
._pool
.apply_async(func
=run_test
, args
=(self
.testdir
, self
.logdir
, test
, max_retry
), callback
=cont
)
396 def dispatch(self
, test
, max_retry
):
397 index
= self
._report
.add_test(test
)
400 if not self
._stop
.is_set():
401 if result
and len(result
) == 2:
402 retry_count
, returncode
= result
405 returncode
= RESULT_ERROR
406 self
._log
.debug('freeing port')
407 self
._log
.debug('adding result')
408 self
._report
.add_result(index
, returncode
, returncode
== RESULT_TIMEOUT
, retry_count
)
409 self
._log
.debug('finish continuation')
410 fn
= self
._dispatch
_async
if self
._async
else self
._dispatch
_sync
411 return fn(test
, cont
, max_retry
)
418 return self
._report
.end()
423 self
._pool
.terminate()