]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/vstart_runner.py
2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
26 # Also, you can create the cluster once and then run named test cases against it:
27 python ~/git/ceph/qa/tasks/vstart_runner.py --create-cluster-only
28 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_health
29 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_rgw
33 from StringIO
import StringIO
34 from collections
import defaultdict
47 from unittest
import suite
, loader
50 from teuthology
.orchestra
.run
import Raw
, quote
51 from teuthology
.orchestra
.daemon
import DaemonGroup
52 from teuthology
.config
import config
as teuth_config
56 log
= logging
.getLogger(__name__
)
58 handler
= logging
.FileHandler("./vstart_runner.log")
59 formatter
= logging
.Formatter(
60 fmt
=u
'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
61 datefmt
='%Y-%m-%dT%H:%M:%S')
62 handler
.setFormatter(formatter
)
63 log
.addHandler(handler
)
64 log
.setLevel(logging
.INFO
)
67 def respawn_in_path(lib_path
, python_paths
):
68 execv_cmd
= ['python']
69 if platform
.system() == "Darwin":
70 lib_path_var
= "DYLD_LIBRARY_PATH"
72 lib_path_var
= "LD_LIBRARY_PATH"
74 py_binary
= os
.environ
.get("PYTHON", "python")
76 if lib_path_var
in os
.environ
:
77 if lib_path
not in os
.environ
[lib_path_var
]:
78 os
.environ
[lib_path_var
] += ':' + lib_path
79 os
.execvp(py_binary
, execv_cmd
+ sys
.argv
)
81 os
.environ
[lib_path_var
] = lib_path
82 os
.execvp(py_binary
, execv_cmd
+ sys
.argv
)
84 for p
in python_paths
:
88 # Let's use some sensible defaults
89 if os
.path
.exists("./CMakeCache.txt") and os
.path
.exists("./bin"):
91 # A list of candidate paths for each package we need
93 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
94 ["lib/cython_modules/lib.2"],
100 # Up one level so that "tasks.foo.bar" imports work
101 python_paths
.append(os
.path
.abspath(
102 os
.path
.join(os
.path
.dirname(os
.path
.realpath(__file__
)), "..")
105 for package_guesses
in guesses
:
106 for g
in package_guesses
:
107 g_exp
= os
.path
.abspath(os
.path
.expanduser(g
))
108 if os
.path
.exists(g_exp
):
109 python_paths
.append(g_exp
)
111 ld_path
= os
.path
.join(os
.getcwd(), "lib/")
112 print("Using guessed paths {0} {1}".format(ld_path
, python_paths
))
113 respawn_in_path(ld_path
, python_paths
)
117 from teuthology
.exceptions
import CommandFailedError
118 from tasks
.ceph_manager
import CephManager
119 from tasks
.cephfs
.fuse_mount
import FuseMount
120 from tasks
.cephfs
.filesystem
import Filesystem
, MDSCluster
, CephCluster
121 from mgr
.mgr_test_case
import MgrCluster
122 from teuthology
.contextutil
import MaxWhileTries
123 from teuthology
.task
import interactive
125 sys
.stderr
.write("***\nError importing packages, have you activated your teuthology virtualenv "
126 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
129 # Must import after teuthology because of gevent monkey patching
132 if os
.path
.exists("./CMakeCache.txt"):
133 # Running in build dir of a cmake build
134 BIN_PREFIX
= "./bin/"
135 SRC_PREFIX
= "../src"
137 # Running in src/ of an autotools build
142 class LocalRemoteProcess(object):
143 def __init__(self
, args
, subproc
, check_status
, stdout
, stderr
):
145 self
.subproc
= subproc
147 self
.stdout
= StringIO()
152 self
.stderr
= StringIO()
156 self
.check_status
= check_status
157 self
.exitstatus
= self
.returncode
= None
161 # Avoid calling communicate() on a dead process because it'll
162 # give you stick about std* already being closed
163 if self
.check_status
and self
.exitstatus
!= 0:
164 raise CommandFailedError(self
.args
, self
.exitstatus
)
168 out
, err
= self
.subproc
.communicate()
169 self
.stdout
.write(out
)
170 self
.stderr
.write(err
)
172 self
.exitstatus
= self
.returncode
= self
.subproc
.returncode
174 if self
.exitstatus
!= 0:
175 sys
.stderr
.write(out
)
176 sys
.stderr
.write(err
)
178 if self
.check_status
and self
.exitstatus
!= 0:
179 raise CommandFailedError(self
.args
, self
.exitstatus
)
183 if self
.exitstatus
is not None:
186 if self
.subproc
.poll() is not None:
187 out
, err
= self
.subproc
.communicate()
188 self
.stdout
.write(out
)
189 self
.stderr
.write(err
)
190 self
.exitstatus
= self
.returncode
= self
.subproc
.returncode
197 if self
.subproc
.pid
and not self
.finished
:
198 log
.info("kill: killing pid {0} ({1})".format(
199 self
.subproc
.pid
, self
.args
))
200 safe_kill(self
.subproc
.pid
)
202 log
.info("kill: already terminated ({0})".format(self
.args
))
206 class FakeStdIn(object):
207 def __init__(self
, mount_daemon
):
208 self
.mount_daemon
= mount_daemon
211 self
.mount_daemon
.kill()
213 return FakeStdIn(self
)
216 class LocalRemote(object):
218 Amusingly named class to present the teuthology RemoteProcess interface when we are really
219 running things locally for vstart
221 Run this inside your src/ dir!
226 self
.hostname
= "localhost"
227 self
.user
= getpass
.getuser()
229 def get_file(self
, path
, sudo
, dest_dir
):
230 tmpfile
= tempfile
.NamedTemporaryFile(delete
=False).name
231 shutil
.copy(path
, tmpfile
)
234 def put_file(self
, src
, dst
, sudo
=False):
235 shutil
.copy(src
, dst
)
237 def run(self
, args
, check_status
=True, wait
=True,
238 stdout
=None, stderr
=None, cwd
=None, stdin
=None,
239 logger
=None, label
=None, env
=None, timeout
=None):
241 # We don't need no stinkin' sudo
242 args
= [a
for a
in args
if a
!= "sudo"]
244 # We have to use shell=True if any run.Raw was present, e.g. &&
245 shell
= any([a
for a
in args
if isinstance(a
, Raw
)])
247 # Filter out helper tools that don't exist in a vstart environment
248 args
= [a
for a
in args
if a
not in {
249 'adjust-ulimits', 'ceph-coverage', 'timeout'}]
251 # Adjust binary path prefix if given a bare program name
252 if "/" not in args
[0]:
253 # If they asked for a bare binary name, and it exists
254 # in our built tree, use the one there.
255 local_bin
= os
.path
.join(BIN_PREFIX
, args
[0])
256 if os
.path
.exists(local_bin
):
257 args
= [local_bin
] + args
[1:]
259 log
.debug("'{0}' is not a binary in the Ceph build dir".format(
263 log
.info("Running {0}".format(args
))
266 subproc
= subprocess
.Popen(quote(args
),
267 stdout
=subprocess
.PIPE
,
268 stderr
=subprocess
.PIPE
,
269 stdin
=subprocess
.PIPE
,
273 # Sanity check that we've got a list of strings
275 if not isinstance(arg
, basestring
):
276 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
280 subproc
= subprocess
.Popen(args
,
281 stdout
=subprocess
.PIPE
,
282 stderr
=subprocess
.PIPE
,
283 stdin
=subprocess
.PIPE
,
288 if not isinstance(stdin
, basestring
):
289 raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
291 # Hack: writing to stdin is not deadlock-safe, but it "always" works
292 # as long as the input buffer is "small"
293 subproc
.stdin
.write(stdin
)
295 proc
= LocalRemoteProcess(
296 args
, subproc
, check_status
,
306 class LocalDaemon(object):
307 def __init__(self
, daemon_type
, daemon_id
):
308 self
.daemon_type
= daemon_type
309 self
.daemon_id
= daemon_id
310 self
.controller
= LocalRemote()
318 return self
._get
_pid
() is not None
320 def check_status(self
):
322 return self
.proc
.poll()
326 Return PID as an integer or None if not found
328 ps_txt
= self
.controller
.run(
329 args
=["ps", "ww", "-u"+str(os
.getuid())]
330 ).stdout
.getvalue().strip()
331 lines
= ps_txt
.split("\n")[1:]
334 if line
.find("ceph-{0} -i {1}".format(self
.daemon_type
, self
.daemon_id
)) != -1:
335 log
.info("Found ps line for daemon: {0}".format(line
))
336 return int(line
.split()[0])
337 log
.info("No match for {0} {1}: {2}".format(
338 self
.daemon_type
, self
.daemon_id
, ps_txt
342 def wait(self
, timeout
):
344 while self
._get
_pid
() is not None:
346 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self
.daemon_type
, self
.daemon_id
))
350 def stop(self
, timeout
=300):
351 if not self
.running():
352 log
.error('tried to stop a non-running daemon')
355 pid
= self
._get
_pid
()
356 log
.info("Killing PID {0} for {1}.{2}".format(pid
, self
.daemon_type
, self
.daemon_id
))
357 os
.kill(pid
, signal
.SIGTERM
)
360 while pid
is not None:
361 new_pid
= self
._get
_pid
()
362 if new_pid
is not None and new_pid
!= pid
:
363 log
.info("Killing new PID {0}".format(new_pid
))
365 os
.kill(pid
, signal
.SIGTERM
)
372 "Timed out waiting for daemon {0}.{1}".format(
373 self
.daemon_type
, self
.daemon_id
))
377 self
.wait(timeout
=timeout
)
380 if self
._get
_pid
() is not None:
383 self
.proc
= self
.controller
.run([os
.path
.join(BIN_PREFIX
, "./ceph-{0}".format(self
.daemon_type
)), "-i", self
.daemon_id
])
385 def signal(self
, sig
, silent
=False):
386 if not self
.running():
387 raise RuntimeError("Can't send signal to non-running daemon")
389 os
.kill(self
._get
_pid
(), sig
)
391 log
.info("Sent signal {0} to {1}.{2}".format(sig
, self
.daemon_type
, self
.daemon_id
))
396 os.kill annoyingly raises exception if process already dead. Ignore it.
399 return os
.kill(pid
, signal
.SIGKILL
)
401 if e
.errno
== errno
.ESRCH
:
402 # Raced with process termination
408 class LocalFuseMount(FuseMount
):
409 def __init__(self
, ctx
, test_dir
, client_id
):
410 super(LocalFuseMount
, self
).__init
__(ctx
, None, test_dir
, client_id
, LocalRemote())
413 def config_path(self
):
416 def get_keyring_path(self
):
417 # This is going to end up in a config file, so use an absolute path
418 # to avoid assumptions about daemons' pwd
419 return os
.path
.abspath("./client.{0}.keyring".format(self
.client_id
))
421 def run_shell(self
, args
, wait
=True):
422 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
423 # the "cd foo && bar" shenanigans isn't needed to begin with and
424 # then we wouldn't have to special case this
425 return self
.client_remote
.run(
426 args
, wait
=wait
, cwd
=self
.mountpoint
429 def setupfs(self
, name
=None):
430 if name
is None and self
.fs
is not None:
431 # Previous mount existed, reuse the old name
433 self
.fs
= LocalFilesystem(self
.ctx
, name
=name
)
434 log
.info('Wait for MDS to reach steady state...')
435 self
.fs
.wait_for_daemons()
436 log
.info('Ready to start {}...'.format(type(self
).__name
__))
442 def _asok_path(self
):
443 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
444 # run foreground. When running it daemonized however, the asok is named after
445 # the PID of the launching process, not the long running ceph-fuse process. Therefore
446 # we need to give an exact path here as the logic for checking /proc/ for which
447 # asok is alive does not work.
449 # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
450 # in a tmpdir. All of the paths are the same, so no need to select
451 # based off of the service type.
453 with
open(self
.config_path
) as f
:
455 asok_conf
= re
.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line
)
457 d
= asok_conf
.groups(1)[0]
459 path
= "{0}/client.{1}.{2}.asok".format(d
, self
.client_id
, self
.fuse_daemon
.subproc
.pid
)
460 log
.info("I think my launching pid was {0}".format(self
.fuse_daemon
.subproc
.pid
))
464 if self
.is_mounted():
465 super(LocalFuseMount
, self
).umount()
467 def mount(self
, mount_path
=None, mount_fs_name
=None):
468 self
.setupfs(name
=mount_fs_name
)
470 self
.client_remote
.run(
478 def list_connections():
479 self
.client_remote
.run(
480 args
=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
483 p
= self
.client_remote
.run(
484 args
=["ls", "/sys/fs/fuse/connections"],
487 if p
.exitstatus
!= 0:
488 log
.warn("ls conns failed with {0}, assuming none".format(p
.exitstatus
))
491 ls_str
= p
.stdout
.getvalue().strip()
493 return [int(n
) for n
in ls_str
.split("\n")]
497 # Before starting ceph-fuse process, note the contents of
498 # /sys/fs/fuse/connections
499 pre_mount_conns
= list_connections()
500 log
.info("Pre-mount connections: {0}".format(pre_mount_conns
))
502 prefix
= [os
.path
.join(BIN_PREFIX
, "ceph-fuse")]
504 prefix
+= ["--client_die_on_failed_dentry_invalidate=false"]
506 if mount_path
is not None:
507 prefix
+= ["--client_mountpoint={0}".format(mount_path
)]
509 if mount_fs_name
is not None:
510 prefix
+= ["--client_mds_namespace={0}".format(mount_fs_name
)]
512 self
.fuse_daemon
= self
.client_remote
.run(args
=
516 "client.{0}".format(self
.client_id
),
520 log
.info("Mounting client.{0} with pid {1}".format(self
.client_id
, self
.fuse_daemon
.subproc
.pid
))
522 # Wait for the connection reference to appear in /sys
524 post_mount_conns
= list_connections()
525 while len(post_mount_conns
) <= len(pre_mount_conns
):
526 if self
.fuse_daemon
.finished
:
527 # Did mount fail? Raise the CommandFailedError instead of
528 # hitting the "failed to populate /sys/" timeout
529 self
.fuse_daemon
.wait()
533 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
536 post_mount_conns
= list_connections()
538 log
.info("Post-mount connections: {0}".format(post_mount_conns
))
540 # Record our fuse connection number so that we can use it when
542 new_conns
= list(set(post_mount_conns
) - set(pre_mount_conns
))
543 if len(new_conns
) == 0:
544 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns
))
545 elif len(new_conns
) > 1:
546 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns
))
548 self
._fuse
_conn
= new_conns
[0]
550 self
.gather_mount_info()
552 def _run_python(self
, pyscript
, py_version
='python'):
554 Override this to remove the daemon-helper prefix that is used otherwise
555 to make the process killable.
557 return self
.client_remote
.run(args
=[py_version
, '-c', pyscript
],
560 class LocalCephManager(CephManager
):
562 # Deliberately skip parent init, only inheriting from it to get
563 # util methods like osd_dump that sit on top of raw_cluster_cmd
564 self
.controller
= LocalRemote()
566 # A minority of CephManager fns actually bother locking for when
567 # certain teuthology tests want to run tasks in parallel
568 self
.lock
= threading
.RLock()
570 self
.log
= lambda x
: log
.info(x
)
572 # Don't bother constructing a map of pools: it should be empty
573 # at test cluster start, and in any case it would be out of date
574 # in no time. The attribute needs to exist for some of the CephManager
575 # methods to work though.
578 def find_remote(self
, daemon_type
, daemon_id
):
580 daemon_type like 'mds', 'osd'
581 daemon_id like 'a', '0'
585 def run_ceph_w(self
, watch_channel
=None):
587 :param watch_channel: Specifies the channel to be watched.
588 This can be 'cluster', 'audit', ...
589 :type watch_channel: str
591 args
= [os
.path
.join(BIN_PREFIX
, "ceph"), "-w"]
592 if watch_channel
is not None:
593 args
.append("--watch-channel")
594 args
.append(watch_channel
)
595 proc
= self
.controller
.run(args
, wait
=False, stdout
=StringIO())
598 def raw_cluster_cmd(self
, *args
, **kwargs
):
600 args like ["osd", "dump"}
603 proc
= self
.controller
.run([os
.path
.join(BIN_PREFIX
, "ceph")] + list(args
), **kwargs
)
604 return proc
.stdout
.getvalue()
606 def raw_cluster_cmd_result(self
, *args
, **kwargs
):
608 like raw_cluster_cmd but don't check status, just return rc
610 kwargs
['check_status'] = False
611 proc
= self
.controller
.run([os
.path
.join(BIN_PREFIX
, "ceph")] + list(args
), **kwargs
)
612 return proc
.exitstatus
614 def admin_socket(self
, daemon_type
, daemon_id
, command
, check_status
=True, timeout
=None):
615 return self
.controller
.run(
616 args
=[os
.path
.join(BIN_PREFIX
, "ceph"), "daemon", "{0}.{1}".format(daemon_type
, daemon_id
)] + command
,
617 check_status
=check_status
,
622 class LocalCephCluster(CephCluster
):
623 def __init__(self
, ctx
):
624 # Deliberately skip calling parent constructor
626 self
.mon_manager
= LocalCephManager()
627 self
._conf
= defaultdict(dict)
630 def admin_remote(self
):
633 def get_config(self
, key
, service_type
=None):
634 if service_type
is None:
637 # FIXME hardcoded vstart service IDs
644 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
646 def _write_conf(self
):
647 # In teuthology, we have the honour of writing the entire ceph.conf, but
648 # in vstart land it has mostly already been written and we need to carefully
650 conf_path
= "./ceph.conf"
651 banner
= "\n#LOCAL_TEST\n"
652 existing_str
= open(conf_path
).read()
654 if banner
in existing_str
:
655 existing_str
= existing_str
[0:existing_str
.find(banner
)]
657 existing_str
+= banner
659 for subsys
, kvs
in self
._conf
.items():
660 existing_str
+= "\n[{0}]\n".format(subsys
)
661 for key
, val
in kvs
.items():
662 # Comment out existing instance if it exists
663 log
.info("Searching for existing instance {0}/{1}".format(
666 existing_section
= re
.search("^\[{0}\]$([\n]|[^\[])+".format(
668 ), existing_str
, re
.MULTILINE
)
671 section_str
= existing_str
[existing_section
.start():existing_section
.end()]
672 existing_val
= re
.search("^\s*[^#]({0}) =".format(key
), section_str
, re
.MULTILINE
)
674 start
= existing_section
.start() + existing_val
.start(1)
675 log
.info("Found string to replace at {0}".format(
678 existing_str
= existing_str
[0:start
] + "#" + existing_str
[start
:]
680 existing_str
+= "{0} = {1}\n".format(key
, val
)
682 open(conf_path
, "w").write(existing_str
)
684 def set_ceph_conf(self
, subsys
, key
, value
):
685 self
._conf
[subsys
][key
] = value
688 def clear_ceph_conf(self
, subsys
, key
):
689 del self
._conf
[subsys
][key
]
693 class LocalMDSCluster(LocalCephCluster
, MDSCluster
):
694 def __init__(self
, ctx
):
695 super(LocalMDSCluster
, self
).__init
__(ctx
)
697 self
.mds_ids
= ctx
.daemons
.daemons
['ceph.mds'].keys()
698 self
.mds_daemons
= dict([(id_
, LocalDaemon("mds", id_
)) for id_
in self
.mds_ids
])
700 def clear_firewall(self
):
701 # FIXME: unimplemented
704 def newfs(self
, name
='cephfs', create
=True):
705 return LocalFilesystem(self
._ctx
, name
=name
, create
=create
)
708 class LocalMgrCluster(LocalCephCluster
, MgrCluster
):
709 def __init__(self
, ctx
):
710 super(LocalMgrCluster
, self
).__init
__(ctx
)
712 self
.mgr_ids
= ctx
.daemons
.daemons
['ceph.mgr'].keys()
713 self
.mgr_daemons
= dict([(id_
, LocalDaemon("mgr", id_
)) for id_
in self
.mgr_ids
])
716 class LocalFilesystem(Filesystem
, LocalMDSCluster
):
717 def __init__(self
, ctx
, fscid
=None, name
='cephfs', create
=False):
718 # Deliberately skip calling parent constructor
723 self
.ec_profile
= None
724 self
.metadata_pool_name
= None
725 self
.metadata_overlay
= False
726 self
.data_pool_name
= None
727 self
.data_pools
= None
729 # Hack: cheeky inspection of ceph.conf to see what MDSs exist
731 for line
in open("ceph.conf").readlines():
732 match
= re
.match("^\[mds\.(.+)\]$", line
)
734 self
.mds_ids
.add(match
.group(1))
737 raise RuntimeError("No MDSs found in ceph.conf!")
739 self
.mds_ids
= list(self
.mds_ids
)
741 log
.info("Discovered MDS IDs: {0}".format(self
.mds_ids
))
743 self
.mon_manager
= LocalCephManager()
745 self
.mds_daemons
= dict([(id_
, LocalDaemon("mds", id_
)) for id_
in self
.mds_ids
])
747 self
.client_remote
= LocalRemote()
749 self
._conf
= defaultdict(dict)
752 if fscid
is not None:
753 raise RuntimeError("cannot specify fscid when creating fs")
754 if create
and not self
.legacy_configured():
757 if fscid
is not None:
759 self
.getinfo(refresh
=True)
761 # Stash a reference to the first created filesystem on ctx, so
762 # that if someone drops to the interactive shell they can easily
764 if not hasattr(self
._ctx
, "filesystem"):
765 self
._ctx
.filesystem
= self
771 def set_clients_block(self
, blocked
, mds_id
=None):
772 raise NotImplementedError()
774 def get_pgs_per_fs_pool(self
):
775 # FIXME: assuming there are 3 OSDs
776 return 3 * int(self
.get_config('mon_pg_warn_min_per_osd'))
779 class InteractiveFailureResult(unittest
.TextTestResult
):
781 Specialization that implements interactive-on-error style
784 def addFailure(self
, test
, err
):
785 super(InteractiveFailureResult
, self
).addFailure(test
, err
)
786 log
.error(self
._exc
_info
_to
_string
(err
, test
))
787 log
.error("Failure in test '{0}', going interactive".format(
788 self
.getDescription(test
)
790 interactive
.task(ctx
=None, config
=None)
792 def addError(self
, test
, err
):
793 super(InteractiveFailureResult
, self
).addError(test
, err
)
794 log
.error(self
._exc
_info
_to
_string
(err
, test
))
795 log
.error("Error in test '{0}', going interactive".format(
796 self
.getDescription(test
)
798 interactive
.task(ctx
=None, config
=None)
801 def enumerate_methods(s
):
802 log
.info("e: {0}".format(s
))
804 if isinstance(t
, suite
.BaseTestSuite
):
805 for sub
in enumerate_methods(t
):
811 def load_tests(modules
, loader
):
813 log
.info("Executing modules: {0}".format(modules
))
815 for mod_name
in modules
:
816 # Test names like cephfs.test_auto_repair
817 module_suites
.append(loader
.loadTestsFromName(mod_name
))
818 log
.info("Loaded: {0}".format(list(module_suites
)))
819 return suite
.TestSuite(module_suites
)
821 log
.info("Executing all cephfs tests")
822 return loader
.discover(
823 os
.path
.join(os
.path
.dirname(os
.path
.abspath(__file__
)), "cephfs")
827 def scan_tests(modules
):
828 overall_suite
= load_tests(modules
, loader
.TestLoader())
831 max_required_clients
= 0
833 require_memstore
= False
835 for suite
, case
in enumerate_methods(overall_suite
):
836 max_required_mds
= max(max_required_mds
,
837 getattr(case
, "MDSS_REQUIRED", 0))
838 max_required_clients
= max(max_required_clients
,
839 getattr(case
, "CLIENTS_REQUIRED", 0))
840 max_required_mgr
= max(max_required_mgr
,
841 getattr(case
, "MGRS_REQUIRED", 0))
842 require_memstore
= getattr(case
, "REQUIRE_MEMSTORE", False) \
845 return max_required_mds
, max_required_clients
, \
846 max_required_mgr
, require_memstore
849 class LocalCluster(object):
850 def __init__(self
, rolename
="placeholder"):
852 LocalRemote(): [rolename
]
855 def only(self
, requested
):
856 return self
.__class
__(rolename
=requested
)
859 class LocalContext(object):
862 self
.teuthology_config
= teuth_config
863 self
.cluster
= LocalCluster()
864 self
.daemons
= DaemonGroup()
866 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
867 # tests that want to look these up via ctx can do so.
868 # Inspect ceph.conf to see what roles exist
869 for conf_line
in open("ceph.conf").readlines():
870 for svc_type
in ["mon", "osd", "mds", "mgr"]:
871 prefixed_type
= "ceph." + svc_type
872 if prefixed_type
not in self
.daemons
.daemons
:
873 self
.daemons
.daemons
[prefixed_type
] = {}
874 match
= re
.match("^\[{0}\.(.+)\]$".format(svc_type
), conf_line
)
876 svc_id
= match
.group(1)
877 self
.daemons
.daemons
[prefixed_type
][svc_id
] = LocalDaemon(svc_type
, svc_id
)
880 shutil
.rmtree(self
.teuthology_config
['test_path'])
884 interactive_on_error
= False
885 create_cluster
= False
886 create_cluster_only
= False
887 ignore_missing_binaries
= False
890 flags
= [a
for a
in args
if a
.startswith("-")]
891 modules
= [a
for a
in args
if not a
.startswith("-")]
893 if f
== "--interactive":
894 interactive_on_error
= True
895 elif f
== "--create":
896 create_cluster
= True
897 elif f
== "--create-cluster-only":
898 create_cluster_only
= True
899 elif f
== "--ignore-missing-binaries":
900 ignore_missing_binaries
= True
902 log
.error("Unknown option '{0}'".format(f
))
905 # Help developers by stopping up-front if their tree isn't built enough for all the
906 # tools that the tests might want to use (add more here if needed)
907 require_binaries
= ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
908 "cephfs-table-tool", "ceph-fuse", "rados"]
909 missing_binaries
= [b
for b
in require_binaries
if not os
.path
.exists(os
.path
.join(BIN_PREFIX
, b
))]
910 if missing_binaries
and not ignore_missing_binaries
:
911 log
.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries
)))
914 max_required_mds
, max_required_clients
, \
915 max_required_mgr
, require_memstore
= scan_tests(modules
)
917 remote
= LocalRemote()
919 # Tolerate no MDSs or clients running at start
921 args
=["ps", "-u"+str(os
.getuid())]
922 ).stdout
.getvalue().strip()
923 lines
= ps_txt
.split("\n")[1:]
925 if 'ceph-fuse' in line
or 'ceph-mds' in line
:
926 pid
= int(line
.split()[0])
927 log
.warn("Killing stray process {0}".format(line
))
928 os
.kill(pid
, signal
.SIGKILL
)
930 # Fire up the Ceph cluster if the user requested it
931 if create_cluster
or create_cluster_only
:
932 log
.info("Creating cluster with {0} MDS daemons".format(
934 remote
.run([os
.path
.join(SRC_PREFIX
, "stop.sh")], check_status
=False)
935 remote
.run(["rm", "-rf", "./out"])
936 remote
.run(["rm", "-rf", "./dev"])
937 vstart_env
= os
.environ
.copy()
938 vstart_env
["FS"] = "0"
939 vstart_env
["MDS"] = max_required_mds
.__str
__()
940 vstart_env
["OSD"] = "4"
941 vstart_env
["MGR"] = max(max_required_mgr
, 1).__str
__()
943 args
= [os
.path
.join(SRC_PREFIX
, "vstart.sh"), "-n", "-d",
946 args
.append("--memstore")
948 remote
.run(args
, env
=vstart_env
)
950 # Wait for OSD to come up so that subsequent injectargs etc will
952 LocalCephCluster(LocalContext()).mon_manager
.wait_for_all_osds_up(timeout
=30)
954 if create_cluster_only
:
957 # List of client mounts, sufficient to run the selected tests
958 clients
= [i
.__str
__() for i
in range(0, max_required_clients
)]
960 test_dir
= tempfile
.mkdtemp()
961 teuth_config
['test_path'] = test_dir
964 ceph_cluster
= LocalCephCluster(ctx
)
965 mds_cluster
= LocalMDSCluster(ctx
)
966 mgr_cluster
= LocalMgrCluster(ctx
)
968 # Construct Mount classes
970 for client_id
in clients
:
971 # Populate client keyring (it sucks to use client.admin for test clients
972 # because it's awkward to find the logs later)
973 client_name
= "client.{0}".format(client_id
)
975 if client_name
not in open("./keyring").read():
976 p
= remote
.run(args
=[os
.path
.join(BIN_PREFIX
, "ceph"), "auth", "get-or-create", client_name
,
981 open("./keyring", "a").write(p
.stdout
.getvalue())
983 mount
= LocalFuseMount(ctx
, test_dir
, client_id
)
985 if mount
.is_mounted():
986 log
.warn("unmounting {0}".format(mount
.mountpoint
))
989 if os
.path
.exists(mount
.mountpoint
):
990 os
.rmdir(mount
.mountpoint
)
992 from tasks
.cephfs_test_runner
import DecoratingLoader
994 class LogStream(object):
998 def write(self
, data
):
1000 if "\n" in self
.buffer:
1001 lines
= self
.buffer.split("\n")
1002 for line
in lines
[:-1]:
1004 # sys.stderr.write(line + "\n")
1006 self
.buffer = lines
[-1]
1011 decorating_loader
= DecoratingLoader({
1014 "ceph_cluster": ceph_cluster
,
1015 "mds_cluster": mds_cluster
,
1016 "mgr_cluster": mgr_cluster
,
1019 # For the benefit of polling tests like test_full -- in teuthology land we set this
1020 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
1021 remote
.run(args
=[os
.path
.join(BIN_PREFIX
, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval", "5"])
1022 ceph_cluster
.set_ceph_conf("osd", "osd_mon_report_interval", "5")
1024 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
1025 # from normal IO latency. Increase it for running teests.
1026 ceph_cluster
.set_ceph_conf("mds", "mds log max segments", "10")
1028 # Make sure the filesystem created in tests has uid/gid that will let us talk to
1029 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
1030 # so that cephfs-data-scan will pick it up too.
1031 ceph_cluster
.set_ceph_conf("global", "mds root ino uid", "%s" % os
.getuid())
1032 ceph_cluster
.set_ceph_conf("global", "mds root ino gid", "%s" % os
.getgid())
1034 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
1035 def _get_package_version(remote
, pkg_name
):
1036 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
1039 import teuthology
.packaging
1040 teuthology
.packaging
.get_package_version
= _get_package_version
1042 overall_suite
= load_tests(modules
, decorating_loader
)
1044 # Filter out tests that don't lend themselves to interactive running,
1046 for case
, method
in enumerate_methods(overall_suite
):
1047 fn
= getattr(method
, method
._testMethodName
)
1051 if hasattr(fn
, 'is_for_teuthology') and getattr(fn
, 'is_for_teuthology') is True:
1053 log
.warn("Dropping test because long running: ".format(method
.id()))
1055 if getattr(fn
, "needs_trimming", False) is True:
1056 drop_test
= (os
.getuid() != 0)
1057 log
.warn("Dropping test because client trim unavailable: ".format(method
.id()))
1060 # Don't drop the test if it was explicitly requested in arguments
1062 for named
in modules
:
1063 if named
.endswith(method
.id()):
1068 victims
.append((case
, method
))
1070 log
.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims
)))
1071 for s
, method
in victims
:
1072 s
._tests
.remove(method
)
1074 if interactive_on_error
:
1075 result_class
= InteractiveFailureResult
1077 result_class
= unittest
.TextTestResult
1078 fail_on_skip
= False
1080 class LoggingResult(result_class
):
1081 def startTest(self
, test
):
1082 log
.info("Starting test: {0}".format(self
.getDescription(test
)))
1083 test
.started_at
= datetime
.datetime
.utcnow()
1084 return super(LoggingResult
, self
).startTest(test
)
1086 def stopTest(self
, test
):
1087 log
.info("Stopped test: {0} in {1}s".format(
1088 self
.getDescription(test
),
1089 (datetime
.datetime
.utcnow() - test
.started_at
).total_seconds()
1092 def addSkip(self
, test
, reason
):
1094 # Don't just call addFailure because that requires a traceback
1095 self
.failures
.append((test
, reason
))
1097 super(LoggingResult
, self
).addSkip(test
, reason
)
1100 result
= unittest
.TextTestRunner(
1102 resultclass
=LoggingResult
,
1104 failfast
=True).run(overall_suite
)
1106 if not result
.wasSuccessful():
1107 result
.printErrors() # duplicate output at end for convenience
1110 for test
, error
in result
.errors
:
1111 bad_tests
.append(str(test
))
1112 for test
, failure
in result
.failures
:
1113 bad_tests
.append(str(test
))
1120 if __name__
== "__main__":