"""
rgw multisite testing
"""
+import importlib.util
import logging
import nose.core
import nose.config
+import sys
+from teuthology.config import config as teuth_config
from teuthology.exceptions import ConfigError
+from teuthology.repo_utils import fetch_repo
from teuthology.task import Task
from teuthology import misc
-from tasks.rgw_multi import multisite, tests, tests_ps
-
log = logging.getLogger(__name__)
"""
Runs the rgw_multi tests against a multisite configuration created by the
rgw-multisite task. Tests are run with nose, using any additional 'args'
- provided. Overrides for tests.Config can be set in 'config'.
+ provided. Overrides for tests.Config can be set in 'config'. The 'branch'
+ and 'repo' can be overridden to clone the rgw_multi tests from another
+ release.
- rgw-multisite-tests:
args:
- - tasks.rgw_multi.tests:test_object_sync
+ - tests.py:test_object_sync
config:
reconfigure_delay: 60
+ branch: octopus
+ repo: https://github.com/ceph/ceph.git
"""
def __init__(self, ctx, config):
realm = self.ctx.rgw_multisite.realm
master_zone = realm.meta_master_zone()
+ branch = self.config.get('branch')
+ if not branch:
+ # run from suite_path
+ suite_path = self.ctx.config.get('suite_path')
+ self.module_path = suite_path + '/../src/test/rgw/rgw_multi'
+ else:
+ # clone the qa branch
+ repo = self.config.get('repo', teuth_config.get_ceph_qa_suite_git_url())
+ log.info("cloning suite branch %s from %s...", branch, repo)
+ clonedir = fetch_repo(repo, branch)
+ # import its version of rgw_multi
+ self.module_path = clonedir + '/src/test/rgw/rgw_multi'
+
+ log.info("importing tests from %s", self.module_path)
+ spec = importlib.util.spec_from_file_location('rgw_multi', self.module_path + '/__init__.py')
+ module = importlib.util.module_from_spec(spec)
+ sys.modules[spec.name] = module
+ spec.loader.exec_module(module)
+
+ from rgw_multi import multisite, tests
+
# create the test user
log.info('creating test user..')
user = multisite.User('rgw-multisite-test-user')
argv = [__name__] + extra_args
log.info("running rgw multisite tests on '%s' with args=%r",
- tests.__name__, extra_args)
-
- # run nose tests in the rgw_multi.tests module
- conf = nose.config.Config(stream=get_log_stream(), verbosity=2)
- error_msg = ''
- result = nose.run(defaultTest=tests.__name__, argv=argv, config=conf)
- if not result:
- error_msg += 'rgw multisite, '
- result = nose.run(defaultTest=tests_ps.__name__, argv=argv, config=conf)
- if not result:
- error_msg += 'rgw multisite pubsub, '
- if error_msg:
- raise RuntimeError(error_msg + 'test failures')
+ self.module_path, extra_args)
+
+ # run nose tests in the module path
+ conf = nose.config.Config(stream=get_log_stream(), verbosity=2, workingDir=self.module_path)
+ assert nose.run(argv=argv, config=conf), 'rgw multisite test failures'
def get_log_stream():