]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/vstart_runner.py
c3988214abf5c11303ef7ebc2231a686bb9f2e1a
[ceph.git] / ceph / qa / tasks / vstart_runner.py
1 """
2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
5
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
7
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
11 cd ~/git/ceph/build
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
14
15 Alternative usage:
16
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py
19
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
22
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
25
26 """
27
28 from StringIO import StringIO
29 from collections import defaultdict
30 import getpass
31 import signal
32 import tempfile
33 import threading
34 import datetime
35 import shutil
36 import re
37 import os
38 import time
39 import json
40 import sys
41 import errno
42 from unittest import suite, loader
43 import unittest
44 import platform
45 from teuthology.orchestra.run import Raw, quote
46 from teuthology.orchestra.daemon import DaemonGroup
47 from teuthology.config import config as teuth_config
48
49 import logging
50
51 log = logging.getLogger(__name__)
52
53 handler = logging.FileHandler("./vstart_runner.log")
54 formatter = logging.Formatter(
55 fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
56 datefmt='%Y-%m-%dT%H:%M:%S')
57 handler.setFormatter(formatter)
58 log.addHandler(handler)
59 log.setLevel(logging.INFO)
60
61
62 def respawn_in_path(lib_path, python_paths):
63 execv_cmd = ['python']
64 if platform.system() == "Darwin":
65 lib_path_var = "DYLD_LIBRARY_PATH"
66 else:
67 lib_path_var = "LD_LIBRARY_PATH"
68
69 py_binary = os.environ.get("PYTHON", "python")
70
71 if lib_path_var in os.environ:
72 if lib_path not in os.environ[lib_path_var]:
73 os.environ[lib_path_var] += ':' + lib_path
74 os.execvp(py_binary, execv_cmd + sys.argv)
75 else:
76 os.environ[lib_path_var] = lib_path
77 os.execvp(py_binary, execv_cmd + sys.argv)
78
79 for p in python_paths:
80 sys.path.insert(0, p)
81
82
83 # Let's use some sensible defaults
84 if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
85
86 # A list of candidate paths for each package we need
87 guesses = [
88 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
89 ["lib/cython_modules/lib.2"],
90 ["../src/pybind"],
91 ]
92
93 python_paths = []
94
95 # Up one level so that "tasks.foo.bar" imports work
96 python_paths.append(os.path.abspath(
97 os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
98 ))
99
100 for package_guesses in guesses:
101 for g in package_guesses:
102 g_exp = os.path.abspath(os.path.expanduser(g))
103 if os.path.exists(g_exp):
104 python_paths.append(g_exp)
105
106 ld_path = os.path.join(os.getcwd(), "lib/")
107 print "Using guessed paths {0} {1}".format(ld_path, python_paths)
108 respawn_in_path(ld_path, python_paths)
109
110
111 try:
112 from teuthology.exceptions import CommandFailedError
113 from tasks.ceph_manager import CephManager
114 from tasks.cephfs.fuse_mount import FuseMount
115 from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
116 from mgr.mgr_test_case import MgrCluster
117 from teuthology.contextutil import MaxWhileTries
118 from teuthology.task import interactive
119 except ImportError:
120 sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
121 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
122 raise
123
124 # Must import after teuthology because of gevent monkey patching
125 import subprocess
126
127 if os.path.exists("./CMakeCache.txt"):
128 # Running in build dir of a cmake build
129 BIN_PREFIX = "./bin/"
130 SRC_PREFIX = "../src"
131 else:
132 # Running in src/ of an autotools build
133 BIN_PREFIX = "./"
134 SRC_PREFIX = "./"
135
136
137 class LocalRemoteProcess(object):
138 def __init__(self, args, subproc, check_status, stdout, stderr):
139 self.args = args
140 self.subproc = subproc
141 if stdout is None:
142 self.stdout = StringIO()
143 else:
144 self.stdout = stdout
145
146 if stderr is None:
147 self.stderr = StringIO()
148 else:
149 self.stderr = stderr
150
151 self.check_status = check_status
152 self.exitstatus = self.returncode = None
153
154 def wait(self):
155 if self.finished:
156 # Avoid calling communicate() on a dead process because it'll
157 # give you stick about std* already being closed
158 if self.exitstatus != 0:
159 raise CommandFailedError(self.args, self.exitstatus)
160 else:
161 return
162
163 out, err = self.subproc.communicate()
164 self.stdout.write(out)
165 self.stderr.write(err)
166
167 self.exitstatus = self.returncode = self.subproc.returncode
168
169 if self.exitstatus != 0:
170 sys.stderr.write(out)
171 sys.stderr.write(err)
172
173 if self.check_status and self.exitstatus != 0:
174 raise CommandFailedError(self.args, self.exitstatus)
175
176 @property
177 def finished(self):
178 if self.exitstatus is not None:
179 return True
180
181 if self.subproc.poll() is not None:
182 out, err = self.subproc.communicate()
183 self.stdout.write(out)
184 self.stderr.write(err)
185 self.exitstatus = self.returncode = self.subproc.returncode
186 return True
187 else:
188 return False
189
190 def kill(self):
191 log.info("kill ")
192 if self.subproc.pid and not self.finished:
193 log.info("kill: killing pid {0} ({1})".format(
194 self.subproc.pid, self.args))
195 safe_kill(self.subproc.pid)
196 else:
197 log.info("kill: already terminated ({0})".format(self.args))
198
199 @property
200 def stdin(self):
201 class FakeStdIn(object):
202 def __init__(self, mount_daemon):
203 self.mount_daemon = mount_daemon
204
205 def close(self):
206 self.mount_daemon.kill()
207
208 return FakeStdIn(self)
209
210
211 class LocalRemote(object):
212 """
213 Amusingly named class to present the teuthology RemoteProcess interface when we are really
214 running things locally for vstart
215
216 Run this inside your src/ dir!
217 """
218
219 def __init__(self):
220 self.name = "local"
221 self.hostname = "localhost"
222 self.user = getpass.getuser()
223
224 def get_file(self, path, sudo, dest_dir):
225 tmpfile = tempfile.NamedTemporaryFile(delete=False).name
226 shutil.copy(path, tmpfile)
227 return tmpfile
228
229 def put_file(self, src, dst, sudo=False):
230 shutil.copy(src, dst)
231
232 def run(self, args, check_status=True, wait=True,
233 stdout=None, stderr=None, cwd=None, stdin=None,
234 logger=None, label=None, env=None):
235 log.info("run args={0}".format(args))
236
237 # We don't need no stinkin' sudo
238 args = [a for a in args if a != "sudo"]
239
240 # We have to use shell=True if any run.Raw was present, e.g. &&
241 shell = any([a for a in args if isinstance(a, Raw)])
242
243 if shell:
244 filtered = []
245 i = 0
246 while i < len(args):
247 if args[i] == 'adjust-ulimits':
248 i += 1
249 elif args[i] == 'ceph-coverage':
250 i += 2
251 elif args[i] == 'timeout':
252 i += 2
253 else:
254 filtered.append(args[i])
255 i += 1
256
257 args = quote(filtered)
258 log.info("Running {0}".format(args))
259
260 subproc = subprocess.Popen(args,
261 stdout=subprocess.PIPE,
262 stderr=subprocess.PIPE,
263 stdin=subprocess.PIPE,
264 cwd=cwd,
265 shell=True)
266 else:
267 log.info("Running {0}".format(args))
268
269 for arg in args:
270 if not isinstance(arg, basestring):
271 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
272 arg, arg.__class__
273 ))
274
275 subproc = subprocess.Popen(args,
276 stdout=subprocess.PIPE,
277 stderr=subprocess.PIPE,
278 stdin=subprocess.PIPE,
279 cwd=cwd,
280 env=env)
281
282 if stdin:
283 if not isinstance(stdin, basestring):
284 raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
285
286 # Hack: writing to stdin is not deadlock-safe, but it "always" works
287 # as long as the input buffer is "small"
288 subproc.stdin.write(stdin)
289
290 proc = LocalRemoteProcess(
291 args, subproc, check_status,
292 stdout, stderr
293 )
294
295 if wait:
296 proc.wait()
297
298 return proc
299
300
301 class LocalDaemon(object):
302 def __init__(self, daemon_type, daemon_id):
303 self.daemon_type = daemon_type
304 self.daemon_id = daemon_id
305 self.controller = LocalRemote()
306 self.proc = None
307
308 @property
309 def remote(self):
310 return LocalRemote()
311
312 def running(self):
313 return self._get_pid() is not None
314
315 def _get_pid(self):
316 """
317 Return PID as an integer or None if not found
318 """
319 ps_txt = self.controller.run(
320 args=["ps", "ww", "-u"+str(os.getuid())]
321 ).stdout.getvalue().strip()
322 lines = ps_txt.split("\n")[1:]
323
324 for line in lines:
325 if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
326 log.info("Found ps line for daemon: {0}".format(line))
327 return int(line.split()[0])
328 log.info("No match for {0} {1}: {2}".format(
329 self.daemon_type, self.daemon_id, ps_txt
330 ))
331 return None
332
333 def wait(self, timeout):
334 waited = 0
335 while self._get_pid() is not None:
336 if waited > timeout:
337 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
338 time.sleep(1)
339 waited += 1
340
341 def stop(self, timeout=300):
342 if not self.running():
343 log.error('tried to stop a non-running daemon')
344 return
345
346 pid = self._get_pid()
347 log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
348 os.kill(pid, signal.SIGKILL)
349
350 waited = 0
351 while pid is not None:
352 new_pid = self._get_pid()
353 if new_pid is not None and new_pid != pid:
354 log.info("Killing new PID {0}".format(new_pid))
355 pid = new_pid
356 os.kill(pid, signal.SIGKILL)
357
358 if new_pid is None:
359 break
360 else:
361 if waited > timeout:
362 raise MaxWhileTries(
363 "Timed out waiting for daemon {0}.{1}".format(
364 self.daemon_type, self.daemon_id))
365 time.sleep(1)
366 waited += 1
367
368 self.wait(timeout=timeout)
369
370 def restart(self):
371 if self._get_pid() is not None:
372 self.stop()
373
374 self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id])
375
376
377 def safe_kill(pid):
378 """
379 os.kill annoyingly raises exception if process already dead. Ignore it.
380 """
381 try:
382 return os.kill(pid, signal.SIGKILL)
383 except OSError as e:
384 if e.errno == errno.ESRCH:
385 # Raced with process termination
386 pass
387 else:
388 raise
389
390
391 class LocalFuseMount(FuseMount):
392 def __init__(self, test_dir, client_id):
393 super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote())
394
395 @property
396 def config_path(self):
397 return "./ceph.conf"
398
399 def get_keyring_path(self):
400 # This is going to end up in a config file, so use an absolute path
401 # to avoid assumptions about daemons' pwd
402 return os.path.abspath("./client.{0}.keyring".format(self.client_id))
403
404 def run_shell(self, args, wait=True):
405 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
406 # the "cd foo && bar" shenanigans isn't needed to begin with and
407 # then we wouldn't have to special case this
408 return self.client_remote.run(
409 args, wait=wait, cwd=self.mountpoint
410 )
411
412 @property
413 def _prefix(self):
414 return BIN_PREFIX
415
416 def _asok_path(self):
417 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
418 # run foreground. When running it daemonized however, the asok is named after
419 # the PID of the launching process, not the long running ceph-fuse process. Therefore
420 # we need to give an exact path here as the logic for checking /proc/ for which
421 # asok is alive does not work.
422 path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid)
423 log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
424 return path
425
426 def umount(self):
427 if self.is_mounted():
428 super(LocalFuseMount, self).umount()
429
430 def mount(self, mount_path=None, mount_fs_name=None):
431 self.client_remote.run(
432 args=[
433 'mkdir',
434 '--',
435 self.mountpoint,
436 ],
437 )
438
439 def list_connections():
440 self.client_remote.run(
441 args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
442 check_status=False
443 )
444 p = self.client_remote.run(
445 args=["ls", "/sys/fs/fuse/connections"],
446 check_status=False
447 )
448 if p.exitstatus != 0:
449 log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus))
450 return []
451
452 ls_str = p.stdout.getvalue().strip()
453 if ls_str:
454 return [int(n) for n in ls_str.split("\n")]
455 else:
456 return []
457
458 # Before starting ceph-fuse process, note the contents of
459 # /sys/fs/fuse/connections
460 pre_mount_conns = list_connections()
461 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
462
463 prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
464 if os.getuid() != 0:
465 prefix += ["--client_die_on_failed_dentry_invalidate=false"]
466
467 if mount_path is not None:
468 prefix += ["--client_mountpoint={0}".format(mount_path)]
469
470 if mount_fs_name is not None:
471 prefix += ["--client_mds_namespace={0}".format(mount_fs_name)]
472
473 self.fuse_daemon = self.client_remote.run(args=
474 prefix + [
475 "-f",
476 "--name",
477 "client.{0}".format(self.client_id),
478 self.mountpoint
479 ], wait=False)
480
481 log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
482
483 # Wait for the connection reference to appear in /sys
484 waited = 0
485 post_mount_conns = list_connections()
486 while len(post_mount_conns) <= len(pre_mount_conns):
487 if self.fuse_daemon.finished:
488 # Did mount fail? Raise the CommandFailedError instead of
489 # hitting the "failed to populate /sys/" timeout
490 self.fuse_daemon.wait()
491 time.sleep(1)
492 waited += 1
493 if waited > 30:
494 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
495 waited
496 ))
497 post_mount_conns = list_connections()
498
499 log.info("Post-mount connections: {0}".format(post_mount_conns))
500
501 # Record our fuse connection number so that we can use it when
502 # forcing an unmount
503 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
504 if len(new_conns) == 0:
505 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
506 elif len(new_conns) > 1:
507 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
508 else:
509 self._fuse_conn = new_conns[0]
510
511 def _run_python(self, pyscript):
512 """
513 Override this to remove the daemon-helper prefix that is used otherwise
514 to make the process killable.
515 """
516 return self.client_remote.run(args=[
517 'python', '-c', pyscript
518 ], wait=False)
519
520
521 class LocalCephManager(CephManager):
522 def __init__(self):
523 # Deliberately skip parent init, only inheriting from it to get
524 # util methods like osd_dump that sit on top of raw_cluster_cmd
525 self.controller = LocalRemote()
526
527 # A minority of CephManager fns actually bother locking for when
528 # certain teuthology tests want to run tasks in parallel
529 self.lock = threading.RLock()
530
531 self.log = lambda x: log.info(x)
532
533 def find_remote(self, daemon_type, daemon_id):
534 """
535 daemon_type like 'mds', 'osd'
536 daemon_id like 'a', '0'
537 """
538 return LocalRemote()
539
540 def run_ceph_w(self):
541 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO())
542 return proc
543
544 def raw_cluster_cmd(self, *args):
545 """
546 args like ["osd", "dump"}
547 return stdout string
548 """
549 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args))
550 return proc.stdout.getvalue()
551
552 def raw_cluster_cmd_result(self, *args):
553 """
554 like raw_cluster_cmd but don't check status, just return rc
555 """
556 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False)
557 return proc.exitstatus
558
559 def admin_socket(self, daemon_type, daemon_id, command, check_status=True):
560 return self.controller.run(
561 args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status
562 )
563
564 # FIXME: copypasta
565 def get_mds_status(self, mds):
566 """
567 Run cluster commands for the mds in order to get mds information
568 """
569 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
570 j = json.loads(' '.join(out.splitlines()[1:]))
571 # collate; for dup ids, larger gid wins.
572 for info in j['info'].itervalues():
573 if info['name'] == mds:
574 return info
575 return None
576
577 # FIXME: copypasta
578 def get_mds_status_by_rank(self, rank):
579 """
580 Run cluster commands for the mds in order to get mds information
581 check rank.
582 """
583 j = self.get_mds_status_all()
584 # collate; for dup ids, larger gid wins.
585 for info in j['info'].itervalues():
586 if info['rank'] == rank:
587 return info
588 return None
589
590 def get_mds_status_all(self):
591 """
592 Run cluster command to extract all the mds status.
593 """
594 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
595 j = json.loads(' '.join(out.splitlines()[1:]))
596 return j
597
598
599 class LocalCephCluster(CephCluster):
600 def __init__(self, ctx):
601 # Deliberately skip calling parent constructor
602 self._ctx = ctx
603 self.mon_manager = LocalCephManager()
604 self._conf = defaultdict(dict)
605
606 @property
607 def admin_remote(self):
608 return LocalRemote()
609
610 def get_config(self, key, service_type=None):
611 if service_type is None:
612 service_type = 'mon'
613
614 # FIXME hardcoded vstart service IDs
615 service_id = {
616 'mon': 'a',
617 'mds': 'a',
618 'osd': '0'
619 }[service_type]
620
621 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
622
623 def _write_conf(self):
624 # In teuthology, we have the honour of writing the entire ceph.conf, but
625 # in vstart land it has mostly already been written and we need to carefully
626 # append to it.
627 conf_path = "./ceph.conf"
628 banner = "\n#LOCAL_TEST\n"
629 existing_str = open(conf_path).read()
630
631 if banner in existing_str:
632 existing_str = existing_str[0:existing_str.find(banner)]
633
634 existing_str += banner
635
636 for subsys, kvs in self._conf.items():
637 existing_str += "\n[{0}]\n".format(subsys)
638 for key, val in kvs.items():
639 # Comment out existing instance if it exists
640 log.info("Searching for existing instance {0}/{1}".format(
641 key, subsys
642 ))
643 existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
644 subsys
645 ), existing_str, re.MULTILINE)
646
647 if existing_section:
648 section_str = existing_str[existing_section.start():existing_section.end()]
649 existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
650 if existing_val:
651 start = existing_section.start() + existing_val.start(1)
652 log.info("Found string to replace at {0}".format(
653 start
654 ))
655 existing_str = existing_str[0:start] + "#" + existing_str[start:]
656
657 existing_str += "{0} = {1}\n".format(key, val)
658
659 open(conf_path, "w").write(existing_str)
660
661 def set_ceph_conf(self, subsys, key, value):
662 self._conf[subsys][key] = value
663 self._write_conf()
664
665 def clear_ceph_conf(self, subsys, key):
666 del self._conf[subsys][key]
667 self._write_conf()
668
669
670 class LocalMDSCluster(LocalCephCluster, MDSCluster):
671 def __init__(self, ctx):
672 super(LocalMDSCluster, self).__init__(ctx)
673
674 self.mds_ids = ctx.daemons.daemons['mds'].keys()
675 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
676
677 def clear_firewall(self):
678 # FIXME: unimplemented
679 pass
680
681 def newfs(self, name='cephfs', create=True):
682 return LocalFilesystem(self._ctx, name=name, create=create)
683
684
685 class LocalMgrCluster(LocalCephCluster, MgrCluster):
686 def __init__(self, ctx):
687 super(LocalMgrCluster, self).__init__(ctx)
688
689 self.mgr_ids = ctx.daemons.daemons['mgr'].keys()
690 self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
691
692
693 class LocalFilesystem(Filesystem, LocalMDSCluster):
694 def __init__(self, ctx, fscid=None, name='cephfs', create=False):
695 # Deliberately skip calling parent constructor
696 self._ctx = ctx
697
698 self.id = None
699 self.name = None
700 self.ec_profile = None
701 self.metadata_pool_name = None
702 self.metadata_overlay = False
703 self.data_pool_name = None
704 self.data_pools = None
705
706 # Hack: cheeky inspection of ceph.conf to see what MDSs exist
707 self.mds_ids = set()
708 for line in open("ceph.conf").readlines():
709 match = re.match("^\[mds\.(.+)\]$", line)
710 if match:
711 self.mds_ids.add(match.group(1))
712
713 if not self.mds_ids:
714 raise RuntimeError("No MDSs found in ceph.conf!")
715
716 self.mds_ids = list(self.mds_ids)
717
718 log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
719
720 self.mon_manager = LocalCephManager()
721
722 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
723
724 self.client_remote = LocalRemote()
725
726 self._conf = defaultdict(dict)
727
728 if name is not None:
729 if fscid is not None:
730 raise RuntimeError("cannot specify fscid when creating fs")
731 if create and not self.legacy_configured():
732 self.create()
733 else:
734 if fscid is not None:
735 self.id = fscid
736 self.getinfo(refresh=True)
737
738 # Stash a reference to the first created filesystem on ctx, so
739 # that if someone drops to the interactive shell they can easily
740 # poke our methods.
741 if not hasattr(self._ctx, "filesystem"):
742 self._ctx.filesystem = self
743
744 @property
745 def _prefix(self):
746 return BIN_PREFIX
747
748 def set_clients_block(self, blocked, mds_id=None):
749 raise NotImplementedError()
750
751 def get_pgs_per_fs_pool(self):
752 # FIXME: assuming there are 3 OSDs
753 return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
754
755
756 class InteractiveFailureResult(unittest.TextTestResult):
757 """
758 Specialization that implements interactive-on-error style
759 behavior.
760 """
761 def addFailure(self, test, err):
762 super(InteractiveFailureResult, self).addFailure(test, err)
763 log.error(self._exc_info_to_string(err, test))
764 log.error("Failure in test '{0}', going interactive".format(
765 self.getDescription(test)
766 ))
767 interactive.task(ctx=None, config=None)
768
769 def addError(self, test, err):
770 super(InteractiveFailureResult, self).addError(test, err)
771 log.error(self._exc_info_to_string(err, test))
772 log.error("Error in test '{0}', going interactive".format(
773 self.getDescription(test)
774 ))
775 interactive.task(ctx=None, config=None)
776
777
778 def enumerate_methods(s):
779 log.info("e: {0}".format(s))
780 for t in s._tests:
781 if isinstance(t, suite.BaseTestSuite):
782 for sub in enumerate_methods(t):
783 yield sub
784 else:
785 yield s, t
786
787
788 def load_tests(modules, loader):
789 if modules:
790 log.info("Executing modules: {0}".format(modules))
791 module_suites = []
792 for mod_name in modules:
793 # Test names like cephfs.test_auto_repair
794 module_suites.append(loader.loadTestsFromName(mod_name))
795 log.info("Loaded: {0}".format(list(module_suites)))
796 return suite.TestSuite(module_suites)
797 else:
798 log.info("Executing all cephfs tests")
799 return loader.discover(
800 os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs")
801 )
802
803
804 def scan_tests(modules):
805 overall_suite = load_tests(modules, loader.TestLoader())
806
807 max_required_mds = 0
808 max_required_clients = 0
809 max_required_mgr = 0
810
811 for suite, case in enumerate_methods(overall_suite):
812 max_required_mds = max(max_required_mds,
813 getattr(case, "MDSS_REQUIRED", 0))
814 max_required_clients = max(max_required_clients,
815 getattr(case, "CLIENTS_REQUIRED", 0))
816 max_required_mgr = max(max_required_mgr,
817 getattr(case, "MGRS_REQUIRED", 0))
818
819 return max_required_mds, max_required_clients, max_required_mgr
820
821
822 class LocalCluster(object):
823 def __init__(self, rolename="placeholder"):
824 self.remotes = {
825 LocalRemote(): [rolename]
826 }
827
828 def only(self, requested):
829 return self.__class__(rolename=requested)
830
831
832 class LocalContext(object):
833 def __init__(self):
834 self.config = {}
835 self.teuthology_config = teuth_config
836 self.cluster = LocalCluster()
837 self.daemons = DaemonGroup()
838
839 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
840 # tests that want to look these up via ctx can do so.
841 # Inspect ceph.conf to see what roles exist
842 for conf_line in open("ceph.conf").readlines():
843 for svc_type in ["mon", "osd", "mds", "mgr"]:
844 if svc_type not in self.daemons.daemons:
845 self.daemons.daemons[svc_type] = {}
846 match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
847 if match:
848 svc_id = match.group(1)
849 self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id)
850
851 def __del__(self):
852 shutil.rmtree(self.teuthology_config['test_path'])
853
854
855 def exec_test():
856 # Parse arguments
857 interactive_on_error = False
858 create_cluster = False
859
860 args = sys.argv[1:]
861 flags = [a for a in args if a.startswith("-")]
862 modules = [a for a in args if not a.startswith("-")]
863 for f in flags:
864 if f == "--interactive":
865 interactive_on_error = True
866 elif f == "--create":
867 create_cluster = True
868 else:
869 log.error("Unknown option '{0}'".format(f))
870 sys.exit(-1)
871
872 # Help developers by stopping up-front if their tree isn't built enough for all the
873 # tools that the tests might want to use (add more here if needed)
874 require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
875 "cephfs-table-tool", "ceph-fuse", "rados"]
876 missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
877 if missing_binaries:
878 log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
879 sys.exit(-1)
880
881 max_required_mds, max_required_clients, max_required_mgr = scan_tests(modules)
882
883 remote = LocalRemote()
884
885 # Tolerate no MDSs or clients running at start
886 ps_txt = remote.run(
887 args=["ps", "-u"+str(os.getuid())]
888 ).stdout.getvalue().strip()
889 lines = ps_txt.split("\n")[1:]
890 for line in lines:
891 if 'ceph-fuse' in line or 'ceph-mds' in line:
892 pid = int(line.split()[0])
893 log.warn("Killing stray process {0}".format(line))
894 os.kill(pid, signal.SIGKILL)
895
896 # Fire up the Ceph cluster if the user requested it
897 if create_cluster:
898 log.info("Creating cluster with {0} MDS daemons".format(
899 max_required_mds))
900 remote.run([os.path.join(SRC_PREFIX, "stop.sh")], check_status=False)
901 remote.run(["rm", "-rf", "./out"])
902 remote.run(["rm", "-rf", "./dev"])
903 vstart_env = os.environ.copy()
904 vstart_env["FS"] = "0"
905 vstart_env["MDS"] = max_required_mds.__str__()
906 vstart_env["OSD"] = "1"
907 vstart_env["MGR"] = max(max_required_mgr, 1).__str__()
908
909 remote.run([os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d", "--nolockdep"],
910 env=vstart_env)
911
912 # Wait for OSD to come up so that subsequent injectargs etc will
913 # definitely succeed
914 LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30)
915
916 # List of client mounts, sufficient to run the selected tests
917 clients = [i.__str__() for i in range(0, max_required_clients)]
918
919 test_dir = tempfile.mkdtemp()
920 teuth_config['test_path'] = test_dir
921
922 # Construct Mount classes
923 mounts = []
924 for client_id in clients:
925 # Populate client keyring (it sucks to use client.admin for test clients
926 # because it's awkward to find the logs later)
927 client_name = "client.{0}".format(client_id)
928
929 if client_name not in open("./keyring").read():
930 p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
931 "osd", "allow rw",
932 "mds", "allow",
933 "mon", "allow r"])
934
935 open("./keyring", "a").write(p.stdout.getvalue())
936
937 mount = LocalFuseMount(test_dir, client_id)
938 mounts.append(mount)
939 if mount.is_mounted():
940 log.warn("unmounting {0}".format(mount.mountpoint))
941 mount.umount_wait()
942 else:
943 if os.path.exists(mount.mountpoint):
944 os.rmdir(mount.mountpoint)
945
946 ctx = LocalContext()
947 ceph_cluster = LocalCephCluster(ctx)
948 mds_cluster = LocalMDSCluster(ctx)
949 mgr_cluster = LocalMgrCluster(ctx)
950
951 from tasks.cephfs_test_runner import DecoratingLoader
952
953 class LogStream(object):
954 def __init__(self):
955 self.buffer = ""
956
957 def write(self, data):
958 self.buffer += data
959 if "\n" in self.buffer:
960 lines = self.buffer.split("\n")
961 for line in lines[:-1]:
962 pass
963 # sys.stderr.write(line + "\n")
964 log.info(line)
965 self.buffer = lines[-1]
966
967 def flush(self):
968 pass
969
970 decorating_loader = DecoratingLoader({
971 "ctx": ctx,
972 "mounts": mounts,
973 "ceph_cluster": ceph_cluster,
974 "mds_cluster": mds_cluster,
975 "mgr_cluster": mgr_cluster,
976 })
977
978 # For the benefit of polling tests like test_full -- in teuthology land we set this
979 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
980 remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"])
981 ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval_max", "5")
982
983 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
984 # from normal IO latency. Increase it for running teests.
985 ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10")
986
987 # Make sure the filesystem created in tests has uid/gid that will let us talk to
988 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
989 # so that cephfs-data-scan will pick it up too.
990 ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
991 ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
992
993 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
994 def _get_package_version(remote, pkg_name):
995 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
996 return "2.9"
997
998 import teuthology.packaging
999 teuthology.packaging.get_package_version = _get_package_version
1000
1001 overall_suite = load_tests(modules, decorating_loader)
1002
1003 # Filter out tests that don't lend themselves to interactive running,
1004 victims = []
1005 for case, method in enumerate_methods(overall_suite):
1006 fn = getattr(method, method._testMethodName)
1007
1008 drop_test = False
1009
1010 if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
1011 drop_test = True
1012 log.warn("Dropping test because long running: ".format(method.id()))
1013
1014 if getattr(fn, "needs_trimming", False) is True:
1015 drop_test = (os.getuid() != 0)
1016 log.warn("Dropping test because client trim unavailable: ".format(method.id()))
1017
1018 if drop_test:
1019 # Don't drop the test if it was explicitly requested in arguments
1020 is_named = False
1021 for named in modules:
1022 if named.endswith(method.id()):
1023 is_named = True
1024 break
1025
1026 if not is_named:
1027 victims.append((case, method))
1028
1029 log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
1030 for s, method in victims:
1031 s._tests.remove(method)
1032
1033 if interactive_on_error:
1034 result_class = InteractiveFailureResult
1035 else:
1036 result_class = unittest.TextTestResult
1037 fail_on_skip = False
1038
1039 class LoggingResult(result_class):
1040 def startTest(self, test):
1041 log.info("Starting test: {0}".format(self.getDescription(test)))
1042 test.started_at = datetime.datetime.utcnow()
1043 return super(LoggingResult, self).startTest(test)
1044
1045 def stopTest(self, test):
1046 log.info("Stopped test: {0} in {1}s".format(
1047 self.getDescription(test),
1048 (datetime.datetime.utcnow() - test.started_at).total_seconds()
1049 ))
1050
1051 def addSkip(self, test, reason):
1052 if fail_on_skip:
1053 # Don't just call addFailure because that requires a traceback
1054 self.failures.append((test, reason))
1055 else:
1056 super(LoggingResult, self).addSkip(test, reason)
1057
1058 # Execute!
1059 result = unittest.TextTestRunner(
1060 stream=LogStream(),
1061 resultclass=LoggingResult,
1062 verbosity=2,
1063 failfast=True).run(overall_suite)
1064
1065 if not result.wasSuccessful():
1066 result.printErrors() # duplicate output at end for convenience
1067
1068 bad_tests = []
1069 for test, error in result.errors:
1070 bad_tests.append(str(test))
1071 for test, failure in result.failures:
1072 bad_tests.append(str(test))
1073
1074 sys.exit(-1)
1075 else:
1076 sys.exit(0)
1077
1078
1079 if __name__ == "__main__":
1080 exec_test()