]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/vstart_runner.py
a248db9e4797acc888ad57fde50c19f698023d70
[ceph.git] / ceph / qa / tasks / vstart_runner.py
1 """
2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
5
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
7
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
11 cd ~/git/ceph/build
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
14
15 Alternative usage:
16
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.3 python ~/git/ceph/qa/tasks/vstart_runner.py
19
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
22
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
25
26 # Also, you can create the cluster once and then run named test cases against it:
27 python ~/git/ceph/qa/tasks/vstart_runner.py --create-cluster-only
28 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_health
29 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_rgw
30
31 """
32
33 from six import StringIO
34 from io import BytesIO
35 from collections import defaultdict
36 import getpass
37 import signal
38 import tempfile
39 import threading
40 import datetime
41 import shutil
42 import re
43 import os
44 import time
45 import sys
46 import errno
47 from unittest import suite, loader
48 import unittest
49 import platform
50 from teuthology import misc
51 from teuthology.orchestra.run import Raw, quote
52 from teuthology.orchestra.daemon import DaemonGroup
53 from teuthology.config import config as teuth_config
54 import six
55 import logging
56
57 def init_log():
58 global log
59 if log is not None:
60 del log
61 log = logging.getLogger(__name__)
62
63 global logpath
64 logpath = './vstart_runner.log'
65
66 handler = logging.FileHandler(logpath)
67 formatter = logging.Formatter(
68 fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
69 datefmt='%Y-%m-%dT%H:%M:%S')
70 handler.setFormatter(formatter)
71 log.addHandler(handler)
72 log.setLevel(logging.INFO)
73
74 log = None
75 init_log()
76
77
78 def respawn_in_path(lib_path, python_paths):
79 execv_cmd = ['python']
80 if platform.system() == "Darwin":
81 lib_path_var = "DYLD_LIBRARY_PATH"
82 else:
83 lib_path_var = "LD_LIBRARY_PATH"
84
85 py_binary = os.environ.get("PYTHON", sys.executable)
86
87 if lib_path_var in os.environ:
88 if lib_path not in os.environ[lib_path_var]:
89 os.environ[lib_path_var] += ':' + lib_path
90 os.execvp(py_binary, execv_cmd + sys.argv)
91 else:
92 os.environ[lib_path_var] = lib_path
93 os.execvp(py_binary, execv_cmd + sys.argv)
94
95 for p in python_paths:
96 sys.path.insert(0, p)
97
98
99 # Let's use some sensible defaults
100 if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
101
102 # A list of candidate paths for each package we need
103 guesses = [
104 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
105 ["lib/cython_modules/lib.3"],
106 ["../src/pybind"],
107 ]
108
109 python_paths = []
110
111 # Up one level so that "tasks.foo.bar" imports work
112 python_paths.append(os.path.abspath(
113 os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
114 ))
115
116 for package_guesses in guesses:
117 for g in package_guesses:
118 g_exp = os.path.abspath(os.path.expanduser(g))
119 if os.path.exists(g_exp):
120 python_paths.append(g_exp)
121
122 ld_path = os.path.join(os.getcwd(), "lib/")
123 print("Using guessed paths {0} {1}".format(ld_path, python_paths))
124 respawn_in_path(ld_path, python_paths)
125
126
127 try:
128 from teuthology.exceptions import CommandFailedError
129 from tasks.ceph_manager import CephManager
130 from tasks.cephfs.fuse_mount import FuseMount
131 from tasks.cephfs.kernel_mount import KernelMount
132 from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
133 from tasks.mgr.mgr_test_case import MgrCluster
134 from teuthology.contextutil import MaxWhileTries
135 from teuthology.task import interactive
136 except ImportError:
137 sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
138 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
139 raise
140
141 # Must import after teuthology because of gevent monkey patching
142 import subprocess
143
144 if os.path.exists("./CMakeCache.txt"):
145 # Running in build dir of a cmake build
146 BIN_PREFIX = "./bin/"
147 SRC_PREFIX = "../src"
148 else:
149 # Running in src/ of an autotools build
150 BIN_PREFIX = "./"
151 SRC_PREFIX = "./"
152
153
154 def rm_nonascii_chars(var):
155 var = var.replace('\xe2\x80\x98', '\'')
156 var = var.replace('\xe2\x80\x99', '\'')
157 return var
158
159 class LocalRemoteProcess(object):
160 def __init__(self, args, subproc, check_status, stdout, stderr):
161 self.args = args
162 self.subproc = subproc
163 self.stdout = stdout or BytesIO()
164 self.stderr = stderr or BytesIO()
165
166 self.check_status = check_status
167 self.exitstatus = self.returncode = None
168
169 def wait(self):
170 if self.finished:
171 # Avoid calling communicate() on a dead process because it'll
172 # give you stick about std* already being closed
173 if self.check_status and self.exitstatus != 0:
174 raise CommandFailedError(self.args, self.exitstatus)
175 else:
176 return
177
178 out, err = self.subproc.communicate()
179 out, err = rm_nonascii_chars(out), rm_nonascii_chars(err)
180 self.stdout.write(out)
181 self.stderr.write(err)
182
183 self.exitstatus = self.returncode = self.subproc.returncode
184
185 if self.exitstatus != 0:
186 sys.stderr.write(six.ensure_str(out))
187 sys.stderr.write(six.ensure_str(err))
188
189 if self.check_status and self.exitstatus != 0:
190 raise CommandFailedError(self.args, self.exitstatus)
191
192 @property
193 def finished(self):
194 if self.exitstatus is not None:
195 return True
196
197 if self.subproc.poll() is not None:
198 out, err = self.subproc.communicate()
199 self.stdout.write(out)
200 self.stderr.write(err)
201 self.exitstatus = self.returncode = self.subproc.returncode
202 return True
203 else:
204 return False
205
206 def kill(self):
207 log.info("kill ")
208 if self.subproc.pid and not self.finished:
209 log.info("kill: killing pid {0} ({1})".format(
210 self.subproc.pid, self.args))
211 safe_kill(self.subproc.pid)
212 else:
213 log.info("kill: already terminated ({0})".format(self.args))
214
215 @property
216 def stdin(self):
217 class FakeStdIn(object):
218 def __init__(self, mount_daemon):
219 self.mount_daemon = mount_daemon
220
221 def close(self):
222 self.mount_daemon.kill()
223
224 return FakeStdIn(self)
225
226
227 class LocalRemote(object):
228 """
229 Amusingly named class to present the teuthology RemoteProcess interface when we are really
230 running things locally for vstart
231
232 Run this inside your src/ dir!
233 """
234
235 def __init__(self):
236 self.name = "local"
237 self.hostname = "localhost"
238 self.user = getpass.getuser()
239
240 def get_file(self, path, sudo, dest_dir):
241 tmpfile = tempfile.NamedTemporaryFile(delete=False).name
242 shutil.copy(path, tmpfile)
243 return tmpfile
244
245 # XXX: This method ignores the error raised when src and dst are
246 # holding same path. For teuthology, same path still represents
247 # different locations as they lie on different machines.
248 def put_file(self, src, dst, sudo=False):
249 if sys.version_info.major < 3:
250 exception = shutil.Error
251 elif sys.version_info.major >= 3:
252 exception = shutil.SameFileError
253
254 try:
255 shutil.copy(src, dst)
256 except exception as e:
257 if sys.version_info.major < 3 and e.message.find('are the same '
258 'file') != -1:
259 return
260 raise e
261
262 # XXX: accepts only two arugments to maintain compatibility with
263 # teuthology's mkdtemp.
264 def mkdtemp(self, suffix='', parentdir=None):
265 from tempfile import mkdtemp
266
267 # XXX: prefix had to be set without that this method failed against
268 # Python2.7 -
269 # > /usr/lib64/python2.7/tempfile.py(337)mkdtemp()
270 # -> file = _os.path.join(dir, prefix + name + suffix)
271 # (Pdb) p prefix
272 # None
273 return mkdtemp(suffix=suffix, prefix='', dir=parentdir)
274
275 def mktemp(self, suffix=None, parentdir=None):
276 """
277 Make a remote temporary file
278
279 Returns: the path of the temp file created.
280 """
281 from tempfile import mktemp
282 return mktemp(suffix=suffix, dir=parentdir)
283
284 def _perform_checks_and_return_list_of_args(self, args, omit_sudo):
285 # Since Python's shell simulation can only work when commands are
286 # provided as a list of argumensts...
287 if isinstance(args, str) or isinstance(args, six.text_type):
288 args = args.split()
289
290 # We'll let sudo be a part of command even omit flag says otherwise in
291 # cases of commands which can normally be ran only by root.
292 try:
293 if args[args.index('sudo') + 1] in ['-u', 'passwd', 'chown']:
294 omit_sudo = False
295 except ValueError:
296 pass
297
298 # Quotes wrapping a command argument don't work fine in Python's shell
299 # simulation if the arguments contains spaces too. E.g. '"ls"' is OK
300 # but "ls /" isn't.
301 errmsg = "Don't surround arguments commands by quotes if it " + \
302 "contains spaces.\nargs - %s" % (args)
303 for arg in args:
304 if isinstance(arg, Raw):
305 continue
306
307 if arg and (arg[0] in ['"', "'"] or arg[-1] in ['"', "'"]) and \
308 (arg.find(' ') != -1 and 0 < arg.find(' ') < len(arg) - 1):
309 raise RuntimeError(errmsg)
310
311 # ['sudo', '-u', 'user', '-s', 'path-to-shell', '-c', 'ls', 'a']
312 # and ['sudo', '-u', user, '-s', path_to_shell, '-c', 'ls a'] are
313 # treated differently by Python's shell simulation. Only latter has
314 # the desired effect.
315 errmsg = 'The entire command to executed as other user should be a ' +\
316 'single argument.\nargs - %s' % (args)
317 if 'sudo' in args and '-u' in args and '-c' in args and \
318 args.count('-c') == 1:
319 if args.index('-c') != len(args) - 2 and \
320 args[args.index('-c') + 2].find('-') == -1:
321 raise RuntimeError(errmsg)
322
323 if omit_sudo:
324 args = [a for a in args if a != "sudo"]
325
326 return args
327
328 # Wrapper to keep the interface exactly same as that of
329 # teuthology.remote.run.
330 def run(self, **kwargs):
331 return self._do_run(**kwargs)
332
333 def _do_run(self, args, check_status=True, wait=True, stdout=None,
334 stderr=None, cwd=None, stdin=None, logger=None, label=None,
335 env=None, timeout=None, omit_sudo=False):
336 args = self._perform_checks_and_return_list_of_args(args, omit_sudo)
337
338 # We have to use shell=True if any run.Raw was present, e.g. &&
339 shell = any([a for a in args if isinstance(a, Raw)])
340
341 # Filter out helper tools that don't exist in a vstart environment
342 args = [a for a in args if a not in ('adjust-ulimits',
343 'ceph-coverage')]
344
345 # Adjust binary path prefix if given a bare program name
346 if "/" not in args[0]:
347 # If they asked for a bare binary name, and it exists
348 # in our built tree, use the one there.
349 local_bin = os.path.join(BIN_PREFIX, args[0])
350 if os.path.exists(local_bin):
351 args = [local_bin] + args[1:]
352 else:
353 log.debug("'{0}' is not a binary in the Ceph build dir".format(
354 args[0]
355 ))
356
357 log.info("Running {0}".format(args))
358
359 if shell:
360 subproc = subprocess.Popen(quote(args),
361 stdout=subprocess.PIPE,
362 stderr=subprocess.PIPE,
363 stdin=subprocess.PIPE,
364 cwd=cwd,
365 shell=True)
366 else:
367 # Sanity check that we've got a list of strings
368 for arg in args:
369 if not isinstance(arg, six.string_types):
370 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
371 arg, arg.__class__
372 ))
373
374 subproc = subprocess.Popen(args,
375 stdout=subprocess.PIPE,
376 stderr=subprocess.PIPE,
377 stdin=subprocess.PIPE,
378 cwd=cwd,
379 env=env)
380
381 if stdin:
382 if not isinstance(stdin, six.string_types):
383 raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
384
385 # Hack: writing to stdin is not deadlock-safe, but it "always" works
386 # as long as the input buffer is "small"
387 subproc.stdin.write(stdin)
388
389 proc = LocalRemoteProcess(
390 args, subproc, check_status,
391 stdout, stderr
392 )
393
394 if wait:
395 proc.wait()
396
397 return proc
398
399 # XXX: for compatibility keep this method same teuthology.orchestra.remote.sh
400 def sh(self, script, **kwargs):
401 """
402 Shortcut for run method.
403
404 Usage:
405 my_name = remote.sh('whoami')
406 remote_date = remote.sh('date')
407 """
408 if 'stdout' not in kwargs:
409 kwargs['stdout'] = StringIO()
410 if 'args' not in kwargs:
411 kwargs['args'] = script
412 proc = self.run(**kwargs)
413 return proc.stdout.getvalue()
414
415
416 class LocalDaemon(object):
417 def __init__(self, daemon_type, daemon_id):
418 self.daemon_type = daemon_type
419 self.daemon_id = daemon_id
420 self.controller = LocalRemote()
421 self.proc = None
422
423 @property
424 def remote(self):
425 return LocalRemote()
426
427 def running(self):
428 return self._get_pid() is not None
429
430 def check_status(self):
431 if self.proc:
432 return self.proc.poll()
433
434 def _get_pid(self):
435 """
436 Return PID as an integer or None if not found
437 """
438 ps_txt = six.ensure_str(self.controller.run(
439 args=["ps", "ww", "-u"+str(os.getuid())]
440 ).stdout.getvalue()).strip()
441 lines = ps_txt.split("\n")[1:]
442
443 for line in lines:
444 if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
445 log.info("Found ps line for daemon: {0}".format(line))
446 return int(line.split()[0])
447 if opt_log_ps_output:
448 log.info("No match for {0} {1}: {2}".format(
449 self.daemon_type, self.daemon_id, ps_txt))
450 else:
451 log.info("No match for {0} {1}".format(self.daemon_type,
452 self.daemon_id))
453 return None
454
455 def wait(self, timeout):
456 waited = 0
457 while self._get_pid() is not None:
458 if waited > timeout:
459 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
460 time.sleep(1)
461 waited += 1
462
463 def stop(self, timeout=300):
464 if not self.running():
465 log.error('tried to stop a non-running daemon')
466 return
467
468 pid = self._get_pid()
469 log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
470 os.kill(pid, signal.SIGTERM)
471
472 waited = 0
473 while pid is not None:
474 new_pid = self._get_pid()
475 if new_pid is not None and new_pid != pid:
476 log.info("Killing new PID {0}".format(new_pid))
477 pid = new_pid
478 os.kill(pid, signal.SIGTERM)
479
480 if new_pid is None:
481 break
482 else:
483 if waited > timeout:
484 raise MaxWhileTries(
485 "Timed out waiting for daemon {0}.{1}".format(
486 self.daemon_type, self.daemon_id))
487 time.sleep(1)
488 waited += 1
489
490 self.wait(timeout=timeout)
491
492 def restart(self):
493 if self._get_pid() is not None:
494 self.stop()
495
496 self.proc = self.controller.run(args=[
497 os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)),
498 "-i", self.daemon_id])
499
500 def signal(self, sig, silent=False):
501 if not self.running():
502 raise RuntimeError("Can't send signal to non-running daemon")
503
504 os.kill(self._get_pid(), sig)
505 if not silent:
506 log.info("Sent signal {0} to {1}.{2}".format(sig, self.daemon_type, self.daemon_id))
507
508
509 def safe_kill(pid):
510 """
511 os.kill annoyingly raises exception if process already dead. Ignore it.
512 """
513 try:
514 return os.kill(pid, signal.SIGKILL)
515 except OSError as e:
516 if e.errno == errno.ESRCH:
517 # Raced with process termination
518 pass
519 else:
520 raise
521
522
523 class LocalKernelMount(KernelMount):
524 def __init__(self, ctx, test_dir, client_id):
525 super(LocalKernelMount, self).__init__(ctx, test_dir, client_id, LocalRemote(), None, None, None)
526
527 @property
528 def config_path(self):
529 return "./ceph.conf"
530
531 def get_keyring_path(self):
532 # This is going to end up in a config file, so use an absolute path
533 # to avoid assumptions about daemons' pwd
534 keyring_path = "./client.{0}.keyring".format(self.client_id)
535 try:
536 os.stat(keyring_path)
537 except OSError:
538 return os.path.join(os.getcwd(), 'keyring')
539 else:
540 return keyring_path
541
542 def run_shell(self, args, wait=True, stdin=None, check_status=True,
543 omit_sudo=False):
544 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
545 # the "cd foo && bar" shenanigans isn't needed to begin with and
546 # then we wouldn't have to special case this
547 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
548 stdin=stdin, check_status=check_status,
549 omit_sudo=omit_sudo)
550
551 def run_as_user(self, args, user, wait=True, stdin=None, check_status=True):
552 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
553 # the "cd foo && bar" shenanigans isn't needed to begin with and
554 # then we wouldn't have to special case this
555 if isinstance(args, str):
556 args = 'sudo -u %s -s /bin/bash -c %s' % (user, args)
557 elif isinstance(args, list):
558 cmdlist = args
559 cmd = ''
560 for i in cmdlist:
561 cmd = cmd + i + ' '
562 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c']
563 args.append(cmd)
564
565 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
566 check_status=check_status, stdin=stdin,
567 omit_sudo=False)
568
569 def run_as_root(self, args, wait=True, stdin=None, check_status=True):
570 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
571 # the "cd foo && bar" shenanigans isn't needed to begin with and
572 # then we wouldn't have to special case this
573 if isinstance(args, str):
574 args = 'sudo ' + args
575 if isinstance(args, list):
576 args.insert(0, 'sudo')
577
578 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
579 check_status=check_status,
580 omit_sudo=False)
581
582 def testcmd(self, args, wait=True, stdin=None, omit_sudo=False):
583 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
584 # the "cd foo && bar" shenanigans isn't needed to begin with and
585 # then we wouldn't have to special case this
586 return self.run_shell(args, wait=wait, stdin=stdin, check_status=False,
587 omit_sudo=omit_sudo)
588
589 def testcmd_as_user(self, args, user, wait=True, stdin=None):
590 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
591 # the "cd foo && bar" shenanigans isn't needed to begin with and
592 # then we wouldn't have to special case this
593 return self.run_as_user(args, user=user, wait=wait, stdin=stdin,
594 check_status=False)
595
596 def testcmd_as_root(self, args, wait=True, stdin=None):
597 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
598 # the "cd foo && bar" shenanigans isn't needed to begin with and
599 # then we wouldn't have to special case this
600 return self.run_as_root(args, wait=wait, stdin=stdin,
601 check_status=False)
602
603 def setupfs(self, name=None):
604 if name is None and self.fs is not None:
605 # Previous mount existed, reuse the old name
606 name = self.fs.name
607 self.fs = LocalFilesystem(self.ctx, name=name)
608 log.info('Wait for MDS to reach steady state...')
609 self.fs.wait_for_daemons()
610 log.info('Ready to start {}...'.format(type(self).__name__))
611
612 @property
613 def _prefix(self):
614 return BIN_PREFIX
615
616 def _asok_path(self):
617 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
618 # run foreground. When running it daemonized however, the asok is named after
619 # the PID of the launching process, not the long running ceph-fuse process. Therefore
620 # we need to give an exact path here as the logic for checking /proc/ for which
621 # asok is alive does not work.
622
623 # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
624 # in a tmpdir. All of the paths are the same, so no need to select
625 # based off of the service type.
626 d = "./out"
627 with open(self.config_path) as f:
628 for line in f:
629 asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line)
630 if asok_conf:
631 d = asok_conf.groups(1)[0]
632 break
633 path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid)
634 log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
635 return path
636
637 def mount(self, mount_path=None, mount_fs_name=None, mount_options=[], **kwargs):
638 self.setupfs(name=mount_fs_name)
639
640 log.info('Mounting kclient client.{id} at {remote} {mnt}...'.format(
641 id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
642
643 self.client_remote.run(
644 args=[
645 'mkdir',
646 '--',
647 self.mountpoint,
648 ],
649 timeout=(5*60),
650 )
651
652 if mount_path is None:
653 mount_path = "/"
654
655 opts = 'name={id},norequire_active_mds,conf={conf}'.format(id=self.client_id,
656 conf=self.config_path)
657
658 if mount_fs_name is not None:
659 opts += ",mds_namespace={0}".format(mount_fs_name)
660
661 for mount_opt in mount_options:
662 opts += ",{0}".format(mount_opt)
663
664 self.client_remote.run(
665 args=[
666 'sudo',
667 './bin/mount.ceph',
668 ':{mount_path}'.format(mount_path=mount_path),
669 self.mountpoint,
670 '-v',
671 '-o',
672 opts
673 ],
674 timeout=(30*60),
675 omit_sudo=False,
676 )
677
678 self.client_remote.run(
679 args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(5*60))
680
681 self.mounted = True
682
683 def _run_python(self, pyscript, py_version='python'):
684 """
685 Override this to remove the daemon-helper prefix that is used otherwise
686 to make the process killable.
687 """
688 return self.client_remote.run(args=[py_version, '-c', pyscript],
689 wait=False)
690
691 class LocalFuseMount(FuseMount):
692 def __init__(self, ctx, test_dir, client_id):
693 super(LocalFuseMount, self).__init__(ctx, None, test_dir, client_id, LocalRemote())
694
695 @property
696 def config_path(self):
697 return "./ceph.conf"
698
699 def get_keyring_path(self):
700 # This is going to end up in a config file, so use an absolute path
701 # to avoid assumptions about daemons' pwd
702 return os.path.abspath("./client.{0}.keyring".format(self.client_id))
703
704 def run_shell(self, args, wait=True, stdin=None, check_status=True, omit_sudo=True):
705 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
706 # the "cd foo && bar" shenanigans isn't needed to begin with and
707 # then we wouldn't have to special case this
708 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
709 stdin=stdin, check_status=check_status,
710 omit_sudo=omit_sudo)
711
712 def run_as_user(self, args, user, wait=True, stdin=None, check_status=True):
713 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
714 # the "cd foo && bar" shenanigans isn't needed to begin with and
715 # then we wouldn't have to special case this
716 if isinstance(args, str):
717 args = 'sudo -u %s -s /bin/bash -c %s' % (user, args)
718 elif isinstance(args, list):
719 cmdlist = args
720 cmd = ''
721 for i in cmdlist:
722 cmd = cmd + i + ' '
723 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c']
724 args.append(cmd)
725
726 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
727 check_status=check_status, stdin=stdin,
728 omit_sudo=False)
729
730 def run_as_root(self, args, wait=True, stdin=None, check_status=True):
731 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
732 # the "cd foo && bar" shenanigans isn't needed to begin with and
733 # then we wouldn't have to special case this
734 if isinstance(args, str):
735 args = 'sudo ' + args
736 if isinstance(args, list):
737 args.insert(0, 'sudo')
738
739 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
740 check_status=check_status,
741 omit_sudo=False)
742
743 def testcmd(self, args, wait=True, stdin=None, omit_sudo=True):
744 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
745 # the "cd foo && bar" shenanigans isn't needed to begin with and
746 # then we wouldn't have to special case this
747 return self.run_shell(args, wait=wait, stdin=stdin, check_status=False,
748 omit_sudo=omit_sudo)
749
750 def testcmd_as_user(self, args, user, wait=True, stdin=None):
751 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
752 # the "cd foo && bar" shenanigans isn't needed to begin with and
753 # then we wouldn't have to special case this
754 return self.run_as_user(args, user=user, wait=wait, stdin=stdin,
755 check_status=False)
756
757 def testcmd_as_root(self, args, wait=True, stdin=None):
758 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
759 # the "cd foo && bar" shenanigans isn't needed to begin with and
760 # then we wouldn't have to special case this
761 return self.run_as_root(args, wait=wait, stdin=stdin,
762 check_status=False)
763
764 def setupfs(self, name=None):
765 if name is None and self.fs is not None:
766 # Previous mount existed, reuse the old name
767 name = self.fs.name
768 self.fs = LocalFilesystem(self.ctx, name=name)
769 log.info('Wait for MDS to reach steady state...')
770 self.fs.wait_for_daemons()
771 log.info('Ready to start {}...'.format(type(self).__name__))
772
773 @property
774 def _prefix(self):
775 return BIN_PREFIX
776
777 def _asok_path(self):
778 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
779 # run foreground. When running it daemonized however, the asok is named after
780 # the PID of the launching process, not the long running ceph-fuse process. Therefore
781 # we need to give an exact path here as the logic for checking /proc/ for which
782 # asok is alive does not work.
783
784 # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
785 # in a tmpdir. All of the paths are the same, so no need to select
786 # based off of the service type.
787 d = "./out"
788 with open(self.config_path) as f:
789 for line in f:
790 asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line)
791 if asok_conf:
792 d = asok_conf.groups(1)[0]
793 break
794 path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid)
795 log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
796 return path
797
798 def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
799 if mountpoint is not None:
800 self.mountpoint = mountpoint
801 self.setupfs(name=mount_fs_name)
802
803 self.client_remote.run(args=['mkdir', '-p', self.mountpoint])
804
805 def list_connections():
806 self.client_remote.run(
807 args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
808 check_status=False
809 )
810 p = self.client_remote.run(
811 args=["ls", "/sys/fs/fuse/connections"],
812 check_status=False
813 )
814 if p.exitstatus != 0:
815 log.warning("ls conns failed with {0}, assuming none".format(p.exitstatus))
816 return []
817
818 ls_str = six.ensure_str(p.stdout.getvalue().strip())
819 if ls_str:
820 return [int(n) for n in ls_str.split("\n")]
821 else:
822 return []
823
824 # Before starting ceph-fuse process, note the contents of
825 # /sys/fs/fuse/connections
826 pre_mount_conns = list_connections()
827 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
828
829 prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
830 if os.getuid() != 0:
831 prefix += ["--client_die_on_failed_dentry_invalidate=false"]
832
833 if mount_path is not None:
834 prefix += ["--client_mountpoint={0}".format(mount_path)]
835
836 if mount_fs_name is not None:
837 prefix += ["--client_fs={0}".format(mount_fs_name)]
838
839 prefix += mount_options;
840
841 self.fuse_daemon = self.client_remote.run(args=
842 prefix + [
843 "-f",
844 "--name",
845 "client.{0}".format(self.client_id),
846 self.mountpoint
847 ], wait=False)
848
849 log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
850
851 # Wait for the connection reference to appear in /sys
852 waited = 0
853 post_mount_conns = list_connections()
854 while len(post_mount_conns) <= len(pre_mount_conns):
855 if self.fuse_daemon.finished:
856 # Did mount fail? Raise the CommandFailedError instead of
857 # hitting the "failed to populate /sys/" timeout
858 self.fuse_daemon.wait()
859 time.sleep(1)
860 waited += 1
861 if waited > 30:
862 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
863 waited
864 ))
865 post_mount_conns = list_connections()
866
867 log.info("Post-mount connections: {0}".format(post_mount_conns))
868
869 # Record our fuse connection number so that we can use it when
870 # forcing an unmount
871 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
872 if len(new_conns) == 0:
873 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
874 elif len(new_conns) > 1:
875 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
876 else:
877 self._fuse_conn = new_conns[0]
878
879 self.gather_mount_info()
880
881 self.mounted = True
882
883 def _run_python(self, pyscript, py_version='python'):
884 """
885 Override this to remove the daemon-helper prefix that is used otherwise
886 to make the process killable.
887 """
888 return self.client_remote.run(args=[py_version, '-c', pyscript],
889 wait=False)
890
891 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
892 # the same name.
893 class LocalCephManager(CephManager):
894 def __init__(self):
895 # Deliberately skip parent init, only inheriting from it to get
896 # util methods like osd_dump that sit on top of raw_cluster_cmd
897 self.controller = LocalRemote()
898
899 # A minority of CephManager fns actually bother locking for when
900 # certain teuthology tests want to run tasks in parallel
901 self.lock = threading.RLock()
902
903 self.log = lambda x: log.info(x)
904
905 # Don't bother constructing a map of pools: it should be empty
906 # at test cluster start, and in any case it would be out of date
907 # in no time. The attribute needs to exist for some of the CephManager
908 # methods to work though.
909 self.pools = {}
910
911 def find_remote(self, daemon_type, daemon_id):
912 """
913 daemon_type like 'mds', 'osd'
914 daemon_id like 'a', '0'
915 """
916 return LocalRemote()
917
918 def run_ceph_w(self, watch_channel=None):
919 """
920 :param watch_channel: Specifies the channel to be watched.
921 This can be 'cluster', 'audit', ...
922 :type watch_channel: str
923 """
924 args = [os.path.join(BIN_PREFIX, "ceph"), "-w"]
925 if watch_channel is not None:
926 args.append("--watch-channel")
927 args.append(watch_channel)
928 proc = self.controller.run(args=args, wait=False, stdout=StringIO())
929 return proc
930
931 def raw_cluster_cmd(self, *args, **kwargs):
932 """
933 args like ["osd", "dump"}
934 return stdout string
935 """
936 proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \
937 list(args), **kwargs)
938 return six.ensure_str(proc.stdout.getvalue())
939
940 def raw_cluster_cmd_result(self, *args, **kwargs):
941 """
942 like raw_cluster_cmd but don't check status, just return rc
943 """
944 kwargs['check_status'] = False
945 proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \
946 list(args), **kwargs)
947 return proc.exitstatus
948
949 def admin_socket(self, daemon_type, daemon_id, command, check_status=True, timeout=None):
950 return self.controller.run(
951 args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command,
952 check_status=check_status,
953 timeout=timeout
954 )
955
956 def get_mon_socks(self):
957 """
958 Get monitor sockets.
959
960 :return socks: tuple of strings; strings are individual sockets.
961 """
962 from json import loads
963
964 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
965 socks = []
966 for mon in output['mons']:
967 for addrvec_mem in mon['public_addrs']['addrvec']:
968 socks.append(addrvec_mem['addr'])
969 return tuple(socks)
970
971 def get_msgrv1_mon_socks(self):
972 """
973 Get monitor sockets that use msgrv2 to operate.
974
975 :return socks: tuple of strings; strings are individual sockets.
976 """
977 from json import loads
978
979 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
980 socks = []
981 for mon in output['mons']:
982 for addrvec_mem in mon['public_addrs']['addrvec']:
983 if addrvec_mem['type'] == 'v1':
984 socks.append(addrvec_mem['addr'])
985 return tuple(socks)
986
987 def get_msgrv2_mon_socks(self):
988 """
989 Get monitor sockets that use msgrv2 to operate.
990
991 :return socks: tuple of strings; strings are individual sockets.
992 """
993 from json import loads
994
995 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
996 socks = []
997 for mon in output['mons']:
998 for addrvec_mem in mon['public_addrs']['addrvec']:
999 if addrvec_mem['type'] == 'v2':
1000 socks.append(addrvec_mem['addr'])
1001 return tuple(socks)
1002
1003
1004 class LocalCephCluster(CephCluster):
1005 def __init__(self, ctx):
1006 # Deliberately skip calling parent constructor
1007 self._ctx = ctx
1008 self.mon_manager = LocalCephManager()
1009 self._conf = defaultdict(dict)
1010
1011 @property
1012 def admin_remote(self):
1013 return LocalRemote()
1014
1015 def get_config(self, key, service_type=None):
1016 if service_type is None:
1017 service_type = 'mon'
1018
1019 # FIXME hardcoded vstart service IDs
1020 service_id = {
1021 'mon': 'a',
1022 'mds': 'a',
1023 'osd': '0'
1024 }[service_type]
1025
1026 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
1027
1028 def _write_conf(self):
1029 # In teuthology, we have the honour of writing the entire ceph.conf, but
1030 # in vstart land it has mostly already been written and we need to carefully
1031 # append to it.
1032 conf_path = "./ceph.conf"
1033 banner = "\n#LOCAL_TEST\n"
1034 existing_str = open(conf_path).read()
1035
1036 if banner in existing_str:
1037 existing_str = existing_str[0:existing_str.find(banner)]
1038
1039 existing_str += banner
1040
1041 for subsys, kvs in self._conf.items():
1042 existing_str += "\n[{0}]\n".format(subsys)
1043 for key, val in kvs.items():
1044 # Comment out existing instance if it exists
1045 log.info("Searching for existing instance {0}/{1}".format(
1046 key, subsys
1047 ))
1048 existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
1049 subsys
1050 ), existing_str, re.MULTILINE)
1051
1052 if existing_section:
1053 section_str = existing_str[existing_section.start():existing_section.end()]
1054 existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
1055 if existing_val:
1056 start = existing_section.start() + existing_val.start(1)
1057 log.info("Found string to replace at {0}".format(
1058 start
1059 ))
1060 existing_str = existing_str[0:start] + "#" + existing_str[start:]
1061
1062 existing_str += "{0} = {1}\n".format(key, val)
1063
1064 open(conf_path, "w").write(existing_str)
1065
1066 def set_ceph_conf(self, subsys, key, value):
1067 self._conf[subsys][key] = value
1068 self._write_conf()
1069
1070 def clear_ceph_conf(self, subsys, key):
1071 del self._conf[subsys][key]
1072 self._write_conf()
1073
1074
1075 class LocalMDSCluster(LocalCephCluster, MDSCluster):
1076 def __init__(self, ctx):
1077 super(LocalMDSCluster, self).__init__(ctx)
1078
1079 self.mds_ids = ctx.daemons.daemons['ceph.mds'].keys()
1080 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
1081
1082 def clear_firewall(self):
1083 # FIXME: unimplemented
1084 pass
1085
1086 def newfs(self, name='cephfs', create=True):
1087 return LocalFilesystem(self._ctx, name=name, create=create)
1088
1089
1090 class LocalMgrCluster(LocalCephCluster, MgrCluster):
1091 def __init__(self, ctx):
1092 super(LocalMgrCluster, self).__init__(ctx)
1093
1094 self.mgr_ids = ctx.daemons.daemons['ceph.mgr'].keys()
1095 self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
1096
1097
1098 class LocalFilesystem(Filesystem, LocalMDSCluster):
1099 def __init__(self, ctx, fscid=None, name='cephfs', create=False, ec_profile=None):
1100 # Deliberately skip calling parent constructor
1101 self._ctx = ctx
1102
1103 self.id = None
1104 self.name = name
1105 self.ec_profile = ec_profile
1106 self.metadata_pool_name = None
1107 self.metadata_overlay = False
1108 self.data_pool_name = None
1109 self.data_pools = None
1110
1111 # Hack: cheeky inspection of ceph.conf to see what MDSs exist
1112 self.mds_ids = set()
1113 for line in open("ceph.conf").readlines():
1114 match = re.match("^\[mds\.(.+)\]$", line)
1115 if match:
1116 self.mds_ids.add(match.group(1))
1117
1118 if not self.mds_ids:
1119 raise RuntimeError("No MDSs found in ceph.conf!")
1120
1121 self.mds_ids = list(self.mds_ids)
1122
1123 log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
1124
1125 self.mon_manager = LocalCephManager()
1126
1127 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
1128
1129 self.client_remote = LocalRemote()
1130
1131 self._conf = defaultdict(dict)
1132
1133 if name is not None:
1134 if fscid is not None:
1135 raise RuntimeError("cannot specify fscid when creating fs")
1136 if create and not self.legacy_configured():
1137 self.create()
1138 else:
1139 if fscid is not None:
1140 self.id = fscid
1141 self.getinfo(refresh=True)
1142
1143 # Stash a reference to the first created filesystem on ctx, so
1144 # that if someone drops to the interactive shell they can easily
1145 # poke our methods.
1146 if not hasattr(self._ctx, "filesystem"):
1147 self._ctx.filesystem = self
1148
1149 @property
1150 def _prefix(self):
1151 return BIN_PREFIX
1152
1153 def set_clients_block(self, blocked, mds_id=None):
1154 raise NotImplementedError()
1155
1156
1157 class InteractiveFailureResult(unittest.TextTestResult):
1158 """
1159 Specialization that implements interactive-on-error style
1160 behavior.
1161 """
1162 def addFailure(self, test, err):
1163 super(InteractiveFailureResult, self).addFailure(test, err)
1164 log.error(self._exc_info_to_string(err, test))
1165 log.error("Failure in test '{0}', going interactive".format(
1166 self.getDescription(test)
1167 ))
1168 interactive.task(ctx=None, config=None)
1169
1170 def addError(self, test, err):
1171 super(InteractiveFailureResult, self).addError(test, err)
1172 log.error(self._exc_info_to_string(err, test))
1173 log.error("Error in test '{0}', going interactive".format(
1174 self.getDescription(test)
1175 ))
1176 interactive.task(ctx=None, config=None)
1177
1178
1179 def enumerate_methods(s):
1180 log.info("e: {0}".format(s))
1181 for t in s._tests:
1182 if isinstance(t, suite.BaseTestSuite):
1183 for sub in enumerate_methods(t):
1184 yield sub
1185 else:
1186 yield s, t
1187
1188
1189 def load_tests(modules, loader):
1190 if modules:
1191 log.info("Executing modules: {0}".format(modules))
1192 module_suites = []
1193 for mod_name in modules:
1194 # Test names like cephfs.test_auto_repair
1195 module_suites.append(loader.loadTestsFromName(mod_name))
1196 log.info("Loaded: {0}".format(list(module_suites)))
1197 return suite.TestSuite(module_suites)
1198 else:
1199 log.info("Executing all cephfs tests")
1200 return loader.discover(
1201 os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs")
1202 )
1203
1204
1205 def scan_tests(modules):
1206 overall_suite = load_tests(modules, loader.TestLoader())
1207
1208 max_required_mds = 0
1209 max_required_clients = 0
1210 max_required_mgr = 0
1211 require_memstore = False
1212
1213 for suite_, case in enumerate_methods(overall_suite):
1214 max_required_mds = max(max_required_mds,
1215 getattr(case, "MDSS_REQUIRED", 0))
1216 max_required_clients = max(max_required_clients,
1217 getattr(case, "CLIENTS_REQUIRED", 0))
1218 max_required_mgr = max(max_required_mgr,
1219 getattr(case, "MGRS_REQUIRED", 0))
1220 require_memstore = getattr(case, "REQUIRE_MEMSTORE", False) \
1221 or require_memstore
1222
1223 return max_required_mds, max_required_clients, \
1224 max_required_mgr, require_memstore
1225
1226
1227 class LocalCluster(object):
1228 def __init__(self, rolename="placeholder"):
1229 self.remotes = {
1230 LocalRemote(): [rolename]
1231 }
1232
1233 def only(self, requested):
1234 return self.__class__(rolename=requested)
1235
1236
1237 class LocalContext(object):
1238 def __init__(self):
1239 self.config = {}
1240 self.teuthology_config = teuth_config
1241 self.cluster = LocalCluster()
1242 self.daemons = DaemonGroup()
1243
1244 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
1245 # tests that want to look these up via ctx can do so.
1246 # Inspect ceph.conf to see what roles exist
1247 for conf_line in open("ceph.conf").readlines():
1248 for svc_type in ["mon", "osd", "mds", "mgr"]:
1249 prefixed_type = "ceph." + svc_type
1250 if prefixed_type not in self.daemons.daemons:
1251 self.daemons.daemons[prefixed_type] = {}
1252 match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
1253 if match:
1254 svc_id = match.group(1)
1255 self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
1256
1257 def __del__(self):
1258 shutil.rmtree(self.teuthology_config['test_path'])
1259
1260 def teardown_cluster():
1261 log.info('\ntearing down the cluster...')
1262 remote.run(args=[os.path.join(SRC_PREFIX, "stop.sh")], timeout=60)
1263 remote.run(args=['rm', '-rf', './dev', './out'])
1264
1265 def clear_old_log():
1266 from os import stat
1267
1268 try:
1269 stat(logpath)
1270 # would need an update when making this py3 compatible. Use FileNotFound
1271 # instead.
1272 except OSError:
1273 return
1274 else:
1275 os.remove(logpath)
1276 with open(logpath, 'w') as logfile:
1277 logfile.write('')
1278 init_log()
1279 log.info('logging in a fresh file now...')
1280
1281 def exec_test():
1282 # Parse arguments
1283 opt_interactive_on_error = False
1284 opt_create_cluster = False
1285 opt_create_cluster_only = False
1286 opt_ignore_missing_binaries = False
1287 opt_teardown_cluster = False
1288 global opt_log_ps_output
1289 opt_log_ps_output = False
1290 use_kernel_client = False
1291
1292 args = sys.argv[1:]
1293 flags = [a for a in args if a.startswith("-")]
1294 modules = [a for a in args if not a.startswith("-")]
1295 for f in flags:
1296 if f == "--interactive":
1297 opt_interactive_on_error = True
1298 elif f == "--create":
1299 opt_create_cluster = True
1300 elif f == "--create-cluster-only":
1301 opt_create_cluster_only = True
1302 elif f == "--ignore-missing-binaries":
1303 opt_ignore_missing_binaries = True
1304 elif f == '--teardown':
1305 opt_teardown_cluster = True
1306 elif f == '--log-ps-output':
1307 opt_log_ps_output = True
1308 elif f == '--clear-old-log':
1309 clear_old_log()
1310 elif f == "--kclient":
1311 use_kernel_client = True
1312 else:
1313 log.error("Unknown option '{0}'".format(f))
1314 sys.exit(-1)
1315
1316 # Help developers by stopping up-front if their tree isn't built enough for all the
1317 # tools that the tests might want to use (add more here if needed)
1318 require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
1319 "cephfs-table-tool", "ceph-fuse", "rados"]
1320 missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
1321 if missing_binaries and not opt_ignore_missing_binaries:
1322 log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
1323 sys.exit(-1)
1324
1325 max_required_mds, max_required_clients, \
1326 max_required_mgr, require_memstore = scan_tests(modules)
1327
1328 global remote
1329 remote = LocalRemote()
1330
1331 # Tolerate no MDSs or clients running at start
1332 ps_txt = six.ensure_str(remote.run(
1333 args=["ps", "-u"+str(os.getuid())]
1334 ).stdout.getvalue().strip())
1335 lines = ps_txt.split("\n")[1:]
1336 for line in lines:
1337 if 'ceph-fuse' in line or 'ceph-mds' in line:
1338 pid = int(line.split()[0])
1339 log.warning("Killing stray process {0}".format(line))
1340 os.kill(pid, signal.SIGKILL)
1341
1342 # Fire up the Ceph cluster if the user requested it
1343 if opt_create_cluster or opt_create_cluster_only:
1344 log.info("Creating cluster with {0} MDS daemons".format(
1345 max_required_mds))
1346 teardown_cluster()
1347 vstart_env = os.environ.copy()
1348 vstart_env["FS"] = "0"
1349 vstart_env["MDS"] = max_required_mds.__str__()
1350 vstart_env["OSD"] = "4"
1351 vstart_env["MGR"] = max(max_required_mgr, 1).__str__()
1352
1353 args = [os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d",
1354 "--nolockdep"]
1355 if require_memstore:
1356 args.append("--memstore")
1357
1358 # usually, i get vstart.sh running completely in less than 100
1359 # seconds.
1360 remote.run(args=args, env=vstart_env, timeout=(3 * 60))
1361
1362 # Wait for OSD to come up so that subsequent injectargs etc will
1363 # definitely succeed
1364 LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30)
1365
1366 if opt_create_cluster_only:
1367 return
1368
1369 # List of client mounts, sufficient to run the selected tests
1370 clients = [i.__str__() for i in range(0, max_required_clients)]
1371
1372 test_dir = tempfile.mkdtemp()
1373 teuth_config['test_path'] = test_dir
1374
1375 ctx = LocalContext()
1376 ceph_cluster = LocalCephCluster(ctx)
1377 mds_cluster = LocalMDSCluster(ctx)
1378 mgr_cluster = LocalMgrCluster(ctx)
1379
1380 # Construct Mount classes
1381 mounts = []
1382 for client_id in clients:
1383 # Populate client keyring (it sucks to use client.admin for test clients
1384 # because it's awkward to find the logs later)
1385 client_name = "client.{0}".format(client_id)
1386
1387 if client_name not in open("./keyring").read():
1388 p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
1389 "osd", "allow rw",
1390 "mds", "allow",
1391 "mon", "allow r"])
1392
1393 open("./keyring", "a").write(p.stdout.getvalue())
1394
1395 if use_kernel_client:
1396 mount = LocalKernelMount(ctx, test_dir, client_id)
1397 else:
1398 mount = LocalFuseMount(ctx, test_dir, client_id)
1399
1400 mounts.append(mount)
1401 if os.path.exists(mount.mountpoint):
1402 if mount.is_mounted():
1403 log.warning("unmounting {0}".format(mount.mountpoint))
1404 mount.umount_wait()
1405 else:
1406 os.rmdir(mount.mountpoint)
1407
1408 from tasks.cephfs_test_runner import DecoratingLoader
1409
1410 class LogStream(object):
1411 def __init__(self):
1412 self.buffer = ""
1413
1414 def write(self, data):
1415 self.buffer += data
1416 if "\n" in self.buffer:
1417 lines = self.buffer.split("\n")
1418 for line in lines[:-1]:
1419 pass
1420 # sys.stderr.write(line + "\n")
1421 log.info(line)
1422 self.buffer = lines[-1]
1423
1424 def flush(self):
1425 pass
1426
1427 decorating_loader = DecoratingLoader({
1428 "ctx": ctx,
1429 "mounts": mounts,
1430 "ceph_cluster": ceph_cluster,
1431 "mds_cluster": mds_cluster,
1432 "mgr_cluster": mgr_cluster,
1433 })
1434
1435 # For the benefit of polling tests like test_full -- in teuthology land we set this
1436 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
1437 remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval", "5"])
1438 ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval", "5")
1439
1440 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
1441 # from normal IO latency. Increase it for running teests.
1442 ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10")
1443
1444 # Make sure the filesystem created in tests has uid/gid that will let us talk to
1445 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
1446 # so that cephfs-data-scan will pick it up too.
1447 ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
1448 ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
1449
1450 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
1451 def _get_package_version(remote, pkg_name):
1452 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
1453 return "2.9"
1454
1455 import teuthology.packaging
1456 teuthology.packaging.get_package_version = _get_package_version
1457
1458 overall_suite = load_tests(modules, decorating_loader)
1459
1460 # Filter out tests that don't lend themselves to interactive running,
1461 victims = []
1462 for case, method in enumerate_methods(overall_suite):
1463 fn = getattr(method, method._testMethodName)
1464
1465 drop_test = False
1466
1467 if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
1468 drop_test = True
1469 log.warning("Dropping test because long running: {method_id}".format(method_id=method.id()))
1470
1471 if getattr(fn, "needs_trimming", False) is True:
1472 drop_test = (os.getuid() != 0)
1473 log.warning("Dropping test because client trim unavailable: {method_id}".format(method_id=method.id()))
1474
1475 if drop_test:
1476 # Don't drop the test if it was explicitly requested in arguments
1477 is_named = False
1478 for named in modules:
1479 if named.endswith(method.id()):
1480 is_named = True
1481 break
1482
1483 if not is_named:
1484 victims.append((case, method))
1485
1486 log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
1487 for s, method in victims:
1488 s._tests.remove(method)
1489
1490 if opt_interactive_on_error:
1491 result_class = InteractiveFailureResult
1492 else:
1493 result_class = unittest.TextTestResult
1494 fail_on_skip = False
1495
1496 class LoggingResult(result_class):
1497 def startTest(self, test):
1498 log.info("Starting test: {0}".format(self.getDescription(test)))
1499 test.started_at = datetime.datetime.utcnow()
1500 return super(LoggingResult, self).startTest(test)
1501
1502 def stopTest(self, test):
1503 log.info("Stopped test: {0} in {1}s".format(
1504 self.getDescription(test),
1505 (datetime.datetime.utcnow() - test.started_at).total_seconds()
1506 ))
1507
1508 def addSkip(self, test, reason):
1509 if fail_on_skip:
1510 # Don't just call addFailure because that requires a traceback
1511 self.failures.append((test, reason))
1512 else:
1513 super(LoggingResult, self).addSkip(test, reason)
1514
1515 # Execute!
1516 result = unittest.TextTestRunner(
1517 stream=LogStream(),
1518 resultclass=LoggingResult,
1519 verbosity=2,
1520 failfast=True).run(overall_suite)
1521
1522 if opt_teardown_cluster:
1523 teardown_cluster()
1524
1525 if not result.wasSuccessful():
1526 result.printErrors() # duplicate output at end for convenience
1527
1528 bad_tests = []
1529 for test, error in result.errors:
1530 bad_tests.append(str(test))
1531 for test, failure in result.failures:
1532 bad_tests.append(str(test))
1533
1534 sys.exit(-1)
1535 else:
1536 sys.exit(0)
1537
1538
1539 if __name__ == "__main__":
1540 exec_test()