2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.3 python ~/git/ceph/qa/tasks/vstart_runner.py
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
26 # Also, you can create the cluster once and then run named test cases against it:
27 python ~/git/ceph/qa/tasks/vstart_runner.py --create-cluster-only
28 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_health
29 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_rgw
33 from io
import StringIO
34 from collections
import defaultdict
51 from unittest
import suite
, loader
53 from teuthology
.orchestra
.run
import Raw
, quote
54 from teuthology
.orchestra
.daemon
import DaemonGroup
55 from teuthology
.orchestra
.remote
import Remote
56 from teuthology
.config
import config
as teuth_config
57 from teuthology
.contextutil
import safe_while
58 from teuthology
.contextutil
import MaxWhileTries
59 from teuthology
.orchestra
.run
import CommandFailedError
62 urllib3
.disable_warnings(urllib3
.exceptions
.InsecureRequestWarning
)
70 log
= logging
.getLogger(__name__
)
73 logpath
= './vstart_runner.log'
75 handler
= logging
.FileHandler(logpath
)
76 formatter
= logging
.Formatter(
77 fmt
=u
'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
78 datefmt
='%Y-%m-%dT%H:%M:%S')
79 handler
.setFormatter(formatter
)
80 log
.addHandler(handler
)
81 log
.setLevel(logging
.INFO
)
87 def respawn_in_path(lib_path
, python_paths
):
88 execv_cmd
= ['python']
89 if platform
.system() == "Darwin":
90 lib_path_var
= "DYLD_LIBRARY_PATH"
92 lib_path_var
= "LD_LIBRARY_PATH"
94 py_binary
= os
.environ
.get("PYTHON", sys
.executable
)
96 if lib_path_var
in os
.environ
:
97 if lib_path
not in os
.environ
[lib_path_var
]:
98 os
.environ
[lib_path_var
] += ':' + lib_path
99 os
.execvp(py_binary
, execv_cmd
+ sys
.argv
)
101 os
.environ
[lib_path_var
] = lib_path
102 os
.execvp(py_binary
, execv_cmd
+ sys
.argv
)
104 for p
in python_paths
:
105 sys
.path
.insert(0, p
)
108 # Let's use some sensible defaults
109 if os
.path
.exists("./CMakeCache.txt") and os
.path
.exists("./bin"):
111 # A list of candidate paths for each package we need
113 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
114 ["lib/cython_modules/lib.3"],
120 # Up one level so that "tasks.foo.bar" imports work
121 python_paths
.append(os
.path
.abspath(
122 os
.path
.join(os
.path
.dirname(os
.path
.realpath(__file__
)), "..")
125 for package_guesses
in guesses
:
126 for g
in package_guesses
:
127 g_exp
= os
.path
.abspath(os
.path
.expanduser(g
))
128 if os
.path
.exists(g_exp
):
129 python_paths
.append(g_exp
)
131 ld_path
= os
.path
.join(os
.getcwd(), "lib/")
132 print("Using guessed paths {0} {1}".format(ld_path
, python_paths
))
133 respawn_in_path(ld_path
, python_paths
)
137 from tasks
.ceph_manager
import CephManager
138 from tasks
.cephfs
.fuse_mount
import FuseMount
139 from tasks
.cephfs
.kernel_mount
import KernelMount
140 from tasks
.cephfs
.filesystem
import Filesystem
, MDSCluster
, CephCluster
141 from tasks
.cephfs
.mount
import CephFSMount
142 from tasks
.mgr
.mgr_test_case
import MgrCluster
143 from teuthology
.task
import interactive
145 sys
.stderr
.write("***\nError importing packages, have you activated your teuthology virtualenv "
146 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
149 # Must import after teuthology because of gevent monkey patching
152 if os
.path
.exists("./CMakeCache.txt"):
153 # Running in build dir of a cmake build
154 BIN_PREFIX
= "./bin/"
155 SRC_PREFIX
= "../src"
157 # Running in src/ of an autotools build
162 def rm_nonascii_chars(var
):
163 var
= var
.replace(b
'\xe2\x80\x98', b
'\'')
164 var
= var
.replace(b
'\xe2\x80\x99', b
'\'')
167 class LocalRemoteProcess(object):
168 def __init__(self
, args
, subproc
, check_status
, stdout
, stderr
):
170 self
.subproc
= subproc
173 # this variable is meant for instance of this class named fuse_daemon.
174 # child process of the command launched with sudo must be killed,
175 # since killing parent process alone has no impact on the child
179 self
.check_status
= check_status
180 self
.exitstatus
= self
.returncode
= None
184 # Avoid calling communicate() on a dead process because it'll
185 # give you stick about std* already being closed
186 if self
.check_status
and self
.exitstatus
!= 0:
187 raise CommandFailedError(self
.args
, self
.exitstatus
)
191 out
, err
= self
.subproc
.communicate()
192 out
, err
= rm_nonascii_chars(out
), rm_nonascii_chars(err
)
193 if isinstance(self
.stdout
, StringIO
):
194 self
.stdout
.write(out
.decode(errors
='ignore'))
195 elif self
.stdout
is None:
198 self
.stdout
.write(out
)
199 if isinstance(self
.stderr
, StringIO
):
200 self
.stderr
.write(err
.decode(errors
='ignore'))
201 elif self
.stderr
is None:
204 self
.stderr
.write(err
)
206 self
.exitstatus
= self
.returncode
= self
.subproc
.returncode
208 if self
.exitstatus
!= 0:
209 sys
.stderr
.write(out
.decode())
210 sys
.stderr
.write(err
.decode())
212 if self
.check_status
and self
.exitstatus
!= 0:
213 raise CommandFailedError(self
.args
, self
.exitstatus
)
217 if self
.exitstatus
is not None:
220 if self
.subproc
.poll() is not None:
221 out
, err
= self
.subproc
.communicate()
222 if isinstance(self
.stdout
, StringIO
):
223 self
.stdout
.write(out
.decode(errors
='ignore'))
224 elif self
.stdout
is None:
227 self
.stdout
.write(out
)
228 if isinstance(self
.stderr
, StringIO
):
229 self
.stderr
.write(err
.decode(errors
='ignore'))
230 elif self
.stderr
is None:
233 self
.stderr
.write(err
)
234 self
.exitstatus
= self
.returncode
= self
.subproc
.returncode
241 if self
.subproc
.pid
and not self
.finished
:
242 log
.debug("kill: killing pid {0} ({1})".format(
243 self
.subproc
.pid
, self
.args
))
244 if self
.fuse_pid
!= -1:
245 safe_kill(self
.fuse_pid
)
247 safe_kill(self
.subproc
.pid
)
249 log
.debug("kill: already terminated ({0})".format(self
.args
))
253 class FakeStdIn(object):
254 def __init__(self
, mount_daemon
):
255 self
.mount_daemon
= mount_daemon
258 self
.mount_daemon
.kill()
260 return FakeStdIn(self
)
263 class LocalRemote(object):
265 Amusingly named class to present the teuthology RemoteProcess interface when we are really
266 running things locally for vstart
268 Run this inside your src/ dir!
276 self
.hostname
= "localhost"
277 self
.user
= getpass
.getuser()
279 def get_file(self
, path
, sudo
, dest_dir
):
280 tmpfile
= tempfile
.NamedTemporaryFile(delete
=False).name
281 shutil
.copy(path
, tmpfile
)
284 # XXX: This method ignores the error raised when src and dst are
285 # holding same path. For teuthology, same path still represents
286 # different locations as they lie on different machines.
287 def put_file(self
, src
, dst
, sudo
=False):
289 shutil
.copy(src
, dst
)
290 except shutil
.SameFileError
:
293 # XXX: accepts only two arugments to maintain compatibility with
294 # teuthology's mkdtemp.
295 def mkdtemp(self
, suffix
='', parentdir
=None):
296 from tempfile
import mkdtemp
298 # XXX: prefix had to be set without that this method failed against
300 # > /usr/lib64/python2.7/tempfile.py(337)mkdtemp()
301 # -> file = _os.path.join(dir, prefix + name + suffix)
304 return mkdtemp(suffix
=suffix
, prefix
='', dir=parentdir
)
306 def mktemp(self
, suffix
=None, parentdir
=None):
308 Make a remote temporary file
310 Returns: the path of the temp file created.
312 from tempfile
import mktemp
313 return mktemp(suffix
=suffix
, dir=parentdir
)
315 def write_file(self
, path
, data
, sudo
=False, mode
=None, owner
=None,
316 mkdir
=False, append
=False):
320 :param path: file path on host
321 :param data: str, binary or fileobj to be written
322 :param sudo: use sudo to write file, defaults False
323 :param mode: set file mode bits if provided
324 :param owner: set file owner if provided
325 :param mkdir: preliminary create the file directory, defaults False
326 :param append: append data to the file, defaults False
328 dd
= 'sudo dd' if sudo
else 'dd'
329 args
= dd
+ ' of=' + path
331 args
+= ' conv=notrunc oflag=append'
333 mkdirp
= 'sudo mkdir -p' if sudo
else 'mkdir -p'
334 dirpath
= os
.path
.dirname(path
)
336 args
= mkdirp
+ ' ' + dirpath
+ '\n' + args
338 chmod
= 'sudo chmod' if sudo
else 'chmod'
339 args
+= '\n' + chmod
+ ' ' + mode
+ ' ' + path
341 chown
= 'sudo chown' if sudo
else 'chown'
342 args
+= '\n' + chown
+ ' ' + owner
+ ' ' + path
343 omit_sudo
= False if sudo
else True
344 self
.run(args
=args
, stdin
=data
, omit_sudo
=omit_sudo
)
346 def sudo_write_file(self
, path
, data
, **kwargs
):
348 Write data to file with sudo, for more info see `write_file()`.
350 self
.write_file(path
, data
, sudo
=True, **kwargs
)
352 def _perform_checks_and_return_list_of_args(self
, args
, omit_sudo
):
353 # Since Python's shell simulation can only work when commands are
354 # provided as a list of argumensts...
355 if isinstance(args
, str):
358 # We'll let sudo be a part of command even omit flag says otherwise in
359 # cases of commands which can normally be ran only by root.
361 if args
[args
.index('sudo') + 1] in ['-u', 'passwd', 'chown']:
366 # Quotes wrapping a command argument don't work fine in Python's shell
367 # simulation if the arguments contains spaces too. E.g. '"ls"' is OK
369 errmsg
= "Don't surround arguments commands by quotes if it " + \
370 "contains spaces.\nargs - %s" % (args
)
372 if isinstance(arg
, Raw
):
375 if arg
and (arg
[0] in ['"', "'"] or arg
[-1] in ['"', "'"]) and \
376 (arg
.find(' ') != -1 and 0 < arg
.find(' ') < len(arg
) - 1):
377 raise RuntimeError(errmsg
)
379 # ['sudo', '-u', 'user', '-s', 'path-to-shell', '-c', 'ls', 'a']
380 # and ['sudo', '-u', user, '-s', path_to_shell, '-c', 'ls a'] are
381 # treated differently by Python's shell simulation. Only latter has
382 # the desired effect.
383 errmsg
= 'The entire command to executed as other user should be a ' +\
384 'single argument.\nargs - %s' % (args
)
385 if 'sudo' in args
and '-u' in args
and '-c' in args
and \
386 args
.count('-c') == 1:
387 if args
.index('-c') != len(args
) - 2 and \
388 args
[args
.index('-c') + 2].find('-') == -1:
389 raise RuntimeError(errmsg
)
392 args
= [a
for a
in args
if a
!= "sudo"]
396 # Wrapper to keep the interface exactly same as that of
397 # teuthology.remote.run.
398 def run(self
, **kwargs
):
399 return self
._do
_run
(**kwargs
)
401 # XXX: omit_sudo is set to True since using sudo can change the ownership
402 # of files which becomes problematic for following executions of
404 def _do_run(self
, args
, check_status
=True, wait
=True, stdout
=None,
405 stderr
=None, cwd
=None, stdin
=None, logger
=None, label
=None,
406 env
=None, timeout
=None, omit_sudo
=True):
407 args
= self
._perform
_checks
_and
_return
_list
_of
_args
(args
, omit_sudo
)
409 # We have to use shell=True if any run.Raw was present, e.g. &&
410 shell
= any([a
for a
in args
if isinstance(a
, Raw
)])
412 # Filter out helper tools that don't exist in a vstart environment
413 args
= [a
for a
in args
if a
not in ('adjust-ulimits',
416 # Adjust binary path prefix if given a bare program name
417 if not isinstance(args
[0], Raw
) and "/" not in args
[0]:
418 # If they asked for a bare binary name, and it exists
419 # in our built tree, use the one there.
420 local_bin
= os
.path
.join(BIN_PREFIX
, args
[0])
421 if os
.path
.exists(local_bin
):
422 args
= [local_bin
] + args
[1:]
424 log
.debug("'{0}' is not a binary in the Ceph build dir".format(
429 ' '.join([str(a
.value
) if isinstance(a
, Raw
) else a
for a
in args
]))
432 subproc
= subprocess
.Popen(quote(args
),
433 stdout
=subprocess
.PIPE
,
434 stderr
=subprocess
.PIPE
,
435 stdin
=subprocess
.PIPE
,
440 # Sanity check that we've got a list of strings
442 if not isinstance(arg
, str):
443 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
447 subproc
= subprocess
.Popen(args
,
448 stdout
=subprocess
.PIPE
,
449 stderr
=subprocess
.PIPE
,
450 stdin
=subprocess
.PIPE
,
455 # Hack: writing to stdin is not deadlock-safe, but it "always" works
456 # as long as the input buffer is "small"
457 if isinstance(stdin
, str):
458 subproc
.stdin
.write(stdin
.encode())
460 subproc
.stdin
.write(stdin
)
462 proc
= LocalRemoteProcess(
463 args
, subproc
, check_status
,
472 # XXX: for compatibility keep this method same as teuthology.orchestra.remote.sh
473 # BytesIO is being used just to keep things identical
474 def sh(self
, script
, **kwargs
):
476 Shortcut for run method.
479 my_name = remote.sh('whoami')
480 remote_date = remote.sh('date')
482 from io
import BytesIO
484 if 'stdout' not in kwargs
:
485 kwargs
['stdout'] = BytesIO()
486 if 'args' not in kwargs
:
487 kwargs
['args'] = script
488 proc
= self
.run(**kwargs
)
489 out
= proc
.stdout
.getvalue()
490 if isinstance(out
, bytes
):
495 class LocalDaemon(object):
496 def __init__(self
, daemon_type
, daemon_id
):
497 self
.daemon_type
= daemon_type
498 self
.daemon_id
= daemon_id
499 self
.controller
= LocalRemote()
507 return self
._get
_pid
() is not None
509 def check_status(self
):
511 return self
.proc
.poll()
515 Return PID as an integer or None if not found
517 ps_txt
= self
.controller
.run(args
=["ps", "ww", "-u"+str(os
.getuid())],
519 stdout
.getvalue().strip()
520 lines
= ps_txt
.split("\n")[1:]
523 if line
.find("ceph-{0} -i {1}".format(self
.daemon_type
, self
.daemon_id
)) != -1:
524 log
.debug("Found ps line for daemon: {0}".format(line
))
525 return int(line
.split()[0])
526 if opt_log_ps_output
:
527 log
.debug("No match for {0} {1}: {2}".format(
528 self
.daemon_type
, self
.daemon_id
, ps_txt
))
530 log
.debug("No match for {0} {1}".format(self
.daemon_type
,
534 def wait(self
, timeout
):
536 while self
._get
_pid
() is not None:
538 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self
.daemon_type
, self
.daemon_id
))
542 def stop(self
, timeout
=300):
543 if not self
.running():
544 log
.error('tried to stop a non-running daemon')
547 pid
= self
._get
_pid
()
548 log
.debug("Killing PID {0} for {1}.{2}".format(pid
, self
.daemon_type
, self
.daemon_id
))
549 os
.kill(pid
, signal
.SIGTERM
)
552 while pid
is not None:
553 new_pid
= self
._get
_pid
()
554 if new_pid
is not None and new_pid
!= pid
:
555 log
.debug("Killing new PID {0}".format(new_pid
))
557 os
.kill(pid
, signal
.SIGTERM
)
564 "Timed out waiting for daemon {0}.{1}".format(
565 self
.daemon_type
, self
.daemon_id
))
569 self
.wait(timeout
=timeout
)
572 if self
._get
_pid
() is not None:
575 self
.proc
= self
.controller
.run(args
=[
576 os
.path
.join(BIN_PREFIX
, "./ceph-{0}".format(self
.daemon_type
)),
577 "-i", self
.daemon_id
])
579 def signal(self
, sig
, silent
=False):
580 if not self
.running():
581 raise RuntimeError("Can't send signal to non-running daemon")
583 os
.kill(self
._get
_pid
(), sig
)
585 log
.debug("Sent signal {0} to {1}.{2}".format(sig
, self
.daemon_type
, self
.daemon_id
))
590 os.kill annoyingly raises exception if process already dead. Ignore it.
593 return os
.kill(pid
, signal
.SIGKILL
)
595 if e
.errno
== errno
.ESRCH
:
596 # Raced with process termination
601 def mon_in_localhost(config_path
="./ceph.conf"):
603 If the ceph cluster is using the localhost IP as mon host, will must disable ns unsharing
605 with
open(config_path
) as f
:
607 local
= re
.match(r
'^\s*mon host\s*=\s*\[((v1|v2):127\.0\.0\.1:\d+,?)+\]', line
)
612 class LocalKernelMount(KernelMount
):
613 def __init__(self
, ctx
, test_dir
, client_id
=None,
614 client_keyring_path
=None, client_remote
=None,
615 hostfs_mntpt
=None, cephfs_name
=None, cephfs_mntpt
=None,
617 super(LocalKernelMount
, self
).__init
__(ctx
=ctx
, test_dir
=test_dir
,
618 client_id
=client_id
, client_keyring_path
=client_keyring_path
,
619 client_remote
=LocalRemote(), hostfs_mntpt
=hostfs_mntpt
,
620 cephfs_name
=cephfs_name
, cephfs_mntpt
=cephfs_mntpt
, brxnet
=brxnet
)
623 def config_path(self
):
626 def get_keyring_path(self
):
627 # This is going to end up in a config file, so use an absolute path
628 # to avoid assumptions about daemons' pwd
629 keyring_path
= "./client.{0}.keyring".format(self
.client_id
)
631 os
.stat(keyring_path
)
633 return os
.path
.join(os
.getcwd(), 'keyring')
637 def setupfs(self
, name
=None):
638 if name
is None and self
.fs
is not None:
639 # Previous mount existed, reuse the old name
641 self
.fs
= LocalFilesystem(self
.ctx
, name
=name
)
642 log
.debug('Wait for MDS to reach steady state...')
643 self
.fs
.wait_for_daemons()
644 log
.debug('Ready to start {}...'.format(type(self
).__name
__))
650 def _asok_path(self
):
651 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
652 # run foreground. When running it daemonized however, the asok is named after
653 # the PID of the launching process, not the long running ceph-fuse process. Therefore
654 # we need to give an exact path here as the logic for checking /proc/ for which
655 # asok is alive does not work.
657 # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
658 # in a tmpdir. All of the paths are the same, so no need to select
659 # based off of the service type.
661 with
open(self
.config_path
) as f
:
663 asok_conf
= re
.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line
)
665 d
= asok_conf
.groups(1)[0]
667 path
= "{0}/client.{1}.*.asok".format(d
, self
.client_id
)
670 def mount(self
, mntopts
=[], createfs
=True, check_status
=True, **kwargs
):
671 self
.update_attrs(**kwargs
)
672 self
.assert_and_log_minimum_mount_details()
675 self
.using_namespace
= True
678 self
.using_namespace
= False
680 if not self
.cephfs_mntpt
:
681 self
.cephfs_mntpt
= "/"
682 # TODO: don't call setupfs() from within mount()
684 self
.setupfs(name
=self
.cephfs_name
)
686 opts
= 'norequire_active_mds'
688 opts
+= ',name=' + self
.client_id
689 if self
.client_keyring_path
and self
.client_id
:
690 opts
+= ",secret=" + self
.get_key_from_keyfile()
692 opts
+= ',conf=' + self
.config_path
694 opts
+= ",mds_namespace={0}".format(self
.cephfs_name
)
696 opts
+= ',' + ','.join(mntopts
)
700 self
.client_remote
.run(args
=['mkdir', '--', self
.hostfs_mntpt
],
701 timeout
=(5*60), stderr
=stderr
)
702 except CommandFailedError
:
703 if 'file exists' not in stderr
.getvalue().lower():
706 if self
.cephfs_mntpt
is None:
707 self
.cephfs_mntpt
= "/"
709 if self
.using_namespace
:
710 cmdargs
+= ['nsenter',
711 '--net=/var/run/netns/{0}'.format(self
.netns_name
)]
712 cmdargs
+= ['./bin/mount.ceph', ':' + self
.cephfs_mntpt
,
713 self
.hostfs_mntpt
, '-v', '-o', opts
]
715 mountcmd_stdout
, mountcmd_stderr
= StringIO(), StringIO()
717 self
.client_remote
.run(args
=cmdargs
, timeout
=(30*60),
718 omit_sudo
=False, stdout
=mountcmd_stdout
,
719 stderr
=mountcmd_stderr
)
720 except CommandFailedError
as e
:
724 return (e
, mountcmd_stdout
.getvalue(),
725 mountcmd_stderr
.getvalue())
729 self
.client_remote
.run(args
=['sudo', 'chmod', '1777',
730 self
.hostfs_mntpt
], stderr
=stderr
,
732 except CommandFailedError
:
733 # the client does not have write permissions in cap it holds for
734 # the Ceph FS that was just mounted.
735 if 'permission denied' in stderr
.getvalue().lower():
740 def cleanup_netns(self
):
741 if self
.using_namespace
:
742 super(type(self
), self
).cleanup_netns()
744 def _run_python(self
, pyscript
, py_version
='python'):
746 Override this to remove the daemon-helper prefix that is used otherwise
747 to make the process killable.
749 return self
.client_remote
.run(args
=[py_version
, '-c', pyscript
],
750 wait
=False, stdout
=StringIO())
752 class LocalFuseMount(FuseMount
):
753 def __init__(self
, ctx
, test_dir
, client_id
, client_keyring_path
=None,
754 client_remote
=None, hostfs_mntpt
=None, cephfs_name
=None,
755 cephfs_mntpt
=None, brxnet
=None):
756 super(LocalFuseMount
, self
).__init
__(ctx
=ctx
, client_config
=None,
757 test_dir
=test_dir
, client_id
=client_id
,
758 client_keyring_path
=client_keyring_path
,
759 client_remote
=LocalRemote(), hostfs_mntpt
=hostfs_mntpt
,
760 cephfs_name
=cephfs_name
, cephfs_mntpt
=cephfs_mntpt
, brxnet
=brxnet
)
763 def config_path(self
):
766 def get_keyring_path(self
):
767 # This is going to end up in a config file, so use an absolute path
768 # to avoid assumptions about daemons' pwd
769 return os
.path
.abspath("./client.{0}.keyring".format(self
.client_id
))
771 def setupfs(self
, name
=None):
772 if name
is None and self
.fs
is not None:
773 # Previous mount existed, reuse the old name
775 self
.fs
= LocalFilesystem(self
.ctx
, name
=name
)
776 log
.debug('Wait for MDS to reach steady state...')
777 self
.fs
.wait_for_daemons()
778 log
.debug('Ready to start {}...'.format(type(self
).__name
__))
784 def _asok_path(self
):
785 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
786 # run foreground. When running it daemonized however, the asok is named after
787 # the PID of the launching process, not the long running ceph-fuse process. Therefore
788 # we need to give an exact path here as the logic for checking /proc/ for which
789 # asok is alive does not work.
791 # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
792 # in a tmpdir. All of the paths are the same, so no need to select
793 # based off of the service type.
795 with
open(self
.config_path
) as f
:
797 asok_conf
= re
.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line
)
799 d
= asok_conf
.groups(1)[0]
801 path
= "{0}/client.{1}.*.asok".format(d
, self
.client_id
)
804 def mount(self
, mntopts
=[], createfs
=True, check_status
=True, **kwargs
):
805 self
.update_attrs(**kwargs
)
806 self
.assert_and_log_minimum_mount_details()
809 self
.using_namespace
= True
812 self
.using_namespace
= False
814 # TODO: don't call setupfs() from within mount()
816 self
.setupfs(name
=self
.cephfs_name
)
820 self
.client_remote
.run(args
=['mkdir', '-p', self
.hostfs_mntpt
],
822 except CommandFailedError
:
823 if 'file exists' not in stderr
.getvalue().lower():
826 def list_connections():
827 self
.client_remote
.run(
828 args
=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
832 p
= self
.client_remote
.run(args
=["ls", "/sys/fs/fuse/connections"],
833 check_status
=False, stdout
=StringIO())
834 if p
.exitstatus
!= 0:
835 log
.warning("ls conns failed with {0}, assuming none".format(p
.exitstatus
))
838 ls_str
= p
.stdout
.getvalue().strip()
840 return [int(n
) for n
in ls_str
.split("\n")]
844 # Before starting ceph-fuse process, note the contents of
845 # /sys/fs/fuse/connections
846 pre_mount_conns
= list_connections()
847 log
.debug("Pre-mount connections: {0}".format(pre_mount_conns
))
850 if self
.using_namespace
:
851 cmdargs
= ['sudo', 'nsenter',
852 '--net=/var/run/netns/{0}'.format(self
.netns_name
),
853 '--setuid', str(os
.getuid())]
854 cmdargs
+= [os
.path
.join(BIN_PREFIX
, 'ceph-fuse'), self
.hostfs_mntpt
,
856 if self
.client_id
is not None:
857 cmdargs
+= ["--id", self
.client_id
]
858 if self
.client_keyring_path
and self
.client_id
is not None:
859 cmdargs
.extend(['-k', self
.client_keyring_path
])
861 cmdargs
+= ["--client_fs=" + self
.cephfs_name
]
862 if self
.cephfs_mntpt
:
863 cmdargs
+= ["--client_mountpoint=" + self
.cephfs_mntpt
]
865 cmdargs
+= ["--client_die_on_failed_dentry_invalidate=false"]
869 mountcmd_stdout
, mountcmd_stderr
= StringIO(), StringIO()
870 self
.fuse_daemon
= self
.client_remote
.run(args
=cmdargs
, wait
=False,
871 omit_sudo
=False, stdout
=mountcmd_stdout
, stderr
=mountcmd_stderr
)
872 self
._set
_fuse
_daemon
_pid
(check_status
)
873 log
.debug("Mounting client.{0} with pid "
874 "{1}".format(self
.client_id
, self
.fuse_daemon
.subproc
.pid
))
876 # Wait for the connection reference to appear in /sys
878 post_mount_conns
= list_connections()
879 while len(post_mount_conns
) <= len(pre_mount_conns
):
880 if self
.fuse_daemon
.finished
:
881 # Did mount fail? Raise the CommandFailedError instead of
882 # hitting the "failed to populate /sys/" timeout
884 self
.fuse_daemon
.wait()
885 except CommandFailedError
as e
:
889 return (e
, mountcmd_stdout
.getvalue(),
890 mountcmd_stderr
.getvalue())
894 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
897 post_mount_conns
= list_connections()
899 log
.debug("Post-mount connections: {0}".format(post_mount_conns
))
901 # Record our fuse connection number so that we can use it when
903 new_conns
= list(set(post_mount_conns
) - set(pre_mount_conns
))
904 if len(new_conns
) == 0:
905 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns
))
906 elif len(new_conns
) > 1:
907 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns
))
909 self
._fuse
_conn
= new_conns
[0]
911 self
.gather_mount_info()
915 def _set_fuse_daemon_pid(self
, check_status
):
916 # NOTE: When a command <args> is launched with sudo, two processes are
917 # launched, one with sudo in <args> and other without. Make sure we
918 # get the PID of latter one.
920 with
safe_while(sleep
=1, tries
=15) as proceed
:
923 sock
= self
.find_admin_socket()
924 except (RuntimeError, CommandFailedError
):
927 self
.fuse_daemon
.fuse_pid
= int(re
.match(".*\.(\d+)\.asok$",
930 except MaxWhileTries
:
936 def cleanup_netns(self
):
937 if self
.using_namespace
:
938 super(type(self
), self
).cleanup_netns()
940 def _run_python(self
, pyscript
, py_version
='python'):
942 Override this to remove the daemon-helper prefix that is used otherwise
943 to make the process killable.
945 return self
.client_remote
.run(args
=[py_version
, '-c', pyscript
],
946 wait
=False, stdout
=StringIO())
948 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
950 class LocalCephManager(CephManager
):
952 # Deliberately skip parent init, only inheriting from it to get
953 # util methods like osd_dump that sit on top of raw_cluster_cmd
954 self
.controller
= LocalRemote()
956 # A minority of CephManager fns actually bother locking for when
957 # certain teuthology tests want to run tasks in parallel
958 self
.lock
= threading
.RLock()
960 self
.log
= lambda x
: log
.debug(x
)
962 # Don't bother constructing a map of pools: it should be empty
963 # at test cluster start, and in any case it would be out of date
964 # in no time. The attribute needs to exist for some of the CephManager
965 # methods to work though.
968 def find_remote(self
, daemon_type
, daemon_id
):
970 daemon_type like 'mds', 'osd'
971 daemon_id like 'a', '0'
975 def run_ceph_w(self
, watch_channel
=None):
977 :param watch_channel: Specifies the channel to be watched.
978 This can be 'cluster', 'audit', ...
979 :type watch_channel: str
981 args
= [os
.path
.join(BIN_PREFIX
, "ceph"), "-w"]
982 if watch_channel
is not None:
983 args
.append("--watch-channel")
984 args
.append(watch_channel
)
985 proc
= self
.controller
.run(args
=args
, wait
=False, stdout
=StringIO())
988 def run_cluster_cmd(self
, **kwargs
):
990 Run a Ceph command and the object representing the process for the
993 Accepts arguments same as teuthology.orchestra.remote.run().
995 kwargs
['args'] = [os
.path
.join(BIN_PREFIX
,'ceph')]+list(kwargs
['args'])
996 return self
.controller
.run(**kwargs
)
998 def raw_cluster_cmd(self
, *args
, **kwargs
) -> str:
1000 args like ["osd", "dump"}
1001 return stdout string
1003 kwargs
['args'] = args
1004 if kwargs
.get('stdout') is None:
1005 kwargs
['stdout'] = StringIO()
1006 return self
.run_cluster_cmd(**kwargs
).stdout
.getvalue()
1008 def raw_cluster_cmd_result(self
, *args
, **kwargs
):
1010 like raw_cluster_cmd but don't check status, just return rc
1012 kwargs
['args'], kwargs
['check_status'] = args
, False
1013 return self
.run_cluster_cmd(**kwargs
).exitstatus
1015 def admin_socket(self
, daemon_type
, daemon_id
, command
, check_status
=True,
1016 timeout
=None, stdout
=None):
1020 return self
.controller
.run(
1021 args
=[os
.path
.join(BIN_PREFIX
, "ceph"), "daemon",
1022 "{0}.{1}".format(daemon_type
, daemon_id
)] + command
,
1023 check_status
=check_status
, timeout
=timeout
, stdout
=stdout
)
1025 def get_mon_socks(self
):
1027 Get monitor sockets.
1029 :return socks: tuple of strings; strings are individual sockets.
1031 from json
import loads
1033 output
= loads(self
.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1035 for mon
in output
['mons']:
1036 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1037 socks
.append(addrvec_mem
['addr'])
1040 def get_msgrv1_mon_socks(self
):
1042 Get monitor sockets that use msgrv2 to operate.
1044 :return socks: tuple of strings; strings are individual sockets.
1046 from json
import loads
1048 output
= loads(self
.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1050 for mon
in output
['mons']:
1051 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1052 if addrvec_mem
['type'] == 'v1':
1053 socks
.append(addrvec_mem
['addr'])
1056 def get_msgrv2_mon_socks(self
):
1058 Get monitor sockets that use msgrv2 to operate.
1060 :return socks: tuple of strings; strings are individual sockets.
1062 from json
import loads
1064 output
= loads(self
.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1066 for mon
in output
['mons']:
1067 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1068 if addrvec_mem
['type'] == 'v2':
1069 socks
.append(addrvec_mem
['addr'])
1073 class LocalCephCluster(CephCluster
):
1074 def __init__(self
, ctx
):
1075 # Deliberately skip calling CephCluster constructor
1077 self
.mon_manager
= LocalCephManager()
1078 self
._conf
= defaultdict(dict)
1081 def admin_remote(self
):
1082 return LocalRemote()
1084 def get_config(self
, key
, service_type
=None):
1085 if service_type
is None:
1086 service_type
= 'mon'
1088 # FIXME hardcoded vstart service IDs
1095 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
1097 def _write_conf(self
):
1098 # In teuthology, we have the honour of writing the entire ceph.conf, but
1099 # in vstart land it has mostly already been written and we need to carefully
1101 conf_path
= "./ceph.conf"
1102 banner
= "\n#LOCAL_TEST\n"
1103 existing_str
= open(conf_path
).read()
1105 if banner
in existing_str
:
1106 existing_str
= existing_str
[0:existing_str
.find(banner
)]
1108 existing_str
+= banner
1110 for subsys
, kvs
in self
._conf
.items():
1111 existing_str
+= "\n[{0}]\n".format(subsys
)
1112 for key
, val
in kvs
.items():
1113 # Comment out existing instance if it exists
1114 log
.debug("Searching for existing instance {0}/{1}".format(
1117 existing_section
= re
.search("^\[{0}\]$([\n]|[^\[])+".format(
1119 ), existing_str
, re
.MULTILINE
)
1121 if existing_section
:
1122 section_str
= existing_str
[existing_section
.start():existing_section
.end()]
1123 existing_val
= re
.search("^\s*[^#]({0}) =".format(key
), section_str
, re
.MULTILINE
)
1125 start
= existing_section
.start() + existing_val
.start(1)
1126 log
.debug("Found string to replace at {0}".format(
1129 existing_str
= existing_str
[0:start
] + "#" + existing_str
[start
:]
1131 existing_str
+= "{0} = {1}\n".format(key
, val
)
1133 open(conf_path
, "w").write(existing_str
)
1135 def set_ceph_conf(self
, subsys
, key
, value
):
1136 self
._conf
[subsys
][key
] = value
1139 def clear_ceph_conf(self
, subsys
, key
):
1140 del self
._conf
[subsys
][key
]
1144 class LocalMDSCluster(LocalCephCluster
, MDSCluster
):
1145 def __init__(self
, ctx
):
1146 LocalCephCluster
.__init
__(self
, ctx
)
1147 # Deliberately skip calling MDSCluster constructor
1148 self
._mds
_ids
= ctx
.daemons
.daemons
['ceph.mds'].keys()
1149 log
.debug("Discovered MDS IDs: {0}".format(self
._mds
_ids
))
1150 self
._mds
_daemons
= dict([(id_
, LocalDaemon("mds", id_
)) for id_
in self
.mds_ids
])
1154 return self
._mds
_ids
1157 def mds_daemons(self
):
1158 return self
._mds
_daemons
1160 def clear_firewall(self
):
1161 # FIXME: unimplemented
1164 def newfs(self
, name
='cephfs', create
=True):
1165 return LocalFilesystem(self
._ctx
, name
=name
, create
=create
)
1167 def delete_all_filesystems(self
):
1169 Remove all filesystems that exist, and any pools in use by them.
1171 for fs
in self
.status().get_filesystems():
1172 LocalFilesystem(ctx
=self
._ctx
, fscid
=fs
['id']).destroy()
1175 class LocalMgrCluster(LocalCephCluster
, MgrCluster
):
1176 def __init__(self
, ctx
):
1177 super(LocalMgrCluster
, self
).__init
__(ctx
)
1179 self
.mgr_ids
= ctx
.daemons
.daemons
['ceph.mgr'].keys()
1180 self
.mgr_daemons
= dict([(id_
, LocalDaemon("mgr", id_
)) for id_
in self
.mgr_ids
])
1183 class LocalFilesystem(LocalMDSCluster
, Filesystem
):
1184 def __init__(self
, ctx
, fs_config
={}, fscid
=None, name
=None, create
=False):
1185 # Deliberately skip calling Filesystem constructor
1186 LocalMDSCluster
.__init
__(self
, ctx
)
1190 self
.metadata_pool_name
= None
1191 self
.metadata_overlay
= False
1192 self
.data_pool_name
= None
1193 self
.data_pools
= None
1194 self
.fs_config
= fs_config
1195 self
.ec_profile
= fs_config
.get('ec_profile')
1197 self
.mon_manager
= LocalCephManager()
1199 self
.client_remote
= LocalRemote()
1201 self
._conf
= defaultdict(dict)
1203 if name
is not None:
1204 if fscid
is not None:
1205 raise RuntimeError("cannot specify fscid when creating fs")
1206 if create
and not self
.legacy_configured():
1209 if fscid
is not None:
1211 self
.getinfo(refresh
=True)
1213 # Stash a reference to the first created filesystem on ctx, so
1214 # that if someone drops to the interactive shell they can easily
1216 if not hasattr(self
._ctx
, "filesystem"):
1217 self
._ctx
.filesystem
= self
1223 def set_clients_block(self
, blocked
, mds_id
=None):
1224 raise NotImplementedError()
1227 class LocalCluster(object):
1228 def __init__(self
, rolename
="placeholder"):
1230 LocalRemote(): [rolename
]
1233 def only(self
, requested
):
1234 return self
.__class
__(rolename
=requested
)
1237 class LocalContext(object):
1240 self
.teuthology_config
= teuth_config
1241 self
.cluster
= LocalCluster()
1242 self
.daemons
= DaemonGroup()
1244 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
1245 # tests that want to look these up via ctx can do so.
1246 # Inspect ceph.conf to see what roles exist
1247 for conf_line
in open("ceph.conf").readlines():
1248 for svc_type
in ["mon", "osd", "mds", "mgr"]:
1249 prefixed_type
= "ceph." + svc_type
1250 if prefixed_type
not in self
.daemons
.daemons
:
1251 self
.daemons
.daemons
[prefixed_type
] = {}
1252 match
= re
.match("^\[{0}\.(.+)\]$".format(svc_type
), conf_line
)
1254 svc_id
= match
.group(1)
1255 self
.daemons
.daemons
[prefixed_type
][svc_id
] = LocalDaemon(svc_type
, svc_id
)
1258 test_path
= self
.teuthology_config
['test_path']
1259 # opt_create_cluster_only does not create the test path
1261 shutil
.rmtree(test_path
)
1264 #########################################
1266 # stuff necessary for launching tests...
1268 #########################################
1271 def enumerate_methods(s
):
1272 log
.debug("e: {0}".format(s
))
1274 if isinstance(t
, suite
.BaseTestSuite
):
1275 for sub
in enumerate_methods(t
):
1281 def load_tests(modules
, loader
):
1283 log
.debug("Executing modules: {0}".format(modules
))
1285 for mod_name
in modules
:
1286 # Test names like cephfs.test_auto_repair
1287 module_suites
.append(loader
.loadTestsFromName(mod_name
))
1288 log
.debug("Loaded: {0}".format(list(module_suites
)))
1289 return suite
.TestSuite(module_suites
)
1291 log
.debug("Executing all cephfs tests")
1292 return loader
.discover(
1293 os
.path
.join(os
.path
.dirname(os
.path
.abspath(__file__
)), "cephfs")
1297 def scan_tests(modules
):
1298 overall_suite
= load_tests(modules
, loader
.TestLoader())
1300 max_required_mds
= 0
1301 max_required_clients
= 0
1302 max_required_mgr
= 0
1303 require_memstore
= False
1305 for suite_
, case
in enumerate_methods(overall_suite
):
1306 max_required_mds
= max(max_required_mds
,
1307 getattr(case
, "MDSS_REQUIRED", 0))
1308 max_required_clients
= max(max_required_clients
,
1309 getattr(case
, "CLIENTS_REQUIRED", 0))
1310 max_required_mgr
= max(max_required_mgr
,
1311 getattr(case
, "MGRS_REQUIRED", 0))
1312 require_memstore
= getattr(case
, "REQUIRE_MEMSTORE", False) \
1315 return max_required_mds
, max_required_clients
, \
1316 max_required_mgr
, require_memstore
1321 self
.conf_file_path
= os
.path
.join(os
.getcwd(), 'logrotate.conf')
1322 self
.state_file_path
= os
.path
.join(os
.getcwd(), 'logrotate.state')
1324 def run_logrotate(self
):
1325 remote
.run(args
=['logrotate', '-f', self
.conf_file_path
, '-s',
1326 self
.state_file_path
, '--verbose'])
1329 def teardown_cluster():
1330 log
.info('\ntearing down the cluster...')
1331 remote
.run(args
=[os
.path
.join(SRC_PREFIX
, "stop.sh")], timeout
=60)
1332 remote
.run(args
=['rm', '-rf', './dev', './out'])
1335 def clear_old_log():
1340 # would need an update when making this py3 compatible. Use FileNotFound
1346 with
open(logpath
, 'w') as logfile
:
1349 log
.debug('logging in a fresh file now...')
1352 class LogStream(object):
1355 self
.omit_result_lines
= False
1357 def _del_result_lines(self
):
1359 Don't let unittest.TextTestRunner print "Ran X tests in Ys",
1360 vstart_runner.py will do it for itself since it runs tests in a
1361 testsuite one by one.
1363 if self
.omit_result_lines
:
1364 self
.buffer = re
.sub('-'*70+'\nran [0-9]* test in [0-9.]*s\n*',
1365 '', self
.buffer, flags
=re
.I
)
1366 self
.buffer = re
.sub('failed \(failures=[0-9]*\)\n', '', self
.buffer,
1368 self
.buffer = self
.buffer.replace('OK\n', '')
1370 def write(self
, data
):
1372 if self
.buffer.count("\n") > 5:
1377 self
._del
_result
_lines
()
1378 if self
.buffer == '':
1381 lines
= self
.buffer.split("\n")
1383 # sys.stderr.write(line + "\n")
1394 class InteractiveFailureResult(unittest
.TextTestResult
):
1396 Specialization that implements interactive-on-error style
1399 def addFailure(self
, test
, err
):
1400 super(InteractiveFailureResult
, self
).addFailure(test
, err
)
1401 log
.error(self
._exc
_info
_to
_string
(err
, test
))
1402 log
.error("Failure in test '{0}', going interactive".format(
1403 self
.getDescription(test
)
1405 interactive
.task(ctx
=None, config
=None)
1407 def addError(self
, test
, err
):
1408 super(InteractiveFailureResult
, self
).addError(test
, err
)
1409 log
.error(self
._exc
_info
_to
_string
(err
, test
))
1410 log
.error("Error in test '{0}', going interactive".format(
1411 self
.getDescription(test
)
1413 interactive
.task(ctx
=None, config
=None)
1416 # XXX: class we require would be inherited from this one and one of
1417 # InteractiveFailureResult and unittestunittest.TextTestResult.
1418 class LoggingResultTemplate(object):
1419 fail_on_skip
= False
1421 def startTest(self
, test
):
1422 log
.info("Starting test: {0}".format(self
.getDescription(test
)))
1423 test
.started_at
= datetime
.datetime
.utcnow()
1424 return super(LoggingResultTemplate
, self
).startTest(test
)
1426 def stopTest(self
, test
):
1427 log
.info("Stopped test: {0} in {1}s".format(
1428 self
.getDescription(test
),
1429 (datetime
.datetime
.utcnow() - test
.started_at
).total_seconds()
1432 def addSkip(self
, test
, reason
):
1433 if LoggingResultTemplate
.fail_on_skip
:
1434 # Don't just call addFailure because that requires a traceback
1435 self
.failures
.append((test
, reason
))
1437 super(LoggingResultTemplate
, self
).addSkip(test
, reason
)
1440 def launch_tests(overall_suite
):
1441 if opt_rotate_logs
or not opt_exit_on_test_failure
:
1442 return launch_individually(overall_suite
)
1444 return launch_entire_suite(overall_suite
)
1447 def get_logging_result_class():
1448 result_class
= InteractiveFailureResult
if opt_interactive_on_error
else \
1449 unittest
.TextTestResult
1450 return type('', (LoggingResultTemplate
, result_class
), {})
1453 def launch_individually(overall_suite
):
1454 no_of_tests_execed
= 0
1455 no_of_tests_failed
, no_of_tests_execed
= 0, 0
1456 LoggingResult
= get_logging_result_class()
1457 stream
= LogStream()
1458 stream
.omit_result_lines
= True
1460 logrotate
= LogRotate()
1462 started_at
= datetime
.datetime
.utcnow()
1463 for suite_
, case
in enumerate_methods(overall_suite
):
1464 # don't run logrotate beforehand since some ceph daemons might be
1465 # down and pre/post-rotate scripts in logrotate.conf might fail.
1467 logrotate
.run_logrotate()
1469 result
= unittest
.TextTestRunner(stream
=stream
,
1470 resultclass
=LoggingResult
,
1471 verbosity
=2, failfast
=True).run(case
)
1473 if not result
.wasSuccessful():
1474 if opt_exit_on_test_failure
:
1477 no_of_tests_failed
+= 1
1479 no_of_tests_execed
+= 1
1480 time_elapsed
= (datetime
.datetime
.utcnow() - started_at
).total_seconds()
1482 if result
.wasSuccessful():
1485 log
.info(f
'Ran {no_of_tests_execed} tests in {time_elapsed}s')
1486 if no_of_tests_failed
> 0:
1487 log
.info(f
'{no_of_tests_failed} tests failed')
1494 def launch_entire_suite(overall_suite
):
1495 LoggingResult
= get_logging_result_class()
1497 testrunner
= unittest
.TextTestRunner(stream
=LogStream(),
1498 resultclass
=LoggingResult
,
1499 verbosity
=2, failfast
=True)
1500 return testrunner
.run(overall_suite
)
1505 global opt_interactive_on_error
1506 opt_interactive_on_error
= False
1507 opt_create_cluster
= False
1508 opt_create_cluster_only
= False
1509 opt_ignore_missing_binaries
= False
1510 opt_teardown_cluster
= False
1511 global opt_log_ps_output
1512 opt_log_ps_output
= False
1513 use_kernel_client
= False
1518 global opt_rotate_logs
1519 opt_rotate_logs
= False
1520 global opt_exit_on_test_failure
1521 opt_exit_on_test_failure
= True
1524 flags
= [a
for a
in args
if a
.startswith("-")]
1525 modules
= [a
for a
in args
if not a
.startswith("-")]
1527 if f
== "--interactive":
1528 opt_interactive_on_error
= True
1529 elif f
== "--create":
1530 opt_create_cluster
= True
1531 elif f
== "--create-cluster-only":
1532 opt_create_cluster_only
= True
1533 elif f
== "--ignore-missing-binaries":
1534 opt_ignore_missing_binaries
= True
1535 elif f
== '--teardown':
1536 opt_teardown_cluster
= True
1537 elif f
== '--log-ps-output':
1538 opt_log_ps_output
= True
1539 elif f
== '--clear-old-log':
1541 elif f
== "--kclient":
1542 use_kernel_client
= True
1543 elif f
== '--usens':
1545 elif '--brxnet' in f
:
1546 if re
.search(r
'=[0-9./]+', f
) is None:
1547 log
.error("--brxnet=<ip/mask> option needs one argument: '{0}'".format(f
))
1549 opt_brxnet
=f
.split('=')[1]
1552 if IP(opt_brxnet
).iptype() == 'PUBLIC':
1553 raise RuntimeError('is public')
1554 except Exception as e
:
1555 log
.error("Invalid ip '{0}' {1}".format(opt_brxnet
, e
))
1557 elif '--no-verbose' == f
:
1559 elif f
== '--rotate-logs':
1560 opt_rotate_logs
= True
1561 elif f
== '--run-all-tests':
1562 opt_exit_on_test_failure
= False
1563 elif f
== '--debug':
1564 log
.setLevel(logging
.DEBUG
)
1566 log
.error("Unknown option '{0}'".format(f
))
1569 # Help developers by stopping up-front if their tree isn't built enough for all the
1570 # tools that the tests might want to use (add more here if needed)
1571 require_binaries
= ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
1572 "cephfs-table-tool", "ceph-fuse", "rados", "cephfs-meta-injection"]
1573 missing_binaries
= [b
for b
in require_binaries
if not os
.path
.exists(os
.path
.join(BIN_PREFIX
, b
))]
1574 if missing_binaries
and not opt_ignore_missing_binaries
:
1575 log
.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries
)))
1578 max_required_mds
, max_required_clients
, \
1579 max_required_mgr
, require_memstore
= scan_tests(modules
)
1582 remote
= LocalRemote()
1584 CephFSMount
.cleanup_stale_netnses_and_bridge(remote
)
1586 # Tolerate no MDSs or clients running at start
1587 ps_txt
= remote
.run(args
=["ps", "-u"+str(os
.getuid())],
1588 stdout
=StringIO()).stdout
.getvalue().strip()
1589 lines
= ps_txt
.split("\n")[1:]
1591 if 'ceph-fuse' in line
or 'ceph-mds' in line
:
1592 pid
= int(line
.split()[0])
1593 log
.warning("Killing stray process {0}".format(line
))
1594 os
.kill(pid
, signal
.SIGKILL
)
1596 # Fire up the Ceph cluster if the user requested it
1597 if opt_create_cluster
or opt_create_cluster_only
:
1598 log
.info("Creating cluster with {0} MDS daemons".format(
1601 vstart_env
= os
.environ
.copy()
1602 vstart_env
["FS"] = "0"
1603 vstart_env
["MDS"] = max_required_mds
.__str
__()
1604 vstart_env
["OSD"] = "4"
1605 vstart_env
["MGR"] = max(max_required_mgr
, 1).__str
__()
1608 os
.path
.join(SRC_PREFIX
, "vstart.sh"),
1612 if require_memstore
:
1613 args
.append("--memstore")
1618 # usually, i get vstart.sh running completely in less than 100
1620 remote
.run(args
=args
, env
=vstart_env
, timeout
=(3 * 60))
1622 # Wait for OSD to come up so that subsequent injectargs etc will
1623 # definitely succeed
1624 LocalCephCluster(LocalContext()).mon_manager
.wait_for_all_osds_up(timeout
=30)
1626 if opt_create_cluster_only
:
1629 if opt_use_ns
and mon_in_localhost() and not opt_create_cluster
:
1630 raise RuntimeError("cluster is on localhost; '--usens' option is incompatible. Or you can pass an extra '--create' option to create a new cluster without localhost!")
1632 # List of client mounts, sufficient to run the selected tests
1633 clients
= [i
.__str
__() for i
in range(0, max_required_clients
)]
1635 test_dir
= tempfile
.mkdtemp()
1636 teuth_config
['test_path'] = test_dir
1638 ctx
= LocalContext()
1639 ceph_cluster
= LocalCephCluster(ctx
)
1640 mds_cluster
= LocalMDSCluster(ctx
)
1641 mgr_cluster
= LocalMgrCluster(ctx
)
1643 # Construct Mount classes
1645 for client_id
in clients
:
1646 # Populate client keyring (it sucks to use client.admin for test clients
1647 # because it's awkward to find the logs later)
1648 client_name
= "client.{0}".format(client_id
)
1650 if client_name
not in open("./keyring").read():
1651 p
= remote
.run(args
=[os
.path
.join(BIN_PREFIX
, "ceph"), "auth", "get-or-create", client_name
,
1654 "mon", "allow r"], stdout
=StringIO())
1656 open("./keyring", "at").write(p
.stdout
.getvalue())
1658 if use_kernel_client
:
1659 mount
= LocalKernelMount(ctx
=ctx
, test_dir
=test_dir
,
1660 client_id
=client_id
, brxnet
=opt_brxnet
)
1662 mount
= LocalFuseMount(ctx
=ctx
, test_dir
=test_dir
,
1663 client_id
=client_id
, brxnet
=opt_brxnet
)
1665 mounts
.append(mount
)
1666 if os
.path
.exists(mount
.hostfs_mntpt
):
1667 if mount
.is_mounted():
1668 log
.warning("unmounting {0}".format(mount
.hostfs_mntpt
))
1671 os
.rmdir(mount
.hostfs_mntpt
)
1673 from tasks
.cephfs_test_runner
import DecoratingLoader
1675 decorating_loader
= DecoratingLoader({
1678 "ceph_cluster": ceph_cluster
,
1679 "mds_cluster": mds_cluster
,
1680 "mgr_cluster": mgr_cluster
,
1683 # For the benefit of polling tests like test_full -- in teuthology land we set this
1684 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
1685 remote
.run(args
=[os
.path
.join(BIN_PREFIX
, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval", "5"])
1686 ceph_cluster
.set_ceph_conf("osd", "osd_mon_report_interval", "5")
1688 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
1689 # from normal IO latency. Increase it for running teests.
1690 ceph_cluster
.set_ceph_conf("mds", "mds log max segments", "10")
1692 # Make sure the filesystem created in tests has uid/gid that will let us talk to
1693 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
1694 # so that cephfs-data-scan will pick it up too.
1695 ceph_cluster
.set_ceph_conf("global", "mds root ino uid", "%s" % os
.getuid())
1696 ceph_cluster
.set_ceph_conf("global", "mds root ino gid", "%s" % os
.getgid())
1698 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
1699 def _get_package_version(remote
, pkg_name
):
1700 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
1703 import teuthology
.packaging
1704 teuthology
.packaging
.get_package_version
= _get_package_version
1706 overall_suite
= load_tests(modules
, decorating_loader
)
1708 # Filter out tests that don't lend themselves to interactive running,
1710 for case
, method
in enumerate_methods(overall_suite
):
1711 fn
= getattr(method
, method
._testMethodName
)
1715 if hasattr(fn
, 'is_for_teuthology') and getattr(fn
, 'is_for_teuthology') is True:
1717 log
.warning("Dropping test because long running: {method_id}".format(method_id
=method
.id()))
1719 if getattr(fn
, "needs_trimming", False) is True:
1720 drop_test
= (os
.getuid() != 0)
1721 log
.warning("Dropping test because client trim unavailable: {method_id}".format(method_id
=method
.id()))
1724 # Don't drop the test if it was explicitly requested in arguments
1726 for named
in modules
:
1727 if named
.endswith(method
.id()):
1732 victims
.append((case
, method
))
1734 log
.debug("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims
)))
1735 for s
, method
in victims
:
1736 s
._tests
.remove(method
)
1738 overall_suite
= load_tests(modules
, loader
.TestLoader())
1739 result
= launch_tests(overall_suite
)
1741 CephFSMount
.cleanup_stale_netnses_and_bridge(remote
)
1742 if opt_teardown_cluster
:
1745 if not result
.wasSuccessful():
1746 # no point in duplicating if we can have multiple failures in same
1748 if opt_exit_on_test_failure
:
1749 result
.printErrors() # duplicate output at end for convenience
1752 for test
, error
in result
.errors
:
1753 bad_tests
.append(str(test
))
1754 for test
, failure
in result
.failures
:
1755 bad_tests
.append(str(test
))
1762 if __name__
== "__main__":