]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/vstart_runner.py
2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
28 from StringIO
import StringIO
29 from collections
import defaultdict
42 from unittest
import suite
, loader
45 from teuthology
.orchestra
.run
import Raw
, quote
46 from teuthology
.orchestra
.daemon
import DaemonGroup
47 from teuthology
.config
import config
as teuth_config
51 log
= logging
.getLogger(__name__
)
53 handler
= logging
.FileHandler("./vstart_runner.log")
54 formatter
= logging
.Formatter(
55 fmt
=u
'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
56 datefmt
='%Y-%m-%dT%H:%M:%S')
57 handler
.setFormatter(formatter
)
58 log
.addHandler(handler
)
59 log
.setLevel(logging
.INFO
)
62 def respawn_in_path(lib_path
, python_paths
):
63 execv_cmd
= ['python']
64 if platform
.system() == "Darwin":
65 lib_path_var
= "DYLD_LIBRARY_PATH"
67 lib_path_var
= "LD_LIBRARY_PATH"
69 py_binary
= os
.environ
.get("PYTHON", "python")
71 if lib_path_var
in os
.environ
:
72 if lib_path
not in os
.environ
[lib_path_var
]:
73 os
.environ
[lib_path_var
] += ':' + lib_path
74 os
.execvp(py_binary
, execv_cmd
+ sys
.argv
)
76 os
.environ
[lib_path_var
] = lib_path
77 os
.execvp(py_binary
, execv_cmd
+ sys
.argv
)
79 for p
in python_paths
:
83 # Let's use some sensible defaults
84 if os
.path
.exists("./CMakeCache.txt") and os
.path
.exists("./bin"):
86 # A list of candidate paths for each package we need
88 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
89 ["lib/cython_modules/lib.2"],
95 # Up one level so that "tasks.foo.bar" imports work
96 python_paths
.append(os
.path
.abspath(
97 os
.path
.join(os
.path
.dirname(os
.path
.realpath(__file__
)), "..")
100 for package_guesses
in guesses
:
101 for g
in package_guesses
:
102 g_exp
= os
.path
.abspath(os
.path
.expanduser(g
))
103 if os
.path
.exists(g_exp
):
104 python_paths
.append(g_exp
)
106 ld_path
= os
.path
.join(os
.getcwd(), "lib/")
107 print "Using guessed paths {0} {1}".format(ld_path
, python_paths
)
108 respawn_in_path(ld_path
, python_paths
)
112 from teuthology
.exceptions
import CommandFailedError
113 from tasks
.ceph_manager
import CephManager
114 from tasks
.cephfs
.fuse_mount
import FuseMount
115 from tasks
.cephfs
.filesystem
import Filesystem
, MDSCluster
, CephCluster
116 from mgr
.mgr_test_case
import MgrCluster
117 from teuthology
.contextutil
import MaxWhileTries
118 from teuthology
.task
import interactive
120 sys
.stderr
.write("***\nError importing packages, have you activated your teuthology virtualenv "
121 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
124 # Must import after teuthology because of gevent monkey patching
127 if os
.path
.exists("./CMakeCache.txt"):
128 # Running in build dir of a cmake build
129 BIN_PREFIX
= "./bin/"
130 SRC_PREFIX
= "../src"
132 # Running in src/ of an autotools build
137 class LocalRemoteProcess(object):
138 def __init__(self
, args
, subproc
, check_status
, stdout
, stderr
):
140 self
.subproc
= subproc
142 self
.stdout
= StringIO()
147 self
.stderr
= StringIO()
151 self
.check_status
= check_status
152 self
.exitstatus
= self
.returncode
= None
156 # Avoid calling communicate() on a dead process because it'll
157 # give you stick about std* already being closed
158 if self
.exitstatus
!= 0:
159 raise CommandFailedError(self
.args
, self
.exitstatus
)
163 out
, err
= self
.subproc
.communicate()
164 self
.stdout
.write(out
)
165 self
.stderr
.write(err
)
167 self
.exitstatus
= self
.returncode
= self
.subproc
.returncode
169 if self
.exitstatus
!= 0:
170 sys
.stderr
.write(out
)
171 sys
.stderr
.write(err
)
173 if self
.check_status
and self
.exitstatus
!= 0:
174 raise CommandFailedError(self
.args
, self
.exitstatus
)
178 if self
.exitstatus
is not None:
181 if self
.subproc
.poll() is not None:
182 out
, err
= self
.subproc
.communicate()
183 self
.stdout
.write(out
)
184 self
.stderr
.write(err
)
185 self
.exitstatus
= self
.returncode
= self
.subproc
.returncode
192 if self
.subproc
.pid
and not self
.finished
:
193 log
.info("kill: killing pid {0} ({1})".format(
194 self
.subproc
.pid
, self
.args
))
195 safe_kill(self
.subproc
.pid
)
197 log
.info("kill: already terminated ({0})".format(self
.args
))
201 class FakeStdIn(object):
202 def __init__(self
, mount_daemon
):
203 self
.mount_daemon
= mount_daemon
206 self
.mount_daemon
.kill()
208 return FakeStdIn(self
)
211 class LocalRemote(object):
213 Amusingly named class to present the teuthology RemoteProcess interface when we are really
214 running things locally for vstart
216 Run this inside your src/ dir!
221 self
.hostname
= "localhost"
222 self
.user
= getpass
.getuser()
224 def get_file(self
, path
, sudo
, dest_dir
):
225 tmpfile
= tempfile
.NamedTemporaryFile(delete
=False).name
226 shutil
.copy(path
, tmpfile
)
229 def put_file(self
, src
, dst
, sudo
=False):
230 shutil
.copy(src
, dst
)
232 def run(self
, args
, check_status
=True, wait
=True,
233 stdout
=None, stderr
=None, cwd
=None, stdin
=None,
234 logger
=None, label
=None, env
=None):
235 log
.info("run args={0}".format(args
))
237 # We don't need no stinkin' sudo
238 args
= [a
for a
in args
if a
!= "sudo"]
240 # We have to use shell=True if any run.Raw was present, e.g. &&
241 shell
= any([a
for a
in args
if isinstance(a
, Raw
)])
247 if args
[i
] == 'adjust-ulimits':
249 elif args
[i
] == 'ceph-coverage':
251 elif args
[i
] == 'timeout':
254 filtered
.append(args
[i
])
257 args
= quote(filtered
)
258 log
.info("Running {0}".format(args
))
260 subproc
= subprocess
.Popen(args
,
261 stdout
=subprocess
.PIPE
,
262 stderr
=subprocess
.PIPE
,
263 stdin
=subprocess
.PIPE
,
267 log
.info("Running {0}".format(args
))
270 if not isinstance(arg
, basestring
):
271 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
275 subproc
= subprocess
.Popen(args
,
276 stdout
=subprocess
.PIPE
,
277 stderr
=subprocess
.PIPE
,
278 stdin
=subprocess
.PIPE
,
283 if not isinstance(stdin
, basestring
):
284 raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
286 # Hack: writing to stdin is not deadlock-safe, but it "always" works
287 # as long as the input buffer is "small"
288 subproc
.stdin
.write(stdin
)
290 proc
= LocalRemoteProcess(
291 args
, subproc
, check_status
,
301 class LocalDaemon(object):
302 def __init__(self
, daemon_type
, daemon_id
):
303 self
.daemon_type
= daemon_type
304 self
.daemon_id
= daemon_id
305 self
.controller
= LocalRemote()
313 return self
._get
_pid
() is not None
317 Return PID as an integer or None if not found
319 ps_txt
= self
.controller
.run(
320 args
=["ps", "ww", "-u"+str(os
.getuid())]
321 ).stdout
.getvalue().strip()
322 lines
= ps_txt
.split("\n")[1:]
325 if line
.find("ceph-{0} -i {1}".format(self
.daemon_type
, self
.daemon_id
)) != -1:
326 log
.info("Found ps line for daemon: {0}".format(line
))
327 return int(line
.split()[0])
328 log
.info("No match for {0} {1}: {2}".format(
329 self
.daemon_type
, self
.daemon_id
, ps_txt
333 def wait(self
, timeout
):
335 while self
._get
_pid
() is not None:
337 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self
.daemon_type
, self
.daemon_id
))
341 def stop(self
, timeout
=300):
342 if not self
.running():
343 log
.error('tried to stop a non-running daemon')
346 pid
= self
._get
_pid
()
347 log
.info("Killing PID {0} for {1}.{2}".format(pid
, self
.daemon_type
, self
.daemon_id
))
348 os
.kill(pid
, signal
.SIGKILL
)
351 while pid
is not None:
352 new_pid
= self
._get
_pid
()
353 if new_pid
is not None and new_pid
!= pid
:
354 log
.info("Killing new PID {0}".format(new_pid
))
356 os
.kill(pid
, signal
.SIGKILL
)
363 "Timed out waiting for daemon {0}.{1}".format(
364 self
.daemon_type
, self
.daemon_id
))
368 self
.wait(timeout
=timeout
)
371 if self
._get
_pid
() is not None:
374 self
.proc
= self
.controller
.run([os
.path
.join(BIN_PREFIX
, "./ceph-{0}".format(self
.daemon_type
)), "-i", self
.daemon_id
])
379 os.kill annoyingly raises exception if process already dead. Ignore it.
382 return os
.kill(pid
, signal
.SIGKILL
)
384 if e
.errno
== errno
.ESRCH
:
385 # Raced with process termination
391 class LocalFuseMount(FuseMount
):
392 def __init__(self
, test_dir
, client_id
):
393 super(LocalFuseMount
, self
).__init
__(None, test_dir
, client_id
, LocalRemote())
396 def config_path(self
):
399 def get_keyring_path(self
):
400 # This is going to end up in a config file, so use an absolute path
401 # to avoid assumptions about daemons' pwd
402 return os
.path
.abspath("./client.{0}.keyring".format(self
.client_id
))
404 def run_shell(self
, args
, wait
=True):
405 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
406 # the "cd foo && bar" shenanigans isn't needed to begin with and
407 # then we wouldn't have to special case this
408 return self
.client_remote
.run(
409 args
, wait
=wait
, cwd
=self
.mountpoint
416 def _asok_path(self
):
417 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
418 # run foreground. When running it daemonized however, the asok is named after
419 # the PID of the launching process, not the long running ceph-fuse process. Therefore
420 # we need to give an exact path here as the logic for checking /proc/ for which
421 # asok is alive does not work.
422 path
= "./out/client.{0}.{1}.asok".format(self
.client_id
, self
.fuse_daemon
.subproc
.pid
)
423 log
.info("I think my launching pid was {0}".format(self
.fuse_daemon
.subproc
.pid
))
427 if self
.is_mounted():
428 super(LocalFuseMount
, self
).umount()
430 def mount(self
, mount_path
=None, mount_fs_name
=None):
431 self
.client_remote
.run(
439 def list_connections():
440 self
.client_remote
.run(
441 args
=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
444 p
= self
.client_remote
.run(
445 args
=["ls", "/sys/fs/fuse/connections"],
448 if p
.exitstatus
!= 0:
449 log
.warn("ls conns failed with {0}, assuming none".format(p
.exitstatus
))
452 ls_str
= p
.stdout
.getvalue().strip()
454 return [int(n
) for n
in ls_str
.split("\n")]
458 # Before starting ceph-fuse process, note the contents of
459 # /sys/fs/fuse/connections
460 pre_mount_conns
= list_connections()
461 log
.info("Pre-mount connections: {0}".format(pre_mount_conns
))
463 prefix
= [os
.path
.join(BIN_PREFIX
, "ceph-fuse")]
465 prefix
+= ["--client-die-on-failed-remount=false"]
467 if mount_path
is not None:
468 prefix
+= ["--client_mountpoint={0}".format(mount_path
)]
470 if mount_fs_name
is not None:
471 prefix
+= ["--client_mds_namespace={0}".format(mount_fs_name
)]
473 self
.fuse_daemon
= self
.client_remote
.run(args
=
477 "client.{0}".format(self
.client_id
),
481 log
.info("Mounting client.{0} with pid {1}".format(self
.client_id
, self
.fuse_daemon
.subproc
.pid
))
483 # Wait for the connection reference to appear in /sys
485 post_mount_conns
= list_connections()
486 while len(post_mount_conns
) <= len(pre_mount_conns
):
487 if self
.fuse_daemon
.finished
:
488 # Did mount fail? Raise the CommandFailedError instead of
489 # hitting the "failed to populate /sys/" timeout
490 self
.fuse_daemon
.wait()
494 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
497 post_mount_conns
= list_connections()
499 log
.info("Post-mount connections: {0}".format(post_mount_conns
))
501 # Record our fuse connection number so that we can use it when
503 new_conns
= list(set(post_mount_conns
) - set(pre_mount_conns
))
504 if len(new_conns
) == 0:
505 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns
))
506 elif len(new_conns
) > 1:
507 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns
))
509 self
._fuse
_conn
= new_conns
[0]
511 def _run_python(self
, pyscript
):
513 Override this to remove the daemon-helper prefix that is used otherwise
514 to make the process killable.
516 return self
.client_remote
.run(args
=[
517 'python', '-c', pyscript
521 class LocalCephManager(CephManager
):
523 # Deliberately skip parent init, only inheriting from it to get
524 # util methods like osd_dump that sit on top of raw_cluster_cmd
525 self
.controller
= LocalRemote()
527 # A minority of CephManager fns actually bother locking for when
528 # certain teuthology tests want to run tasks in parallel
529 self
.lock
= threading
.RLock()
531 self
.log
= lambda x
: log
.info(x
)
533 def find_remote(self
, daemon_type
, daemon_id
):
535 daemon_type like 'mds', 'osd'
536 daemon_id like 'a', '0'
540 def run_ceph_w(self
):
541 proc
= self
.controller
.run([os
.path
.join(BIN_PREFIX
, "ceph"), "-w"], wait
=False, stdout
=StringIO())
544 def raw_cluster_cmd(self
, *args
):
546 args like ["osd", "dump"}
549 proc
= self
.controller
.run([os
.path
.join(BIN_PREFIX
, "ceph")] + list(args
))
550 return proc
.stdout
.getvalue()
552 def raw_cluster_cmd_result(self
, *args
):
554 like raw_cluster_cmd but don't check status, just return rc
556 proc
= self
.controller
.run([os
.path
.join(BIN_PREFIX
, "ceph")] + list(args
), check_status
=False)
557 return proc
.exitstatus
559 def admin_socket(self
, daemon_type
, daemon_id
, command
, check_status
=True):
560 return self
.controller
.run(
561 args
=[os
.path
.join(BIN_PREFIX
, "ceph"), "daemon", "{0}.{1}".format(daemon_type
, daemon_id
)] + command
, check_status
=check_status
565 def get_mds_status(self
, mds
):
567 Run cluster commands for the mds in order to get mds information
569 out
= self
.raw_cluster_cmd('mds', 'dump', '--format=json')
570 j
= json
.loads(' '.join(out
.splitlines()[1:]))
571 # collate; for dup ids, larger gid wins.
572 for info
in j
['info'].itervalues():
573 if info
['name'] == mds
:
578 def get_mds_status_by_rank(self
, rank
):
580 Run cluster commands for the mds in order to get mds information
583 j
= self
.get_mds_status_all()
584 # collate; for dup ids, larger gid wins.
585 for info
in j
['info'].itervalues():
586 if info
['rank'] == rank
:
590 def get_mds_status_all(self
):
592 Run cluster command to extract all the mds status.
594 out
= self
.raw_cluster_cmd('mds', 'dump', '--format=json')
595 j
= json
.loads(' '.join(out
.splitlines()[1:]))
599 class LocalCephCluster(CephCluster
):
600 def __init__(self
, ctx
):
601 # Deliberately skip calling parent constructor
603 self
.mon_manager
= LocalCephManager()
604 self
._conf
= defaultdict(dict)
607 def admin_remote(self
):
610 def get_config(self
, key
, service_type
=None):
611 if service_type
is None:
614 # FIXME hardcoded vstart service IDs
621 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
623 def _write_conf(self
):
624 # In teuthology, we have the honour of writing the entire ceph.conf, but
625 # in vstart land it has mostly already been written and we need to carefully
627 conf_path
= "./ceph.conf"
628 banner
= "\n#LOCAL_TEST\n"
629 existing_str
= open(conf_path
).read()
631 if banner
in existing_str
:
632 existing_str
= existing_str
[0:existing_str
.find(banner
)]
634 existing_str
+= banner
636 for subsys
, kvs
in self
._conf
.items():
637 existing_str
+= "\n[{0}]\n".format(subsys
)
638 for key
, val
in kvs
.items():
639 # Comment out existing instance if it exists
640 log
.info("Searching for existing instance {0}/{1}".format(
643 existing_section
= re
.search("^\[{0}\]$([\n]|[^\[])+".format(
645 ), existing_str
, re
.MULTILINE
)
648 section_str
= existing_str
[existing_section
.start():existing_section
.end()]
649 existing_val
= re
.search("^\s*[^#]({0}) =".format(key
), section_str
, re
.MULTILINE
)
651 start
= existing_section
.start() + existing_val
.start(1)
652 log
.info("Found string to replace at {0}".format(
655 existing_str
= existing_str
[0:start
] + "#" + existing_str
[start
:]
657 existing_str
+= "{0} = {1}\n".format(key
, val
)
659 open(conf_path
, "w").write(existing_str
)
661 def set_ceph_conf(self
, subsys
, key
, value
):
662 self
._conf
[subsys
][key
] = value
665 def clear_ceph_conf(self
, subsys
, key
):
666 del self
._conf
[subsys
][key
]
670 class LocalMDSCluster(LocalCephCluster
, MDSCluster
):
671 def __init__(self
, ctx
):
672 super(LocalMDSCluster
, self
).__init
__(ctx
)
674 self
.mds_ids
= ctx
.daemons
.daemons
['mds'].keys()
675 self
.mds_daemons
= dict([(id_
, LocalDaemon("mds", id_
)) for id_
in self
.mds_ids
])
677 def clear_firewall(self
):
678 # FIXME: unimplemented
681 def newfs(self
, name
):
682 return LocalFilesystem(self
._ctx
, create
=name
)
685 class LocalMgrCluster(LocalCephCluster
, MgrCluster
):
686 def __init__(self
, ctx
):
687 super(LocalMgrCluster
, self
).__init
__(ctx
)
689 self
.mgr_ids
= ctx
.daemons
.daemons
['mgr'].keys()
690 self
.mgr_daemons
= dict([(id_
, LocalDaemon("mgr", id_
)) for id_
in self
.mgr_ids
])
693 class LocalFilesystem(Filesystem
, LocalMDSCluster
):
694 def __init__(self
, ctx
, fscid
=None, create
=None):
695 # Deliberately skip calling parent constructor
700 self
.metadata_pool_name
= None
701 self
.data_pools
= None
703 # Hack: cheeky inspection of ceph.conf to see what MDSs exist
705 for line
in open("ceph.conf").readlines():
706 match
= re
.match("^\[mds\.(.+)\]$", line
)
708 self
.mds_ids
.add(match
.group(1))
711 raise RuntimeError("No MDSs found in ceph.conf!")
713 self
.mds_ids
= list(self
.mds_ids
)
715 log
.info("Discovered MDS IDs: {0}".format(self
.mds_ids
))
717 self
.mon_manager
= LocalCephManager()
719 self
.mds_daemons
= dict([(id_
, LocalDaemon("mds", id_
)) for id_
in self
.mds_ids
])
721 self
.client_remote
= LocalRemote()
723 self
._conf
= defaultdict(dict)
725 if create
is not None:
726 if fscid
is not None:
727 raise RuntimeError("cannot specify fscid when creating fs")
733 elif fscid
is not None:
735 self
.getinfo(refresh
=True)
737 # Stash a reference to the first created filesystem on ctx, so
738 # that if someone drops to the interactive shell they can easily
740 if not hasattr(self
._ctx
, "filesystem"):
741 self
._ctx
.filesystem
= self
747 def set_clients_block(self
, blocked
, mds_id
=None):
748 raise NotImplementedError()
750 def get_pgs_per_fs_pool(self
):
751 # FIXME: assuming there are 3 OSDs
752 return 3 * int(self
.get_config('mon_pg_warn_min_per_osd'))
755 class InteractiveFailureResult(unittest
.TextTestResult
):
757 Specialization that implements interactive-on-error style
760 def addFailure(self
, test
, err
):
761 super(InteractiveFailureResult
, self
).addFailure(test
, err
)
762 log
.error(self
._exc
_info
_to
_string
(err
, test
))
763 log
.error("Failure in test '{0}', going interactive".format(
764 self
.getDescription(test
)
766 interactive
.task(ctx
=None, config
=None)
768 def addError(self
, test
, err
):
769 super(InteractiveFailureResult
, self
).addError(test
, err
)
770 log
.error(self
._exc
_info
_to
_string
(err
, test
))
771 log
.error("Error in test '{0}', going interactive".format(
772 self
.getDescription(test
)
774 interactive
.task(ctx
=None, config
=None)
777 def enumerate_methods(s
):
778 log
.info("e: {0}".format(s
))
780 if isinstance(t
, suite
.BaseTestSuite
):
781 for sub
in enumerate_methods(t
):
787 def load_tests(modules
, loader
):
789 log
.info("Executing modules: {0}".format(modules
))
791 for mod_name
in modules
:
792 # Test names like cephfs.test_auto_repair
793 module_suites
.append(loader
.loadTestsFromName(mod_name
))
794 log
.info("Loaded: {0}".format(list(module_suites
)))
795 return suite
.TestSuite(module_suites
)
797 log
.info("Executing all cephfs tests")
798 return loader
.discover(
799 os
.path
.join(os
.path
.dirname(os
.path
.abspath(__file__
)), "cephfs")
803 def scan_tests(modules
):
804 overall_suite
= load_tests(modules
, loader
.TestLoader())
807 max_required_clients
= 0
810 for suite
, case
in enumerate_methods(overall_suite
):
811 max_required_mds
= max(max_required_mds
,
812 getattr(case
, "MDSS_REQUIRED", 0))
813 max_required_clients
= max(max_required_clients
,
814 getattr(case
, "CLIENTS_REQUIRED", 0))
815 max_required_mgr
= max(max_required_mgr
,
816 getattr(case
, "MGRS_REQUIRED", 0))
818 return max_required_mds
, max_required_clients
, max_required_mgr
821 class LocalCluster(object):
822 def __init__(self
, rolename
="placeholder"):
824 LocalRemote(): [rolename
]
827 def only(self
, requested
):
828 return self
.__class
__(rolename
=requested
)
831 class LocalContext(object):
834 self
.teuthology_config
= teuth_config
835 self
.cluster
= LocalCluster()
836 self
.daemons
= DaemonGroup()
838 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
839 # tests that want to look these up via ctx can do so.
840 # Inspect ceph.conf to see what roles exist
841 for conf_line
in open("ceph.conf").readlines():
842 for svc_type
in ["mon", "osd", "mds", "mgr"]:
843 if svc_type
not in self
.daemons
.daemons
:
844 self
.daemons
.daemons
[svc_type
] = {}
845 match
= re
.match("^\[{0}\.(.+)\]$".format(svc_type
), conf_line
)
847 svc_id
= match
.group(1)
848 self
.daemons
.daemons
[svc_type
][svc_id
] = LocalDaemon(svc_type
, svc_id
)
851 shutil
.rmtree(self
.teuthology_config
['test_path'])
856 interactive_on_error
= False
857 create_cluster
= False
860 flags
= [a
for a
in args
if a
.startswith("-")]
861 modules
= [a
for a
in args
if not a
.startswith("-")]
863 if f
== "--interactive":
864 interactive_on_error
= True
865 elif f
== "--create":
866 create_cluster
= True
868 log
.error("Unknown option '{0}'".format(f
))
871 # Help developers by stopping up-front if their tree isn't built enough for all the
872 # tools that the tests might want to use (add more here if needed)
873 require_binaries
= ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
874 "cephfs-table-tool", "ceph-fuse", "rados"]
875 missing_binaries
= [b
for b
in require_binaries
if not os
.path
.exists(os
.path
.join(BIN_PREFIX
, b
))]
877 log
.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries
)))
880 max_required_mds
, max_required_clients
, max_required_mgr
= scan_tests(modules
)
882 remote
= LocalRemote()
884 # Tolerate no MDSs or clients running at start
886 args
=["ps", "-u"+str(os
.getuid())]
887 ).stdout
.getvalue().strip()
888 lines
= ps_txt
.split("\n")[1:]
890 if 'ceph-fuse' in line
or 'ceph-mds' in line
:
891 pid
= int(line
.split()[0])
892 log
.warn("Killing stray process {0}".format(line
))
893 os
.kill(pid
, signal
.SIGKILL
)
895 # Fire up the Ceph cluster if the user requested it
897 log
.info("Creating cluster with {0} MDS daemons".format(
899 remote
.run([os
.path
.join(SRC_PREFIX
, "stop.sh")], check_status
=False)
900 remote
.run(["rm", "-rf", "./out"])
901 remote
.run(["rm", "-rf", "./dev"])
902 vstart_env
= os
.environ
.copy()
903 vstart_env
["FS"] = "0"
904 vstart_env
["MDS"] = max_required_mds
.__str
__()
905 vstart_env
["OSD"] = "1"
906 vstart_env
["MGR"] = max(max_required_mgr
, 1).__str
__()
908 remote
.run([os
.path
.join(SRC_PREFIX
, "vstart.sh"), "-n", "-d", "--nolockdep"],
911 # Wait for OSD to come up so that subsequent injectargs etc will
913 LocalCephCluster(LocalContext()).mon_manager
.wait_for_all_osds_up(timeout
=30)
915 # List of client mounts, sufficient to run the selected tests
916 clients
= [i
.__str
__() for i
in range(0, max_required_clients
)]
918 test_dir
= tempfile
.mkdtemp()
919 teuth_config
['test_path'] = test_dir
921 # Construct Mount classes
923 for client_id
in clients
:
924 # Populate client keyring (it sucks to use client.admin for test clients
925 # because it's awkward to find the logs later)
926 client_name
= "client.{0}".format(client_id
)
928 if client_name
not in open("./keyring").read():
929 p
= remote
.run(args
=[os
.path
.join(BIN_PREFIX
, "ceph"), "auth", "get-or-create", client_name
,
934 open("./keyring", "a").write(p
.stdout
.getvalue())
936 mount
= LocalFuseMount(test_dir
, client_id
)
938 if mount
.is_mounted():
939 log
.warn("unmounting {0}".format(mount
.mountpoint
))
942 if os
.path
.exists(mount
.mountpoint
):
943 os
.rmdir(mount
.mountpoint
)
946 ceph_cluster
= LocalCephCluster(ctx
)
947 mds_cluster
= LocalMDSCluster(ctx
)
948 mgr_cluster
= LocalMgrCluster(ctx
)
950 from tasks
.cephfs_test_runner
import DecoratingLoader
952 class LogStream(object):
956 def write(self
, data
):
958 if "\n" in self
.buffer:
959 lines
= self
.buffer.split("\n")
960 for line
in lines
[:-1]:
962 # sys.stderr.write(line + "\n")
964 self
.buffer = lines
[-1]
969 decorating_loader
= DecoratingLoader({
972 "ceph_cluster": ceph_cluster
,
973 "mds_cluster": mds_cluster
,
974 "mgr_cluster": mgr_cluster
,
977 # For the benefit of polling tests like test_full -- in teuthology land we set this
978 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
979 remote
.run(args
=[os
.path
.join(BIN_PREFIX
, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"])
980 ceph_cluster
.set_ceph_conf("osd", "osd_mon_report_interval_max", "5")
982 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
983 # from normal IO latency. Increase it for running teests.
984 ceph_cluster
.set_ceph_conf("mds", "mds log max segments", "10")
986 # Make sure the filesystem created in tests has uid/gid that will let us talk to
987 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
988 # so that cephfs-data-scan will pick it up too.
989 ceph_cluster
.set_ceph_conf("global", "mds root ino uid", "%s" % os
.getuid())
990 ceph_cluster
.set_ceph_conf("global", "mds root ino gid", "%s" % os
.getgid())
992 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
993 def _get_package_version(remote
, pkg_name
):
994 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
997 import teuthology
.packaging
998 teuthology
.packaging
.get_package_version
= _get_package_version
1000 overall_suite
= load_tests(modules
, decorating_loader
)
1002 # Filter out tests that don't lend themselves to interactive running,
1004 for case
, method
in enumerate_methods(overall_suite
):
1005 fn
= getattr(method
, method
._testMethodName
)
1009 if hasattr(fn
, 'is_for_teuthology') and getattr(fn
, 'is_for_teuthology') is True:
1011 log
.warn("Dropping test because long running: ".format(method
.id()))
1013 if getattr(fn
, "needs_trimming", False) is True:
1014 drop_test
= (os
.getuid() != 0)
1015 log
.warn("Dropping test because client trim unavailable: ".format(method
.id()))
1018 # Don't drop the test if it was explicitly requested in arguments
1020 for named
in modules
:
1021 if named
.endswith(method
.id()):
1026 victims
.append((case
, method
))
1028 log
.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims
)))
1029 for s
, method
in victims
:
1030 s
._tests
.remove(method
)
1032 if interactive_on_error
:
1033 result_class
= InteractiveFailureResult
1035 result_class
= unittest
.TextTestResult
1036 fail_on_skip
= False
1038 class LoggingResult(result_class
):
1039 def startTest(self
, test
):
1040 log
.info("Starting test: {0}".format(self
.getDescription(test
)))
1041 test
.started_at
= datetime
.datetime
.utcnow()
1042 return super(LoggingResult
, self
).startTest(test
)
1044 def stopTest(self
, test
):
1045 log
.info("Stopped test: {0} in {1}s".format(
1046 self
.getDescription(test
),
1047 (datetime
.datetime
.utcnow() - test
.started_at
).total_seconds()
1050 def addSkip(self
, test
, reason
):
1052 # Don't just call addFailure because that requires a traceback
1053 self
.failures
.append((test
, reason
))
1055 super(LoggingResult
, self
).addSkip(test
, reason
)
1058 result
= unittest
.TextTestRunner(
1060 resultclass
=LoggingResult
,
1062 failfast
=True).run(overall_suite
)
1064 if not result
.wasSuccessful():
1065 result
.printErrors() # duplicate output at end for convenience
1068 for test
, error
in result
.errors
:
1069 bad_tests
.append(str(test
))
1070 for test
, failure
in result
.failures
:
1071 bad_tests
.append(str(test
))
1078 if __name__
== "__main__":