+++ /dev/null
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import copy
-import multiprocessing
-import os
-import sys
-from .compat import path_join
-from .util import merge_dict, domain_socket_path
-
-
-class TestProgram(object):
- def __init__(self, kind, name, protocol, transport, socket, workdir, stop_signal, command, env=None,
- extra_args=[], extra_args2=[], join_args=False, **kwargs):
-
- self.kind = kind
- self.name = name
- self.protocol = protocol
- self.transport = transport
- self.socket = socket
- self.workdir = workdir
- self.stop_signal = stop_signal
- self.command = None
- self._base_command = self._fix_cmd_path(command)
- if env:
- self.env = copy.copy(os.environ)
- self.env.update(env)
- else:
- self.env = os.environ
- self._extra_args = extra_args
- self._extra_args2 = extra_args2
- self._join_args = join_args
-
- def _fix_cmd_path(self, cmd):
- # if the arg is a file in the current directory, make it path
- def abs_if_exists(arg):
- p = path_join(self.workdir, arg)
- return p if os.path.exists(p) else arg
-
- if cmd[0] == 'python':
- cmd[0] = sys.executable
- else:
- cmd[0] = abs_if_exists(cmd[0])
- return cmd
-
- def _socket_args(self, socket, port):
- return {
- 'ip-ssl': ['--ssl'],
- 'domain': ['--domain-socket=%s' % domain_socket_path(port)],
- 'abstract': ['--abstract-namespace', '--domain-socket=%s' % domain_socket_path(port)],
- }.get(socket, None)
-
- def _transport_args(self, transport):
- return {
- 'zlib': ['--zlib'],
- }.get(transport, None)
-
- def build_command(self, port):
- cmd = copy.copy(self._base_command)
- args = copy.copy(self._extra_args2)
- args.append('--protocol=' + self.protocol)
- args.append('--transport=' + self.transport)
- transport_args = self._transport_args(self.transport)
- if transport_args:
- args += transport_args
- socket_args = self._socket_args(self.socket, port)
- if socket_args:
- args += socket_args
- args.append('--port=%d' % port)
- if self._join_args:
- cmd.append('%s' % " ".join(args))
- else:
- cmd.extend(args)
- if self._extra_args:
- cmd.extend(self._extra_args)
- self.command = cmd
- return self.command
-
-
-class TestEntry(object):
- def __init__(self, testdir, server, client, delay, timeout, **kwargs):
- self.testdir = testdir
- self._log = multiprocessing.get_logger()
- self._config = kwargs
- self.protocol = kwargs['protocol']
- self.transport = kwargs['transport']
- self.socket = kwargs['socket']
- srv_dict = self._fix_workdir(merge_dict(self._config, server))
- cli_dict = self._fix_workdir(merge_dict(self._config, client))
- cli_dict['extra_args2'] = srv_dict.pop('remote_args', [])
- srv_dict['extra_args2'] = cli_dict.pop('remote_args', [])
- self.server = TestProgram('server', **srv_dict)
- self.client = TestProgram('client', **cli_dict)
- self.delay = delay
- self.timeout = timeout
- self._name = None
- # results
- self.success = None
- self.as_expected = None
- self.returncode = None
- self.expired = False
- self.retry_count = 0
-
- def _fix_workdir(self, config):
- key = 'workdir'
- path = config.get(key, None)
- if not path:
- path = self.testdir
- if os.path.isabs(path):
- path = os.path.realpath(path)
- else:
- path = os.path.realpath(path_join(self.testdir, path))
- config.update({key: path})
- return config
-
- @classmethod
- def get_name(cls, server, client, protocol, transport, socket, *args, **kwargs):
- return '%s-%s_%s_%s-%s' % (server, client, protocol, transport, socket)
-
- @property
- def name(self):
- if not self._name:
- self._name = self.get_name(
- self.server.name, self.client.name, self.protocol, self.transport, self.socket)
- return self._name
-
- @property
- def transport_name(self):
- return '%s-%s' % (self.transport, self.socket)
-
-
-def test_name(server, client, protocol, transport, socket, **kwargs):
- return TestEntry.get_name(server['name'], client['name'], protocol, transport, socket)