]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/vstart_runner.py
2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
28 from StringIO
import StringIO
29 from collections
import defaultdict
42 from unittest
import suite
, loader
45 from teuthology
.orchestra
.run
import Raw
, quote
46 from teuthology
.orchestra
.daemon
import DaemonGroup
47 from teuthology
.config
import config
as teuth_config
51 log
= logging
.getLogger(__name__
)
53 handler
= logging
.FileHandler("./vstart_runner.log")
54 formatter
= logging
.Formatter(
55 fmt
=u
'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
56 datefmt
='%Y-%m-%dT%H:%M:%S')
57 handler
.setFormatter(formatter
)
58 log
.addHandler(handler
)
59 log
.setLevel(logging
.INFO
)
62 def respawn_in_path(lib_path
, python_paths
):
63 execv_cmd
= ['python']
64 if platform
.system() == "Darwin":
65 lib_path_var
= "DYLD_LIBRARY_PATH"
67 lib_path_var
= "LD_LIBRARY_PATH"
69 py_binary
= os
.environ
.get("PYTHON", "python")
71 if lib_path_var
in os
.environ
:
72 if lib_path
not in os
.environ
[lib_path_var
]:
73 os
.environ
[lib_path_var
] += ':' + lib_path
74 os
.execvp(py_binary
, execv_cmd
+ sys
.argv
)
76 os
.environ
[lib_path_var
] = lib_path
77 os
.execvp(py_binary
, execv_cmd
+ sys
.argv
)
79 for p
in python_paths
:
83 # Let's use some sensible defaults
84 if os
.path
.exists("./CMakeCache.txt") and os
.path
.exists("./bin"):
86 # A list of candidate paths for each package we need
88 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
89 ["lib/cython_modules/lib.2"],
95 # Up one level so that "tasks.foo.bar" imports work
96 python_paths
.append(os
.path
.abspath(
97 os
.path
.join(os
.path
.dirname(os
.path
.realpath(__file__
)), "..")
100 for package_guesses
in guesses
:
101 for g
in package_guesses
:
102 g_exp
= os
.path
.abspath(os
.path
.expanduser(g
))
103 if os
.path
.exists(g_exp
):
104 python_paths
.append(g_exp
)
106 ld_path
= os
.path
.join(os
.getcwd(), "lib/")
107 print "Using guessed paths {0} {1}".format(ld_path
, python_paths
)
108 respawn_in_path(ld_path
, python_paths
)
112 from teuthology
.exceptions
import CommandFailedError
113 from tasks
.ceph_manager
import CephManager
114 from tasks
.cephfs
.fuse_mount
import FuseMount
115 from tasks
.cephfs
.filesystem
import Filesystem
, MDSCluster
, CephCluster
116 from mgr
.mgr_test_case
import MgrCluster
117 from teuthology
.contextutil
import MaxWhileTries
118 from teuthology
.task
import interactive
120 sys
.stderr
.write("***\nError importing packages, have you activated your teuthology virtualenv "
121 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
124 # Must import after teuthology because of gevent monkey patching
127 if os
.path
.exists("./CMakeCache.txt"):
128 # Running in build dir of a cmake build
129 BIN_PREFIX
= "./bin/"
130 SRC_PREFIX
= "../src"
132 # Running in src/ of an autotools build
137 class LocalRemoteProcess(object):
138 def __init__(self
, args
, subproc
, check_status
, stdout
, stderr
):
140 self
.subproc
= subproc
142 self
.stdout
= StringIO()
147 self
.stderr
= StringIO()
151 self
.check_status
= check_status
152 self
.exitstatus
= self
.returncode
= None
156 # Avoid calling communicate() on a dead process because it'll
157 # give you stick about std* already being closed
158 if self
.exitstatus
!= 0:
159 raise CommandFailedError(self
.args
, self
.exitstatus
)
163 out
, err
= self
.subproc
.communicate()
164 self
.stdout
.write(out
)
165 self
.stderr
.write(err
)
167 self
.exitstatus
= self
.returncode
= self
.subproc
.returncode
169 if self
.exitstatus
!= 0:
170 sys
.stderr
.write(out
)
171 sys
.stderr
.write(err
)
173 if self
.check_status
and self
.exitstatus
!= 0:
174 raise CommandFailedError(self
.args
, self
.exitstatus
)
178 if self
.exitstatus
is not None:
181 if self
.subproc
.poll() is not None:
182 out
, err
= self
.subproc
.communicate()
183 self
.stdout
.write(out
)
184 self
.stderr
.write(err
)
185 self
.exitstatus
= self
.returncode
= self
.subproc
.returncode
192 if self
.subproc
.pid
and not self
.finished
:
193 log
.info("kill: killing pid {0} ({1})".format(
194 self
.subproc
.pid
, self
.args
))
195 safe_kill(self
.subproc
.pid
)
197 log
.info("kill: already terminated ({0})".format(self
.args
))
201 class FakeStdIn(object):
202 def __init__(self
, mount_daemon
):
203 self
.mount_daemon
= mount_daemon
206 self
.mount_daemon
.kill()
208 return FakeStdIn(self
)
211 class LocalRemote(object):
213 Amusingly named class to present the teuthology RemoteProcess interface when we are really
214 running things locally for vstart
216 Run this inside your src/ dir!
221 self
.hostname
= "localhost"
222 self
.user
= getpass
.getuser()
224 def get_file(self
, path
, sudo
, dest_dir
):
225 tmpfile
= tempfile
.NamedTemporaryFile(delete
=False).name
226 shutil
.copy(path
, tmpfile
)
229 def put_file(self
, src
, dst
, sudo
=False):
230 shutil
.copy(src
, dst
)
232 def run(self
, args
, check_status
=True, wait
=True,
233 stdout
=None, stderr
=None, cwd
=None, stdin
=None,
234 logger
=None, label
=None, env
=None):
235 log
.info("run args={0}".format(args
))
237 # We don't need no stinkin' sudo
238 args
= [a
for a
in args
if a
!= "sudo"]
240 # We have to use shell=True if any run.Raw was present, e.g. &&
241 shell
= any([a
for a
in args
if isinstance(a
, Raw
)])
247 if args
[i
] == 'adjust-ulimits':
249 elif args
[i
] == 'ceph-coverage':
251 elif args
[i
] == 'timeout':
254 filtered
.append(args
[i
])
257 args
= quote(filtered
)
258 log
.info("Running {0}".format(args
))
260 subproc
= subprocess
.Popen(args
,
261 stdout
=subprocess
.PIPE
,
262 stderr
=subprocess
.PIPE
,
263 stdin
=subprocess
.PIPE
,
267 log
.info("Running {0}".format(args
))
270 if not isinstance(arg
, basestring
):
271 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
275 subproc
= subprocess
.Popen(args
,
276 stdout
=subprocess
.PIPE
,
277 stderr
=subprocess
.PIPE
,
278 stdin
=subprocess
.PIPE
,
283 if not isinstance(stdin
, basestring
):
284 raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
286 # Hack: writing to stdin is not deadlock-safe, but it "always" works
287 # as long as the input buffer is "small"
288 subproc
.stdin
.write(stdin
)
290 proc
= LocalRemoteProcess(
291 args
, subproc
, check_status
,
301 class LocalDaemon(object):
302 def __init__(self
, daemon_type
, daemon_id
):
303 self
.daemon_type
= daemon_type
304 self
.daemon_id
= daemon_id
305 self
.controller
= LocalRemote()
313 return self
._get
_pid
() is not None
317 Return PID as an integer or None if not found
319 ps_txt
= self
.controller
.run(
320 args
=["ps", "ww", "-u"+str(os
.getuid())]
321 ).stdout
.getvalue().strip()
322 lines
= ps_txt
.split("\n")[1:]
325 if line
.find("ceph-{0} -i {1}".format(self
.daemon_type
, self
.daemon_id
)) != -1:
326 log
.info("Found ps line for daemon: {0}".format(line
))
327 return int(line
.split()[0])
328 log
.info("No match for {0} {1}: {2}".format(
329 self
.daemon_type
, self
.daemon_id
, ps_txt
333 def wait(self
, timeout
):
335 while self
._get
_pid
() is not None:
337 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self
.daemon_type
, self
.daemon_id
))
341 def stop(self
, timeout
=300):
342 if not self
.running():
343 log
.error('tried to stop a non-running daemon')
346 pid
= self
._get
_pid
()
347 log
.info("Killing PID {0} for {1}.{2}".format(pid
, self
.daemon_type
, self
.daemon_id
))
348 os
.kill(pid
, signal
.SIGKILL
)
351 while pid
is not None:
352 new_pid
= self
._get
_pid
()
353 if new_pid
is not None and new_pid
!= pid
:
354 log
.info("Killing new PID {0}".format(new_pid
))
356 os
.kill(pid
, signal
.SIGKILL
)
363 "Timed out waiting for daemon {0}.{1}".format(
364 self
.daemon_type
, self
.daemon_id
))
368 self
.wait(timeout
=timeout
)
371 if self
._get
_pid
() is not None:
374 self
.proc
= self
.controller
.run([os
.path
.join(BIN_PREFIX
, "./ceph-{0}".format(self
.daemon_type
)), "-i", self
.daemon_id
])
376 def signal(self
, sig
, silent
=False):
377 if not self
.running():
378 raise RuntimeError("Can't send signal to non-running daemon")
380 os
.kill(self
._get
_pid
(), sig
)
382 log
.info("Sent signal {0} to {1}.{2}".format(sig
, self
.daemon_type
, self
.daemon_id
))
387 os.kill annoyingly raises exception if process already dead. Ignore it.
390 return os
.kill(pid
, signal
.SIGKILL
)
392 if e
.errno
== errno
.ESRCH
:
393 # Raced with process termination
399 class LocalFuseMount(FuseMount
):
400 def __init__(self
, test_dir
, client_id
):
401 super(LocalFuseMount
, self
).__init
__(None, test_dir
, client_id
, LocalRemote())
404 def config_path(self
):
407 def get_keyring_path(self
):
408 # This is going to end up in a config file, so use an absolute path
409 # to avoid assumptions about daemons' pwd
410 return os
.path
.abspath("./client.{0}.keyring".format(self
.client_id
))
412 def run_shell(self
, args
, wait
=True):
413 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
414 # the "cd foo && bar" shenanigans isn't needed to begin with and
415 # then we wouldn't have to special case this
416 return self
.client_remote
.run(
417 args
, wait
=wait
, cwd
=self
.mountpoint
424 def _asok_path(self
):
425 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
426 # run foreground. When running it daemonized however, the asok is named after
427 # the PID of the launching process, not the long running ceph-fuse process. Therefore
428 # we need to give an exact path here as the logic for checking /proc/ for which
429 # asok is alive does not work.
430 path
= "./out/client.{0}.{1}.asok".format(self
.client_id
, self
.fuse_daemon
.subproc
.pid
)
431 log
.info("I think my launching pid was {0}".format(self
.fuse_daemon
.subproc
.pid
))
435 if self
.is_mounted():
436 super(LocalFuseMount
, self
).umount()
438 def mount(self
, mount_path
=None, mount_fs_name
=None):
439 self
.client_remote
.run(
447 def list_connections():
448 self
.client_remote
.run(
449 args
=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
452 p
= self
.client_remote
.run(
453 args
=["ls", "/sys/fs/fuse/connections"],
456 if p
.exitstatus
!= 0:
457 log
.warn("ls conns failed with {0}, assuming none".format(p
.exitstatus
))
460 ls_str
= p
.stdout
.getvalue().strip()
462 return [int(n
) for n
in ls_str
.split("\n")]
466 # Before starting ceph-fuse process, note the contents of
467 # /sys/fs/fuse/connections
468 pre_mount_conns
= list_connections()
469 log
.info("Pre-mount connections: {0}".format(pre_mount_conns
))
471 prefix
= [os
.path
.join(BIN_PREFIX
, "ceph-fuse")]
473 prefix
+= ["--client_die_on_failed_dentry_invalidate=false"]
475 if mount_path
is not None:
476 prefix
+= ["--client_mountpoint={0}".format(mount_path
)]
478 if mount_fs_name
is not None:
479 prefix
+= ["--client_mds_namespace={0}".format(mount_fs_name
)]
481 self
.fuse_daemon
= self
.client_remote
.run(args
=
485 "client.{0}".format(self
.client_id
),
489 log
.info("Mounting client.{0} with pid {1}".format(self
.client_id
, self
.fuse_daemon
.subproc
.pid
))
491 # Wait for the connection reference to appear in /sys
493 post_mount_conns
= list_connections()
494 while len(post_mount_conns
) <= len(pre_mount_conns
):
495 if self
.fuse_daemon
.finished
:
496 # Did mount fail? Raise the CommandFailedError instead of
497 # hitting the "failed to populate /sys/" timeout
498 self
.fuse_daemon
.wait()
502 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
505 post_mount_conns
= list_connections()
507 log
.info("Post-mount connections: {0}".format(post_mount_conns
))
509 # Record our fuse connection number so that we can use it when
511 new_conns
= list(set(post_mount_conns
) - set(pre_mount_conns
))
512 if len(new_conns
) == 0:
513 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns
))
514 elif len(new_conns
) > 1:
515 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns
))
517 self
._fuse
_conn
= new_conns
[0]
519 def _run_python(self
, pyscript
, py_version
='python'):
521 Override this to remove the daemon-helper prefix that is used otherwise
522 to make the process killable.
524 return self
.client_remote
.run(args
=[py_version
, '-c', pyscript
],
527 class LocalCephManager(CephManager
):
529 # Deliberately skip parent init, only inheriting from it to get
530 # util methods like osd_dump that sit on top of raw_cluster_cmd
531 self
.controller
= LocalRemote()
533 # A minority of CephManager fns actually bother locking for when
534 # certain teuthology tests want to run tasks in parallel
535 self
.lock
= threading
.RLock()
537 self
.log
= lambda x
: log
.info(x
)
539 def find_remote(self
, daemon_type
, daemon_id
):
541 daemon_type like 'mds', 'osd'
542 daemon_id like 'a', '0'
546 def run_ceph_w(self
):
547 proc
= self
.controller
.run([os
.path
.join(BIN_PREFIX
, "ceph"), "-w"], wait
=False, stdout
=StringIO())
550 def raw_cluster_cmd(self
, *args
):
552 args like ["osd", "dump"}
555 proc
= self
.controller
.run([os
.path
.join(BIN_PREFIX
, "ceph")] + list(args
))
556 return proc
.stdout
.getvalue()
558 def raw_cluster_cmd_result(self
, *args
):
560 like raw_cluster_cmd but don't check status, just return rc
562 proc
= self
.controller
.run([os
.path
.join(BIN_PREFIX
, "ceph")] + list(args
), check_status
=False)
563 return proc
.exitstatus
565 def admin_socket(self
, daemon_type
, daemon_id
, command
, check_status
=True):
566 return self
.controller
.run(
567 args
=[os
.path
.join(BIN_PREFIX
, "ceph"), "daemon", "{0}.{1}".format(daemon_type
, daemon_id
)] + command
, check_status
=check_status
571 def get_mds_status(self
, mds
):
573 Run cluster commands for the mds in order to get mds information
575 out
= self
.raw_cluster_cmd('mds', 'dump', '--format=json')
576 j
= json
.loads(' '.join(out
.splitlines()[1:]))
577 # collate; for dup ids, larger gid wins.
578 for info
in j
['info'].itervalues():
579 if info
['name'] == mds
:
584 def get_mds_status_by_rank(self
, rank
):
586 Run cluster commands for the mds in order to get mds information
589 j
= self
.get_mds_status_all()
590 # collate; for dup ids, larger gid wins.
591 for info
in j
['info'].itervalues():
592 if info
['rank'] == rank
:
596 def get_mds_status_all(self
):
598 Run cluster command to extract all the mds status.
600 out
= self
.raw_cluster_cmd('mds', 'dump', '--format=json')
601 j
= json
.loads(' '.join(out
.splitlines()[1:]))
605 class LocalCephCluster(CephCluster
):
606 def __init__(self
, ctx
):
607 # Deliberately skip calling parent constructor
609 self
.mon_manager
= LocalCephManager()
610 self
._conf
= defaultdict(dict)
613 def admin_remote(self
):
616 def get_config(self
, key
, service_type
=None):
617 if service_type
is None:
620 # FIXME hardcoded vstart service IDs
627 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
629 def _write_conf(self
):
630 # In teuthology, we have the honour of writing the entire ceph.conf, but
631 # in vstart land it has mostly already been written and we need to carefully
633 conf_path
= "./ceph.conf"
634 banner
= "\n#LOCAL_TEST\n"
635 existing_str
= open(conf_path
).read()
637 if banner
in existing_str
:
638 existing_str
= existing_str
[0:existing_str
.find(banner
)]
640 existing_str
+= banner
642 for subsys
, kvs
in self
._conf
.items():
643 existing_str
+= "\n[{0}]\n".format(subsys
)
644 for key
, val
in kvs
.items():
645 # Comment out existing instance if it exists
646 log
.info("Searching for existing instance {0}/{1}".format(
649 existing_section
= re
.search("^\[{0}\]$([\n]|[^\[])+".format(
651 ), existing_str
, re
.MULTILINE
)
654 section_str
= existing_str
[existing_section
.start():existing_section
.end()]
655 existing_val
= re
.search("^\s*[^#]({0}) =".format(key
), section_str
, re
.MULTILINE
)
657 start
= existing_section
.start() + existing_val
.start(1)
658 log
.info("Found string to replace at {0}".format(
661 existing_str
= existing_str
[0:start
] + "#" + existing_str
[start
:]
663 existing_str
+= "{0} = {1}\n".format(key
, val
)
665 open(conf_path
, "w").write(existing_str
)
667 def set_ceph_conf(self
, subsys
, key
, value
):
668 self
._conf
[subsys
][key
] = value
671 def clear_ceph_conf(self
, subsys
, key
):
672 del self
._conf
[subsys
][key
]
676 class LocalMDSCluster(LocalCephCluster
, MDSCluster
):
677 def __init__(self
, ctx
):
678 super(LocalMDSCluster
, self
).__init
__(ctx
)
680 self
.mds_ids
= ctx
.daemons
.daemons
['mds'].keys()
681 self
.mds_daemons
= dict([(id_
, LocalDaemon("mds", id_
)) for id_
in self
.mds_ids
])
683 def clear_firewall(self
):
684 # FIXME: unimplemented
687 def newfs(self
, name
='cephfs', create
=True):
688 return LocalFilesystem(self
._ctx
, name
=name
, create
=create
)
691 class LocalMgrCluster(LocalCephCluster
, MgrCluster
):
692 def __init__(self
, ctx
):
693 super(LocalMgrCluster
, self
).__init
__(ctx
)
695 self
.mgr_ids
= ctx
.daemons
.daemons
['mgr'].keys()
696 self
.mgr_daemons
= dict([(id_
, LocalDaemon("mgr", id_
)) for id_
in self
.mgr_ids
])
699 class LocalFilesystem(Filesystem
, LocalMDSCluster
):
700 def __init__(self
, ctx
, fscid
=None, name
='cephfs', create
=False):
701 # Deliberately skip calling parent constructor
706 self
.ec_profile
= None
707 self
.metadata_pool_name
= None
708 self
.metadata_overlay
= False
709 self
.data_pool_name
= None
710 self
.data_pools
= None
712 # Hack: cheeky inspection of ceph.conf to see what MDSs exist
714 for line
in open("ceph.conf").readlines():
715 match
= re
.match("^\[mds\.(.+)\]$", line
)
717 self
.mds_ids
.add(match
.group(1))
720 raise RuntimeError("No MDSs found in ceph.conf!")
722 self
.mds_ids
= list(self
.mds_ids
)
724 log
.info("Discovered MDS IDs: {0}".format(self
.mds_ids
))
726 self
.mon_manager
= LocalCephManager()
728 self
.mds_daemons
= dict([(id_
, LocalDaemon("mds", id_
)) for id_
in self
.mds_ids
])
730 self
.client_remote
= LocalRemote()
732 self
._conf
= defaultdict(dict)
735 if fscid
is not None:
736 raise RuntimeError("cannot specify fscid when creating fs")
737 if create
and not self
.legacy_configured():
740 if fscid
is not None:
742 self
.getinfo(refresh
=True)
744 # Stash a reference to the first created filesystem on ctx, so
745 # that if someone drops to the interactive shell they can easily
747 if not hasattr(self
._ctx
, "filesystem"):
748 self
._ctx
.filesystem
= self
754 def set_clients_block(self
, blocked
, mds_id
=None):
755 raise NotImplementedError()
757 def get_pgs_per_fs_pool(self
):
758 # FIXME: assuming there are 3 OSDs
759 return 3 * int(self
.get_config('mon_pg_warn_min_per_osd'))
762 class InteractiveFailureResult(unittest
.TextTestResult
):
764 Specialization that implements interactive-on-error style
767 def addFailure(self
, test
, err
):
768 super(InteractiveFailureResult
, self
).addFailure(test
, err
)
769 log
.error(self
._exc
_info
_to
_string
(err
, test
))
770 log
.error("Failure in test '{0}', going interactive".format(
771 self
.getDescription(test
)
773 interactive
.task(ctx
=None, config
=None)
775 def addError(self
, test
, err
):
776 super(InteractiveFailureResult
, self
).addError(test
, err
)
777 log
.error(self
._exc
_info
_to
_string
(err
, test
))
778 log
.error("Error in test '{0}', going interactive".format(
779 self
.getDescription(test
)
781 interactive
.task(ctx
=None, config
=None)
784 def enumerate_methods(s
):
785 log
.info("e: {0}".format(s
))
787 if isinstance(t
, suite
.BaseTestSuite
):
788 for sub
in enumerate_methods(t
):
794 def load_tests(modules
, loader
):
796 log
.info("Executing modules: {0}".format(modules
))
798 for mod_name
in modules
:
799 # Test names like cephfs.test_auto_repair
800 module_suites
.append(loader
.loadTestsFromName(mod_name
))
801 log
.info("Loaded: {0}".format(list(module_suites
)))
802 return suite
.TestSuite(module_suites
)
804 log
.info("Executing all cephfs tests")
805 return loader
.discover(
806 os
.path
.join(os
.path
.dirname(os
.path
.abspath(__file__
)), "cephfs")
810 def scan_tests(modules
):
811 overall_suite
= load_tests(modules
, loader
.TestLoader())
814 max_required_clients
= 0
817 for suite
, case
in enumerate_methods(overall_suite
):
818 max_required_mds
= max(max_required_mds
,
819 getattr(case
, "MDSS_REQUIRED", 0))
820 max_required_clients
= max(max_required_clients
,
821 getattr(case
, "CLIENTS_REQUIRED", 0))
822 max_required_mgr
= max(max_required_mgr
,
823 getattr(case
, "MGRS_REQUIRED", 0))
825 return max_required_mds
, max_required_clients
, max_required_mgr
828 class LocalCluster(object):
829 def __init__(self
, rolename
="placeholder"):
831 LocalRemote(): [rolename
]
834 def only(self
, requested
):
835 return self
.__class
__(rolename
=requested
)
838 class LocalContext(object):
841 self
.teuthology_config
= teuth_config
842 self
.cluster
= LocalCluster()
843 self
.daemons
= DaemonGroup()
845 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
846 # tests that want to look these up via ctx can do so.
847 # Inspect ceph.conf to see what roles exist
848 for conf_line
in open("ceph.conf").readlines():
849 for svc_type
in ["mon", "osd", "mds", "mgr"]:
850 if svc_type
not in self
.daemons
.daemons
:
851 self
.daemons
.daemons
[svc_type
] = {}
852 match
= re
.match("^\[{0}\.(.+)\]$".format(svc_type
), conf_line
)
854 svc_id
= match
.group(1)
855 self
.daemons
.daemons
[svc_type
][svc_id
] = LocalDaemon(svc_type
, svc_id
)
858 shutil
.rmtree(self
.teuthology_config
['test_path'])
863 interactive_on_error
= False
864 create_cluster
= False
867 flags
= [a
for a
in args
if a
.startswith("-")]
868 modules
= [a
for a
in args
if not a
.startswith("-")]
870 if f
== "--interactive":
871 interactive_on_error
= True
872 elif f
== "--create":
873 create_cluster
= True
875 log
.error("Unknown option '{0}'".format(f
))
878 # Help developers by stopping up-front if their tree isn't built enough for all the
879 # tools that the tests might want to use (add more here if needed)
880 require_binaries
= ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
881 "cephfs-table-tool", "ceph-fuse", "rados"]
882 missing_binaries
= [b
for b
in require_binaries
if not os
.path
.exists(os
.path
.join(BIN_PREFIX
, b
))]
884 log
.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries
)))
887 max_required_mds
, max_required_clients
, max_required_mgr
= scan_tests(modules
)
889 remote
= LocalRemote()
891 # Tolerate no MDSs or clients running at start
893 args
=["ps", "-u"+str(os
.getuid())]
894 ).stdout
.getvalue().strip()
895 lines
= ps_txt
.split("\n")[1:]
897 if 'ceph-fuse' in line
or 'ceph-mds' in line
:
898 pid
= int(line
.split()[0])
899 log
.warn("Killing stray process {0}".format(line
))
900 os
.kill(pid
, signal
.SIGKILL
)
902 # Fire up the Ceph cluster if the user requested it
904 log
.info("Creating cluster with {0} MDS daemons".format(
906 remote
.run([os
.path
.join(SRC_PREFIX
, "stop.sh")], check_status
=False)
907 remote
.run(["rm", "-rf", "./out"])
908 remote
.run(["rm", "-rf", "./dev"])
909 vstart_env
= os
.environ
.copy()
910 vstart_env
["FS"] = "0"
911 vstart_env
["MDS"] = max_required_mds
.__str
__()
912 vstart_env
["OSD"] = "1"
913 vstart_env
["MGR"] = max(max_required_mgr
, 1).__str
__()
915 remote
.run([os
.path
.join(SRC_PREFIX
, "vstart.sh"), "-n", "-d", "--nolockdep"],
918 # Wait for OSD to come up so that subsequent injectargs etc will
920 LocalCephCluster(LocalContext()).mon_manager
.wait_for_all_osds_up(timeout
=30)
922 # List of client mounts, sufficient to run the selected tests
923 clients
= [i
.__str
__() for i
in range(0, max_required_clients
)]
925 test_dir
= tempfile
.mkdtemp()
926 teuth_config
['test_path'] = test_dir
928 # Construct Mount classes
930 for client_id
in clients
:
931 # Populate client keyring (it sucks to use client.admin for test clients
932 # because it's awkward to find the logs later)
933 client_name
= "client.{0}".format(client_id
)
935 if client_name
not in open("./keyring").read():
936 p
= remote
.run(args
=[os
.path
.join(BIN_PREFIX
, "ceph"), "auth", "get-or-create", client_name
,
941 open("./keyring", "a").write(p
.stdout
.getvalue())
943 mount
= LocalFuseMount(test_dir
, client_id
)
945 if mount
.is_mounted():
946 log
.warn("unmounting {0}".format(mount
.mountpoint
))
949 if os
.path
.exists(mount
.mountpoint
):
950 os
.rmdir(mount
.mountpoint
)
953 ceph_cluster
= LocalCephCluster(ctx
)
954 mds_cluster
= LocalMDSCluster(ctx
)
955 mgr_cluster
= LocalMgrCluster(ctx
)
957 from tasks
.cephfs_test_runner
import DecoratingLoader
959 class LogStream(object):
963 def write(self
, data
):
965 if "\n" in self
.buffer:
966 lines
= self
.buffer.split("\n")
967 for line
in lines
[:-1]:
969 # sys.stderr.write(line + "\n")
971 self
.buffer = lines
[-1]
976 decorating_loader
= DecoratingLoader({
979 "ceph_cluster": ceph_cluster
,
980 "mds_cluster": mds_cluster
,
981 "mgr_cluster": mgr_cluster
,
984 # For the benefit of polling tests like test_full -- in teuthology land we set this
985 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
986 remote
.run(args
=[os
.path
.join(BIN_PREFIX
, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"])
987 ceph_cluster
.set_ceph_conf("osd", "osd_mon_report_interval_max", "5")
989 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
990 # from normal IO latency. Increase it for running teests.
991 ceph_cluster
.set_ceph_conf("mds", "mds log max segments", "10")
993 # Make sure the filesystem created in tests has uid/gid that will let us talk to
994 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
995 # so that cephfs-data-scan will pick it up too.
996 ceph_cluster
.set_ceph_conf("global", "mds root ino uid", "%s" % os
.getuid())
997 ceph_cluster
.set_ceph_conf("global", "mds root ino gid", "%s" % os
.getgid())
999 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
1000 def _get_package_version(remote
, pkg_name
):
1001 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
1004 import teuthology
.packaging
1005 teuthology
.packaging
.get_package_version
= _get_package_version
1007 overall_suite
= load_tests(modules
, decorating_loader
)
1009 # Filter out tests that don't lend themselves to interactive running,
1011 for case
, method
in enumerate_methods(overall_suite
):
1012 fn
= getattr(method
, method
._testMethodName
)
1016 if hasattr(fn
, 'is_for_teuthology') and getattr(fn
, 'is_for_teuthology') is True:
1018 log
.warn("Dropping test because long running: ".format(method
.id()))
1020 if getattr(fn
, "needs_trimming", False) is True:
1021 drop_test
= (os
.getuid() != 0)
1022 log
.warn("Dropping test because client trim unavailable: ".format(method
.id()))
1025 # Don't drop the test if it was explicitly requested in arguments
1027 for named
in modules
:
1028 if named
.endswith(method
.id()):
1033 victims
.append((case
, method
))
1035 log
.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims
)))
1036 for s
, method
in victims
:
1037 s
._tests
.remove(method
)
1039 if interactive_on_error
:
1040 result_class
= InteractiveFailureResult
1042 result_class
= unittest
.TextTestResult
1043 fail_on_skip
= False
1045 class LoggingResult(result_class
):
1046 def startTest(self
, test
):
1047 log
.info("Starting test: {0}".format(self
.getDescription(test
)))
1048 test
.started_at
= datetime
.datetime
.utcnow()
1049 return super(LoggingResult
, self
).startTest(test
)
1051 def stopTest(self
, test
):
1052 log
.info("Stopped test: {0} in {1}s".format(
1053 self
.getDescription(test
),
1054 (datetime
.datetime
.utcnow() - test
.started_at
).total_seconds()
1057 def addSkip(self
, test
, reason
):
1059 # Don't just call addFailure because that requires a traceback
1060 self
.failures
.append((test
, reason
))
1062 super(LoggingResult
, self
).addSkip(test
, reason
)
1065 result
= unittest
.TextTestRunner(
1067 resultclass
=LoggingResult
,
1069 failfast
=True).run(overall_suite
)
1071 if not result
.wasSuccessful():
1072 result
.printErrors() # duplicate output at end for convenience
1075 for test
, error
in result
.errors
:
1076 bad_tests
.append(str(test
))
1077 for test
, failure
in result
.failures
:
1078 bad_tests
.append(str(test
))
1085 if __name__
== "__main__":