]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/vstart_runner.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / qa / tasks / vstart_runner.py
1 """
2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
5
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
7
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
11 cd ~/git/ceph/build
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
14
15 Alternative usage:
16
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.3 python ~/git/ceph/qa/tasks/vstart_runner.py
19
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
22
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
25
26 # Also, you can create the cluster once and then run named test cases against it:
27 python ~/git/ceph/qa/tasks/vstart_runner.py --create-cluster-only
28 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_health
29 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_rgw
30
31 """
32
33 from io import StringIO
34 from collections import defaultdict
35 import getpass
36 import signal
37 import tempfile
38 import threading
39 import datetime
40 import shutil
41 import re
42 import os
43 import time
44 import sys
45 import errno
46 from IPy import IP
47 import unittest
48 import platform
49 import logging
50
51 from unittest import suite, loader
52
53 from teuthology.orchestra.run import Raw, quote
54 from teuthology.orchestra.daemon import DaemonGroup
55 from teuthology.orchestra.remote import Remote
56 from teuthology.config import config as teuth_config
57 from teuthology.contextutil import safe_while
58 from teuthology.contextutil import MaxWhileTries
59 from teuthology.orchestra.run import CommandFailedError
60 try:
61 import urllib3
62 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
63 except:
64 pass
65
66 def init_log():
67 global log
68 if log is not None:
69 del log
70 log = logging.getLogger(__name__)
71
72 global logpath
73 logpath = './vstart_runner.log'
74
75 handler = logging.FileHandler(logpath)
76 formatter = logging.Formatter(
77 fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
78 datefmt='%Y-%m-%dT%H:%M:%S')
79 handler.setFormatter(formatter)
80 log.addHandler(handler)
81 log.setLevel(logging.INFO)
82
83 log = None
84 init_log()
85
86
87 def respawn_in_path(lib_path, python_paths):
88 execv_cmd = ['python']
89 if platform.system() == "Darwin":
90 lib_path_var = "DYLD_LIBRARY_PATH"
91 else:
92 lib_path_var = "LD_LIBRARY_PATH"
93
94 py_binary = os.environ.get("PYTHON", sys.executable)
95
96 if lib_path_var in os.environ:
97 if lib_path not in os.environ[lib_path_var]:
98 os.environ[lib_path_var] += ':' + lib_path
99 os.execvp(py_binary, execv_cmd + sys.argv)
100 else:
101 os.environ[lib_path_var] = lib_path
102 os.execvp(py_binary, execv_cmd + sys.argv)
103
104 for p in python_paths:
105 sys.path.insert(0, p)
106
107
108 # Let's use some sensible defaults
109 if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
110
111 # A list of candidate paths for each package we need
112 guesses = [
113 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
114 ["lib/cython_modules/lib.3"],
115 ["../src/pybind"],
116 ]
117
118 python_paths = []
119
120 # Up one level so that "tasks.foo.bar" imports work
121 python_paths.append(os.path.abspath(
122 os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
123 ))
124
125 for package_guesses in guesses:
126 for g in package_guesses:
127 g_exp = os.path.abspath(os.path.expanduser(g))
128 if os.path.exists(g_exp):
129 python_paths.append(g_exp)
130
131 ld_path = os.path.join(os.getcwd(), "lib/")
132 print("Using guessed paths {0} {1}".format(ld_path, python_paths))
133 respawn_in_path(ld_path, python_paths)
134
135
136 try:
137 from tasks.ceph_manager import CephManager
138 from tasks.cephfs.fuse_mount import FuseMount
139 from tasks.cephfs.kernel_mount import KernelMount
140 from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
141 from tasks.cephfs.mount import CephFSMount
142 from tasks.mgr.mgr_test_case import MgrCluster
143 from teuthology.task import interactive
144 except ImportError:
145 sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
146 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
147 raise
148
149 # Must import after teuthology because of gevent monkey patching
150 import subprocess
151
152 if os.path.exists("./CMakeCache.txt"):
153 # Running in build dir of a cmake build
154 BIN_PREFIX = "./bin/"
155 SRC_PREFIX = "../src"
156 else:
157 # Running in src/ of an autotools build
158 BIN_PREFIX = "./"
159 SRC_PREFIX = "./"
160
161
162 def rm_nonascii_chars(var):
163 var = var.replace(b'\xe2\x80\x98', b'\'')
164 var = var.replace(b'\xe2\x80\x99', b'\'')
165 return var
166
167 class LocalRemoteProcess(object):
168 def __init__(self, args, subproc, check_status, stdout, stderr):
169 self.args = args
170 self.subproc = subproc
171 self.stdout = stdout
172 self.stderr = stderr
173 # this variable is meant for instance of this class named fuse_daemon.
174 # child process of the command launched with sudo must be killed,
175 # since killing parent process alone has no impact on the child
176 # process.
177 self.fuse_pid = -1
178
179 self.check_status = check_status
180 self.exitstatus = self.returncode = None
181
182 def wait(self):
183 if self.finished:
184 # Avoid calling communicate() on a dead process because it'll
185 # give you stick about std* already being closed
186 if self.check_status and self.exitstatus != 0:
187 raise CommandFailedError(self.args, self.exitstatus)
188 else:
189 return
190
191 out, err = self.subproc.communicate()
192 out, err = rm_nonascii_chars(out), rm_nonascii_chars(err)
193 if isinstance(self.stdout, StringIO):
194 self.stdout.write(out.decode(errors='ignore'))
195 elif self.stdout is None:
196 pass
197 else:
198 self.stdout.write(out)
199 if isinstance(self.stderr, StringIO):
200 self.stderr.write(err.decode(errors='ignore'))
201 elif self.stderr is None:
202 pass
203 else:
204 self.stderr.write(err)
205
206 self.exitstatus = self.returncode = self.subproc.returncode
207
208 if self.exitstatus != 0:
209 sys.stderr.write(out.decode())
210 sys.stderr.write(err.decode())
211
212 if self.check_status and self.exitstatus != 0:
213 raise CommandFailedError(self.args, self.exitstatus)
214
215 @property
216 def finished(self):
217 if self.exitstatus is not None:
218 return True
219
220 if self.subproc.poll() is not None:
221 out, err = self.subproc.communicate()
222 if isinstance(self.stdout, StringIO):
223 self.stdout.write(out.decode(errors='ignore'))
224 elif self.stdout is None:
225 pass
226 else:
227 self.stdout.write(out)
228 if isinstance(self.stderr, StringIO):
229 self.stderr.write(err.decode(errors='ignore'))
230 elif self.stderr is None:
231 pass
232 else:
233 self.stderr.write(err)
234 self.exitstatus = self.returncode = self.subproc.returncode
235 return True
236 else:
237 return False
238
239 def kill(self):
240 log.debug("kill ")
241 if self.subproc.pid and not self.finished:
242 log.debug("kill: killing pid {0} ({1})".format(
243 self.subproc.pid, self.args))
244 if self.fuse_pid != -1:
245 safe_kill(self.fuse_pid)
246 else:
247 safe_kill(self.subproc.pid)
248 else:
249 log.debug("kill: already terminated ({0})".format(self.args))
250
251 @property
252 def stdin(self):
253 class FakeStdIn(object):
254 def __init__(self, mount_daemon):
255 self.mount_daemon = mount_daemon
256
257 def close(self):
258 self.mount_daemon.kill()
259
260 return FakeStdIn(self)
261
262
263 class LocalRemote(object):
264 """
265 Amusingly named class to present the teuthology RemoteProcess interface when we are really
266 running things locally for vstart
267
268 Run this inside your src/ dir!
269 """
270
271 os = Remote.os
272 arch = Remote.arch
273
274 def __init__(self):
275 self.name = "local"
276 self.hostname = "localhost"
277 self.user = getpass.getuser()
278
279 def get_file(self, path, sudo, dest_dir):
280 tmpfile = tempfile.NamedTemporaryFile(delete=False).name
281 shutil.copy(path, tmpfile)
282 return tmpfile
283
284 # XXX: This method ignores the error raised when src and dst are
285 # holding same path. For teuthology, same path still represents
286 # different locations as they lie on different machines.
287 def put_file(self, src, dst, sudo=False):
288 try:
289 shutil.copy(src, dst)
290 except shutil.SameFileError:
291 pass
292
293 # XXX: accepts only two arugments to maintain compatibility with
294 # teuthology's mkdtemp.
295 def mkdtemp(self, suffix='', parentdir=None):
296 from tempfile import mkdtemp
297
298 # XXX: prefix had to be set without that this method failed against
299 # Python2.7 -
300 # > /usr/lib64/python2.7/tempfile.py(337)mkdtemp()
301 # -> file = _os.path.join(dir, prefix + name + suffix)
302 # (Pdb) p prefix
303 # None
304 return mkdtemp(suffix=suffix, prefix='', dir=parentdir)
305
306 def mktemp(self, suffix=None, parentdir=None):
307 """
308 Make a remote temporary file
309
310 Returns: the path of the temp file created.
311 """
312 from tempfile import mktemp
313 return mktemp(suffix=suffix, dir=parentdir)
314
315 def write_file(self, path, data, sudo=False, mode=None, owner=None,
316 mkdir=False, append=False):
317 """
318 Write data to file
319
320 :param path: file path on host
321 :param data: str, binary or fileobj to be written
322 :param sudo: use sudo to write file, defaults False
323 :param mode: set file mode bits if provided
324 :param owner: set file owner if provided
325 :param mkdir: preliminary create the file directory, defaults False
326 :param append: append data to the file, defaults False
327 """
328 dd = 'sudo dd' if sudo else 'dd'
329 args = dd + ' of=' + path
330 if append:
331 args += ' conv=notrunc oflag=append'
332 if mkdir:
333 mkdirp = 'sudo mkdir -p' if sudo else 'mkdir -p'
334 dirpath = os.path.dirname(path)
335 if dirpath:
336 args = mkdirp + ' ' + dirpath + '\n' + args
337 if mode:
338 chmod = 'sudo chmod' if sudo else 'chmod'
339 args += '\n' + chmod + ' ' + mode + ' ' + path
340 if owner:
341 chown = 'sudo chown' if sudo else 'chown'
342 args += '\n' + chown + ' ' + owner + ' ' + path
343 omit_sudo = False if sudo else True
344 self.run(args=args, stdin=data, omit_sudo=omit_sudo)
345
346 def sudo_write_file(self, path, data, **kwargs):
347 """
348 Write data to file with sudo, for more info see `write_file()`.
349 """
350 self.write_file(path, data, sudo=True, **kwargs)
351
352 def _perform_checks_and_return_list_of_args(self, args, omit_sudo):
353 # Since Python's shell simulation can only work when commands are
354 # provided as a list of argumensts...
355 if isinstance(args, str):
356 args = args.split()
357
358 # We'll let sudo be a part of command even omit flag says otherwise in
359 # cases of commands which can normally be ran only by root.
360 try:
361 if args[args.index('sudo') + 1] in ['-u', 'passwd', 'chown']:
362 omit_sudo = False
363 except ValueError:
364 pass
365
366 # Quotes wrapping a command argument don't work fine in Python's shell
367 # simulation if the arguments contains spaces too. E.g. '"ls"' is OK
368 # but "ls /" isn't.
369 errmsg = "Don't surround arguments commands by quotes if it " + \
370 "contains spaces.\nargs - %s" % (args)
371 for arg in args:
372 if isinstance(arg, Raw):
373 continue
374
375 if arg and (arg[0] in ['"', "'"] or arg[-1] in ['"', "'"]) and \
376 (arg.find(' ') != -1 and 0 < arg.find(' ') < len(arg) - 1):
377 raise RuntimeError(errmsg)
378
379 # ['sudo', '-u', 'user', '-s', 'path-to-shell', '-c', 'ls', 'a']
380 # and ['sudo', '-u', user, '-s', path_to_shell, '-c', 'ls a'] are
381 # treated differently by Python's shell simulation. Only latter has
382 # the desired effect.
383 errmsg = 'The entire command to executed as other user should be a ' +\
384 'single argument.\nargs - %s' % (args)
385 if 'sudo' in args and '-u' in args and '-c' in args and \
386 args.count('-c') == 1:
387 if args.index('-c') != len(args) - 2 and \
388 args[args.index('-c') + 2].find('-') == -1:
389 raise RuntimeError(errmsg)
390
391 if omit_sudo:
392 args = [a for a in args if a != "sudo"]
393
394 return args
395
396 # Wrapper to keep the interface exactly same as that of
397 # teuthology.remote.run.
398 def run(self, **kwargs):
399 return self._do_run(**kwargs)
400
401 # XXX: omit_sudo is set to True since using sudo can change the ownership
402 # of files which becomes problematic for following executions of
403 # vstart_runner.py.
404 def _do_run(self, args, check_status=True, wait=True, stdout=None,
405 stderr=None, cwd=None, stdin=None, logger=None, label=None,
406 env=None, timeout=None, omit_sudo=True):
407 args = self._perform_checks_and_return_list_of_args(args, omit_sudo)
408
409 # We have to use shell=True if any run.Raw was present, e.g. &&
410 shell = any([a for a in args if isinstance(a, Raw)])
411
412 # Filter out helper tools that don't exist in a vstart environment
413 args = [a for a in args if a not in ('adjust-ulimits',
414 'ceph-coverage')]
415
416 # Adjust binary path prefix if given a bare program name
417 if not isinstance(args[0], Raw) and "/" not in args[0]:
418 # If they asked for a bare binary name, and it exists
419 # in our built tree, use the one there.
420 local_bin = os.path.join(BIN_PREFIX, args[0])
421 if os.path.exists(local_bin):
422 args = [local_bin] + args[1:]
423 else:
424 log.debug("'{0}' is not a binary in the Ceph build dir".format(
425 args[0]
426 ))
427
428 log.debug('> ' +
429 ' '.join([str(a.value) if isinstance(a, Raw) else a for a in args]))
430
431 if shell:
432 subproc = subprocess.Popen(quote(args),
433 stdout=subprocess.PIPE,
434 stderr=subprocess.PIPE,
435 stdin=subprocess.PIPE,
436 cwd=cwd,
437 env=env,
438 shell=True)
439 else:
440 # Sanity check that we've got a list of strings
441 for arg in args:
442 if not isinstance(arg, str):
443 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
444 arg, arg.__class__
445 ))
446
447 subproc = subprocess.Popen(args,
448 stdout=subprocess.PIPE,
449 stderr=subprocess.PIPE,
450 stdin=subprocess.PIPE,
451 cwd=cwd,
452 env=env)
453
454 if stdin:
455 # Hack: writing to stdin is not deadlock-safe, but it "always" works
456 # as long as the input buffer is "small"
457 if isinstance(stdin, str):
458 subproc.stdin.write(stdin.encode())
459 else:
460 subproc.stdin.write(stdin)
461
462 proc = LocalRemoteProcess(
463 args, subproc, check_status,
464 stdout, stderr
465 )
466
467 if wait:
468 proc.wait()
469
470 return proc
471
472 # XXX: for compatibility keep this method same as teuthology.orchestra.remote.sh
473 # BytesIO is being used just to keep things identical
474 def sh(self, script, **kwargs):
475 """
476 Shortcut for run method.
477
478 Usage:
479 my_name = remote.sh('whoami')
480 remote_date = remote.sh('date')
481 """
482 from io import BytesIO
483
484 if 'stdout' not in kwargs:
485 kwargs['stdout'] = BytesIO()
486 if 'args' not in kwargs:
487 kwargs['args'] = script
488 proc = self.run(**kwargs)
489 out = proc.stdout.getvalue()
490 if isinstance(out, bytes):
491 return out.decode()
492 else:
493 return out
494
495 class LocalDaemon(object):
496 def __init__(self, daemon_type, daemon_id):
497 self.daemon_type = daemon_type
498 self.daemon_id = daemon_id
499 self.controller = LocalRemote()
500 self.proc = None
501
502 @property
503 def remote(self):
504 return LocalRemote()
505
506 def running(self):
507 return self._get_pid() is not None
508
509 def check_status(self):
510 if self.proc:
511 return self.proc.poll()
512
513 def _get_pid(self):
514 """
515 Return PID as an integer or None if not found
516 """
517 ps_txt = self.controller.run(args=["ps", "ww", "-u"+str(os.getuid())],
518 stdout=StringIO()).\
519 stdout.getvalue().strip()
520 lines = ps_txt.split("\n")[1:]
521
522 for line in lines:
523 if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
524 log.debug("Found ps line for daemon: {0}".format(line))
525 return int(line.split()[0])
526 if opt_log_ps_output:
527 log.debug("No match for {0} {1}: {2}".format(
528 self.daemon_type, self.daemon_id, ps_txt))
529 else:
530 log.debug("No match for {0} {1}".format(self.daemon_type,
531 self.daemon_id))
532 return None
533
534 def wait(self, timeout):
535 waited = 0
536 while self._get_pid() is not None:
537 if waited > timeout:
538 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
539 time.sleep(1)
540 waited += 1
541
542 def stop(self, timeout=300):
543 if not self.running():
544 log.error('tried to stop a non-running daemon')
545 return
546
547 pid = self._get_pid()
548 log.debug("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
549 os.kill(pid, signal.SIGTERM)
550
551 waited = 0
552 while pid is not None:
553 new_pid = self._get_pid()
554 if new_pid is not None and new_pid != pid:
555 log.debug("Killing new PID {0}".format(new_pid))
556 pid = new_pid
557 os.kill(pid, signal.SIGTERM)
558
559 if new_pid is None:
560 break
561 else:
562 if waited > timeout:
563 raise MaxWhileTries(
564 "Timed out waiting for daemon {0}.{1}".format(
565 self.daemon_type, self.daemon_id))
566 time.sleep(1)
567 waited += 1
568
569 self.wait(timeout=timeout)
570
571 def restart(self):
572 if self._get_pid() is not None:
573 self.stop()
574
575 self.proc = self.controller.run(args=[
576 os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)),
577 "-i", self.daemon_id])
578
579 def signal(self, sig, silent=False):
580 if not self.running():
581 raise RuntimeError("Can't send signal to non-running daemon")
582
583 os.kill(self._get_pid(), sig)
584 if not silent:
585 log.debug("Sent signal {0} to {1}.{2}".format(sig, self.daemon_type, self.daemon_id))
586
587
588 def safe_kill(pid):
589 """
590 os.kill annoyingly raises exception if process already dead. Ignore it.
591 """
592 try:
593 return os.kill(pid, signal.SIGKILL)
594 except OSError as e:
595 if e.errno == errno.ESRCH:
596 # Raced with process termination
597 pass
598 else:
599 raise
600
601 def mon_in_localhost(config_path="./ceph.conf"):
602 """
603 If the ceph cluster is using the localhost IP as mon host, will must disable ns unsharing
604 """
605 with open(config_path) as f:
606 for line in f:
607 local = re.match(r'^\s*mon host\s*=\s*\[((v1|v2):127\.0\.0\.1:\d+,?)+\]', line)
608 if local:
609 return True
610 return False
611
612 class LocalKernelMount(KernelMount):
613 def __init__(self, ctx, test_dir, client_id=None,
614 client_keyring_path=None, client_remote=None,
615 hostfs_mntpt=None, cephfs_name=None, cephfs_mntpt=None,
616 brxnet=None):
617 super(LocalKernelMount, self).__init__(ctx=ctx, test_dir=test_dir,
618 client_id=client_id, client_keyring_path=client_keyring_path,
619 client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
620 cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
621
622 @property
623 def config_path(self):
624 return "./ceph.conf"
625
626 def get_keyring_path(self):
627 # This is going to end up in a config file, so use an absolute path
628 # to avoid assumptions about daemons' pwd
629 keyring_path = "./client.{0}.keyring".format(self.client_id)
630 try:
631 os.stat(keyring_path)
632 except OSError:
633 return os.path.join(os.getcwd(), 'keyring')
634 else:
635 return keyring_path
636
637 def setupfs(self, name=None):
638 if name is None and self.fs is not None:
639 # Previous mount existed, reuse the old name
640 name = self.fs.name
641 self.fs = LocalFilesystem(self.ctx, name=name)
642 log.debug('Wait for MDS to reach steady state...')
643 self.fs.wait_for_daemons()
644 log.debug('Ready to start {}...'.format(type(self).__name__))
645
646 @property
647 def _prefix(self):
648 return BIN_PREFIX
649
650 def _asok_path(self):
651 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
652 # run foreground. When running it daemonized however, the asok is named after
653 # the PID of the launching process, not the long running ceph-fuse process. Therefore
654 # we need to give an exact path here as the logic for checking /proc/ for which
655 # asok is alive does not work.
656
657 # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
658 # in a tmpdir. All of the paths are the same, so no need to select
659 # based off of the service type.
660 d = "./out"
661 with open(self.config_path) as f:
662 for line in f:
663 asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line)
664 if asok_conf:
665 d = asok_conf.groups(1)[0]
666 break
667 path = "{0}/client.{1}.*.asok".format(d, self.client_id)
668 return path
669
670 def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
671 self.update_attrs(**kwargs)
672 self.assert_and_log_minimum_mount_details()
673
674 if opt_use_ns:
675 self.using_namespace = True
676 self.setup_netns()
677 else:
678 self.using_namespace = False
679
680 if not self.cephfs_mntpt:
681 self.cephfs_mntpt = "/"
682 # TODO: don't call setupfs() from within mount()
683 if createfs:
684 self.setupfs(name=self.cephfs_name)
685
686 opts = 'norequire_active_mds'
687 if self.client_id:
688 opts += ',name=' + self.client_id
689 if self.client_keyring_path and self.client_id:
690 opts += ",secret=" + self.get_key_from_keyfile()
691 if self.config_path:
692 opts += ',conf=' + self.config_path
693 if self.cephfs_name:
694 opts += ",mds_namespace={0}".format(self.cephfs_name)
695 if mntopts:
696 opts += ',' + ','.join(mntopts)
697
698 stderr = StringIO()
699 try:
700 self.client_remote.run(args=['mkdir', '--', self.hostfs_mntpt],
701 timeout=(5*60), stderr=stderr)
702 except CommandFailedError:
703 if 'file exists' not in stderr.getvalue().lower():
704 raise
705
706 if self.cephfs_mntpt is None:
707 self.cephfs_mntpt = "/"
708 cmdargs = ['sudo']
709 if self.using_namespace:
710 cmdargs += ['nsenter',
711 '--net=/var/run/netns/{0}'.format(self.netns_name)]
712 cmdargs += ['./bin/mount.ceph', ':' + self.cephfs_mntpt,
713 self.hostfs_mntpt, '-v', '-o', opts]
714
715 mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
716 try:
717 self.client_remote.run(args=cmdargs, timeout=(30*60),
718 omit_sudo=False, stdout=mountcmd_stdout,
719 stderr=mountcmd_stderr)
720 except CommandFailedError as e:
721 if check_status:
722 raise
723 else:
724 return (e, mountcmd_stdout.getvalue(),
725 mountcmd_stderr.getvalue())
726
727 stderr = StringIO()
728 try:
729 self.client_remote.run(args=['sudo', 'chmod', '1777',
730 self.hostfs_mntpt], stderr=stderr,
731 timeout=(5*60))
732 except CommandFailedError:
733 # the client does not have write permissions in cap it holds for
734 # the Ceph FS that was just mounted.
735 if 'permission denied' in stderr.getvalue().lower():
736 pass
737
738 self.mounted = True
739
740 def cleanup_netns(self):
741 if self.using_namespace:
742 super(type(self), self).cleanup_netns()
743
744 def _run_python(self, pyscript, py_version='python'):
745 """
746 Override this to remove the daemon-helper prefix that is used otherwise
747 to make the process killable.
748 """
749 return self.client_remote.run(args=[py_version, '-c', pyscript],
750 wait=False, stdout=StringIO())
751
752 class LocalFuseMount(FuseMount):
753 def __init__(self, ctx, test_dir, client_id, client_keyring_path=None,
754 client_remote=None, hostfs_mntpt=None, cephfs_name=None,
755 cephfs_mntpt=None, brxnet=None):
756 super(LocalFuseMount, self).__init__(ctx=ctx, client_config=None,
757 test_dir=test_dir, client_id=client_id,
758 client_keyring_path=client_keyring_path,
759 client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
760 cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
761
762 @property
763 def config_path(self):
764 return "./ceph.conf"
765
766 def get_keyring_path(self):
767 # This is going to end up in a config file, so use an absolute path
768 # to avoid assumptions about daemons' pwd
769 return os.path.abspath("./client.{0}.keyring".format(self.client_id))
770
771 def setupfs(self, name=None):
772 if name is None and self.fs is not None:
773 # Previous mount existed, reuse the old name
774 name = self.fs.name
775 self.fs = LocalFilesystem(self.ctx, name=name)
776 log.debug('Wait for MDS to reach steady state...')
777 self.fs.wait_for_daemons()
778 log.debug('Ready to start {}...'.format(type(self).__name__))
779
780 @property
781 def _prefix(self):
782 return BIN_PREFIX
783
784 def _asok_path(self):
785 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
786 # run foreground. When running it daemonized however, the asok is named after
787 # the PID of the launching process, not the long running ceph-fuse process. Therefore
788 # we need to give an exact path here as the logic for checking /proc/ for which
789 # asok is alive does not work.
790
791 # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
792 # in a tmpdir. All of the paths are the same, so no need to select
793 # based off of the service type.
794 d = "./out"
795 with open(self.config_path) as f:
796 for line in f:
797 asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line)
798 if asok_conf:
799 d = asok_conf.groups(1)[0]
800 break
801 path = "{0}/client.{1}.*.asok".format(d, self.client_id)
802 return path
803
804 def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
805 self.update_attrs(**kwargs)
806 self.assert_and_log_minimum_mount_details()
807
808 if opt_use_ns:
809 self.using_namespace = True
810 self.setup_netns()
811 else:
812 self.using_namespace = False
813
814 # TODO: don't call setupfs() from within mount()
815 if createfs:
816 self.setupfs(name=self.cephfs_name)
817
818 stderr = StringIO()
819 try:
820 self.client_remote.run(args=['mkdir', '-p', self.hostfs_mntpt],
821 stderr=stderr)
822 except CommandFailedError:
823 if 'file exists' not in stderr.getvalue().lower():
824 raise
825
826 def list_connections():
827 self.client_remote.run(
828 args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
829 check_status=False
830 )
831
832 p = self.client_remote.run(args=["ls", "/sys/fs/fuse/connections"],
833 check_status=False, stdout=StringIO())
834 if p.exitstatus != 0:
835 log.warning("ls conns failed with {0}, assuming none".format(p.exitstatus))
836 return []
837
838 ls_str = p.stdout.getvalue().strip()
839 if ls_str:
840 return [int(n) for n in ls_str.split("\n")]
841 else:
842 return []
843
844 # Before starting ceph-fuse process, note the contents of
845 # /sys/fs/fuse/connections
846 pre_mount_conns = list_connections()
847 log.debug("Pre-mount connections: {0}".format(pre_mount_conns))
848
849 cmdargs = []
850 if self.using_namespace:
851 cmdargs = ['sudo', 'nsenter',
852 '--net=/var/run/netns/{0}'.format(self.netns_name),
853 '--setuid', str(os.getuid())]
854 cmdargs += [os.path.join(BIN_PREFIX, 'ceph-fuse'), self.hostfs_mntpt,
855 '-f']
856 if self.client_id is not None:
857 cmdargs += ["--id", self.client_id]
858 if self.client_keyring_path and self.client_id is not None:
859 cmdargs.extend(['-k', self.client_keyring_path])
860 if self.cephfs_name:
861 cmdargs += ["--client_fs=" + self.cephfs_name]
862 if self.cephfs_mntpt:
863 cmdargs += ["--client_mountpoint=" + self.cephfs_mntpt]
864 if os.getuid() != 0:
865 cmdargs += ["--client_die_on_failed_dentry_invalidate=false"]
866 if mntopts:
867 cmdargs += mntopts
868
869 mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
870 self.fuse_daemon = self.client_remote.run(args=cmdargs, wait=False,
871 omit_sudo=False, stdout=mountcmd_stdout, stderr=mountcmd_stderr)
872 self._set_fuse_daemon_pid(check_status)
873 log.debug("Mounting client.{0} with pid "
874 "{1}".format(self.client_id, self.fuse_daemon.subproc.pid))
875
876 # Wait for the connection reference to appear in /sys
877 waited = 0
878 post_mount_conns = list_connections()
879 while len(post_mount_conns) <= len(pre_mount_conns):
880 if self.fuse_daemon.finished:
881 # Did mount fail? Raise the CommandFailedError instead of
882 # hitting the "failed to populate /sys/" timeout
883 try:
884 self.fuse_daemon.wait()
885 except CommandFailedError as e:
886 if check_status:
887 raise
888 else:
889 return (e, mountcmd_stdout.getvalue(),
890 mountcmd_stderr.getvalue())
891 time.sleep(1)
892 waited += 1
893 if waited > 30:
894 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
895 waited
896 ))
897 post_mount_conns = list_connections()
898
899 log.debug("Post-mount connections: {0}".format(post_mount_conns))
900
901 # Record our fuse connection number so that we can use it when
902 # forcing an unmount
903 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
904 if len(new_conns) == 0:
905 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
906 elif len(new_conns) > 1:
907 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
908 else:
909 self._fuse_conn = new_conns[0]
910
911 self.gather_mount_info()
912
913 self.mounted = True
914
915 def _set_fuse_daemon_pid(self, check_status):
916 # NOTE: When a command <args> is launched with sudo, two processes are
917 # launched, one with sudo in <args> and other without. Make sure we
918 # get the PID of latter one.
919 try:
920 with safe_while(sleep=1, tries=15) as proceed:
921 while proceed():
922 try:
923 sock = self.find_admin_socket()
924 except (RuntimeError, CommandFailedError):
925 continue
926
927 self.fuse_daemon.fuse_pid = int(re.match(".*\.(\d+)\.asok$",
928 sock).group(1))
929 break
930 except MaxWhileTries:
931 if check_status:
932 raise
933 else:
934 pass
935
936 def cleanup_netns(self):
937 if self.using_namespace:
938 super(type(self), self).cleanup_netns()
939
940 def _run_python(self, pyscript, py_version='python'):
941 """
942 Override this to remove the daemon-helper prefix that is used otherwise
943 to make the process killable.
944 """
945 return self.client_remote.run(args=[py_version, '-c', pyscript],
946 wait=False, stdout=StringIO())
947
948 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
949 # the same name.
950 class LocalCephManager(CephManager):
951 def __init__(self):
952 # Deliberately skip parent init, only inheriting from it to get
953 # util methods like osd_dump that sit on top of raw_cluster_cmd
954 self.controller = LocalRemote()
955
956 # A minority of CephManager fns actually bother locking for when
957 # certain teuthology tests want to run tasks in parallel
958 self.lock = threading.RLock()
959
960 self.log = lambda x: log.debug(x)
961
962 # Don't bother constructing a map of pools: it should be empty
963 # at test cluster start, and in any case it would be out of date
964 # in no time. The attribute needs to exist for some of the CephManager
965 # methods to work though.
966 self.pools = {}
967
968 def find_remote(self, daemon_type, daemon_id):
969 """
970 daemon_type like 'mds', 'osd'
971 daemon_id like 'a', '0'
972 """
973 return LocalRemote()
974
975 def run_ceph_w(self, watch_channel=None):
976 """
977 :param watch_channel: Specifies the channel to be watched.
978 This can be 'cluster', 'audit', ...
979 :type watch_channel: str
980 """
981 args = [os.path.join(BIN_PREFIX, "ceph"), "-w"]
982 if watch_channel is not None:
983 args.append("--watch-channel")
984 args.append(watch_channel)
985 proc = self.controller.run(args=args, wait=False, stdout=StringIO())
986 return proc
987
988 def run_cluster_cmd(self, **kwargs):
989 """
990 Run a Ceph command and the object representing the process for the
991 command.
992
993 Accepts arguments same as teuthology.orchestra.remote.run().
994 """
995 kwargs['args'] = [os.path.join(BIN_PREFIX,'ceph')]+list(kwargs['args'])
996 return self.controller.run(**kwargs)
997
998 def raw_cluster_cmd(self, *args, **kwargs) -> str:
999 """
1000 args like ["osd", "dump"}
1001 return stdout string
1002 """
1003 kwargs['args'] = args
1004 if kwargs.get('stdout') is None:
1005 kwargs['stdout'] = StringIO()
1006 return self.run_cluster_cmd(**kwargs).stdout.getvalue()
1007
1008 def raw_cluster_cmd_result(self, *args, **kwargs):
1009 """
1010 like raw_cluster_cmd but don't check status, just return rc
1011 """
1012 kwargs['args'], kwargs['check_status'] = args, False
1013 return self.run_cluster_cmd(**kwargs).exitstatus
1014
1015 def admin_socket(self, daemon_type, daemon_id, command, check_status=True,
1016 timeout=None, stdout=None):
1017 if stdout is None:
1018 stdout = StringIO()
1019
1020 return self.controller.run(
1021 args=[os.path.join(BIN_PREFIX, "ceph"), "daemon",
1022 "{0}.{1}".format(daemon_type, daemon_id)] + command,
1023 check_status=check_status, timeout=timeout, stdout=stdout)
1024
1025 def get_mon_socks(self):
1026 """
1027 Get monitor sockets.
1028
1029 :return socks: tuple of strings; strings are individual sockets.
1030 """
1031 from json import loads
1032
1033 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1034 socks = []
1035 for mon in output['mons']:
1036 for addrvec_mem in mon['public_addrs']['addrvec']:
1037 socks.append(addrvec_mem['addr'])
1038 return tuple(socks)
1039
1040 def get_msgrv1_mon_socks(self):
1041 """
1042 Get monitor sockets that use msgrv2 to operate.
1043
1044 :return socks: tuple of strings; strings are individual sockets.
1045 """
1046 from json import loads
1047
1048 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1049 socks = []
1050 for mon in output['mons']:
1051 for addrvec_mem in mon['public_addrs']['addrvec']:
1052 if addrvec_mem['type'] == 'v1':
1053 socks.append(addrvec_mem['addr'])
1054 return tuple(socks)
1055
1056 def get_msgrv2_mon_socks(self):
1057 """
1058 Get monitor sockets that use msgrv2 to operate.
1059
1060 :return socks: tuple of strings; strings are individual sockets.
1061 """
1062 from json import loads
1063
1064 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1065 socks = []
1066 for mon in output['mons']:
1067 for addrvec_mem in mon['public_addrs']['addrvec']:
1068 if addrvec_mem['type'] == 'v2':
1069 socks.append(addrvec_mem['addr'])
1070 return tuple(socks)
1071
1072
1073 class LocalCephCluster(CephCluster):
1074 def __init__(self, ctx):
1075 # Deliberately skip calling CephCluster constructor
1076 self._ctx = ctx
1077 self.mon_manager = LocalCephManager()
1078 self._conf = defaultdict(dict)
1079
1080 @property
1081 def admin_remote(self):
1082 return LocalRemote()
1083
1084 def get_config(self, key, service_type=None):
1085 if service_type is None:
1086 service_type = 'mon'
1087
1088 # FIXME hardcoded vstart service IDs
1089 service_id = {
1090 'mon': 'a',
1091 'mds': 'a',
1092 'osd': '0'
1093 }[service_type]
1094
1095 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
1096
1097 def _write_conf(self):
1098 # In teuthology, we have the honour of writing the entire ceph.conf, but
1099 # in vstart land it has mostly already been written and we need to carefully
1100 # append to it.
1101 conf_path = "./ceph.conf"
1102 banner = "\n#LOCAL_TEST\n"
1103 existing_str = open(conf_path).read()
1104
1105 if banner in existing_str:
1106 existing_str = existing_str[0:existing_str.find(banner)]
1107
1108 existing_str += banner
1109
1110 for subsys, kvs in self._conf.items():
1111 existing_str += "\n[{0}]\n".format(subsys)
1112 for key, val in kvs.items():
1113 # Comment out existing instance if it exists
1114 log.debug("Searching for existing instance {0}/{1}".format(
1115 key, subsys
1116 ))
1117 existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
1118 subsys
1119 ), existing_str, re.MULTILINE)
1120
1121 if existing_section:
1122 section_str = existing_str[existing_section.start():existing_section.end()]
1123 existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
1124 if existing_val:
1125 start = existing_section.start() + existing_val.start(1)
1126 log.debug("Found string to replace at {0}".format(
1127 start
1128 ))
1129 existing_str = existing_str[0:start] + "#" + existing_str[start:]
1130
1131 existing_str += "{0} = {1}\n".format(key, val)
1132
1133 open(conf_path, "w").write(existing_str)
1134
1135 def set_ceph_conf(self, subsys, key, value):
1136 self._conf[subsys][key] = value
1137 self._write_conf()
1138
1139 def clear_ceph_conf(self, subsys, key):
1140 del self._conf[subsys][key]
1141 self._write_conf()
1142
1143
1144 class LocalMDSCluster(LocalCephCluster, MDSCluster):
1145 def __init__(self, ctx):
1146 LocalCephCluster.__init__(self, ctx)
1147 # Deliberately skip calling MDSCluster constructor
1148 self._mds_ids = ctx.daemons.daemons['ceph.mds'].keys()
1149 log.debug("Discovered MDS IDs: {0}".format(self._mds_ids))
1150 self._mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
1151
1152 @property
1153 def mds_ids(self):
1154 return self._mds_ids
1155
1156 @property
1157 def mds_daemons(self):
1158 return self._mds_daemons
1159
1160 def clear_firewall(self):
1161 # FIXME: unimplemented
1162 pass
1163
1164 def newfs(self, name='cephfs', create=True):
1165 return LocalFilesystem(self._ctx, name=name, create=create)
1166
1167 def delete_all_filesystems(self):
1168 """
1169 Remove all filesystems that exist, and any pools in use by them.
1170 """
1171 for fs in self.status().get_filesystems():
1172 LocalFilesystem(ctx=self._ctx, fscid=fs['id']).destroy()
1173
1174
1175 class LocalMgrCluster(LocalCephCluster, MgrCluster):
1176 def __init__(self, ctx):
1177 super(LocalMgrCluster, self).__init__(ctx)
1178
1179 self.mgr_ids = ctx.daemons.daemons['ceph.mgr'].keys()
1180 self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
1181
1182
1183 class LocalFilesystem(LocalMDSCluster, Filesystem):
1184 def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
1185 # Deliberately skip calling Filesystem constructor
1186 LocalMDSCluster.__init__(self, ctx)
1187
1188 self.id = None
1189 self.name = name
1190 self.metadata_pool_name = None
1191 self.metadata_overlay = False
1192 self.data_pool_name = None
1193 self.data_pools = None
1194 self.fs_config = fs_config
1195 self.ec_profile = fs_config.get('ec_profile')
1196
1197 self.mon_manager = LocalCephManager()
1198
1199 self.client_remote = LocalRemote()
1200
1201 self._conf = defaultdict(dict)
1202
1203 if name is not None:
1204 if fscid is not None:
1205 raise RuntimeError("cannot specify fscid when creating fs")
1206 if create and not self.legacy_configured():
1207 self.create()
1208 else:
1209 if fscid is not None:
1210 self.id = fscid
1211 self.getinfo(refresh=True)
1212
1213 # Stash a reference to the first created filesystem on ctx, so
1214 # that if someone drops to the interactive shell they can easily
1215 # poke our methods.
1216 if not hasattr(self._ctx, "filesystem"):
1217 self._ctx.filesystem = self
1218
1219 @property
1220 def _prefix(self):
1221 return BIN_PREFIX
1222
1223 def set_clients_block(self, blocked, mds_id=None):
1224 raise NotImplementedError()
1225
1226
1227 class LocalCluster(object):
1228 def __init__(self, rolename="placeholder"):
1229 self.remotes = {
1230 LocalRemote(): [rolename]
1231 }
1232
1233 def only(self, requested):
1234 return self.__class__(rolename=requested)
1235
1236
1237 class LocalContext(object):
1238 def __init__(self):
1239 self.config = {}
1240 self.teuthology_config = teuth_config
1241 self.cluster = LocalCluster()
1242 self.daemons = DaemonGroup()
1243
1244 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
1245 # tests that want to look these up via ctx can do so.
1246 # Inspect ceph.conf to see what roles exist
1247 for conf_line in open("ceph.conf").readlines():
1248 for svc_type in ["mon", "osd", "mds", "mgr"]:
1249 prefixed_type = "ceph." + svc_type
1250 if prefixed_type not in self.daemons.daemons:
1251 self.daemons.daemons[prefixed_type] = {}
1252 match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
1253 if match:
1254 svc_id = match.group(1)
1255 self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
1256
1257 def __del__(self):
1258 test_path = self.teuthology_config['test_path']
1259 # opt_create_cluster_only does not create the test path
1260 if test_path:
1261 shutil.rmtree(test_path)
1262
1263
1264 #########################################
1265 #
1266 # stuff necessary for launching tests...
1267 #
1268 #########################################
1269
1270
1271 def enumerate_methods(s):
1272 log.debug("e: {0}".format(s))
1273 for t in s._tests:
1274 if isinstance(t, suite.BaseTestSuite):
1275 for sub in enumerate_methods(t):
1276 yield sub
1277 else:
1278 yield s, t
1279
1280
1281 def load_tests(modules, loader):
1282 if modules:
1283 log.debug("Executing modules: {0}".format(modules))
1284 module_suites = []
1285 for mod_name in modules:
1286 # Test names like cephfs.test_auto_repair
1287 module_suites.append(loader.loadTestsFromName(mod_name))
1288 log.debug("Loaded: {0}".format(list(module_suites)))
1289 return suite.TestSuite(module_suites)
1290 else:
1291 log.debug("Executing all cephfs tests")
1292 return loader.discover(
1293 os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs")
1294 )
1295
1296
1297 def scan_tests(modules):
1298 overall_suite = load_tests(modules, loader.TestLoader())
1299
1300 max_required_mds = 0
1301 max_required_clients = 0
1302 max_required_mgr = 0
1303 require_memstore = False
1304
1305 for suite_, case in enumerate_methods(overall_suite):
1306 max_required_mds = max(max_required_mds,
1307 getattr(case, "MDSS_REQUIRED", 0))
1308 max_required_clients = max(max_required_clients,
1309 getattr(case, "CLIENTS_REQUIRED", 0))
1310 max_required_mgr = max(max_required_mgr,
1311 getattr(case, "MGRS_REQUIRED", 0))
1312 require_memstore = getattr(case, "REQUIRE_MEMSTORE", False) \
1313 or require_memstore
1314
1315 return max_required_mds, max_required_clients, \
1316 max_required_mgr, require_memstore
1317
1318
1319 class LogRotate():
1320 def __init__(self):
1321 self.conf_file_path = os.path.join(os.getcwd(), 'logrotate.conf')
1322 self.state_file_path = os.path.join(os.getcwd(), 'logrotate.state')
1323
1324 def run_logrotate(self):
1325 remote.run(args=['logrotate', '-f', self.conf_file_path, '-s',
1326 self.state_file_path, '--verbose'])
1327
1328
1329 def teardown_cluster():
1330 log.info('\ntearing down the cluster...')
1331 remote.run(args=[os.path.join(SRC_PREFIX, "stop.sh")], timeout=60)
1332 remote.run(args=['rm', '-rf', './dev', './out'])
1333
1334
1335 def clear_old_log():
1336 from os import stat
1337
1338 try:
1339 stat(logpath)
1340 # would need an update when making this py3 compatible. Use FileNotFound
1341 # instead.
1342 except OSError:
1343 return
1344 else:
1345 os.remove(logpath)
1346 with open(logpath, 'w') as logfile:
1347 logfile.write('')
1348 init_log()
1349 log.debug('logging in a fresh file now...')
1350
1351
1352 class LogStream(object):
1353 def __init__(self):
1354 self.buffer = ""
1355 self.omit_result_lines = False
1356
1357 def _del_result_lines(self):
1358 """
1359 Don't let unittest.TextTestRunner print "Ran X tests in Ys",
1360 vstart_runner.py will do it for itself since it runs tests in a
1361 testsuite one by one.
1362 """
1363 if self.omit_result_lines:
1364 self.buffer = re.sub('-'*70+'\nran [0-9]* test in [0-9.]*s\n*',
1365 '', self.buffer, flags=re.I)
1366 self.buffer = re.sub('failed \(failures=[0-9]*\)\n', '', self.buffer,
1367 flags=re.I)
1368 self.buffer = self.buffer.replace('OK\n', '')
1369
1370 def write(self, data):
1371 self.buffer += data
1372 if self.buffer.count("\n") > 5:
1373 self._write()
1374
1375 def _write(self):
1376 if opt_rotate_logs:
1377 self._del_result_lines()
1378 if self.buffer == '':
1379 return
1380
1381 lines = self.buffer.split("\n")
1382 for line in lines:
1383 # sys.stderr.write(line + "\n")
1384 log.info(line)
1385 self.buffer = ''
1386
1387 def flush(self):
1388 pass
1389
1390 def __del__(self):
1391 self._write()
1392
1393
1394 class InteractiveFailureResult(unittest.TextTestResult):
1395 """
1396 Specialization that implements interactive-on-error style
1397 behavior.
1398 """
1399 def addFailure(self, test, err):
1400 super(InteractiveFailureResult, self).addFailure(test, err)
1401 log.error(self._exc_info_to_string(err, test))
1402 log.error("Failure in test '{0}', going interactive".format(
1403 self.getDescription(test)
1404 ))
1405 interactive.task(ctx=None, config=None)
1406
1407 def addError(self, test, err):
1408 super(InteractiveFailureResult, self).addError(test, err)
1409 log.error(self._exc_info_to_string(err, test))
1410 log.error("Error in test '{0}', going interactive".format(
1411 self.getDescription(test)
1412 ))
1413 interactive.task(ctx=None, config=None)
1414
1415
1416 # XXX: class we require would be inherited from this one and one of
1417 # InteractiveFailureResult and unittestunittest.TextTestResult.
1418 class LoggingResultTemplate(object):
1419 fail_on_skip = False
1420
1421 def startTest(self, test):
1422 log.info("Starting test: {0}".format(self.getDescription(test)))
1423 test.started_at = datetime.datetime.utcnow()
1424 return super(LoggingResultTemplate, self).startTest(test)
1425
1426 def stopTest(self, test):
1427 log.info("Stopped test: {0} in {1}s".format(
1428 self.getDescription(test),
1429 (datetime.datetime.utcnow() - test.started_at).total_seconds()
1430 ))
1431
1432 def addSkip(self, test, reason):
1433 if LoggingResultTemplate.fail_on_skip:
1434 # Don't just call addFailure because that requires a traceback
1435 self.failures.append((test, reason))
1436 else:
1437 super(LoggingResultTemplate, self).addSkip(test, reason)
1438
1439
1440 def launch_tests(overall_suite):
1441 if opt_rotate_logs or not opt_exit_on_test_failure:
1442 return launch_individually(overall_suite)
1443 else:
1444 return launch_entire_suite(overall_suite)
1445
1446
1447 def get_logging_result_class():
1448 result_class = InteractiveFailureResult if opt_interactive_on_error else \
1449 unittest.TextTestResult
1450 return type('', (LoggingResultTemplate, result_class), {})
1451
1452
1453 def launch_individually(overall_suite):
1454 no_of_tests_execed = 0
1455 no_of_tests_failed, no_of_tests_execed = 0, 0
1456 LoggingResult = get_logging_result_class()
1457 stream = LogStream()
1458 stream.omit_result_lines = True
1459 if opt_rotate_logs:
1460 logrotate = LogRotate()
1461
1462 started_at = datetime.datetime.utcnow()
1463 for suite_, case in enumerate_methods(overall_suite):
1464 # don't run logrotate beforehand since some ceph daemons might be
1465 # down and pre/post-rotate scripts in logrotate.conf might fail.
1466 if opt_rotate_logs:
1467 logrotate.run_logrotate()
1468
1469 result = unittest.TextTestRunner(stream=stream,
1470 resultclass=LoggingResult,
1471 verbosity=2, failfast=True).run(case)
1472
1473 if not result.wasSuccessful():
1474 if opt_exit_on_test_failure:
1475 break
1476 else:
1477 no_of_tests_failed += 1
1478
1479 no_of_tests_execed += 1
1480 time_elapsed = (datetime.datetime.utcnow() - started_at).total_seconds()
1481
1482 if result.wasSuccessful():
1483 log.info('')
1484 log.info('-'*70)
1485 log.info(f'Ran {no_of_tests_execed} tests in {time_elapsed}s')
1486 if no_of_tests_failed > 0:
1487 log.info(f'{no_of_tests_failed} tests failed')
1488 log.info('')
1489 log.info('OK')
1490
1491 return result
1492
1493
1494 def launch_entire_suite(overall_suite):
1495 LoggingResult = get_logging_result_class()
1496
1497 testrunner = unittest.TextTestRunner(stream=LogStream(),
1498 resultclass=LoggingResult,
1499 verbosity=2, failfast=True)
1500 return testrunner.run(overall_suite)
1501
1502
1503 def exec_test():
1504 # Parse arguments
1505 global opt_interactive_on_error
1506 opt_interactive_on_error = False
1507 opt_create_cluster = False
1508 opt_create_cluster_only = False
1509 opt_ignore_missing_binaries = False
1510 opt_teardown_cluster = False
1511 global opt_log_ps_output
1512 opt_log_ps_output = False
1513 use_kernel_client = False
1514 global opt_use_ns
1515 opt_use_ns = False
1516 opt_brxnet= None
1517 opt_verbose = True
1518 global opt_rotate_logs
1519 opt_rotate_logs = False
1520 global opt_exit_on_test_failure
1521 opt_exit_on_test_failure = True
1522
1523 args = sys.argv[1:]
1524 flags = [a for a in args if a.startswith("-")]
1525 modules = [a for a in args if not a.startswith("-")]
1526 for f in flags:
1527 if f == "--interactive":
1528 opt_interactive_on_error = True
1529 elif f == "--create":
1530 opt_create_cluster = True
1531 elif f == "--create-cluster-only":
1532 opt_create_cluster_only = True
1533 elif f == "--ignore-missing-binaries":
1534 opt_ignore_missing_binaries = True
1535 elif f == '--teardown':
1536 opt_teardown_cluster = True
1537 elif f == '--log-ps-output':
1538 opt_log_ps_output = True
1539 elif f == '--clear-old-log':
1540 clear_old_log()
1541 elif f == "--kclient":
1542 use_kernel_client = True
1543 elif f == '--usens':
1544 opt_use_ns = True
1545 elif '--brxnet' in f:
1546 if re.search(r'=[0-9./]+', f) is None:
1547 log.error("--brxnet=<ip/mask> option needs one argument: '{0}'".format(f))
1548 sys.exit(-1)
1549 opt_brxnet=f.split('=')[1]
1550 try:
1551 IP(opt_brxnet)
1552 if IP(opt_brxnet).iptype() == 'PUBLIC':
1553 raise RuntimeError('is public')
1554 except Exception as e:
1555 log.error("Invalid ip '{0}' {1}".format(opt_brxnet, e))
1556 sys.exit(-1)
1557 elif '--no-verbose' == f:
1558 opt_verbose = False
1559 elif f == '--rotate-logs':
1560 opt_rotate_logs = True
1561 elif f == '--run-all-tests':
1562 opt_exit_on_test_failure = False
1563 elif f == '--debug':
1564 log.setLevel(logging.DEBUG)
1565 else:
1566 log.error("Unknown option '{0}'".format(f))
1567 sys.exit(-1)
1568
1569 # Help developers by stopping up-front if their tree isn't built enough for all the
1570 # tools that the tests might want to use (add more here if needed)
1571 require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
1572 "cephfs-table-tool", "ceph-fuse", "rados", "cephfs-meta-injection"]
1573 missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
1574 if missing_binaries and not opt_ignore_missing_binaries:
1575 log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
1576 sys.exit(-1)
1577
1578 max_required_mds, max_required_clients, \
1579 max_required_mgr, require_memstore = scan_tests(modules)
1580
1581 global remote
1582 remote = LocalRemote()
1583
1584 CephFSMount.cleanup_stale_netnses_and_bridge(remote)
1585
1586 # Tolerate no MDSs or clients running at start
1587 ps_txt = remote.run(args=["ps", "-u"+str(os.getuid())],
1588 stdout=StringIO()).stdout.getvalue().strip()
1589 lines = ps_txt.split("\n")[1:]
1590 for line in lines:
1591 if 'ceph-fuse' in line or 'ceph-mds' in line:
1592 pid = int(line.split()[0])
1593 log.warning("Killing stray process {0}".format(line))
1594 os.kill(pid, signal.SIGKILL)
1595
1596 # Fire up the Ceph cluster if the user requested it
1597 if opt_create_cluster or opt_create_cluster_only:
1598 log.info("Creating cluster with {0} MDS daemons".format(
1599 max_required_mds))
1600 teardown_cluster()
1601 vstart_env = os.environ.copy()
1602 vstart_env["FS"] = "0"
1603 vstart_env["MDS"] = max_required_mds.__str__()
1604 vstart_env["OSD"] = "4"
1605 vstart_env["MGR"] = max(max_required_mgr, 1).__str__()
1606
1607 args = [
1608 os.path.join(SRC_PREFIX, "vstart.sh"),
1609 "-n",
1610 "--nolockdep",
1611 ]
1612 if require_memstore:
1613 args.append("--memstore")
1614
1615 if opt_verbose:
1616 args.append("-d")
1617
1618 # usually, i get vstart.sh running completely in less than 100
1619 # seconds.
1620 remote.run(args=args, env=vstart_env, timeout=(3 * 60))
1621
1622 # Wait for OSD to come up so that subsequent injectargs etc will
1623 # definitely succeed
1624 LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30)
1625
1626 if opt_create_cluster_only:
1627 return
1628
1629 if opt_use_ns and mon_in_localhost() and not opt_create_cluster:
1630 raise RuntimeError("cluster is on localhost; '--usens' option is incompatible. Or you can pass an extra '--create' option to create a new cluster without localhost!")
1631
1632 # List of client mounts, sufficient to run the selected tests
1633 clients = [i.__str__() for i in range(0, max_required_clients)]
1634
1635 test_dir = tempfile.mkdtemp()
1636 teuth_config['test_path'] = test_dir
1637
1638 ctx = LocalContext()
1639 ceph_cluster = LocalCephCluster(ctx)
1640 mds_cluster = LocalMDSCluster(ctx)
1641 mgr_cluster = LocalMgrCluster(ctx)
1642
1643 # Construct Mount classes
1644 mounts = []
1645 for client_id in clients:
1646 # Populate client keyring (it sucks to use client.admin for test clients
1647 # because it's awkward to find the logs later)
1648 client_name = "client.{0}".format(client_id)
1649
1650 if client_name not in open("./keyring").read():
1651 p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
1652 "osd", "allow rw",
1653 "mds", "allow",
1654 "mon", "allow r"], stdout=StringIO())
1655
1656 open("./keyring", "at").write(p.stdout.getvalue())
1657
1658 if use_kernel_client:
1659 mount = LocalKernelMount(ctx=ctx, test_dir=test_dir,
1660 client_id=client_id, brxnet=opt_brxnet)
1661 else:
1662 mount = LocalFuseMount(ctx=ctx, test_dir=test_dir,
1663 client_id=client_id, brxnet=opt_brxnet)
1664
1665 mounts.append(mount)
1666 if os.path.exists(mount.hostfs_mntpt):
1667 if mount.is_mounted():
1668 log.warning("unmounting {0}".format(mount.hostfs_mntpt))
1669 mount.umount_wait()
1670 else:
1671 os.rmdir(mount.hostfs_mntpt)
1672
1673 from tasks.cephfs_test_runner import DecoratingLoader
1674
1675 decorating_loader = DecoratingLoader({
1676 "ctx": ctx,
1677 "mounts": mounts,
1678 "ceph_cluster": ceph_cluster,
1679 "mds_cluster": mds_cluster,
1680 "mgr_cluster": mgr_cluster,
1681 })
1682
1683 # For the benefit of polling tests like test_full -- in teuthology land we set this
1684 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
1685 remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval", "5"])
1686 ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval", "5")
1687
1688 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
1689 # from normal IO latency. Increase it for running teests.
1690 ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10")
1691
1692 # Make sure the filesystem created in tests has uid/gid that will let us talk to
1693 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
1694 # so that cephfs-data-scan will pick it up too.
1695 ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
1696 ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
1697
1698 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
1699 def _get_package_version(remote, pkg_name):
1700 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
1701 return "2.9"
1702
1703 import teuthology.packaging
1704 teuthology.packaging.get_package_version = _get_package_version
1705
1706 overall_suite = load_tests(modules, decorating_loader)
1707
1708 # Filter out tests that don't lend themselves to interactive running,
1709 victims = []
1710 for case, method in enumerate_methods(overall_suite):
1711 fn = getattr(method, method._testMethodName)
1712
1713 drop_test = False
1714
1715 if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
1716 drop_test = True
1717 log.warning("Dropping test because long running: {method_id}".format(method_id=method.id()))
1718
1719 if getattr(fn, "needs_trimming", False) is True:
1720 drop_test = (os.getuid() != 0)
1721 log.warning("Dropping test because client trim unavailable: {method_id}".format(method_id=method.id()))
1722
1723 if drop_test:
1724 # Don't drop the test if it was explicitly requested in arguments
1725 is_named = False
1726 for named in modules:
1727 if named.endswith(method.id()):
1728 is_named = True
1729 break
1730
1731 if not is_named:
1732 victims.append((case, method))
1733
1734 log.debug("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
1735 for s, method in victims:
1736 s._tests.remove(method)
1737
1738 overall_suite = load_tests(modules, loader.TestLoader())
1739 result = launch_tests(overall_suite)
1740
1741 CephFSMount.cleanup_stale_netnses_and_bridge(remote)
1742 if opt_teardown_cluster:
1743 teardown_cluster()
1744
1745 if not result.wasSuccessful():
1746 # no point in duplicating if we can have multiple failures in same
1747 # run.
1748 if opt_exit_on_test_failure:
1749 result.printErrors() # duplicate output at end for convenience
1750
1751 bad_tests = []
1752 for test, error in result.errors:
1753 bad_tests.append(str(test))
1754 for test, failure in result.failures:
1755 bad_tests.append(str(test))
1756
1757 sys.exit(-1)
1758 else:
1759 sys.exit(0)
1760
1761
1762 if __name__ == "__main__":
1763 exec_test()