]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/vstart_runner.py
import 15.2.2 octopus source
[ceph.git] / ceph / qa / tasks / vstart_runner.py
1 """
2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
5
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
7
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
11 cd ~/git/ceph/build
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
14
15 Alternative usage:
16
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.3 python ~/git/ceph/qa/tasks/vstart_runner.py
19
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
22
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
25
26 # Also, you can create the cluster once and then run named test cases against it:
27 python ~/git/ceph/qa/tasks/vstart_runner.py --create-cluster-only
28 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_health
29 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_rgw
30
31 """
32
33 from six import StringIO
34 from io import BytesIO
35 from collections import defaultdict
36 import getpass
37 import signal
38 import tempfile
39 import threading
40 import datetime
41 import shutil
42 import re
43 import os
44 import time
45 import sys
46 import errno
47 from unittest import suite, loader
48 import unittest
49 import platform
50 from teuthology import misc
51 from teuthology.orchestra.run import Raw, quote
52 from teuthology.orchestra.daemon import DaemonGroup
53 from teuthology.config import config as teuth_config
54 import six
55 import logging
56
57 def init_log():
58 global log
59 if log is not None:
60 del log
61 log = logging.getLogger(__name__)
62
63 global logpath
64 logpath = './vstart_runner.log'
65
66 handler = logging.FileHandler(logpath)
67 formatter = logging.Formatter(
68 fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
69 datefmt='%Y-%m-%dT%H:%M:%S')
70 handler.setFormatter(formatter)
71 log.addHandler(handler)
72 log.setLevel(logging.INFO)
73
74 log = None
75 init_log()
76
77
78 def respawn_in_path(lib_path, python_paths):
79 execv_cmd = ['python']
80 if platform.system() == "Darwin":
81 lib_path_var = "DYLD_LIBRARY_PATH"
82 else:
83 lib_path_var = "LD_LIBRARY_PATH"
84
85 py_binary = os.environ.get("PYTHON", sys.executable)
86
87 if lib_path_var in os.environ:
88 if lib_path not in os.environ[lib_path_var]:
89 os.environ[lib_path_var] += ':' + lib_path
90 os.execvp(py_binary, execv_cmd + sys.argv)
91 else:
92 os.environ[lib_path_var] = lib_path
93 os.execvp(py_binary, execv_cmd + sys.argv)
94
95 for p in python_paths:
96 sys.path.insert(0, p)
97
98
99 # Let's use some sensible defaults
100 if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
101
102 # A list of candidate paths for each package we need
103 guesses = [
104 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
105 ["lib/cython_modules/lib.3"],
106 ["../src/pybind"],
107 ]
108
109 python_paths = []
110
111 # Up one level so that "tasks.foo.bar" imports work
112 python_paths.append(os.path.abspath(
113 os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
114 ))
115
116 for package_guesses in guesses:
117 for g in package_guesses:
118 g_exp = os.path.abspath(os.path.expanduser(g))
119 if os.path.exists(g_exp):
120 python_paths.append(g_exp)
121
122 ld_path = os.path.join(os.getcwd(), "lib/")
123 print("Using guessed paths {0} {1}".format(ld_path, python_paths))
124 respawn_in_path(ld_path, python_paths)
125
126
127 try:
128 from teuthology.exceptions import CommandFailedError
129 from tasks.ceph_manager import CephManager
130 from tasks.cephfs.fuse_mount import FuseMount
131 from tasks.cephfs.kernel_mount import KernelMount
132 from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
133 from mgr.mgr_test_case import MgrCluster
134 from teuthology.contextutil import MaxWhileTries
135 from teuthology.task import interactive
136 except ImportError:
137 sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
138 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
139 raise
140
141 # Must import after teuthology because of gevent monkey patching
142 import subprocess
143
144 if os.path.exists("./CMakeCache.txt"):
145 # Running in build dir of a cmake build
146 BIN_PREFIX = "./bin/"
147 SRC_PREFIX = "../src"
148 else:
149 # Running in src/ of an autotools build
150 BIN_PREFIX = "./"
151 SRC_PREFIX = "./"
152
153
154 def rm_nonascii_chars(var):
155 var = var.replace('\xe2\x80\x98', '\'')
156 var = var.replace('\xe2\x80\x99', '\'')
157 return var
158
159 class LocalRemoteProcess(object):
160 def __init__(self, args, subproc, check_status, stdout, stderr):
161 self.args = args
162 self.subproc = subproc
163 self.stdout = stdout or BytesIO()
164 self.stderr = stderr or BytesIO()
165
166 self.check_status = check_status
167 self.exitstatus = self.returncode = None
168
169 def wait(self):
170 if self.finished:
171 # Avoid calling communicate() on a dead process because it'll
172 # give you stick about std* already being closed
173 if self.check_status and self.exitstatus != 0:
174 raise CommandFailedError(self.args, self.exitstatus)
175 else:
176 return
177
178 out, err = self.subproc.communicate()
179 out, err = rm_nonascii_chars(out), rm_nonascii_chars(err)
180 self.stdout.write(out)
181 self.stderr.write(err)
182
183 self.exitstatus = self.returncode = self.subproc.returncode
184
185 if self.exitstatus != 0:
186 sys.stderr.write(six.ensure_str(out))
187 sys.stderr.write(six.ensure_str(err))
188
189 if self.check_status and self.exitstatus != 0:
190 raise CommandFailedError(self.args, self.exitstatus)
191
192 @property
193 def finished(self):
194 if self.exitstatus is not None:
195 return True
196
197 if self.subproc.poll() is not None:
198 out, err = self.subproc.communicate()
199 self.stdout.write(out)
200 self.stderr.write(err)
201 self.exitstatus = self.returncode = self.subproc.returncode
202 return True
203 else:
204 return False
205
206 def kill(self):
207 log.info("kill ")
208 if self.subproc.pid and not self.finished:
209 log.info("kill: killing pid {0} ({1})".format(
210 self.subproc.pid, self.args))
211 safe_kill(self.subproc.pid)
212 else:
213 log.info("kill: already terminated ({0})".format(self.args))
214
215 @property
216 def stdin(self):
217 class FakeStdIn(object):
218 def __init__(self, mount_daemon):
219 self.mount_daemon = mount_daemon
220
221 def close(self):
222 self.mount_daemon.kill()
223
224 return FakeStdIn(self)
225
226
227 class LocalRemote(object):
228 """
229 Amusingly named class to present the teuthology RemoteProcess interface when we are really
230 running things locally for vstart
231
232 Run this inside your src/ dir!
233 """
234
235 def __init__(self):
236 self.name = "local"
237 self.hostname = "localhost"
238 self.user = getpass.getuser()
239
240 def get_file(self, path, sudo, dest_dir):
241 tmpfile = tempfile.NamedTemporaryFile(delete=False).name
242 shutil.copy(path, tmpfile)
243 return tmpfile
244
245 # XXX: This method ignores the error raised when src and dst are
246 # holding same path. For teuthology, same path still represents
247 # different locations as they lie on different machines.
248 def put_file(self, src, dst, sudo=False):
249 if sys.version_info.major < 3:
250 exception = shutil.Error
251 elif sys.version_info.major >= 3:
252 exception = shutil.SameFileError
253
254 try:
255 shutil.copy(src, dst)
256 except exception as e:
257 if sys.version_info.major < 3 and e.message.find('are the same '
258 'file') != -1:
259 return
260 raise e
261
262 # XXX: accepts only two arugments to maintain compatibility with
263 # teuthology's mkdtemp.
264 def mkdtemp(self, suffix='', parentdir=None):
265 from tempfile import mkdtemp
266
267 # XXX: prefix had to be set without that this method failed against
268 # Python2.7 -
269 # > /usr/lib64/python2.7/tempfile.py(337)mkdtemp()
270 # -> file = _os.path.join(dir, prefix + name + suffix)
271 # (Pdb) p prefix
272 # None
273 return mkdtemp(suffix=suffix, prefix='', dir=parentdir)
274
275 def mktemp(self, suffix=None, parentdir=None):
276 """
277 Make a remote temporary file
278
279 Returns: the path of the temp file created.
280 """
281 from tempfile import mktemp
282 return mktemp(suffix=suffix, dir=parentdir)
283
284 def _perform_checks_and_return_list_of_args(self, args, omit_sudo):
285 # Since Python's shell simulation can only work when commands are
286 # provided as a list of argumensts...
287 if isinstance(args, str) or isinstance(args, six.text_type):
288 args = args.split()
289
290 # We'll let sudo be a part of command even omit flag says otherwise in
291 # cases of commands which can normally be ran only by root.
292 try:
293 if args[args.index('sudo') + 1] in ['-u', 'passwd', 'chown']:
294 omit_sudo = False
295 except ValueError:
296 pass
297
298 # Quotes wrapping a command argument don't work fine in Python's shell
299 # simulation if the arguments contains spaces too. E.g. '"ls"' is OK
300 # but "ls /" isn't.
301 errmsg = "Don't surround arguments commands by quotes if it " + \
302 "contains spaces.\nargs - %s" % (args)
303 for arg in args:
304 if isinstance(arg, Raw):
305 continue
306
307 if arg and (arg[0] in ['"', "'"] or arg[-1] in ['"', "'"]) and \
308 (arg.find(' ') != -1 and 0 < arg.find(' ') < len(arg) - 1):
309 raise RuntimeError(errmsg)
310
311 # ['sudo', '-u', 'user', '-s', 'path-to-shell', '-c', 'ls', 'a']
312 # and ['sudo', '-u', user, '-s', path_to_shell, '-c', 'ls a'] are
313 # treated differently by Python's shell simulation. Only latter has
314 # the desired effect.
315 errmsg = 'The entire command to executed as other user should be a ' +\
316 'single argument.\nargs - %s' % (args)
317 if 'sudo' in args and '-u' in args and '-c' in args and \
318 args.count('-c') == 1:
319 if args.index('-c') != len(args) - 2 and \
320 args[args.index('-c') + 2].find('-') == -1:
321 raise RuntimeError(errmsg)
322
323 if omit_sudo:
324 args = [a for a in args if a != "sudo"]
325
326 return args
327
328 # Wrapper to keep the interface exactly same as that of
329 # teuthology.remote.run.
330 def run(self, **kwargs):
331 return self._do_run(**kwargs)
332
333 def _do_run(self, args, check_status=True, wait=True, stdout=None,
334 stderr=None, cwd=None, stdin=None, logger=None, label=None,
335 env=None, timeout=None, omit_sudo=False):
336 args = self._perform_checks_and_return_list_of_args(args, omit_sudo)
337
338 # We have to use shell=True if any run.Raw was present, e.g. &&
339 shell = any([a for a in args if isinstance(a, Raw)])
340
341 # Filter out helper tools that don't exist in a vstart environment
342 args = [a for a in args if a not in {
343 'adjust-ulimits', 'ceph-coverage'}]
344
345 # Adjust binary path prefix if given a bare program name
346 if "/" not in args[0]:
347 # If they asked for a bare binary name, and it exists
348 # in our built tree, use the one there.
349 local_bin = os.path.join(BIN_PREFIX, args[0])
350 if os.path.exists(local_bin):
351 args = [local_bin] + args[1:]
352 else:
353 log.debug("'{0}' is not a binary in the Ceph build dir".format(
354 args[0]
355 ))
356
357 log.info("Running {0}".format(args))
358
359 if shell:
360 subproc = subprocess.Popen(quote(args),
361 stdout=subprocess.PIPE,
362 stderr=subprocess.PIPE,
363 stdin=subprocess.PIPE,
364 cwd=cwd,
365 shell=True)
366 else:
367 # Sanity check that we've got a list of strings
368 for arg in args:
369 if not isinstance(arg, six.string_types):
370 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
371 arg, arg.__class__
372 ))
373
374 subproc = subprocess.Popen(args,
375 stdout=subprocess.PIPE,
376 stderr=subprocess.PIPE,
377 stdin=subprocess.PIPE,
378 cwd=cwd,
379 env=env)
380
381 if stdin:
382 if not isinstance(stdin, six.string_types):
383 raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
384
385 # Hack: writing to stdin is not deadlock-safe, but it "always" works
386 # as long as the input buffer is "small"
387 subproc.stdin.write(stdin)
388
389 proc = LocalRemoteProcess(
390 args, subproc, check_status,
391 stdout, stderr
392 )
393
394 if wait:
395 proc.wait()
396
397 return proc
398
399 def sh(self, command, log_limit=1024, cwd=None, env=None):
400
401 return misc.sh(command=command, log_limit=log_limit, cwd=cwd,
402 env=env)
403
404 class LocalDaemon(object):
405 def __init__(self, daemon_type, daemon_id):
406 self.daemon_type = daemon_type
407 self.daemon_id = daemon_id
408 self.controller = LocalRemote()
409 self.proc = None
410
411 @property
412 def remote(self):
413 return LocalRemote()
414
415 def running(self):
416 return self._get_pid() is not None
417
418 def check_status(self):
419 if self.proc:
420 return self.proc.poll()
421
422 def _get_pid(self):
423 """
424 Return PID as an integer or None if not found
425 """
426 ps_txt = six.ensure_str(self.controller.run(
427 args=["ps", "ww", "-u"+str(os.getuid())]
428 ).stdout.getvalue()).strip()
429 lines = ps_txt.split("\n")[1:]
430
431 for line in lines:
432 if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
433 log.info("Found ps line for daemon: {0}".format(line))
434 return int(line.split()[0])
435 if opt_log_ps_output:
436 log.info("No match for {0} {1}: {2}".format(
437 self.daemon_type, self.daemon_id, ps_txt))
438 else:
439 log.info("No match for {0} {1}".format(self.daemon_type,
440 self.daemon_id))
441 return None
442
443 def wait(self, timeout):
444 waited = 0
445 while self._get_pid() is not None:
446 if waited > timeout:
447 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
448 time.sleep(1)
449 waited += 1
450
451 def stop(self, timeout=300):
452 if not self.running():
453 log.error('tried to stop a non-running daemon')
454 return
455
456 pid = self._get_pid()
457 log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
458 os.kill(pid, signal.SIGTERM)
459
460 waited = 0
461 while pid is not None:
462 new_pid = self._get_pid()
463 if new_pid is not None and new_pid != pid:
464 log.info("Killing new PID {0}".format(new_pid))
465 pid = new_pid
466 os.kill(pid, signal.SIGTERM)
467
468 if new_pid is None:
469 break
470 else:
471 if waited > timeout:
472 raise MaxWhileTries(
473 "Timed out waiting for daemon {0}.{1}".format(
474 self.daemon_type, self.daemon_id))
475 time.sleep(1)
476 waited += 1
477
478 self.wait(timeout=timeout)
479
480 def restart(self):
481 if self._get_pid() is not None:
482 self.stop()
483
484 self.proc = self.controller.run(args=[
485 os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)),
486 "-i", self.daemon_id])
487
488 def signal(self, sig, silent=False):
489 if not self.running():
490 raise RuntimeError("Can't send signal to non-running daemon")
491
492 os.kill(self._get_pid(), sig)
493 if not silent:
494 log.info("Sent signal {0} to {1}.{2}".format(sig, self.daemon_type, self.daemon_id))
495
496
497 def safe_kill(pid):
498 """
499 os.kill annoyingly raises exception if process already dead. Ignore it.
500 """
501 try:
502 return os.kill(pid, signal.SIGKILL)
503 except OSError as e:
504 if e.errno == errno.ESRCH:
505 # Raced with process termination
506 pass
507 else:
508 raise
509
510
511 class LocalKernelMount(KernelMount):
512 def __init__(self, ctx, test_dir, client_id):
513 super(LocalKernelMount, self).__init__(ctx, test_dir, client_id, LocalRemote(), None, None, None)
514
515 @property
516 def config_path(self):
517 return "./ceph.conf"
518
519 def get_keyring_path(self):
520 # This is going to end up in a config file, so use an absolute path
521 # to avoid assumptions about daemons' pwd
522 keyring_path = "./client.{0}.keyring".format(self.client_id)
523 try:
524 os.stat(keyring_path)
525 except OSError:
526 return os.path.join(os.getcwd(), 'keyring')
527 else:
528 return keyring_path
529
530 def run_shell(self, args, wait=True, stdin=None, check_status=True,
531 omit_sudo=False):
532 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
533 # the "cd foo && bar" shenanigans isn't needed to begin with and
534 # then we wouldn't have to special case this
535 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
536 stdin=stdin, check_status=check_status,
537 omit_sudo=omit_sudo)
538
539 def run_as_user(self, args, user, wait=True, stdin=None, check_status=True):
540 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
541 # the "cd foo && bar" shenanigans isn't needed to begin with and
542 # then we wouldn't have to special case this
543 if isinstance(args, str):
544 args = 'sudo -u %s -s /bin/bash -c %s' % (user, args)
545 elif isinstance(args, list):
546 cmdlist = args
547 cmd = ''
548 for i in cmdlist:
549 cmd = cmd + i + ' '
550 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c']
551 args.append(cmd)
552
553 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
554 check_status=check_status, stdin=stdin,
555 omit_sudo=False)
556
557 def run_as_root(self, args, wait=True, stdin=None, check_status=True):
558 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
559 # the "cd foo && bar" shenanigans isn't needed to begin with and
560 # then we wouldn't have to special case this
561 if isinstance(args, str):
562 args = 'sudo ' + args
563 if isinstance(args, list):
564 args.insert(0, 'sudo')
565
566 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
567 check_status=check_status,
568 omit_sudo=False)
569
570 def testcmd(self, args, wait=True, stdin=None, omit_sudo=False):
571 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
572 # the "cd foo && bar" shenanigans isn't needed to begin with and
573 # then we wouldn't have to special case this
574 return self.run_shell(args, wait=wait, stdin=stdin, check_status=False,
575 omit_sudo=omit_sudo)
576
577 def testcmd_as_user(self, args, user, wait=True, stdin=None):
578 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
579 # the "cd foo && bar" shenanigans isn't needed to begin with and
580 # then we wouldn't have to special case this
581 return self.run_as_user(args, user=user, wait=wait, stdin=stdin,
582 check_status=False)
583
584 def testcmd_as_root(self, args, wait=True, stdin=None):
585 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
586 # the "cd foo && bar" shenanigans isn't needed to begin with and
587 # then we wouldn't have to special case this
588 return self.run_as_root(args, wait=wait, stdin=stdin,
589 check_status=False)
590
591 def setupfs(self, name=None):
592 if name is None and self.fs is not None:
593 # Previous mount existed, reuse the old name
594 name = self.fs.name
595 self.fs = LocalFilesystem(self.ctx, name=name)
596 log.info('Wait for MDS to reach steady state...')
597 self.fs.wait_for_daemons()
598 log.info('Ready to start {}...'.format(type(self).__name__))
599
600 @property
601 def _prefix(self):
602 return BIN_PREFIX
603
604 def _asok_path(self):
605 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
606 # run foreground. When running it daemonized however, the asok is named after
607 # the PID of the launching process, not the long running ceph-fuse process. Therefore
608 # we need to give an exact path here as the logic for checking /proc/ for which
609 # asok is alive does not work.
610
611 # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
612 # in a tmpdir. All of the paths are the same, so no need to select
613 # based off of the service type.
614 d = "./out"
615 with open(self.config_path) as f:
616 for line in f:
617 asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line)
618 if asok_conf:
619 d = asok_conf.groups(1)[0]
620 break
621 path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid)
622 log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
623 return path
624
625 def mount(self, mount_path=None, mount_fs_name=None, mount_options=[]):
626 self.setupfs(name=mount_fs_name)
627
628 log.info('Mounting kclient client.{id} at {remote} {mnt}...'.format(
629 id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
630
631 self.client_remote.run(
632 args=[
633 'mkdir',
634 '--',
635 self.mountpoint,
636 ],
637 timeout=(5*60),
638 )
639
640 if mount_path is None:
641 mount_path = "/"
642
643 opts = 'name={id},norequire_active_mds,conf={conf}'.format(id=self.client_id,
644 conf=self.config_path)
645
646 if mount_fs_name is not None:
647 opts += ",mds_namespace={0}".format(mount_fs_name)
648
649 for mount_opt in mount_options:
650 opts += ",{0}".format(mount_opt)
651
652 self.client_remote.run(
653 args=[
654 'sudo',
655 './bin/mount.ceph',
656 ':{mount_path}'.format(mount_path=mount_path),
657 self.mountpoint,
658 '-v',
659 '-o',
660 opts
661 ],
662 timeout=(30*60),
663 omit_sudo=False,
664 )
665
666 self.client_remote.run(
667 args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(5*60))
668
669 self.mounted = True
670
671 def _run_python(self, pyscript, py_version='python'):
672 """
673 Override this to remove the daemon-helper prefix that is used otherwise
674 to make the process killable.
675 """
676 return self.client_remote.run(args=[py_version, '-c', pyscript],
677 wait=False)
678
679 class LocalFuseMount(FuseMount):
680 def __init__(self, ctx, test_dir, client_id):
681 super(LocalFuseMount, self).__init__(ctx, None, test_dir, client_id, LocalRemote())
682
683 @property
684 def config_path(self):
685 return "./ceph.conf"
686
687 def get_keyring_path(self):
688 # This is going to end up in a config file, so use an absolute path
689 # to avoid assumptions about daemons' pwd
690 return os.path.abspath("./client.{0}.keyring".format(self.client_id))
691
692 def run_shell(self, args, wait=True, stdin=None, check_status=True, omit_sudo=True):
693 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
694 # the "cd foo && bar" shenanigans isn't needed to begin with and
695 # then we wouldn't have to special case this
696 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
697 stdin=stdin, check_status=check_status,
698 omit_sudo=omit_sudo)
699
700 def run_as_user(self, args, user, wait=True, stdin=None, check_status=True):
701 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
702 # the "cd foo && bar" shenanigans isn't needed to begin with and
703 # then we wouldn't have to special case this
704 if isinstance(args, str):
705 args = 'sudo -u %s -s /bin/bash -c %s' % (user, args)
706 elif isinstance(args, list):
707 cmdlist = args
708 cmd = ''
709 for i in cmdlist:
710 cmd = cmd + i + ' '
711 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c']
712 args.append(cmd)
713
714 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
715 check_status=check_status, stdin=stdin,
716 omit_sudo=False)
717
718 def run_as_root(self, args, wait=True, stdin=None, check_status=True):
719 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
720 # the "cd foo && bar" shenanigans isn't needed to begin with and
721 # then we wouldn't have to special case this
722 if isinstance(args, str):
723 args = 'sudo ' + args
724 if isinstance(args, list):
725 args.insert(0, 'sudo')
726
727 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
728 check_status=check_status,
729 omit_sudo=False)
730
731 def testcmd(self, args, wait=True, stdin=None, omit_sudo=True):
732 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
733 # the "cd foo && bar" shenanigans isn't needed to begin with and
734 # then we wouldn't have to special case this
735 return self.run_shell(args, wait=wait, stdin=stdin, check_status=False,
736 omit_sudo=omit_sudo)
737
738 def testcmd_as_user(self, args, user, wait=True, stdin=None):
739 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
740 # the "cd foo && bar" shenanigans isn't needed to begin with and
741 # then we wouldn't have to special case this
742 return self.run_as_user(args, user=user, wait=wait, stdin=stdin,
743 check_status=False)
744
745 def testcmd_as_root(self, args, wait=True, stdin=None):
746 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
747 # the "cd foo && bar" shenanigans isn't needed to begin with and
748 # then we wouldn't have to special case this
749 return self.run_as_root(args, wait=wait, stdin=stdin,
750 check_status=False)
751
752 def setupfs(self, name=None):
753 if name is None and self.fs is not None:
754 # Previous mount existed, reuse the old name
755 name = self.fs.name
756 self.fs = LocalFilesystem(self.ctx, name=name)
757 log.info('Wait for MDS to reach steady state...')
758 self.fs.wait_for_daemons()
759 log.info('Ready to start {}...'.format(type(self).__name__))
760
761 @property
762 def _prefix(self):
763 return BIN_PREFIX
764
765 def _asok_path(self):
766 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
767 # run foreground. When running it daemonized however, the asok is named after
768 # the PID of the launching process, not the long running ceph-fuse process. Therefore
769 # we need to give an exact path here as the logic for checking /proc/ for which
770 # asok is alive does not work.
771
772 # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
773 # in a tmpdir. All of the paths are the same, so no need to select
774 # based off of the service type.
775 d = "./out"
776 with open(self.config_path) as f:
777 for line in f:
778 asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line)
779 if asok_conf:
780 d = asok_conf.groups(1)[0]
781 break
782 path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid)
783 log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
784 return path
785
786 def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
787 if mountpoint is not None:
788 self.mountpoint = mountpoint
789 self.setupfs(name=mount_fs_name)
790
791 self.client_remote.run(args=['mkdir', '-p', self.mountpoint])
792
793 def list_connections():
794 self.client_remote.run(
795 args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
796 check_status=False
797 )
798 p = self.client_remote.run(
799 args=["ls", "/sys/fs/fuse/connections"],
800 check_status=False
801 )
802 if p.exitstatus != 0:
803 log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus))
804 return []
805
806 ls_str = six.ensure_str(p.stdout.getvalue().strip())
807 if ls_str:
808 return [int(n) for n in ls_str.split("\n")]
809 else:
810 return []
811
812 # Before starting ceph-fuse process, note the contents of
813 # /sys/fs/fuse/connections
814 pre_mount_conns = list_connections()
815 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
816
817 prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
818 if os.getuid() != 0:
819 prefix += ["--client_die_on_failed_dentry_invalidate=false"]
820
821 if mount_path is not None:
822 prefix += ["--client_mountpoint={0}".format(mount_path)]
823
824 if mount_fs_name is not None:
825 prefix += ["--client_fs={0}".format(mount_fs_name)]
826
827 prefix += mount_options;
828
829 self.fuse_daemon = self.client_remote.run(args=
830 prefix + [
831 "-f",
832 "--name",
833 "client.{0}".format(self.client_id),
834 self.mountpoint
835 ], wait=False)
836
837 log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
838
839 # Wait for the connection reference to appear in /sys
840 waited = 0
841 post_mount_conns = list_connections()
842 while len(post_mount_conns) <= len(pre_mount_conns):
843 if self.fuse_daemon.finished:
844 # Did mount fail? Raise the CommandFailedError instead of
845 # hitting the "failed to populate /sys/" timeout
846 self.fuse_daemon.wait()
847 time.sleep(1)
848 waited += 1
849 if waited > 30:
850 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
851 waited
852 ))
853 post_mount_conns = list_connections()
854
855 log.info("Post-mount connections: {0}".format(post_mount_conns))
856
857 # Record our fuse connection number so that we can use it when
858 # forcing an unmount
859 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
860 if len(new_conns) == 0:
861 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
862 elif len(new_conns) > 1:
863 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
864 else:
865 self._fuse_conn = new_conns[0]
866
867 self.gather_mount_info()
868
869 def _run_python(self, pyscript, py_version='python'):
870 """
871 Override this to remove the daemon-helper prefix that is used otherwise
872 to make the process killable.
873 """
874 return self.client_remote.run(args=[py_version, '-c', pyscript],
875 wait=False)
876
877 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
878 # the same name.
879 class LocalCephManager(CephManager):
880 def __init__(self):
881 # Deliberately skip parent init, only inheriting from it to get
882 # util methods like osd_dump that sit on top of raw_cluster_cmd
883 self.controller = LocalRemote()
884
885 # A minority of CephManager fns actually bother locking for when
886 # certain teuthology tests want to run tasks in parallel
887 self.lock = threading.RLock()
888
889 self.log = lambda x: log.info(x)
890
891 # Don't bother constructing a map of pools: it should be empty
892 # at test cluster start, and in any case it would be out of date
893 # in no time. The attribute needs to exist for some of the CephManager
894 # methods to work though.
895 self.pools = {}
896
897 def find_remote(self, daemon_type, daemon_id):
898 """
899 daemon_type like 'mds', 'osd'
900 daemon_id like 'a', '0'
901 """
902 return LocalRemote()
903
904 def run_ceph_w(self, watch_channel=None):
905 """
906 :param watch_channel: Specifies the channel to be watched.
907 This can be 'cluster', 'audit', ...
908 :type watch_channel: str
909 """
910 args = [os.path.join(BIN_PREFIX, "ceph"), "-w"]
911 if watch_channel is not None:
912 args.append("--watch-channel")
913 args.append(watch_channel)
914 proc = self.controller.run(args=args, wait=False, stdout=StringIO())
915 return proc
916
917 def raw_cluster_cmd(self, *args, **kwargs):
918 """
919 args like ["osd", "dump"}
920 return stdout string
921 """
922 proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \
923 list(args), **kwargs)
924 return six.ensure_str(proc.stdout.getvalue())
925
926 def raw_cluster_cmd_result(self, *args, **kwargs):
927 """
928 like raw_cluster_cmd but don't check status, just return rc
929 """
930 kwargs['check_status'] = False
931 proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \
932 list(args), **kwargs)
933 return proc.exitstatus
934
935 def admin_socket(self, daemon_type, daemon_id, command, check_status=True, timeout=None):
936 return self.controller.run(
937 args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command,
938 check_status=check_status,
939 timeout=timeout
940 )
941
942 def get_mon_socks(self):
943 """
944 Get monitor sockets.
945
946 :return socks: tuple of strings; strings are individual sockets.
947 """
948 from json import loads
949
950 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
951 socks = []
952 for mon in output['mons']:
953 for addrvec_mem in mon['public_addrs']['addrvec']:
954 socks.append(addrvec_mem['addr'])
955 return tuple(socks)
956
957 def get_msgrv1_mon_socks(self):
958 """
959 Get monitor sockets that use msgrv2 to operate.
960
961 :return socks: tuple of strings; strings are individual sockets.
962 """
963 from json import loads
964
965 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
966 socks = []
967 for mon in output['mons']:
968 for addrvec_mem in mon['public_addrs']['addrvec']:
969 if addrvec_mem['type'] == 'v1':
970 socks.append(addrvec_mem['addr'])
971 return tuple(socks)
972
973 def get_msgrv2_mon_socks(self):
974 """
975 Get monitor sockets that use msgrv2 to operate.
976
977 :return socks: tuple of strings; strings are individual sockets.
978 """
979 from json import loads
980
981 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
982 socks = []
983 for mon in output['mons']:
984 for addrvec_mem in mon['public_addrs']['addrvec']:
985 if addrvec_mem['type'] == 'v2':
986 socks.append(addrvec_mem['addr'])
987 return tuple(socks)
988
989
990 class LocalCephCluster(CephCluster):
991 def __init__(self, ctx):
992 # Deliberately skip calling parent constructor
993 self._ctx = ctx
994 self.mon_manager = LocalCephManager()
995 self._conf = defaultdict(dict)
996
997 @property
998 def admin_remote(self):
999 return LocalRemote()
1000
1001 def get_config(self, key, service_type=None):
1002 if service_type is None:
1003 service_type = 'mon'
1004
1005 # FIXME hardcoded vstart service IDs
1006 service_id = {
1007 'mon': 'a',
1008 'mds': 'a',
1009 'osd': '0'
1010 }[service_type]
1011
1012 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
1013
1014 def _write_conf(self):
1015 # In teuthology, we have the honour of writing the entire ceph.conf, but
1016 # in vstart land it has mostly already been written and we need to carefully
1017 # append to it.
1018 conf_path = "./ceph.conf"
1019 banner = "\n#LOCAL_TEST\n"
1020 existing_str = open(conf_path).read()
1021
1022 if banner in existing_str:
1023 existing_str = existing_str[0:existing_str.find(banner)]
1024
1025 existing_str += banner
1026
1027 for subsys, kvs in self._conf.items():
1028 existing_str += "\n[{0}]\n".format(subsys)
1029 for key, val in kvs.items():
1030 # Comment out existing instance if it exists
1031 log.info("Searching for existing instance {0}/{1}".format(
1032 key, subsys
1033 ))
1034 existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
1035 subsys
1036 ), existing_str, re.MULTILINE)
1037
1038 if existing_section:
1039 section_str = existing_str[existing_section.start():existing_section.end()]
1040 existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
1041 if existing_val:
1042 start = existing_section.start() + existing_val.start(1)
1043 log.info("Found string to replace at {0}".format(
1044 start
1045 ))
1046 existing_str = existing_str[0:start] + "#" + existing_str[start:]
1047
1048 existing_str += "{0} = {1}\n".format(key, val)
1049
1050 open(conf_path, "w").write(existing_str)
1051
1052 def set_ceph_conf(self, subsys, key, value):
1053 self._conf[subsys][key] = value
1054 self._write_conf()
1055
1056 def clear_ceph_conf(self, subsys, key):
1057 del self._conf[subsys][key]
1058 self._write_conf()
1059
1060
1061 class LocalMDSCluster(LocalCephCluster, MDSCluster):
1062 def __init__(self, ctx):
1063 super(LocalMDSCluster, self).__init__(ctx)
1064
1065 self.mds_ids = ctx.daemons.daemons['ceph.mds'].keys()
1066 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
1067
1068 def clear_firewall(self):
1069 # FIXME: unimplemented
1070 pass
1071
1072 def newfs(self, name='cephfs', create=True):
1073 return LocalFilesystem(self._ctx, name=name, create=create)
1074
1075
1076 class LocalMgrCluster(LocalCephCluster, MgrCluster):
1077 def __init__(self, ctx):
1078 super(LocalMgrCluster, self).__init__(ctx)
1079
1080 self.mgr_ids = ctx.daemons.daemons['ceph.mgr'].keys()
1081 self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
1082
1083
1084 class LocalFilesystem(Filesystem, LocalMDSCluster):
1085 def __init__(self, ctx, fscid=None, name='cephfs', create=False, ec_profile=None):
1086 # Deliberately skip calling parent constructor
1087 self._ctx = ctx
1088
1089 self.id = None
1090 self.name = name
1091 self.ec_profile = ec_profile
1092 self.metadata_pool_name = None
1093 self.metadata_overlay = False
1094 self.data_pool_name = None
1095 self.data_pools = None
1096
1097 # Hack: cheeky inspection of ceph.conf to see what MDSs exist
1098 self.mds_ids = set()
1099 for line in open("ceph.conf").readlines():
1100 match = re.match("^\[mds\.(.+)\]$", line)
1101 if match:
1102 self.mds_ids.add(match.group(1))
1103
1104 if not self.mds_ids:
1105 raise RuntimeError("No MDSs found in ceph.conf!")
1106
1107 self.mds_ids = list(self.mds_ids)
1108
1109 log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
1110
1111 self.mon_manager = LocalCephManager()
1112
1113 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
1114
1115 self.client_remote = LocalRemote()
1116
1117 self._conf = defaultdict(dict)
1118
1119 if name is not None:
1120 if fscid is not None:
1121 raise RuntimeError("cannot specify fscid when creating fs")
1122 if create and not self.legacy_configured():
1123 self.create()
1124 else:
1125 if fscid is not None:
1126 self.id = fscid
1127 self.getinfo(refresh=True)
1128
1129 # Stash a reference to the first created filesystem on ctx, so
1130 # that if someone drops to the interactive shell they can easily
1131 # poke our methods.
1132 if not hasattr(self._ctx, "filesystem"):
1133 self._ctx.filesystem = self
1134
1135 @property
1136 def _prefix(self):
1137 return BIN_PREFIX
1138
1139 def set_clients_block(self, blocked, mds_id=None):
1140 raise NotImplementedError()
1141
1142
1143 class InteractiveFailureResult(unittest.TextTestResult):
1144 """
1145 Specialization that implements interactive-on-error style
1146 behavior.
1147 """
1148 def addFailure(self, test, err):
1149 super(InteractiveFailureResult, self).addFailure(test, err)
1150 log.error(self._exc_info_to_string(err, test))
1151 log.error("Failure in test '{0}', going interactive".format(
1152 self.getDescription(test)
1153 ))
1154 interactive.task(ctx=None, config=None)
1155
1156 def addError(self, test, err):
1157 super(InteractiveFailureResult, self).addError(test, err)
1158 log.error(self._exc_info_to_string(err, test))
1159 log.error("Error in test '{0}', going interactive".format(
1160 self.getDescription(test)
1161 ))
1162 interactive.task(ctx=None, config=None)
1163
1164
1165 def enumerate_methods(s):
1166 log.info("e: {0}".format(s))
1167 for t in s._tests:
1168 if isinstance(t, suite.BaseTestSuite):
1169 for sub in enumerate_methods(t):
1170 yield sub
1171 else:
1172 yield s, t
1173
1174
1175 def load_tests(modules, loader):
1176 if modules:
1177 log.info("Executing modules: {0}".format(modules))
1178 module_suites = []
1179 for mod_name in modules:
1180 # Test names like cephfs.test_auto_repair
1181 module_suites.append(loader.loadTestsFromName(mod_name))
1182 log.info("Loaded: {0}".format(list(module_suites)))
1183 return suite.TestSuite(module_suites)
1184 else:
1185 log.info("Executing all cephfs tests")
1186 return loader.discover(
1187 os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs")
1188 )
1189
1190
1191 def scan_tests(modules):
1192 overall_suite = load_tests(modules, loader.TestLoader())
1193
1194 max_required_mds = 0
1195 max_required_clients = 0
1196 max_required_mgr = 0
1197 require_memstore = False
1198
1199 for suite_, case in enumerate_methods(overall_suite):
1200 max_required_mds = max(max_required_mds,
1201 getattr(case, "MDSS_REQUIRED", 0))
1202 max_required_clients = max(max_required_clients,
1203 getattr(case, "CLIENTS_REQUIRED", 0))
1204 max_required_mgr = max(max_required_mgr,
1205 getattr(case, "MGRS_REQUIRED", 0))
1206 require_memstore = getattr(case, "REQUIRE_MEMSTORE", False) \
1207 or require_memstore
1208
1209 return max_required_mds, max_required_clients, \
1210 max_required_mgr, require_memstore
1211
1212
1213 class LocalCluster(object):
1214 def __init__(self, rolename="placeholder"):
1215 self.remotes = {
1216 LocalRemote(): [rolename]
1217 }
1218
1219 def only(self, requested):
1220 return self.__class__(rolename=requested)
1221
1222
1223 class LocalContext(object):
1224 def __init__(self):
1225 self.config = {}
1226 self.teuthology_config = teuth_config
1227 self.cluster = LocalCluster()
1228 self.daemons = DaemonGroup()
1229
1230 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
1231 # tests that want to look these up via ctx can do so.
1232 # Inspect ceph.conf to see what roles exist
1233 for conf_line in open("ceph.conf").readlines():
1234 for svc_type in ["mon", "osd", "mds", "mgr"]:
1235 prefixed_type = "ceph." + svc_type
1236 if prefixed_type not in self.daemons.daemons:
1237 self.daemons.daemons[prefixed_type] = {}
1238 match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
1239 if match:
1240 svc_id = match.group(1)
1241 self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
1242
1243 def __del__(self):
1244 shutil.rmtree(self.teuthology_config['test_path'])
1245
1246 def teardown_cluster():
1247 log.info('\ntearing down the cluster...')
1248 remote.run(args=[os.path.join(SRC_PREFIX, "stop.sh")], timeout=60)
1249 remote.run(args=['rm', '-rf', './dev', './out'])
1250
1251 def clear_old_log():
1252 from os import stat
1253
1254 try:
1255 stat(logpath)
1256 # would need an update when making this py3 compatible. Use FileNotFound
1257 # instead.
1258 except OSError:
1259 return
1260 else:
1261 os.remove(logpath)
1262 with open(logpath, 'w') as logfile:
1263 logfile.write('')
1264 init_log()
1265 log.info('logging in a fresh file now...')
1266
1267 def exec_test():
1268 # Parse arguments
1269 opt_interactive_on_error = False
1270 opt_create_cluster = False
1271 opt_create_cluster_only = False
1272 opt_ignore_missing_binaries = False
1273 opt_teardown_cluster = False
1274 global opt_log_ps_output
1275 opt_log_ps_output = False
1276 use_kernel_client = False
1277
1278 args = sys.argv[1:]
1279 flags = [a for a in args if a.startswith("-")]
1280 modules = [a for a in args if not a.startswith("-")]
1281 for f in flags:
1282 if f == "--interactive":
1283 opt_interactive_on_error = True
1284 elif f == "--create":
1285 opt_create_cluster = True
1286 elif f == "--create-cluster-only":
1287 opt_create_cluster_only = True
1288 elif f == "--ignore-missing-binaries":
1289 opt_ignore_missing_binaries = True
1290 elif f == '--teardown':
1291 opt_teardown_cluster = True
1292 elif f == '--log-ps-output':
1293 opt_log_ps_output = True
1294 elif f == '--clear-old-log':
1295 clear_old_log()
1296 elif f == "--kclient":
1297 use_kernel_client = True
1298 else:
1299 log.error("Unknown option '{0}'".format(f))
1300 sys.exit(-1)
1301
1302 # Help developers by stopping up-front if their tree isn't built enough for all the
1303 # tools that the tests might want to use (add more here if needed)
1304 require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
1305 "cephfs-table-tool", "ceph-fuse", "rados"]
1306 missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
1307 if missing_binaries and not opt_ignore_missing_binaries:
1308 log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
1309 sys.exit(-1)
1310
1311 max_required_mds, max_required_clients, \
1312 max_required_mgr, require_memstore = scan_tests(modules)
1313
1314 global remote
1315 remote = LocalRemote()
1316
1317 # Tolerate no MDSs or clients running at start
1318 ps_txt = six.ensure_str(remote.run(
1319 args=["ps", "-u"+str(os.getuid())]
1320 ).stdout.getvalue().strip())
1321 lines = ps_txt.split("\n")[1:]
1322 for line in lines:
1323 if 'ceph-fuse' in line or 'ceph-mds' in line:
1324 pid = int(line.split()[0])
1325 log.warn("Killing stray process {0}".format(line))
1326 os.kill(pid, signal.SIGKILL)
1327
1328 # Fire up the Ceph cluster if the user requested it
1329 if opt_create_cluster or opt_create_cluster_only:
1330 log.info("Creating cluster with {0} MDS daemons".format(
1331 max_required_mds))
1332 teardown_cluster()
1333 vstart_env = os.environ.copy()
1334 vstart_env["FS"] = "0"
1335 vstart_env["MDS"] = max_required_mds.__str__()
1336 vstart_env["OSD"] = "4"
1337 vstart_env["MGR"] = max(max_required_mgr, 1).__str__()
1338
1339 args = [os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d",
1340 "--nolockdep"]
1341 if require_memstore:
1342 args.append("--memstore")
1343
1344 # usually, i get vstart.sh running completely in less than 100
1345 # seconds.
1346 remote.run(args=args, env=vstart_env, timeout=(3 * 60))
1347
1348 # Wait for OSD to come up so that subsequent injectargs etc will
1349 # definitely succeed
1350 LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30)
1351
1352 if opt_create_cluster_only:
1353 return
1354
1355 # List of client mounts, sufficient to run the selected tests
1356 clients = [i.__str__() for i in range(0, max_required_clients)]
1357
1358 test_dir = tempfile.mkdtemp()
1359 teuth_config['test_path'] = test_dir
1360
1361 ctx = LocalContext()
1362 ceph_cluster = LocalCephCluster(ctx)
1363 mds_cluster = LocalMDSCluster(ctx)
1364 mgr_cluster = LocalMgrCluster(ctx)
1365
1366 # Construct Mount classes
1367 mounts = []
1368 for client_id in clients:
1369 # Populate client keyring (it sucks to use client.admin for test clients
1370 # because it's awkward to find the logs later)
1371 client_name = "client.{0}".format(client_id)
1372
1373 if client_name not in open("./keyring").read():
1374 p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
1375 "osd", "allow rw",
1376 "mds", "allow",
1377 "mon", "allow r"])
1378
1379 open("./keyring", "a").write(p.stdout.getvalue())
1380
1381 if use_kernel_client:
1382 mount = LocalKernelMount(ctx, test_dir, client_id)
1383 else:
1384 mount = LocalFuseMount(ctx, test_dir, client_id)
1385
1386 mounts.append(mount)
1387 if os.path.exists(mount.mountpoint):
1388 if mount.is_mounted():
1389 log.warn("unmounting {0}".format(mount.mountpoint))
1390 mount.umount_wait()
1391 else:
1392 os.rmdir(mount.mountpoint)
1393
1394 from tasks.cephfs_test_runner import DecoratingLoader
1395
1396 class LogStream(object):
1397 def __init__(self):
1398 self.buffer = ""
1399
1400 def write(self, data):
1401 self.buffer += data
1402 if "\n" in self.buffer:
1403 lines = self.buffer.split("\n")
1404 for line in lines[:-1]:
1405 pass
1406 # sys.stderr.write(line + "\n")
1407 log.info(line)
1408 self.buffer = lines[-1]
1409
1410 def flush(self):
1411 pass
1412
1413 decorating_loader = DecoratingLoader({
1414 "ctx": ctx,
1415 "mounts": mounts,
1416 "ceph_cluster": ceph_cluster,
1417 "mds_cluster": mds_cluster,
1418 "mgr_cluster": mgr_cluster,
1419 })
1420
1421 # For the benefit of polling tests like test_full -- in teuthology land we set this
1422 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
1423 remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval", "5"])
1424 ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval", "5")
1425
1426 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
1427 # from normal IO latency. Increase it for running teests.
1428 ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10")
1429
1430 # Make sure the filesystem created in tests has uid/gid that will let us talk to
1431 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
1432 # so that cephfs-data-scan will pick it up too.
1433 ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
1434 ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
1435
1436 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
1437 def _get_package_version(remote, pkg_name):
1438 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
1439 return "2.9"
1440
1441 import teuthology.packaging
1442 teuthology.packaging.get_package_version = _get_package_version
1443
1444 overall_suite = load_tests(modules, decorating_loader)
1445
1446 # Filter out tests that don't lend themselves to interactive running,
1447 victims = []
1448 for case, method in enumerate_methods(overall_suite):
1449 fn = getattr(method, method._testMethodName)
1450
1451 drop_test = False
1452
1453 if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
1454 drop_test = True
1455 log.warn("Dropping test because long running: {method_id}".format(method_id=method.id()))
1456
1457 if getattr(fn, "needs_trimming", False) is True:
1458 drop_test = (os.getuid() != 0)
1459 log.warn("Dropping test because client trim unavailable: {method_id}".format(method_id=method.id()))
1460
1461 if drop_test:
1462 # Don't drop the test if it was explicitly requested in arguments
1463 is_named = False
1464 for named in modules:
1465 if named.endswith(method.id()):
1466 is_named = True
1467 break
1468
1469 if not is_named:
1470 victims.append((case, method))
1471
1472 log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
1473 for s, method in victims:
1474 s._tests.remove(method)
1475
1476 if opt_interactive_on_error:
1477 result_class = InteractiveFailureResult
1478 else:
1479 result_class = unittest.TextTestResult
1480 fail_on_skip = False
1481
1482 class LoggingResult(result_class):
1483 def startTest(self, test):
1484 log.info("Starting test: {0}".format(self.getDescription(test)))
1485 test.started_at = datetime.datetime.utcnow()
1486 return super(LoggingResult, self).startTest(test)
1487
1488 def stopTest(self, test):
1489 log.info("Stopped test: {0} in {1}s".format(
1490 self.getDescription(test),
1491 (datetime.datetime.utcnow() - test.started_at).total_seconds()
1492 ))
1493
1494 def addSkip(self, test, reason):
1495 if fail_on_skip:
1496 # Don't just call addFailure because that requires a traceback
1497 self.failures.append((test, reason))
1498 else:
1499 super(LoggingResult, self).addSkip(test, reason)
1500
1501 # Execute!
1502 result = unittest.TextTestRunner(
1503 stream=LogStream(),
1504 resultclass=LoggingResult,
1505 verbosity=2,
1506 failfast=True).run(overall_suite)
1507
1508 if opt_teardown_cluster:
1509 teardown_cluster()
1510
1511 if not result.wasSuccessful():
1512 result.printErrors() # duplicate output at end for convenience
1513
1514 bad_tests = []
1515 for test, error in result.errors:
1516 bad_tests.append(str(test))
1517 for test, failure in result.failures:
1518 bad_tests.append(str(test))
1519
1520 sys.exit(-1)
1521 else:
1522 sys.exit(0)
1523
1524
1525 if __name__ == "__main__":
1526 exec_test()