]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/vstart_runner.py
update sources to v12.2.1
[ceph.git] / ceph / qa / tasks / vstart_runner.py
1 """
2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
5
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
7
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
11 cd ~/git/ceph/build
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
14
15 Alternative usage:
16
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py
19
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
22
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
25
26 """
27
28 from StringIO import StringIO
29 from collections import defaultdict
30 import getpass
31 import signal
32 import tempfile
33 import threading
34 import datetime
35 import shutil
36 import re
37 import os
38 import time
39 import json
40 import sys
41 import errno
42 from unittest import suite, loader
43 import unittest
44 import platform
45 from teuthology.orchestra.run import Raw, quote
46 from teuthology.orchestra.daemon import DaemonGroup
47 from teuthology.config import config as teuth_config
48
49 import logging
50
51 log = logging.getLogger(__name__)
52
53 handler = logging.FileHandler("./vstart_runner.log")
54 formatter = logging.Formatter(
55 fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
56 datefmt='%Y-%m-%dT%H:%M:%S')
57 handler.setFormatter(formatter)
58 log.addHandler(handler)
59 log.setLevel(logging.INFO)
60
61
62 def respawn_in_path(lib_path, python_paths):
63 execv_cmd = ['python']
64 if platform.system() == "Darwin":
65 lib_path_var = "DYLD_LIBRARY_PATH"
66 else:
67 lib_path_var = "LD_LIBRARY_PATH"
68
69 py_binary = os.environ.get("PYTHON", "python")
70
71 if lib_path_var in os.environ:
72 if lib_path not in os.environ[lib_path_var]:
73 os.environ[lib_path_var] += ':' + lib_path
74 os.execvp(py_binary, execv_cmd + sys.argv)
75 else:
76 os.environ[lib_path_var] = lib_path
77 os.execvp(py_binary, execv_cmd + sys.argv)
78
79 for p in python_paths:
80 sys.path.insert(0, p)
81
82
83 # Let's use some sensible defaults
84 if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
85
86 # A list of candidate paths for each package we need
87 guesses = [
88 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
89 ["lib/cython_modules/lib.2"],
90 ["../src/pybind"],
91 ]
92
93 python_paths = []
94
95 # Up one level so that "tasks.foo.bar" imports work
96 python_paths.append(os.path.abspath(
97 os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
98 ))
99
100 for package_guesses in guesses:
101 for g in package_guesses:
102 g_exp = os.path.abspath(os.path.expanduser(g))
103 if os.path.exists(g_exp):
104 python_paths.append(g_exp)
105
106 ld_path = os.path.join(os.getcwd(), "lib/")
107 print "Using guessed paths {0} {1}".format(ld_path, python_paths)
108 respawn_in_path(ld_path, python_paths)
109
110
111 try:
112 from teuthology.exceptions import CommandFailedError
113 from tasks.ceph_manager import CephManager
114 from tasks.cephfs.fuse_mount import FuseMount
115 from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
116 from mgr.mgr_test_case import MgrCluster
117 from teuthology.contextutil import MaxWhileTries
118 from teuthology.task import interactive
119 except ImportError:
120 sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
121 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
122 raise
123
124 # Must import after teuthology because of gevent monkey patching
125 import subprocess
126
127 if os.path.exists("./CMakeCache.txt"):
128 # Running in build dir of a cmake build
129 BIN_PREFIX = "./bin/"
130 SRC_PREFIX = "../src"
131 else:
132 # Running in src/ of an autotools build
133 BIN_PREFIX = "./"
134 SRC_PREFIX = "./"
135
136
137 class LocalRemoteProcess(object):
138 def __init__(self, args, subproc, check_status, stdout, stderr):
139 self.args = args
140 self.subproc = subproc
141 if stdout is None:
142 self.stdout = StringIO()
143 else:
144 self.stdout = stdout
145
146 if stderr is None:
147 self.stderr = StringIO()
148 else:
149 self.stderr = stderr
150
151 self.check_status = check_status
152 self.exitstatus = self.returncode = None
153
154 def wait(self):
155 if self.finished:
156 # Avoid calling communicate() on a dead process because it'll
157 # give you stick about std* already being closed
158 if self.exitstatus != 0:
159 raise CommandFailedError(self.args, self.exitstatus)
160 else:
161 return
162
163 out, err = self.subproc.communicate()
164 self.stdout.write(out)
165 self.stderr.write(err)
166
167 self.exitstatus = self.returncode = self.subproc.returncode
168
169 if self.exitstatus != 0:
170 sys.stderr.write(out)
171 sys.stderr.write(err)
172
173 if self.check_status and self.exitstatus != 0:
174 raise CommandFailedError(self.args, self.exitstatus)
175
176 @property
177 def finished(self):
178 if self.exitstatus is not None:
179 return True
180
181 if self.subproc.poll() is not None:
182 out, err = self.subproc.communicate()
183 self.stdout.write(out)
184 self.stderr.write(err)
185 self.exitstatus = self.returncode = self.subproc.returncode
186 return True
187 else:
188 return False
189
190 def kill(self):
191 log.info("kill ")
192 if self.subproc.pid and not self.finished:
193 log.info("kill: killing pid {0} ({1})".format(
194 self.subproc.pid, self.args))
195 safe_kill(self.subproc.pid)
196 else:
197 log.info("kill: already terminated ({0})".format(self.args))
198
199 @property
200 def stdin(self):
201 class FakeStdIn(object):
202 def __init__(self, mount_daemon):
203 self.mount_daemon = mount_daemon
204
205 def close(self):
206 self.mount_daemon.kill()
207
208 return FakeStdIn(self)
209
210
211 class LocalRemote(object):
212 """
213 Amusingly named class to present the teuthology RemoteProcess interface when we are really
214 running things locally for vstart
215
216 Run this inside your src/ dir!
217 """
218
219 def __init__(self):
220 self.name = "local"
221 self.hostname = "localhost"
222 self.user = getpass.getuser()
223
224 def get_file(self, path, sudo, dest_dir):
225 tmpfile = tempfile.NamedTemporaryFile(delete=False).name
226 shutil.copy(path, tmpfile)
227 return tmpfile
228
229 def put_file(self, src, dst, sudo=False):
230 shutil.copy(src, dst)
231
232 def run(self, args, check_status=True, wait=True,
233 stdout=None, stderr=None, cwd=None, stdin=None,
234 logger=None, label=None, env=None):
235 log.info("run args={0}".format(args))
236
237 # We don't need no stinkin' sudo
238 args = [a for a in args if a != "sudo"]
239
240 # We have to use shell=True if any run.Raw was present, e.g. &&
241 shell = any([a for a in args if isinstance(a, Raw)])
242
243 if shell:
244 filtered = []
245 i = 0
246 while i < len(args):
247 if args[i] == 'adjust-ulimits':
248 i += 1
249 elif args[i] == 'ceph-coverage':
250 i += 2
251 elif args[i] == 'timeout':
252 i += 2
253 else:
254 filtered.append(args[i])
255 i += 1
256
257 args = quote(filtered)
258 log.info("Running {0}".format(args))
259
260 subproc = subprocess.Popen(args,
261 stdout=subprocess.PIPE,
262 stderr=subprocess.PIPE,
263 stdin=subprocess.PIPE,
264 cwd=cwd,
265 shell=True)
266 else:
267 log.info("Running {0}".format(args))
268
269 for arg in args:
270 if not isinstance(arg, basestring):
271 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
272 arg, arg.__class__
273 ))
274
275 subproc = subprocess.Popen(args,
276 stdout=subprocess.PIPE,
277 stderr=subprocess.PIPE,
278 stdin=subprocess.PIPE,
279 cwd=cwd,
280 env=env)
281
282 if stdin:
283 if not isinstance(stdin, basestring):
284 raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
285
286 # Hack: writing to stdin is not deadlock-safe, but it "always" works
287 # as long as the input buffer is "small"
288 subproc.stdin.write(stdin)
289
290 proc = LocalRemoteProcess(
291 args, subproc, check_status,
292 stdout, stderr
293 )
294
295 if wait:
296 proc.wait()
297
298 return proc
299
300
301 class LocalDaemon(object):
302 def __init__(self, daemon_type, daemon_id):
303 self.daemon_type = daemon_type
304 self.daemon_id = daemon_id
305 self.controller = LocalRemote()
306 self.proc = None
307
308 @property
309 def remote(self):
310 return LocalRemote()
311
312 def running(self):
313 return self._get_pid() is not None
314
315 def _get_pid(self):
316 """
317 Return PID as an integer or None if not found
318 """
319 ps_txt = self.controller.run(
320 args=["ps", "ww", "-u"+str(os.getuid())]
321 ).stdout.getvalue().strip()
322 lines = ps_txt.split("\n")[1:]
323
324 for line in lines:
325 if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
326 log.info("Found ps line for daemon: {0}".format(line))
327 return int(line.split()[0])
328 log.info("No match for {0} {1}: {2}".format(
329 self.daemon_type, self.daemon_id, ps_txt
330 ))
331 return None
332
333 def wait(self, timeout):
334 waited = 0
335 while self._get_pid() is not None:
336 if waited > timeout:
337 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
338 time.sleep(1)
339 waited += 1
340
341 def stop(self, timeout=300):
342 if not self.running():
343 log.error('tried to stop a non-running daemon')
344 return
345
346 pid = self._get_pid()
347 log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
348 os.kill(pid, signal.SIGKILL)
349
350 waited = 0
351 while pid is not None:
352 new_pid = self._get_pid()
353 if new_pid is not None and new_pid != pid:
354 log.info("Killing new PID {0}".format(new_pid))
355 pid = new_pid
356 os.kill(pid, signal.SIGKILL)
357
358 if new_pid is None:
359 break
360 else:
361 if waited > timeout:
362 raise MaxWhileTries(
363 "Timed out waiting for daemon {0}.{1}".format(
364 self.daemon_type, self.daemon_id))
365 time.sleep(1)
366 waited += 1
367
368 self.wait(timeout=timeout)
369
370 def restart(self):
371 if self._get_pid() is not None:
372 self.stop()
373
374 self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id])
375
376
377 def safe_kill(pid):
378 """
379 os.kill annoyingly raises exception if process already dead. Ignore it.
380 """
381 try:
382 return os.kill(pid, signal.SIGKILL)
383 except OSError as e:
384 if e.errno == errno.ESRCH:
385 # Raced with process termination
386 pass
387 else:
388 raise
389
390
391 class LocalFuseMount(FuseMount):
392 def __init__(self, test_dir, client_id):
393 super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote())
394
395 @property
396 def config_path(self):
397 return "./ceph.conf"
398
399 def get_keyring_path(self):
400 # This is going to end up in a config file, so use an absolute path
401 # to avoid assumptions about daemons' pwd
402 return os.path.abspath("./client.{0}.keyring".format(self.client_id))
403
404 def run_shell(self, args, wait=True):
405 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
406 # the "cd foo && bar" shenanigans isn't needed to begin with and
407 # then we wouldn't have to special case this
408 return self.client_remote.run(
409 args, wait=wait, cwd=self.mountpoint
410 )
411
412 @property
413 def _prefix(self):
414 return BIN_PREFIX
415
416 def _asok_path(self):
417 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
418 # run foreground. When running it daemonized however, the asok is named after
419 # the PID of the launching process, not the long running ceph-fuse process. Therefore
420 # we need to give an exact path here as the logic for checking /proc/ for which
421 # asok is alive does not work.
422 path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid)
423 log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
424 return path
425
426 def umount(self):
427 if self.is_mounted():
428 super(LocalFuseMount, self).umount()
429
430 def mount(self, mount_path=None, mount_fs_name=None):
431 self.client_remote.run(
432 args=[
433 'mkdir',
434 '--',
435 self.mountpoint,
436 ],
437 )
438
439 def list_connections():
440 self.client_remote.run(
441 args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
442 check_status=False
443 )
444 p = self.client_remote.run(
445 args=["ls", "/sys/fs/fuse/connections"],
446 check_status=False
447 )
448 if p.exitstatus != 0:
449 log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus))
450 return []
451
452 ls_str = p.stdout.getvalue().strip()
453 if ls_str:
454 return [int(n) for n in ls_str.split("\n")]
455 else:
456 return []
457
458 # Before starting ceph-fuse process, note the contents of
459 # /sys/fs/fuse/connections
460 pre_mount_conns = list_connections()
461 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
462
463 prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
464 if os.getuid() != 0:
465 prefix += ["--client-die-on-failed-remount=false"]
466
467 if mount_path is not None:
468 prefix += ["--client_mountpoint={0}".format(mount_path)]
469
470 if mount_fs_name is not None:
471 prefix += ["--client_mds_namespace={0}".format(mount_fs_name)]
472
473 self.fuse_daemon = self.client_remote.run(args=
474 prefix + [
475 "-f",
476 "--name",
477 "client.{0}".format(self.client_id),
478 self.mountpoint
479 ], wait=False)
480
481 log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
482
483 # Wait for the connection reference to appear in /sys
484 waited = 0
485 post_mount_conns = list_connections()
486 while len(post_mount_conns) <= len(pre_mount_conns):
487 if self.fuse_daemon.finished:
488 # Did mount fail? Raise the CommandFailedError instead of
489 # hitting the "failed to populate /sys/" timeout
490 self.fuse_daemon.wait()
491 time.sleep(1)
492 waited += 1
493 if waited > 30:
494 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
495 waited
496 ))
497 post_mount_conns = list_connections()
498
499 log.info("Post-mount connections: {0}".format(post_mount_conns))
500
501 # Record our fuse connection number so that we can use it when
502 # forcing an unmount
503 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
504 if len(new_conns) == 0:
505 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
506 elif len(new_conns) > 1:
507 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
508 else:
509 self._fuse_conn = new_conns[0]
510
511 def _run_python(self, pyscript):
512 """
513 Override this to remove the daemon-helper prefix that is used otherwise
514 to make the process killable.
515 """
516 return self.client_remote.run(args=[
517 'python', '-c', pyscript
518 ], wait=False)
519
520
521 class LocalCephManager(CephManager):
522 def __init__(self):
523 # Deliberately skip parent init, only inheriting from it to get
524 # util methods like osd_dump that sit on top of raw_cluster_cmd
525 self.controller = LocalRemote()
526
527 # A minority of CephManager fns actually bother locking for when
528 # certain teuthology tests want to run tasks in parallel
529 self.lock = threading.RLock()
530
531 self.log = lambda x: log.info(x)
532
533 def find_remote(self, daemon_type, daemon_id):
534 """
535 daemon_type like 'mds', 'osd'
536 daemon_id like 'a', '0'
537 """
538 return LocalRemote()
539
540 def run_ceph_w(self):
541 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO())
542 return proc
543
544 def raw_cluster_cmd(self, *args):
545 """
546 args like ["osd", "dump"}
547 return stdout string
548 """
549 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args))
550 return proc.stdout.getvalue()
551
552 def raw_cluster_cmd_result(self, *args):
553 """
554 like raw_cluster_cmd but don't check status, just return rc
555 """
556 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False)
557 return proc.exitstatus
558
559 def admin_socket(self, daemon_type, daemon_id, command, check_status=True):
560 return self.controller.run(
561 args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status
562 )
563
564 # FIXME: copypasta
565 def get_mds_status(self, mds):
566 """
567 Run cluster commands for the mds in order to get mds information
568 """
569 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
570 j = json.loads(' '.join(out.splitlines()[1:]))
571 # collate; for dup ids, larger gid wins.
572 for info in j['info'].itervalues():
573 if info['name'] == mds:
574 return info
575 return None
576
577 # FIXME: copypasta
578 def get_mds_status_by_rank(self, rank):
579 """
580 Run cluster commands for the mds in order to get mds information
581 check rank.
582 """
583 j = self.get_mds_status_all()
584 # collate; for dup ids, larger gid wins.
585 for info in j['info'].itervalues():
586 if info['rank'] == rank:
587 return info
588 return None
589
590 def get_mds_status_all(self):
591 """
592 Run cluster command to extract all the mds status.
593 """
594 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
595 j = json.loads(' '.join(out.splitlines()[1:]))
596 return j
597
598
599 class LocalCephCluster(CephCluster):
600 def __init__(self, ctx):
601 # Deliberately skip calling parent constructor
602 self._ctx = ctx
603 self.mon_manager = LocalCephManager()
604 self._conf = defaultdict(dict)
605
606 @property
607 def admin_remote(self):
608 return LocalRemote()
609
610 def get_config(self, key, service_type=None):
611 if service_type is None:
612 service_type = 'mon'
613
614 # FIXME hardcoded vstart service IDs
615 service_id = {
616 'mon': 'a',
617 'mds': 'a',
618 'osd': '0'
619 }[service_type]
620
621 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
622
623 def _write_conf(self):
624 # In teuthology, we have the honour of writing the entire ceph.conf, but
625 # in vstart land it has mostly already been written and we need to carefully
626 # append to it.
627 conf_path = "./ceph.conf"
628 banner = "\n#LOCAL_TEST\n"
629 existing_str = open(conf_path).read()
630
631 if banner in existing_str:
632 existing_str = existing_str[0:existing_str.find(banner)]
633
634 existing_str += banner
635
636 for subsys, kvs in self._conf.items():
637 existing_str += "\n[{0}]\n".format(subsys)
638 for key, val in kvs.items():
639 # Comment out existing instance if it exists
640 log.info("Searching for existing instance {0}/{1}".format(
641 key, subsys
642 ))
643 existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
644 subsys
645 ), existing_str, re.MULTILINE)
646
647 if existing_section:
648 section_str = existing_str[existing_section.start():existing_section.end()]
649 existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
650 if existing_val:
651 start = existing_section.start() + existing_val.start(1)
652 log.info("Found string to replace at {0}".format(
653 start
654 ))
655 existing_str = existing_str[0:start] + "#" + existing_str[start:]
656
657 existing_str += "{0} = {1}\n".format(key, val)
658
659 open(conf_path, "w").write(existing_str)
660
661 def set_ceph_conf(self, subsys, key, value):
662 self._conf[subsys][key] = value
663 self._write_conf()
664
665 def clear_ceph_conf(self, subsys, key):
666 del self._conf[subsys][key]
667 self._write_conf()
668
669
670 class LocalMDSCluster(LocalCephCluster, MDSCluster):
671 def __init__(self, ctx):
672 super(LocalMDSCluster, self).__init__(ctx)
673
674 self.mds_ids = ctx.daemons.daemons['mds'].keys()
675 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
676
677 def clear_firewall(self):
678 # FIXME: unimplemented
679 pass
680
681 def newfs(self, name='cephfs', create=True):
682 return LocalFilesystem(self._ctx, name=name, create=create)
683
684
685 class LocalMgrCluster(LocalCephCluster, MgrCluster):
686 def __init__(self, ctx):
687 super(LocalMgrCluster, self).__init__(ctx)
688
689 self.mgr_ids = ctx.daemons.daemons['mgr'].keys()
690 self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
691
692
693 class LocalFilesystem(Filesystem, LocalMDSCluster):
694 def __init__(self, ctx, fscid=None, name='cephfs', create=False):
695 # Deliberately skip calling parent constructor
696 self._ctx = ctx
697
698 self.id = None
699 self.name = None
700 self.metadata_pool_name = None
701 self.metadata_overlay = False
702 self.data_pool_name = None
703 self.data_pools = None
704
705 # Hack: cheeky inspection of ceph.conf to see what MDSs exist
706 self.mds_ids = set()
707 for line in open("ceph.conf").readlines():
708 match = re.match("^\[mds\.(.+)\]$", line)
709 if match:
710 self.mds_ids.add(match.group(1))
711
712 if not self.mds_ids:
713 raise RuntimeError("No MDSs found in ceph.conf!")
714
715 self.mds_ids = list(self.mds_ids)
716
717 log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
718
719 self.mon_manager = LocalCephManager()
720
721 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
722
723 self.client_remote = LocalRemote()
724
725 self._conf = defaultdict(dict)
726
727 if name is not None:
728 if fscid is not None:
729 raise RuntimeError("cannot specify fscid when creating fs")
730 if create and not self.legacy_configured():
731 self.create()
732 else:
733 if fscid is not None:
734 self.id = fscid
735 self.getinfo(refresh=True)
736
737 # Stash a reference to the first created filesystem on ctx, so
738 # that if someone drops to the interactive shell they can easily
739 # poke our methods.
740 if not hasattr(self._ctx, "filesystem"):
741 self._ctx.filesystem = self
742
743 @property
744 def _prefix(self):
745 return BIN_PREFIX
746
747 def set_clients_block(self, blocked, mds_id=None):
748 raise NotImplementedError()
749
750 def get_pgs_per_fs_pool(self):
751 # FIXME: assuming there are 3 OSDs
752 return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
753
754
755 class InteractiveFailureResult(unittest.TextTestResult):
756 """
757 Specialization that implements interactive-on-error style
758 behavior.
759 """
760 def addFailure(self, test, err):
761 super(InteractiveFailureResult, self).addFailure(test, err)
762 log.error(self._exc_info_to_string(err, test))
763 log.error("Failure in test '{0}', going interactive".format(
764 self.getDescription(test)
765 ))
766 interactive.task(ctx=None, config=None)
767
768 def addError(self, test, err):
769 super(InteractiveFailureResult, self).addError(test, err)
770 log.error(self._exc_info_to_string(err, test))
771 log.error("Error in test '{0}', going interactive".format(
772 self.getDescription(test)
773 ))
774 interactive.task(ctx=None, config=None)
775
776
777 def enumerate_methods(s):
778 log.info("e: {0}".format(s))
779 for t in s._tests:
780 if isinstance(t, suite.BaseTestSuite):
781 for sub in enumerate_methods(t):
782 yield sub
783 else:
784 yield s, t
785
786
787 def load_tests(modules, loader):
788 if modules:
789 log.info("Executing modules: {0}".format(modules))
790 module_suites = []
791 for mod_name in modules:
792 # Test names like cephfs.test_auto_repair
793 module_suites.append(loader.loadTestsFromName(mod_name))
794 log.info("Loaded: {0}".format(list(module_suites)))
795 return suite.TestSuite(module_suites)
796 else:
797 log.info("Executing all cephfs tests")
798 return loader.discover(
799 os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs")
800 )
801
802
803 def scan_tests(modules):
804 overall_suite = load_tests(modules, loader.TestLoader())
805
806 max_required_mds = 0
807 max_required_clients = 0
808 max_required_mgr = 0
809
810 for suite, case in enumerate_methods(overall_suite):
811 max_required_mds = max(max_required_mds,
812 getattr(case, "MDSS_REQUIRED", 0))
813 max_required_clients = max(max_required_clients,
814 getattr(case, "CLIENTS_REQUIRED", 0))
815 max_required_mgr = max(max_required_mgr,
816 getattr(case, "MGRS_REQUIRED", 0))
817
818 return max_required_mds, max_required_clients, max_required_mgr
819
820
821 class LocalCluster(object):
822 def __init__(self, rolename="placeholder"):
823 self.remotes = {
824 LocalRemote(): [rolename]
825 }
826
827 def only(self, requested):
828 return self.__class__(rolename=requested)
829
830
831 class LocalContext(object):
832 def __init__(self):
833 self.config = {}
834 self.teuthology_config = teuth_config
835 self.cluster = LocalCluster()
836 self.daemons = DaemonGroup()
837
838 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
839 # tests that want to look these up via ctx can do so.
840 # Inspect ceph.conf to see what roles exist
841 for conf_line in open("ceph.conf").readlines():
842 for svc_type in ["mon", "osd", "mds", "mgr"]:
843 if svc_type not in self.daemons.daemons:
844 self.daemons.daemons[svc_type] = {}
845 match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
846 if match:
847 svc_id = match.group(1)
848 self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id)
849
850 def __del__(self):
851 shutil.rmtree(self.teuthology_config['test_path'])
852
853
854 def exec_test():
855 # Parse arguments
856 interactive_on_error = False
857 create_cluster = False
858
859 args = sys.argv[1:]
860 flags = [a for a in args if a.startswith("-")]
861 modules = [a for a in args if not a.startswith("-")]
862 for f in flags:
863 if f == "--interactive":
864 interactive_on_error = True
865 elif f == "--create":
866 create_cluster = True
867 else:
868 log.error("Unknown option '{0}'".format(f))
869 sys.exit(-1)
870
871 # Help developers by stopping up-front if their tree isn't built enough for all the
872 # tools that the tests might want to use (add more here if needed)
873 require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
874 "cephfs-table-tool", "ceph-fuse", "rados"]
875 missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
876 if missing_binaries:
877 log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
878 sys.exit(-1)
879
880 max_required_mds, max_required_clients, max_required_mgr = scan_tests(modules)
881
882 remote = LocalRemote()
883
884 # Tolerate no MDSs or clients running at start
885 ps_txt = remote.run(
886 args=["ps", "-u"+str(os.getuid())]
887 ).stdout.getvalue().strip()
888 lines = ps_txt.split("\n")[1:]
889 for line in lines:
890 if 'ceph-fuse' in line or 'ceph-mds' in line:
891 pid = int(line.split()[0])
892 log.warn("Killing stray process {0}".format(line))
893 os.kill(pid, signal.SIGKILL)
894
895 # Fire up the Ceph cluster if the user requested it
896 if create_cluster:
897 log.info("Creating cluster with {0} MDS daemons".format(
898 max_required_mds))
899 remote.run([os.path.join(SRC_PREFIX, "stop.sh")], check_status=False)
900 remote.run(["rm", "-rf", "./out"])
901 remote.run(["rm", "-rf", "./dev"])
902 vstart_env = os.environ.copy()
903 vstart_env["FS"] = "0"
904 vstart_env["MDS"] = max_required_mds.__str__()
905 vstart_env["OSD"] = "1"
906 vstart_env["MGR"] = max(max_required_mgr, 1).__str__()
907
908 remote.run([os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d", "--nolockdep"],
909 env=vstart_env)
910
911 # Wait for OSD to come up so that subsequent injectargs etc will
912 # definitely succeed
913 LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30)
914
915 # List of client mounts, sufficient to run the selected tests
916 clients = [i.__str__() for i in range(0, max_required_clients)]
917
918 test_dir = tempfile.mkdtemp()
919 teuth_config['test_path'] = test_dir
920
921 # Construct Mount classes
922 mounts = []
923 for client_id in clients:
924 # Populate client keyring (it sucks to use client.admin for test clients
925 # because it's awkward to find the logs later)
926 client_name = "client.{0}".format(client_id)
927
928 if client_name not in open("./keyring").read():
929 p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
930 "osd", "allow rw",
931 "mds", "allow",
932 "mon", "allow r"])
933
934 open("./keyring", "a").write(p.stdout.getvalue())
935
936 mount = LocalFuseMount(test_dir, client_id)
937 mounts.append(mount)
938 if mount.is_mounted():
939 log.warn("unmounting {0}".format(mount.mountpoint))
940 mount.umount_wait()
941 else:
942 if os.path.exists(mount.mountpoint):
943 os.rmdir(mount.mountpoint)
944
945 ctx = LocalContext()
946 ceph_cluster = LocalCephCluster(ctx)
947 mds_cluster = LocalMDSCluster(ctx)
948 mgr_cluster = LocalMgrCluster(ctx)
949
950 from tasks.cephfs_test_runner import DecoratingLoader
951
952 class LogStream(object):
953 def __init__(self):
954 self.buffer = ""
955
956 def write(self, data):
957 self.buffer += data
958 if "\n" in self.buffer:
959 lines = self.buffer.split("\n")
960 for line in lines[:-1]:
961 pass
962 # sys.stderr.write(line + "\n")
963 log.info(line)
964 self.buffer = lines[-1]
965
966 def flush(self):
967 pass
968
969 decorating_loader = DecoratingLoader({
970 "ctx": ctx,
971 "mounts": mounts,
972 "ceph_cluster": ceph_cluster,
973 "mds_cluster": mds_cluster,
974 "mgr_cluster": mgr_cluster,
975 })
976
977 # For the benefit of polling tests like test_full -- in teuthology land we set this
978 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
979 remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"])
980 ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval_max", "5")
981
982 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
983 # from normal IO latency. Increase it for running teests.
984 ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10")
985
986 # Make sure the filesystem created in tests has uid/gid that will let us talk to
987 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
988 # so that cephfs-data-scan will pick it up too.
989 ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
990 ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
991
992 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
993 def _get_package_version(remote, pkg_name):
994 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
995 return "2.9"
996
997 import teuthology.packaging
998 teuthology.packaging.get_package_version = _get_package_version
999
1000 overall_suite = load_tests(modules, decorating_loader)
1001
1002 # Filter out tests that don't lend themselves to interactive running,
1003 victims = []
1004 for case, method in enumerate_methods(overall_suite):
1005 fn = getattr(method, method._testMethodName)
1006
1007 drop_test = False
1008
1009 if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
1010 drop_test = True
1011 log.warn("Dropping test because long running: ".format(method.id()))
1012
1013 if getattr(fn, "needs_trimming", False) is True:
1014 drop_test = (os.getuid() != 0)
1015 log.warn("Dropping test because client trim unavailable: ".format(method.id()))
1016
1017 if drop_test:
1018 # Don't drop the test if it was explicitly requested in arguments
1019 is_named = False
1020 for named in modules:
1021 if named.endswith(method.id()):
1022 is_named = True
1023 break
1024
1025 if not is_named:
1026 victims.append((case, method))
1027
1028 log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
1029 for s, method in victims:
1030 s._tests.remove(method)
1031
1032 if interactive_on_error:
1033 result_class = InteractiveFailureResult
1034 else:
1035 result_class = unittest.TextTestResult
1036 fail_on_skip = False
1037
1038 class LoggingResult(result_class):
1039 def startTest(self, test):
1040 log.info("Starting test: {0}".format(self.getDescription(test)))
1041 test.started_at = datetime.datetime.utcnow()
1042 return super(LoggingResult, self).startTest(test)
1043
1044 def stopTest(self, test):
1045 log.info("Stopped test: {0} in {1}s".format(
1046 self.getDescription(test),
1047 (datetime.datetime.utcnow() - test.started_at).total_seconds()
1048 ))
1049
1050 def addSkip(self, test, reason):
1051 if fail_on_skip:
1052 # Don't just call addFailure because that requires a traceback
1053 self.failures.append((test, reason))
1054 else:
1055 super(LoggingResult, self).addSkip(test, reason)
1056
1057 # Execute!
1058 result = unittest.TextTestRunner(
1059 stream=LogStream(),
1060 resultclass=LoggingResult,
1061 verbosity=2,
1062 failfast=True).run(overall_suite)
1063
1064 if not result.wasSuccessful():
1065 result.printErrors() # duplicate output at end for convenience
1066
1067 bad_tests = []
1068 for test, error in result.errors:
1069 bad_tests.append(str(test))
1070 for test, failure in result.failures:
1071 bad_tests.append(str(test))
1072
1073 sys.exit(-1)
1074 else:
1075 sys.exit(0)
1076
1077
1078 if __name__ == "__main__":
1079 exec_test()