]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/vstart_runner.py
14cc5939d06683cf3fa1236d38fb9b069442c782
[ceph.git] / ceph / qa / tasks / vstart_runner.py
1 """
2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
5
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
7
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
11 cd ~/git/ceph/build
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
14
15 Alternative usage:
16
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.3 python ~/git/ceph/qa/tasks/vstart_runner.py
19
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
22
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
25
26 # Also, you can create the cluster once and then run named test cases against it:
27 python ~/git/ceph/qa/tasks/vstart_runner.py --create-cluster-only
28 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_health
29 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_rgw
30
31 """
32
33 from io import BytesIO
34 from io import StringIO
35 from collections import defaultdict
36 import getpass
37 import signal
38 import tempfile
39 import threading
40 import datetime
41 import shutil
42 import re
43 import os
44 import time
45 import sys
46 import errno
47 from unittest import suite, loader
48 import unittest
49 import platform
50 from teuthology import misc
51 from teuthology.orchestra.run import Raw, quote
52 from teuthology.orchestra.daemon import DaemonGroup
53 from teuthology.config import config as teuth_config
54 import six
55 import logging
56
57 def init_log():
58 global log
59 if log is not None:
60 del log
61 log = logging.getLogger(__name__)
62
63 global logpath
64 logpath = './vstart_runner.log'
65
66 handler = logging.FileHandler(logpath)
67 formatter = logging.Formatter(
68 fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
69 datefmt='%Y-%m-%dT%H:%M:%S')
70 handler.setFormatter(formatter)
71 log.addHandler(handler)
72 log.setLevel(logging.INFO)
73
74 log = None
75 init_log()
76
77
78 def respawn_in_path(lib_path, python_paths):
79 execv_cmd = ['python']
80 if platform.system() == "Darwin":
81 lib_path_var = "DYLD_LIBRARY_PATH"
82 else:
83 lib_path_var = "LD_LIBRARY_PATH"
84
85 py_binary = os.environ.get("PYTHON", sys.executable)
86
87 if lib_path_var in os.environ:
88 if lib_path not in os.environ[lib_path_var]:
89 os.environ[lib_path_var] += ':' + lib_path
90 os.execvp(py_binary, execv_cmd + sys.argv)
91 else:
92 os.environ[lib_path_var] = lib_path
93 os.execvp(py_binary, execv_cmd + sys.argv)
94
95 for p in python_paths:
96 sys.path.insert(0, p)
97
98
99 # Let's use some sensible defaults
100 if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
101
102 # A list of candidate paths for each package we need
103 guesses = [
104 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
105 ["lib/cython_modules/lib.3"],
106 ["../src/pybind"],
107 ]
108
109 python_paths = []
110
111 # Up one level so that "tasks.foo.bar" imports work
112 python_paths.append(os.path.abspath(
113 os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
114 ))
115
116 for package_guesses in guesses:
117 for g in package_guesses:
118 g_exp = os.path.abspath(os.path.expanduser(g))
119 if os.path.exists(g_exp):
120 python_paths.append(g_exp)
121
122 ld_path = os.path.join(os.getcwd(), "lib/")
123 print("Using guessed paths {0} {1}".format(ld_path, python_paths))
124 respawn_in_path(ld_path, python_paths)
125
126
127 try:
128 from teuthology.exceptions import CommandFailedError
129 from tasks.ceph_manager import CephManager
130 from tasks.cephfs.fuse_mount import FuseMount
131 from tasks.cephfs.kernel_mount import KernelMount
132 from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
133 from tasks.mgr.mgr_test_case import MgrCluster
134 from teuthology.contextutil import MaxWhileTries
135 from teuthology.task import interactive
136 except ImportError:
137 sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
138 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
139 raise
140
141 # Must import after teuthology because of gevent monkey patching
142 import subprocess
143
144 if os.path.exists("./CMakeCache.txt"):
145 # Running in build dir of a cmake build
146 BIN_PREFIX = "./bin/"
147 SRC_PREFIX = "../src"
148 else:
149 # Running in src/ of an autotools build
150 BIN_PREFIX = "./"
151 SRC_PREFIX = "./"
152
153
154 def rm_nonascii_chars(var):
155 var = var.replace(b'\xe2\x80\x98', b'\'')
156 var = var.replace(b'\xe2\x80\x99', b'\'')
157 return var
158
159 class LocalRemoteProcess(object):
160 def __init__(self, args, subproc, check_status, stdout, stderr):
161 self.args = args
162 self.subproc = subproc
163 self.stdout = stdout or BytesIO()
164 self.stderr = stderr or BytesIO()
165
166 self.check_status = check_status
167 self.exitstatus = self.returncode = None
168
169 def wait(self):
170 if self.finished:
171 # Avoid calling communicate() on a dead process because it'll
172 # give you stick about std* already being closed
173 if self.check_status and self.exitstatus != 0:
174 raise CommandFailedError(self.args, self.exitstatus)
175 else:
176 return
177
178 out, err = self.subproc.communicate()
179 out, err = rm_nonascii_chars(out), rm_nonascii_chars(err)
180 if isinstance(self.stdout, StringIO):
181 self.stdout.write(out.decode(errors='ignore'))
182 else:
183 self.stdout.write(out)
184 if isinstance(self.stderr, StringIO):
185 self.stderr.write(err.decode(errors='ignore'))
186 else:
187 self.stderr.write(err)
188
189 self.exitstatus = self.returncode = self.subproc.returncode
190
191 if self.exitstatus != 0:
192 sys.stderr.write(six.ensure_str(out))
193 sys.stderr.write(six.ensure_str(err))
194
195 if self.check_status and self.exitstatus != 0:
196 raise CommandFailedError(self.args, self.exitstatus)
197
198 @property
199 def finished(self):
200 if self.exitstatus is not None:
201 return True
202
203 if self.subproc.poll() is not None:
204 out, err = self.subproc.communicate()
205 self.stdout.write(out)
206 self.stderr.write(err)
207 self.exitstatus = self.returncode = self.subproc.returncode
208 return True
209 else:
210 return False
211
212 def kill(self):
213 log.info("kill ")
214 if self.subproc.pid and not self.finished:
215 log.info("kill: killing pid {0} ({1})".format(
216 self.subproc.pid, self.args))
217 safe_kill(self.subproc.pid)
218 else:
219 log.info("kill: already terminated ({0})".format(self.args))
220
221 @property
222 def stdin(self):
223 class FakeStdIn(object):
224 def __init__(self, mount_daemon):
225 self.mount_daemon = mount_daemon
226
227 def close(self):
228 self.mount_daemon.kill()
229
230 return FakeStdIn(self)
231
232
233 class LocalRemote(object):
234 """
235 Amusingly named class to present the teuthology RemoteProcess interface when we are really
236 running things locally for vstart
237
238 Run this inside your src/ dir!
239 """
240
241 def __init__(self):
242 self.name = "local"
243 self.hostname = "localhost"
244 self.user = getpass.getuser()
245
246 def get_file(self, path, sudo, dest_dir):
247 tmpfile = tempfile.NamedTemporaryFile(delete=False).name
248 shutil.copy(path, tmpfile)
249 return tmpfile
250
251 # XXX: This method ignores the error raised when src and dst are
252 # holding same path. For teuthology, same path still represents
253 # different locations as they lie on different machines.
254 def put_file(self, src, dst, sudo=False):
255 if sys.version_info.major < 3:
256 exception = shutil.Error
257 elif sys.version_info.major >= 3:
258 exception = shutil.SameFileError
259
260 try:
261 shutil.copy(src, dst)
262 except exception as e:
263 if sys.version_info.major < 3 and e.message.find('are the same '
264 'file') != -1:
265 return
266 raise e
267
268 # XXX: accepts only two arugments to maintain compatibility with
269 # teuthology's mkdtemp.
270 def mkdtemp(self, suffix='', parentdir=None):
271 from tempfile import mkdtemp
272
273 # XXX: prefix had to be set without that this method failed against
274 # Python2.7 -
275 # > /usr/lib64/python2.7/tempfile.py(337)mkdtemp()
276 # -> file = _os.path.join(dir, prefix + name + suffix)
277 # (Pdb) p prefix
278 # None
279 return mkdtemp(suffix=suffix, prefix='', dir=parentdir)
280
281 def mktemp(self, suffix=None, parentdir=None):
282 """
283 Make a remote temporary file
284
285 Returns: the path of the temp file created.
286 """
287 from tempfile import mktemp
288 return mktemp(suffix=suffix, dir=parentdir)
289
290 def _perform_checks_and_return_list_of_args(self, args, omit_sudo):
291 # Since Python's shell simulation can only work when commands are
292 # provided as a list of argumensts...
293 if isinstance(args, str) or isinstance(args, six.text_type):
294 args = args.split()
295
296 # We'll let sudo be a part of command even omit flag says otherwise in
297 # cases of commands which can normally be ran only by root.
298 try:
299 if args[args.index('sudo') + 1] in ['-u', 'passwd', 'chown']:
300 omit_sudo = False
301 except ValueError:
302 pass
303
304 # Quotes wrapping a command argument don't work fine in Python's shell
305 # simulation if the arguments contains spaces too. E.g. '"ls"' is OK
306 # but "ls /" isn't.
307 errmsg = "Don't surround arguments commands by quotes if it " + \
308 "contains spaces.\nargs - %s" % (args)
309 for arg in args:
310 if isinstance(arg, Raw):
311 continue
312
313 if arg and (arg[0] in ['"', "'"] or arg[-1] in ['"', "'"]) and \
314 (arg.find(' ') != -1 and 0 < arg.find(' ') < len(arg) - 1):
315 raise RuntimeError(errmsg)
316
317 # ['sudo', '-u', 'user', '-s', 'path-to-shell', '-c', 'ls', 'a']
318 # and ['sudo', '-u', user, '-s', path_to_shell, '-c', 'ls a'] are
319 # treated differently by Python's shell simulation. Only latter has
320 # the desired effect.
321 errmsg = 'The entire command to executed as other user should be a ' +\
322 'single argument.\nargs - %s' % (args)
323 if 'sudo' in args and '-u' in args and '-c' in args and \
324 args.count('-c') == 1:
325 if args.index('-c') != len(args) - 2 and \
326 args[args.index('-c') + 2].find('-') == -1:
327 raise RuntimeError(errmsg)
328
329 if omit_sudo:
330 args = [a for a in args if a != "sudo"]
331
332 return args
333
334 # Wrapper to keep the interface exactly same as that of
335 # teuthology.remote.run.
336 def run(self, **kwargs):
337 return self._do_run(**kwargs)
338
339 def _do_run(self, args, check_status=True, wait=True, stdout=None,
340 stderr=None, cwd=None, stdin=None, logger=None, label=None,
341 env=None, timeout=None, omit_sudo=False):
342 args = self._perform_checks_and_return_list_of_args(args, omit_sudo)
343
344 # We have to use shell=True if any run.Raw was present, e.g. &&
345 shell = any([a for a in args if isinstance(a, Raw)])
346
347 # Filter out helper tools that don't exist in a vstart environment
348 args = [a for a in args if a not in ('adjust-ulimits',
349 'ceph-coverage')]
350
351 # Adjust binary path prefix if given a bare program name
352 if "/" not in args[0]:
353 # If they asked for a bare binary name, and it exists
354 # in our built tree, use the one there.
355 local_bin = os.path.join(BIN_PREFIX, args[0])
356 if os.path.exists(local_bin):
357 args = [local_bin] + args[1:]
358 else:
359 log.debug("'{0}' is not a binary in the Ceph build dir".format(
360 args[0]
361 ))
362
363 log.info("Running {0}".format(args))
364
365 if shell:
366 subproc = subprocess.Popen(quote(args),
367 stdout=subprocess.PIPE,
368 stderr=subprocess.PIPE,
369 stdin=subprocess.PIPE,
370 cwd=cwd,
371 shell=True)
372 else:
373 # Sanity check that we've got a list of strings
374 for arg in args:
375 if not isinstance(arg, six.string_types):
376 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
377 arg, arg.__class__
378 ))
379
380 subproc = subprocess.Popen(args,
381 stdout=subprocess.PIPE,
382 stderr=subprocess.PIPE,
383 stdin=subprocess.PIPE,
384 cwd=cwd,
385 env=env)
386
387 if stdin:
388 if not isinstance(stdin, str):
389 raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
390
391 # Hack: writing to stdin is not deadlock-safe, but it "always" works
392 # as long as the input buffer is "small"
393 subproc.stdin.write(stdin.encode())
394
395 proc = LocalRemoteProcess(
396 args, subproc, check_status,
397 stdout, stderr
398 )
399
400 if wait:
401 proc.wait()
402
403 return proc
404
405 # XXX: for compatibility keep this method same as teuthology.orchestra.remote.sh
406 # BytesIO is being used just to keep things identical
407 def sh(self, script, **kwargs):
408 """
409 Shortcut for run method.
410
411 Usage:
412 my_name = remote.sh('whoami')
413 remote_date = remote.sh('date')
414 """
415 from io import BytesIO
416
417 if 'stdout' not in kwargs:
418 kwargs['stdout'] = BytesIO()
419 if 'args' not in kwargs:
420 kwargs['args'] = script
421 proc = self.run(**kwargs)
422 out = proc.stdout.getvalue()
423 if isinstance(out, bytes):
424 return out.decode()
425 else:
426 return out
427
428 class LocalDaemon(object):
429 def __init__(self, daemon_type, daemon_id):
430 self.daemon_type = daemon_type
431 self.daemon_id = daemon_id
432 self.controller = LocalRemote()
433 self.proc = None
434
435 @property
436 def remote(self):
437 return LocalRemote()
438
439 def running(self):
440 return self._get_pid() is not None
441
442 def check_status(self):
443 if self.proc:
444 return self.proc.poll()
445
446 def _get_pid(self):
447 """
448 Return PID as an integer or None if not found
449 """
450 ps_txt = six.ensure_str(self.controller.run(
451 args=["ps", "ww", "-u"+str(os.getuid())]
452 ).stdout.getvalue()).strip()
453 lines = ps_txt.split("\n")[1:]
454
455 for line in lines:
456 if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
457 log.info("Found ps line for daemon: {0}".format(line))
458 return int(line.split()[0])
459 if opt_log_ps_output:
460 log.info("No match for {0} {1}: {2}".format(
461 self.daemon_type, self.daemon_id, ps_txt))
462 else:
463 log.info("No match for {0} {1}".format(self.daemon_type,
464 self.daemon_id))
465 return None
466
467 def wait(self, timeout):
468 waited = 0
469 while self._get_pid() is not None:
470 if waited > timeout:
471 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
472 time.sleep(1)
473 waited += 1
474
475 def stop(self, timeout=300):
476 if not self.running():
477 log.error('tried to stop a non-running daemon')
478 return
479
480 pid = self._get_pid()
481 log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
482 os.kill(pid, signal.SIGTERM)
483
484 waited = 0
485 while pid is not None:
486 new_pid = self._get_pid()
487 if new_pid is not None and new_pid != pid:
488 log.info("Killing new PID {0}".format(new_pid))
489 pid = new_pid
490 os.kill(pid, signal.SIGTERM)
491
492 if new_pid is None:
493 break
494 else:
495 if waited > timeout:
496 raise MaxWhileTries(
497 "Timed out waiting for daemon {0}.{1}".format(
498 self.daemon_type, self.daemon_id))
499 time.sleep(1)
500 waited += 1
501
502 self.wait(timeout=timeout)
503
504 def restart(self):
505 if self._get_pid() is not None:
506 self.stop()
507
508 self.proc = self.controller.run(args=[
509 os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)),
510 "-i", self.daemon_id])
511
512 def signal(self, sig, silent=False):
513 if not self.running():
514 raise RuntimeError("Can't send signal to non-running daemon")
515
516 os.kill(self._get_pid(), sig)
517 if not silent:
518 log.info("Sent signal {0} to {1}.{2}".format(sig, self.daemon_type, self.daemon_id))
519
520
521 def safe_kill(pid):
522 """
523 os.kill annoyingly raises exception if process already dead. Ignore it.
524 """
525 try:
526 return os.kill(pid, signal.SIGKILL)
527 except OSError as e:
528 if e.errno == errno.ESRCH:
529 # Raced with process termination
530 pass
531 else:
532 raise
533
534
535 class LocalKernelMount(KernelMount):
536 def __init__(self, ctx, test_dir, client_id):
537 super(LocalKernelMount, self).__init__(ctx, test_dir, client_id, LocalRemote(), None, None, None)
538
539 @property
540 def config_path(self):
541 return "./ceph.conf"
542
543 def get_keyring_path(self):
544 # This is going to end up in a config file, so use an absolute path
545 # to avoid assumptions about daemons' pwd
546 keyring_path = "./client.{0}.keyring".format(self.client_id)
547 try:
548 os.stat(keyring_path)
549 except OSError:
550 return os.path.join(os.getcwd(), 'keyring')
551 else:
552 return keyring_path
553
554 def run_shell(self, args, wait=True, stdin=None, check_status=True,
555 omit_sudo=False):
556 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
557 # the "cd foo && bar" shenanigans isn't needed to begin with and
558 # then we wouldn't have to special case this
559 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
560 stdin=stdin, check_status=check_status,
561 omit_sudo=omit_sudo)
562
563 def run_as_user(self, args, user, wait=True, stdin=None, check_status=True):
564 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
565 # the "cd foo && bar" shenanigans isn't needed to begin with and
566 # then we wouldn't have to special case this
567 if isinstance(args, str):
568 args = 'sudo -u %s -s /bin/bash -c %s' % (user, args)
569 elif isinstance(args, list):
570 cmdlist = args
571 cmd = ''
572 for i in cmdlist:
573 cmd = cmd + i + ' '
574 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c']
575 args.append(cmd)
576
577 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
578 check_status=check_status, stdin=stdin,
579 omit_sudo=False)
580
581 def run_as_root(self, args, wait=True, stdin=None, check_status=True):
582 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
583 # the "cd foo && bar" shenanigans isn't needed to begin with and
584 # then we wouldn't have to special case this
585 if isinstance(args, str):
586 args = 'sudo ' + args
587 if isinstance(args, list):
588 args.insert(0, 'sudo')
589
590 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
591 check_status=check_status,
592 omit_sudo=False)
593
594 def testcmd(self, args, wait=True, stdin=None, omit_sudo=False):
595 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
596 # the "cd foo && bar" shenanigans isn't needed to begin with and
597 # then we wouldn't have to special case this
598 return self.run_shell(args, wait=wait, stdin=stdin, check_status=False,
599 omit_sudo=omit_sudo)
600
601 def testcmd_as_user(self, args, user, wait=True, stdin=None):
602 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
603 # the "cd foo && bar" shenanigans isn't needed to begin with and
604 # then we wouldn't have to special case this
605 return self.run_as_user(args, user=user, wait=wait, stdin=stdin,
606 check_status=False)
607
608 def testcmd_as_root(self, args, wait=True, stdin=None):
609 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
610 # the "cd foo && bar" shenanigans isn't needed to begin with and
611 # then we wouldn't have to special case this
612 return self.run_as_root(args, wait=wait, stdin=stdin,
613 check_status=False)
614
615 def setupfs(self, name=None):
616 if name is None and self.fs is not None:
617 # Previous mount existed, reuse the old name
618 name = self.fs.name
619 self.fs = LocalFilesystem(self.ctx, name=name)
620 log.info('Wait for MDS to reach steady state...')
621 self.fs.wait_for_daemons()
622 log.info('Ready to start {}...'.format(type(self).__name__))
623
624 @property
625 def _prefix(self):
626 return BIN_PREFIX
627
628 def _asok_path(self):
629 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
630 # run foreground. When running it daemonized however, the asok is named after
631 # the PID of the launching process, not the long running ceph-fuse process. Therefore
632 # we need to give an exact path here as the logic for checking /proc/ for which
633 # asok is alive does not work.
634
635 # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
636 # in a tmpdir. All of the paths are the same, so no need to select
637 # based off of the service type.
638 d = "./out"
639 with open(self.config_path) as f:
640 for line in f:
641 asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line)
642 if asok_conf:
643 d = asok_conf.groups(1)[0]
644 break
645 path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid)
646 log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
647 return path
648
649 def mount(self, mount_path=None, mount_fs_name=None, mount_options=[], **kwargs):
650 self.setupfs(name=mount_fs_name)
651
652 log.info('Mounting kclient client.{id} at {remote} {mnt}...'.format(
653 id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
654
655 self.client_remote.run(
656 args=[
657 'mkdir',
658 '--',
659 self.mountpoint,
660 ],
661 timeout=(5*60),
662 )
663
664 if mount_path is None:
665 mount_path = "/"
666
667 opts = 'name={id},norequire_active_mds,conf={conf}'.format(id=self.client_id,
668 conf=self.config_path)
669
670 if mount_fs_name is not None:
671 opts += ",mds_namespace={0}".format(mount_fs_name)
672
673 for mount_opt in mount_options:
674 opts += ",{0}".format(mount_opt)
675
676 self.client_remote.run(
677 args=[
678 'sudo',
679 './bin/mount.ceph',
680 ':{mount_path}'.format(mount_path=mount_path),
681 self.mountpoint,
682 '-v',
683 '-o',
684 opts
685 ],
686 timeout=(30*60),
687 omit_sudo=False,
688 )
689
690 self.client_remote.run(
691 args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(5*60))
692
693 self.mounted = True
694
695 def _run_python(self, pyscript, py_version='python'):
696 """
697 Override this to remove the daemon-helper prefix that is used otherwise
698 to make the process killable.
699 """
700 return self.client_remote.run(args=[py_version, '-c', pyscript],
701 wait=False)
702
703 class LocalFuseMount(FuseMount):
704 def __init__(self, ctx, test_dir, client_id):
705 super(LocalFuseMount, self).__init__(ctx, None, test_dir, client_id, LocalRemote())
706
707 @property
708 def config_path(self):
709 return "./ceph.conf"
710
711 def get_keyring_path(self):
712 # This is going to end up in a config file, so use an absolute path
713 # to avoid assumptions about daemons' pwd
714 return os.path.abspath("./client.{0}.keyring".format(self.client_id))
715
716 def run_shell(self, args, wait=True, stdin=None, check_status=True, omit_sudo=True):
717 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
718 # the "cd foo && bar" shenanigans isn't needed to begin with and
719 # then we wouldn't have to special case this
720 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
721 stdin=stdin, check_status=check_status,
722 omit_sudo=omit_sudo)
723
724 def run_as_user(self, args, user, wait=True, stdin=None, check_status=True):
725 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
726 # the "cd foo && bar" shenanigans isn't needed to begin with and
727 # then we wouldn't have to special case this
728 if isinstance(args, str):
729 args = 'sudo -u %s -s /bin/bash -c %s' % (user, args)
730 elif isinstance(args, list):
731 cmdlist = args
732 cmd = ''
733 for i in cmdlist:
734 cmd = cmd + i + ' '
735 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c']
736 args.append(cmd)
737
738 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
739 check_status=check_status, stdin=stdin,
740 omit_sudo=False)
741
742 def run_as_root(self, args, wait=True, stdin=None, check_status=True):
743 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
744 # the "cd foo && bar" shenanigans isn't needed to begin with and
745 # then we wouldn't have to special case this
746 if isinstance(args, str):
747 args = 'sudo ' + args
748 if isinstance(args, list):
749 args.insert(0, 'sudo')
750
751 return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
752 check_status=check_status,
753 omit_sudo=False)
754
755 def testcmd(self, args, wait=True, stdin=None, omit_sudo=True):
756 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
757 # the "cd foo && bar" shenanigans isn't needed to begin with and
758 # then we wouldn't have to special case this
759 return self.run_shell(args, wait=wait, stdin=stdin, check_status=False,
760 omit_sudo=omit_sudo)
761
762 def testcmd_as_user(self, args, user, wait=True, stdin=None):
763 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
764 # the "cd foo && bar" shenanigans isn't needed to begin with and
765 # then we wouldn't have to special case this
766 return self.run_as_user(args, user=user, wait=wait, stdin=stdin,
767 check_status=False)
768
769 def testcmd_as_root(self, args, wait=True, stdin=None):
770 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
771 # the "cd foo && bar" shenanigans isn't needed to begin with and
772 # then we wouldn't have to special case this
773 return self.run_as_root(args, wait=wait, stdin=stdin,
774 check_status=False)
775
776 def setupfs(self, name=None):
777 if name is None and self.fs is not None:
778 # Previous mount existed, reuse the old name
779 name = self.fs.name
780 self.fs = LocalFilesystem(self.ctx, name=name)
781 log.info('Wait for MDS to reach steady state...')
782 self.fs.wait_for_daemons()
783 log.info('Ready to start {}...'.format(type(self).__name__))
784
785 @property
786 def _prefix(self):
787 return BIN_PREFIX
788
789 def _asok_path(self):
790 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
791 # run foreground. When running it daemonized however, the asok is named after
792 # the PID of the launching process, not the long running ceph-fuse process. Therefore
793 # we need to give an exact path here as the logic for checking /proc/ for which
794 # asok is alive does not work.
795
796 # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
797 # in a tmpdir. All of the paths are the same, so no need to select
798 # based off of the service type.
799 d = "./out"
800 with open(self.config_path) as f:
801 for line in f:
802 asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line)
803 if asok_conf:
804 d = asok_conf.groups(1)[0]
805 break
806 path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid)
807 log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
808 return path
809
810 def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
811 if mountpoint is not None:
812 self.mountpoint = mountpoint
813 self.setupfs(name=mount_fs_name)
814
815 self.client_remote.run(args=['mkdir', '-p', self.mountpoint])
816
817 def list_connections():
818 self.client_remote.run(
819 args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
820 check_status=False
821 )
822 p = self.client_remote.run(
823 args=["ls", "/sys/fs/fuse/connections"],
824 check_status=False
825 )
826 if p.exitstatus != 0:
827 log.warning("ls conns failed with {0}, assuming none".format(p.exitstatus))
828 return []
829
830 ls_str = six.ensure_str(p.stdout.getvalue().strip())
831 if ls_str:
832 return [int(n) for n in ls_str.split("\n")]
833 else:
834 return []
835
836 # Before starting ceph-fuse process, note the contents of
837 # /sys/fs/fuse/connections
838 pre_mount_conns = list_connections()
839 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
840
841 prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
842 if os.getuid() != 0:
843 prefix += ["--client_die_on_failed_dentry_invalidate=false"]
844
845 if mount_path is not None:
846 prefix += ["--client_mountpoint={0}".format(mount_path)]
847
848 if mount_fs_name is not None:
849 prefix += ["--client_fs={0}".format(mount_fs_name)]
850
851 prefix += mount_options;
852
853 self.fuse_daemon = self.client_remote.run(args=
854 prefix + [
855 "-f",
856 "--name",
857 "client.{0}".format(self.client_id),
858 self.mountpoint
859 ], wait=False)
860
861 log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
862
863 # Wait for the connection reference to appear in /sys
864 waited = 0
865 post_mount_conns = list_connections()
866 while len(post_mount_conns) <= len(pre_mount_conns):
867 if self.fuse_daemon.finished:
868 # Did mount fail? Raise the CommandFailedError instead of
869 # hitting the "failed to populate /sys/" timeout
870 self.fuse_daemon.wait()
871 time.sleep(1)
872 waited += 1
873 if waited > 30:
874 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
875 waited
876 ))
877 post_mount_conns = list_connections()
878
879 log.info("Post-mount connections: {0}".format(post_mount_conns))
880
881 # Record our fuse connection number so that we can use it when
882 # forcing an unmount
883 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
884 if len(new_conns) == 0:
885 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
886 elif len(new_conns) > 1:
887 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
888 else:
889 self._fuse_conn = new_conns[0]
890
891 self.gather_mount_info()
892
893 self.mounted = True
894
895 def _run_python(self, pyscript, py_version='python'):
896 """
897 Override this to remove the daemon-helper prefix that is used otherwise
898 to make the process killable.
899 """
900 return self.client_remote.run(args=[py_version, '-c', pyscript],
901 wait=False)
902
903 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
904 # the same name.
905 class LocalCephManager(CephManager):
906 def __init__(self):
907 # Deliberately skip parent init, only inheriting from it to get
908 # util methods like osd_dump that sit on top of raw_cluster_cmd
909 self.controller = LocalRemote()
910
911 # A minority of CephManager fns actually bother locking for when
912 # certain teuthology tests want to run tasks in parallel
913 self.lock = threading.RLock()
914
915 self.log = lambda x: log.info(x)
916
917 # Don't bother constructing a map of pools: it should be empty
918 # at test cluster start, and in any case it would be out of date
919 # in no time. The attribute needs to exist for some of the CephManager
920 # methods to work though.
921 self.pools = {}
922
923 def find_remote(self, daemon_type, daemon_id):
924 """
925 daemon_type like 'mds', 'osd'
926 daemon_id like 'a', '0'
927 """
928 return LocalRemote()
929
930 def run_ceph_w(self, watch_channel=None):
931 """
932 :param watch_channel: Specifies the channel to be watched.
933 This can be 'cluster', 'audit', ...
934 :type watch_channel: str
935 """
936 args = [os.path.join(BIN_PREFIX, "ceph"), "-w"]
937 if watch_channel is not None:
938 args.append("--watch-channel")
939 args.append(watch_channel)
940 proc = self.controller.run(args=args, wait=False, stdout=StringIO())
941 return proc
942
943 def raw_cluster_cmd(self, *args, **kwargs):
944 """
945 args like ["osd", "dump"}
946 return stdout string
947 """
948 proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \
949 list(args), **kwargs)
950 return six.ensure_str(proc.stdout.getvalue())
951
952 def raw_cluster_cmd_result(self, *args, **kwargs):
953 """
954 like raw_cluster_cmd but don't check status, just return rc
955 """
956 kwargs['check_status'] = False
957 proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \
958 list(args), **kwargs)
959 return proc.exitstatus
960
961 def admin_socket(self, daemon_type, daemon_id, command, check_status=True, timeout=None):
962 return self.controller.run(
963 args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command,
964 check_status=check_status,
965 timeout=timeout
966 )
967
968 def get_mon_socks(self):
969 """
970 Get monitor sockets.
971
972 :return socks: tuple of strings; strings are individual sockets.
973 """
974 from json import loads
975
976 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
977 socks = []
978 for mon in output['mons']:
979 for addrvec_mem in mon['public_addrs']['addrvec']:
980 socks.append(addrvec_mem['addr'])
981 return tuple(socks)
982
983 def get_msgrv1_mon_socks(self):
984 """
985 Get monitor sockets that use msgrv2 to operate.
986
987 :return socks: tuple of strings; strings are individual sockets.
988 """
989 from json import loads
990
991 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
992 socks = []
993 for mon in output['mons']:
994 for addrvec_mem in mon['public_addrs']['addrvec']:
995 if addrvec_mem['type'] == 'v1':
996 socks.append(addrvec_mem['addr'])
997 return tuple(socks)
998
999 def get_msgrv2_mon_socks(self):
1000 """
1001 Get monitor sockets that use msgrv2 to operate.
1002
1003 :return socks: tuple of strings; strings are individual sockets.
1004 """
1005 from json import loads
1006
1007 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1008 socks = []
1009 for mon in output['mons']:
1010 for addrvec_mem in mon['public_addrs']['addrvec']:
1011 if addrvec_mem['type'] == 'v2':
1012 socks.append(addrvec_mem['addr'])
1013 return tuple(socks)
1014
1015
1016 class LocalCephCluster(CephCluster):
1017 def __init__(self, ctx):
1018 # Deliberately skip calling parent constructor
1019 self._ctx = ctx
1020 self.mon_manager = LocalCephManager()
1021 self._conf = defaultdict(dict)
1022
1023 @property
1024 def admin_remote(self):
1025 return LocalRemote()
1026
1027 def get_config(self, key, service_type=None):
1028 if service_type is None:
1029 service_type = 'mon'
1030
1031 # FIXME hardcoded vstart service IDs
1032 service_id = {
1033 'mon': 'a',
1034 'mds': 'a',
1035 'osd': '0'
1036 }[service_type]
1037
1038 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
1039
1040 def _write_conf(self):
1041 # In teuthology, we have the honour of writing the entire ceph.conf, but
1042 # in vstart land it has mostly already been written and we need to carefully
1043 # append to it.
1044 conf_path = "./ceph.conf"
1045 banner = "\n#LOCAL_TEST\n"
1046 existing_str = open(conf_path).read()
1047
1048 if banner in existing_str:
1049 existing_str = existing_str[0:existing_str.find(banner)]
1050
1051 existing_str += banner
1052
1053 for subsys, kvs in self._conf.items():
1054 existing_str += "\n[{0}]\n".format(subsys)
1055 for key, val in kvs.items():
1056 # Comment out existing instance if it exists
1057 log.info("Searching for existing instance {0}/{1}".format(
1058 key, subsys
1059 ))
1060 existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
1061 subsys
1062 ), existing_str, re.MULTILINE)
1063
1064 if existing_section:
1065 section_str = existing_str[existing_section.start():existing_section.end()]
1066 existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
1067 if existing_val:
1068 start = existing_section.start() + existing_val.start(1)
1069 log.info("Found string to replace at {0}".format(
1070 start
1071 ))
1072 existing_str = existing_str[0:start] + "#" + existing_str[start:]
1073
1074 existing_str += "{0} = {1}\n".format(key, val)
1075
1076 open(conf_path, "w").write(existing_str)
1077
1078 def set_ceph_conf(self, subsys, key, value):
1079 self._conf[subsys][key] = value
1080 self._write_conf()
1081
1082 def clear_ceph_conf(self, subsys, key):
1083 del self._conf[subsys][key]
1084 self._write_conf()
1085
1086
1087 class LocalMDSCluster(LocalCephCluster, MDSCluster):
1088 def __init__(self, ctx):
1089 super(LocalMDSCluster, self).__init__(ctx)
1090
1091 self.mds_ids = ctx.daemons.daemons['ceph.mds'].keys()
1092 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
1093
1094 def clear_firewall(self):
1095 # FIXME: unimplemented
1096 pass
1097
1098 def newfs(self, name='cephfs', create=True):
1099 return LocalFilesystem(self._ctx, name=name, create=create)
1100
1101
1102 class LocalMgrCluster(LocalCephCluster, MgrCluster):
1103 def __init__(self, ctx):
1104 super(LocalMgrCluster, self).__init__(ctx)
1105
1106 self.mgr_ids = ctx.daemons.daemons['ceph.mgr'].keys()
1107 self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
1108
1109
1110 class LocalFilesystem(Filesystem, LocalMDSCluster):
1111 def __init__(self, ctx, fscid=None, name='cephfs', create=False, ec_profile=None):
1112 # Deliberately skip calling parent constructor
1113 self._ctx = ctx
1114
1115 self.id = None
1116 self.name = name
1117 self.ec_profile = ec_profile
1118 self.metadata_pool_name = None
1119 self.metadata_overlay = False
1120 self.data_pool_name = None
1121 self.data_pools = None
1122 self.fs_config = None
1123
1124 # Hack: cheeky inspection of ceph.conf to see what MDSs exist
1125 self.mds_ids = set()
1126 for line in open("ceph.conf").readlines():
1127 match = re.match("^\[mds\.(.+)\]$", line)
1128 if match:
1129 self.mds_ids.add(match.group(1))
1130
1131 if not self.mds_ids:
1132 raise RuntimeError("No MDSs found in ceph.conf!")
1133
1134 self.mds_ids = list(self.mds_ids)
1135
1136 log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
1137
1138 self.mon_manager = LocalCephManager()
1139
1140 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
1141
1142 self.client_remote = LocalRemote()
1143
1144 self._conf = defaultdict(dict)
1145
1146 if name is not None:
1147 if fscid is not None:
1148 raise RuntimeError("cannot specify fscid when creating fs")
1149 if create and not self.legacy_configured():
1150 self.create()
1151 else:
1152 if fscid is not None:
1153 self.id = fscid
1154 self.getinfo(refresh=True)
1155
1156 # Stash a reference to the first created filesystem on ctx, so
1157 # that if someone drops to the interactive shell they can easily
1158 # poke our methods.
1159 if not hasattr(self._ctx, "filesystem"):
1160 self._ctx.filesystem = self
1161
1162 @property
1163 def _prefix(self):
1164 return BIN_PREFIX
1165
1166 def set_clients_block(self, blocked, mds_id=None):
1167 raise NotImplementedError()
1168
1169
1170 class InteractiveFailureResult(unittest.TextTestResult):
1171 """
1172 Specialization that implements interactive-on-error style
1173 behavior.
1174 """
1175 def addFailure(self, test, err):
1176 super(InteractiveFailureResult, self).addFailure(test, err)
1177 log.error(self._exc_info_to_string(err, test))
1178 log.error("Failure in test '{0}', going interactive".format(
1179 self.getDescription(test)
1180 ))
1181 interactive.task(ctx=None, config=None)
1182
1183 def addError(self, test, err):
1184 super(InteractiveFailureResult, self).addError(test, err)
1185 log.error(self._exc_info_to_string(err, test))
1186 log.error("Error in test '{0}', going interactive".format(
1187 self.getDescription(test)
1188 ))
1189 interactive.task(ctx=None, config=None)
1190
1191
1192 def enumerate_methods(s):
1193 log.info("e: {0}".format(s))
1194 for t in s._tests:
1195 if isinstance(t, suite.BaseTestSuite):
1196 for sub in enumerate_methods(t):
1197 yield sub
1198 else:
1199 yield s, t
1200
1201
1202 def load_tests(modules, loader):
1203 if modules:
1204 log.info("Executing modules: {0}".format(modules))
1205 module_suites = []
1206 for mod_name in modules:
1207 # Test names like cephfs.test_auto_repair
1208 module_suites.append(loader.loadTestsFromName(mod_name))
1209 log.info("Loaded: {0}".format(list(module_suites)))
1210 return suite.TestSuite(module_suites)
1211 else:
1212 log.info("Executing all cephfs tests")
1213 return loader.discover(
1214 os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs")
1215 )
1216
1217
1218 def scan_tests(modules):
1219 overall_suite = load_tests(modules, loader.TestLoader())
1220
1221 max_required_mds = 0
1222 max_required_clients = 0
1223 max_required_mgr = 0
1224 require_memstore = False
1225
1226 for suite_, case in enumerate_methods(overall_suite):
1227 max_required_mds = max(max_required_mds,
1228 getattr(case, "MDSS_REQUIRED", 0))
1229 max_required_clients = max(max_required_clients,
1230 getattr(case, "CLIENTS_REQUIRED", 0))
1231 max_required_mgr = max(max_required_mgr,
1232 getattr(case, "MGRS_REQUIRED", 0))
1233 require_memstore = getattr(case, "REQUIRE_MEMSTORE", False) \
1234 or require_memstore
1235
1236 return max_required_mds, max_required_clients, \
1237 max_required_mgr, require_memstore
1238
1239
1240 class LocalCluster(object):
1241 def __init__(self, rolename="placeholder"):
1242 self.remotes = {
1243 LocalRemote(): [rolename]
1244 }
1245
1246 def only(self, requested):
1247 return self.__class__(rolename=requested)
1248
1249
1250 class LocalContext(object):
1251 def __init__(self):
1252 self.config = {}
1253 self.teuthology_config = teuth_config
1254 self.cluster = LocalCluster()
1255 self.daemons = DaemonGroup()
1256
1257 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
1258 # tests that want to look these up via ctx can do so.
1259 # Inspect ceph.conf to see what roles exist
1260 for conf_line in open("ceph.conf").readlines():
1261 for svc_type in ["mon", "osd", "mds", "mgr"]:
1262 prefixed_type = "ceph." + svc_type
1263 if prefixed_type not in self.daemons.daemons:
1264 self.daemons.daemons[prefixed_type] = {}
1265 match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
1266 if match:
1267 svc_id = match.group(1)
1268 self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
1269
1270 def __del__(self):
1271 test_path = self.teuthology_config['test_path']
1272 # opt_create_cluster_only does not create the test path
1273 if test_path:
1274 shutil.rmtree(test_path)
1275
1276 def teardown_cluster():
1277 log.info('\ntearing down the cluster...')
1278 remote.run(args=[os.path.join(SRC_PREFIX, "stop.sh")], timeout=60)
1279 remote.run(args=['rm', '-rf', './dev', './out'])
1280
1281 def clear_old_log():
1282 from os import stat
1283
1284 try:
1285 stat(logpath)
1286 # would need an update when making this py3 compatible. Use FileNotFound
1287 # instead.
1288 except OSError:
1289 return
1290 else:
1291 os.remove(logpath)
1292 with open(logpath, 'w') as logfile:
1293 logfile.write('')
1294 init_log()
1295 log.info('logging in a fresh file now...')
1296
1297 def exec_test():
1298 # Parse arguments
1299 opt_interactive_on_error = False
1300 opt_create_cluster = False
1301 opt_create_cluster_only = False
1302 opt_ignore_missing_binaries = False
1303 opt_teardown_cluster = False
1304 global opt_log_ps_output
1305 opt_log_ps_output = False
1306 use_kernel_client = False
1307 opt_verbose = True
1308
1309 args = sys.argv[1:]
1310 flags = [a for a in args if a.startswith("-")]
1311 modules = [a for a in args if not a.startswith("-")]
1312 for f in flags:
1313 if f == "--interactive":
1314 opt_interactive_on_error = True
1315 elif f == "--create":
1316 opt_create_cluster = True
1317 elif f == "--create-cluster-only":
1318 opt_create_cluster_only = True
1319 elif f == "--ignore-missing-binaries":
1320 opt_ignore_missing_binaries = True
1321 elif f == '--teardown':
1322 opt_teardown_cluster = True
1323 elif f == '--log-ps-output':
1324 opt_log_ps_output = True
1325 elif f == '--clear-old-log':
1326 clear_old_log()
1327 elif f == "--kclient":
1328 use_kernel_client = True
1329 elif '--no-verbose' == f:
1330 opt_verbose = False
1331 else:
1332 log.error("Unknown option '{0}'".format(f))
1333 sys.exit(-1)
1334
1335 # Help developers by stopping up-front if their tree isn't built enough for all the
1336 # tools that the tests might want to use (add more here if needed)
1337 require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
1338 "cephfs-table-tool", "ceph-fuse", "rados"]
1339 missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
1340 if missing_binaries and not opt_ignore_missing_binaries:
1341 log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
1342 sys.exit(-1)
1343
1344 max_required_mds, max_required_clients, \
1345 max_required_mgr, require_memstore = scan_tests(modules)
1346
1347 global remote
1348 remote = LocalRemote()
1349
1350 # Tolerate no MDSs or clients running at start
1351 ps_txt = six.ensure_str(remote.run(
1352 args=["ps", "-u"+str(os.getuid())],
1353 stdout=StringIO()
1354 ).stdout.getvalue().strip())
1355 lines = ps_txt.split("\n")[1:]
1356 for line in lines:
1357 if 'ceph-fuse' in line or 'ceph-mds' in line:
1358 pid = int(line.split()[0])
1359 log.warning("Killing stray process {0}".format(line))
1360 os.kill(pid, signal.SIGKILL)
1361
1362 # Fire up the Ceph cluster if the user requested it
1363 if opt_create_cluster or opt_create_cluster_only:
1364 log.info("Creating cluster with {0} MDS daemons".format(
1365 max_required_mds))
1366 teardown_cluster()
1367 vstart_env = os.environ.copy()
1368 vstart_env["FS"] = "0"
1369 vstart_env["MDS"] = max_required_mds.__str__()
1370 vstart_env["OSD"] = "4"
1371 vstart_env["MGR"] = max(max_required_mgr, 1).__str__()
1372
1373 args = [
1374 os.path.join(SRC_PREFIX, "vstart.sh"),
1375 "-n",
1376 "--nolockdep",
1377 ]
1378 if require_memstore:
1379 args.append("--memstore")
1380
1381 if opt_verbose:
1382 args.append("-d")
1383
1384 # usually, i get vstart.sh running completely in less than 100
1385 # seconds.
1386 remote.run(args=args, env=vstart_env, timeout=(3 * 60))
1387
1388 # Wait for OSD to come up so that subsequent injectargs etc will
1389 # definitely succeed
1390 LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30)
1391
1392 if opt_create_cluster_only:
1393 return
1394
1395 # List of client mounts, sufficient to run the selected tests
1396 clients = [i.__str__() for i in range(0, max_required_clients)]
1397
1398 test_dir = tempfile.mkdtemp()
1399 teuth_config['test_path'] = test_dir
1400
1401 ctx = LocalContext()
1402 ceph_cluster = LocalCephCluster(ctx)
1403 mds_cluster = LocalMDSCluster(ctx)
1404 mgr_cluster = LocalMgrCluster(ctx)
1405
1406 # Construct Mount classes
1407 mounts = []
1408 for client_id in clients:
1409 # Populate client keyring (it sucks to use client.admin for test clients
1410 # because it's awkward to find the logs later)
1411 client_name = "client.{0}".format(client_id)
1412
1413 if client_name not in open("./keyring").read():
1414 p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
1415 "osd", "allow rw",
1416 "mds", "allow",
1417 "mon", "allow r"])
1418
1419 open("./keyring", "ab").write(p.stdout.getvalue())
1420
1421 if use_kernel_client:
1422 mount = LocalKernelMount(ctx, test_dir, client_id)
1423 else:
1424 mount = LocalFuseMount(ctx, test_dir, client_id)
1425
1426 mounts.append(mount)
1427 if os.path.exists(mount.mountpoint):
1428 if mount.is_mounted():
1429 log.warning("unmounting {0}".format(mount.mountpoint))
1430 mount.umount_wait()
1431 else:
1432 os.rmdir(mount.mountpoint)
1433
1434 from tasks.cephfs_test_runner import DecoratingLoader
1435
1436 class LogStream(object):
1437 def __init__(self):
1438 self.buffer = ""
1439
1440 def write(self, data):
1441 self.buffer += data
1442 if "\n" in self.buffer:
1443 lines = self.buffer.split("\n")
1444 for line in lines[:-1]:
1445 pass
1446 # sys.stderr.write(line + "\n")
1447 log.info(line)
1448 self.buffer = lines[-1]
1449
1450 def flush(self):
1451 pass
1452
1453 decorating_loader = DecoratingLoader({
1454 "ctx": ctx,
1455 "mounts": mounts,
1456 "ceph_cluster": ceph_cluster,
1457 "mds_cluster": mds_cluster,
1458 "mgr_cluster": mgr_cluster,
1459 })
1460
1461 # For the benefit of polling tests like test_full -- in teuthology land we set this
1462 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
1463 remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval", "5"])
1464 ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval", "5")
1465
1466 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
1467 # from normal IO latency. Increase it for running teests.
1468 ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10")
1469
1470 # Make sure the filesystem created in tests has uid/gid that will let us talk to
1471 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
1472 # so that cephfs-data-scan will pick it up too.
1473 ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
1474 ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
1475
1476 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
1477 def _get_package_version(remote, pkg_name):
1478 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
1479 return "2.9"
1480
1481 import teuthology.packaging
1482 teuthology.packaging.get_package_version = _get_package_version
1483
1484 overall_suite = load_tests(modules, decorating_loader)
1485
1486 # Filter out tests that don't lend themselves to interactive running,
1487 victims = []
1488 for case, method in enumerate_methods(overall_suite):
1489 fn = getattr(method, method._testMethodName)
1490
1491 drop_test = False
1492
1493 if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
1494 drop_test = True
1495 log.warning("Dropping test because long running: {method_id}".format(method_id=method.id()))
1496
1497 if getattr(fn, "needs_trimming", False) is True:
1498 drop_test = (os.getuid() != 0)
1499 log.warning("Dropping test because client trim unavailable: {method_id}".format(method_id=method.id()))
1500
1501 if drop_test:
1502 # Don't drop the test if it was explicitly requested in arguments
1503 is_named = False
1504 for named in modules:
1505 if named.endswith(method.id()):
1506 is_named = True
1507 break
1508
1509 if not is_named:
1510 victims.append((case, method))
1511
1512 log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
1513 for s, method in victims:
1514 s._tests.remove(method)
1515
1516 if opt_interactive_on_error:
1517 result_class = InteractiveFailureResult
1518 else:
1519 result_class = unittest.TextTestResult
1520 fail_on_skip = False
1521
1522 class LoggingResult(result_class):
1523 def startTest(self, test):
1524 log.info("Starting test: {0}".format(self.getDescription(test)))
1525 test.started_at = datetime.datetime.utcnow()
1526 return super(LoggingResult, self).startTest(test)
1527
1528 def stopTest(self, test):
1529 log.info("Stopped test: {0} in {1}s".format(
1530 self.getDescription(test),
1531 (datetime.datetime.utcnow() - test.started_at).total_seconds()
1532 ))
1533
1534 def addSkip(self, test, reason):
1535 if fail_on_skip:
1536 # Don't just call addFailure because that requires a traceback
1537 self.failures.append((test, reason))
1538 else:
1539 super(LoggingResult, self).addSkip(test, reason)
1540
1541 # Execute!
1542 result = unittest.TextTestRunner(
1543 stream=LogStream(),
1544 resultclass=LoggingResult,
1545 verbosity=2,
1546 failfast=True).run(overall_suite)
1547
1548 if opt_teardown_cluster:
1549 teardown_cluster()
1550
1551 if not result.wasSuccessful():
1552 result.printErrors() # duplicate output at end for convenience
1553
1554 bad_tests = []
1555 for test, error in result.errors:
1556 bad_tests.append(str(test))
1557 for test, failure in result.failures:
1558 bad_tests.append(str(test))
1559
1560 sys.exit(-1)
1561 else:
1562 sys.exit(0)
1563
1564
1565 if __name__ == "__main__":
1566 exec_test()