]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/vstart_runner.py
update sources to 12.2.8
[ceph.git] / ceph / qa / tasks / vstart_runner.py
1 """
2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
5
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
7
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
11 cd ~/git/ceph/build
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
14
15 Alternative usage:
16
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py
19
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
22
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
25
26 """
27
28 from StringIO import StringIO
29 from collections import defaultdict
30 import getpass
31 import signal
32 import tempfile
33 import threading
34 import datetime
35 import shutil
36 import re
37 import os
38 import time
39 import json
40 import sys
41 import errno
42 from unittest import suite, loader
43 import unittest
44 import platform
45 from teuthology.orchestra.run import Raw, quote
46 from teuthology.orchestra.daemon import DaemonGroup
47 from teuthology.config import config as teuth_config
48
49 import logging
50
51 log = logging.getLogger(__name__)
52
53 handler = logging.FileHandler("./vstart_runner.log")
54 formatter = logging.Formatter(
55 fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
56 datefmt='%Y-%m-%dT%H:%M:%S')
57 handler.setFormatter(formatter)
58 log.addHandler(handler)
59 log.setLevel(logging.INFO)
60
61
62 def respawn_in_path(lib_path, python_paths):
63 execv_cmd = ['python']
64 if platform.system() == "Darwin":
65 lib_path_var = "DYLD_LIBRARY_PATH"
66 else:
67 lib_path_var = "LD_LIBRARY_PATH"
68
69 py_binary = os.environ.get("PYTHON", "python")
70
71 if lib_path_var in os.environ:
72 if lib_path not in os.environ[lib_path_var]:
73 os.environ[lib_path_var] += ':' + lib_path
74 os.execvp(py_binary, execv_cmd + sys.argv)
75 else:
76 os.environ[lib_path_var] = lib_path
77 os.execvp(py_binary, execv_cmd + sys.argv)
78
79 for p in python_paths:
80 sys.path.insert(0, p)
81
82
83 # Let's use some sensible defaults
84 if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
85
86 # A list of candidate paths for each package we need
87 guesses = [
88 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
89 ["lib/cython_modules/lib.2"],
90 ["../src/pybind"],
91 ]
92
93 python_paths = []
94
95 # Up one level so that "tasks.foo.bar" imports work
96 python_paths.append(os.path.abspath(
97 os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
98 ))
99
100 for package_guesses in guesses:
101 for g in package_guesses:
102 g_exp = os.path.abspath(os.path.expanduser(g))
103 if os.path.exists(g_exp):
104 python_paths.append(g_exp)
105
106 ld_path = os.path.join(os.getcwd(), "lib/")
107 print "Using guessed paths {0} {1}".format(ld_path, python_paths)
108 respawn_in_path(ld_path, python_paths)
109
110
111 try:
112 from teuthology.exceptions import CommandFailedError
113 from tasks.ceph_manager import CephManager
114 from tasks.cephfs.fuse_mount import FuseMount
115 from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
116 from mgr.mgr_test_case import MgrCluster
117 from teuthology.contextutil import MaxWhileTries
118 from teuthology.task import interactive
119 except ImportError:
120 sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
121 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
122 raise
123
124 # Must import after teuthology because of gevent monkey patching
125 import subprocess
126
127 if os.path.exists("./CMakeCache.txt"):
128 # Running in build dir of a cmake build
129 BIN_PREFIX = "./bin/"
130 SRC_PREFIX = "../src"
131 else:
132 # Running in src/ of an autotools build
133 BIN_PREFIX = "./"
134 SRC_PREFIX = "./"
135
136
137 class LocalRemoteProcess(object):
138 def __init__(self, args, subproc, check_status, stdout, stderr):
139 self.args = args
140 self.subproc = subproc
141 if stdout is None:
142 self.stdout = StringIO()
143 else:
144 self.stdout = stdout
145
146 if stderr is None:
147 self.stderr = StringIO()
148 else:
149 self.stderr = stderr
150
151 self.check_status = check_status
152 self.exitstatus = self.returncode = None
153
154 def wait(self):
155 if self.finished:
156 # Avoid calling communicate() on a dead process because it'll
157 # give you stick about std* already being closed
158 if self.exitstatus != 0:
159 raise CommandFailedError(self.args, self.exitstatus)
160 else:
161 return
162
163 out, err = self.subproc.communicate()
164 self.stdout.write(out)
165 self.stderr.write(err)
166
167 self.exitstatus = self.returncode = self.subproc.returncode
168
169 if self.exitstatus != 0:
170 sys.stderr.write(out)
171 sys.stderr.write(err)
172
173 if self.check_status and self.exitstatus != 0:
174 raise CommandFailedError(self.args, self.exitstatus)
175
176 @property
177 def finished(self):
178 if self.exitstatus is not None:
179 return True
180
181 if self.subproc.poll() is not None:
182 out, err = self.subproc.communicate()
183 self.stdout.write(out)
184 self.stderr.write(err)
185 self.exitstatus = self.returncode = self.subproc.returncode
186 return True
187 else:
188 return False
189
190 def kill(self):
191 log.info("kill ")
192 if self.subproc.pid and not self.finished:
193 log.info("kill: killing pid {0} ({1})".format(
194 self.subproc.pid, self.args))
195 safe_kill(self.subproc.pid)
196 else:
197 log.info("kill: already terminated ({0})".format(self.args))
198
199 @property
200 def stdin(self):
201 class FakeStdIn(object):
202 def __init__(self, mount_daemon):
203 self.mount_daemon = mount_daemon
204
205 def close(self):
206 self.mount_daemon.kill()
207
208 return FakeStdIn(self)
209
210
211 class LocalRemote(object):
212 """
213 Amusingly named class to present the teuthology RemoteProcess interface when we are really
214 running things locally for vstart
215
216 Run this inside your src/ dir!
217 """
218
219 def __init__(self):
220 self.name = "local"
221 self.hostname = "localhost"
222 self.user = getpass.getuser()
223
224 def get_file(self, path, sudo, dest_dir):
225 tmpfile = tempfile.NamedTemporaryFile(delete=False).name
226 shutil.copy(path, tmpfile)
227 return tmpfile
228
229 def put_file(self, src, dst, sudo=False):
230 shutil.copy(src, dst)
231
232 def run(self, args, check_status=True, wait=True,
233 stdout=None, stderr=None, cwd=None, stdin=None,
234 logger=None, label=None, env=None):
235 log.info("run args={0}".format(args))
236
237 # We don't need no stinkin' sudo
238 args = [a for a in args if a != "sudo"]
239
240 # We have to use shell=True if any run.Raw was present, e.g. &&
241 shell = any([a for a in args if isinstance(a, Raw)])
242
243 if shell:
244 filtered = []
245 i = 0
246 while i < len(args):
247 if args[i] == 'adjust-ulimits':
248 i += 1
249 elif args[i] == 'ceph-coverage':
250 i += 2
251 elif args[i] == 'timeout':
252 i += 2
253 else:
254 filtered.append(args[i])
255 i += 1
256
257 args = quote(filtered)
258 log.info("Running {0}".format(args))
259
260 subproc = subprocess.Popen(args,
261 stdout=subprocess.PIPE,
262 stderr=subprocess.PIPE,
263 stdin=subprocess.PIPE,
264 cwd=cwd,
265 shell=True)
266 else:
267 log.info("Running {0}".format(args))
268
269 for arg in args:
270 if not isinstance(arg, basestring):
271 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
272 arg, arg.__class__
273 ))
274
275 subproc = subprocess.Popen(args,
276 stdout=subprocess.PIPE,
277 stderr=subprocess.PIPE,
278 stdin=subprocess.PIPE,
279 cwd=cwd,
280 env=env)
281
282 if stdin:
283 if not isinstance(stdin, basestring):
284 raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
285
286 # Hack: writing to stdin is not deadlock-safe, but it "always" works
287 # as long as the input buffer is "small"
288 subproc.stdin.write(stdin)
289
290 proc = LocalRemoteProcess(
291 args, subproc, check_status,
292 stdout, stderr
293 )
294
295 if wait:
296 proc.wait()
297
298 return proc
299
300
301 class LocalDaemon(object):
302 def __init__(self, daemon_type, daemon_id):
303 self.daemon_type = daemon_type
304 self.daemon_id = daemon_id
305 self.controller = LocalRemote()
306 self.proc = None
307
308 @property
309 def remote(self):
310 return LocalRemote()
311
312 def running(self):
313 return self._get_pid() is not None
314
315 def _get_pid(self):
316 """
317 Return PID as an integer or None if not found
318 """
319 ps_txt = self.controller.run(
320 args=["ps", "ww", "-u"+str(os.getuid())]
321 ).stdout.getvalue().strip()
322 lines = ps_txt.split("\n")[1:]
323
324 for line in lines:
325 if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
326 log.info("Found ps line for daemon: {0}".format(line))
327 return int(line.split()[0])
328 log.info("No match for {0} {1}: {2}".format(
329 self.daemon_type, self.daemon_id, ps_txt
330 ))
331 return None
332
333 def wait(self, timeout):
334 waited = 0
335 while self._get_pid() is not None:
336 if waited > timeout:
337 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
338 time.sleep(1)
339 waited += 1
340
341 def stop(self, timeout=300):
342 if not self.running():
343 log.error('tried to stop a non-running daemon')
344 return
345
346 pid = self._get_pid()
347 log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
348 os.kill(pid, signal.SIGKILL)
349
350 waited = 0
351 while pid is not None:
352 new_pid = self._get_pid()
353 if new_pid is not None and new_pid != pid:
354 log.info("Killing new PID {0}".format(new_pid))
355 pid = new_pid
356 os.kill(pid, signal.SIGKILL)
357
358 if new_pid is None:
359 break
360 else:
361 if waited > timeout:
362 raise MaxWhileTries(
363 "Timed out waiting for daemon {0}.{1}".format(
364 self.daemon_type, self.daemon_id))
365 time.sleep(1)
366 waited += 1
367
368 self.wait(timeout=timeout)
369
370 def restart(self):
371 if self._get_pid() is not None:
372 self.stop()
373
374 self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id])
375
376 def signal(self, sig, silent=False):
377 if not self.running():
378 raise RuntimeError("Can't send signal to non-running daemon")
379
380 os.kill(self._get_pid(), sig)
381 if not silent:
382 log.info("Sent signal {0} to {1}.{2}".format(sig, self.daemon_type, self.daemon_id))
383
384
385 def safe_kill(pid):
386 """
387 os.kill annoyingly raises exception if process already dead. Ignore it.
388 """
389 try:
390 return os.kill(pid, signal.SIGKILL)
391 except OSError as e:
392 if e.errno == errno.ESRCH:
393 # Raced with process termination
394 pass
395 else:
396 raise
397
398
399 class LocalFuseMount(FuseMount):
400 def __init__(self, test_dir, client_id):
401 super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote())
402
403 @property
404 def config_path(self):
405 return "./ceph.conf"
406
407 def get_keyring_path(self):
408 # This is going to end up in a config file, so use an absolute path
409 # to avoid assumptions about daemons' pwd
410 return os.path.abspath("./client.{0}.keyring".format(self.client_id))
411
412 def run_shell(self, args, wait=True):
413 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
414 # the "cd foo && bar" shenanigans isn't needed to begin with and
415 # then we wouldn't have to special case this
416 return self.client_remote.run(
417 args, wait=wait, cwd=self.mountpoint
418 )
419
420 @property
421 def _prefix(self):
422 return BIN_PREFIX
423
424 def _asok_path(self):
425 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
426 # run foreground. When running it daemonized however, the asok is named after
427 # the PID of the launching process, not the long running ceph-fuse process. Therefore
428 # we need to give an exact path here as the logic for checking /proc/ for which
429 # asok is alive does not work.
430 path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid)
431 log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
432 return path
433
434 def umount(self):
435 if self.is_mounted():
436 super(LocalFuseMount, self).umount()
437
438 def mount(self, mount_path=None, mount_fs_name=None):
439 self.client_remote.run(
440 args=[
441 'mkdir',
442 '--',
443 self.mountpoint,
444 ],
445 )
446
447 def list_connections():
448 self.client_remote.run(
449 args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
450 check_status=False
451 )
452 p = self.client_remote.run(
453 args=["ls", "/sys/fs/fuse/connections"],
454 check_status=False
455 )
456 if p.exitstatus != 0:
457 log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus))
458 return []
459
460 ls_str = p.stdout.getvalue().strip()
461 if ls_str:
462 return [int(n) for n in ls_str.split("\n")]
463 else:
464 return []
465
466 # Before starting ceph-fuse process, note the contents of
467 # /sys/fs/fuse/connections
468 pre_mount_conns = list_connections()
469 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
470
471 prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
472 if os.getuid() != 0:
473 prefix += ["--client_die_on_failed_dentry_invalidate=false"]
474
475 if mount_path is not None:
476 prefix += ["--client_mountpoint={0}".format(mount_path)]
477
478 if mount_fs_name is not None:
479 prefix += ["--client_mds_namespace={0}".format(mount_fs_name)]
480
481 self.fuse_daemon = self.client_remote.run(args=
482 prefix + [
483 "-f",
484 "--name",
485 "client.{0}".format(self.client_id),
486 self.mountpoint
487 ], wait=False)
488
489 log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
490
491 # Wait for the connection reference to appear in /sys
492 waited = 0
493 post_mount_conns = list_connections()
494 while len(post_mount_conns) <= len(pre_mount_conns):
495 if self.fuse_daemon.finished:
496 # Did mount fail? Raise the CommandFailedError instead of
497 # hitting the "failed to populate /sys/" timeout
498 self.fuse_daemon.wait()
499 time.sleep(1)
500 waited += 1
501 if waited > 30:
502 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
503 waited
504 ))
505 post_mount_conns = list_connections()
506
507 log.info("Post-mount connections: {0}".format(post_mount_conns))
508
509 # Record our fuse connection number so that we can use it when
510 # forcing an unmount
511 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
512 if len(new_conns) == 0:
513 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
514 elif len(new_conns) > 1:
515 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
516 else:
517 self._fuse_conn = new_conns[0]
518
519 def _run_python(self, pyscript):
520 """
521 Override this to remove the daemon-helper prefix that is used otherwise
522 to make the process killable.
523 """
524 return self.client_remote.run(args=[
525 'python', '-c', pyscript
526 ], wait=False)
527
528
529 class LocalCephManager(CephManager):
530 def __init__(self):
531 # Deliberately skip parent init, only inheriting from it to get
532 # util methods like osd_dump that sit on top of raw_cluster_cmd
533 self.controller = LocalRemote()
534
535 # A minority of CephManager fns actually bother locking for when
536 # certain teuthology tests want to run tasks in parallel
537 self.lock = threading.RLock()
538
539 self.log = lambda x: log.info(x)
540
541 def find_remote(self, daemon_type, daemon_id):
542 """
543 daemon_type like 'mds', 'osd'
544 daemon_id like 'a', '0'
545 """
546 return LocalRemote()
547
548 def run_ceph_w(self):
549 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO())
550 return proc
551
552 def raw_cluster_cmd(self, *args):
553 """
554 args like ["osd", "dump"}
555 return stdout string
556 """
557 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args))
558 return proc.stdout.getvalue()
559
560 def raw_cluster_cmd_result(self, *args):
561 """
562 like raw_cluster_cmd but don't check status, just return rc
563 """
564 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False)
565 return proc.exitstatus
566
567 def admin_socket(self, daemon_type, daemon_id, command, check_status=True):
568 return self.controller.run(
569 args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status
570 )
571
572 # FIXME: copypasta
573 def get_mds_status(self, mds):
574 """
575 Run cluster commands for the mds in order to get mds information
576 """
577 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
578 j = json.loads(' '.join(out.splitlines()[1:]))
579 # collate; for dup ids, larger gid wins.
580 for info in j['info'].itervalues():
581 if info['name'] == mds:
582 return info
583 return None
584
585 # FIXME: copypasta
586 def get_mds_status_by_rank(self, rank):
587 """
588 Run cluster commands for the mds in order to get mds information
589 check rank.
590 """
591 j = self.get_mds_status_all()
592 # collate; for dup ids, larger gid wins.
593 for info in j['info'].itervalues():
594 if info['rank'] == rank:
595 return info
596 return None
597
598 def get_mds_status_all(self):
599 """
600 Run cluster command to extract all the mds status.
601 """
602 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
603 j = json.loads(' '.join(out.splitlines()[1:]))
604 return j
605
606
607 class LocalCephCluster(CephCluster):
608 def __init__(self, ctx):
609 # Deliberately skip calling parent constructor
610 self._ctx = ctx
611 self.mon_manager = LocalCephManager()
612 self._conf = defaultdict(dict)
613
614 @property
615 def admin_remote(self):
616 return LocalRemote()
617
618 def get_config(self, key, service_type=None):
619 if service_type is None:
620 service_type = 'mon'
621
622 # FIXME hardcoded vstart service IDs
623 service_id = {
624 'mon': 'a',
625 'mds': 'a',
626 'osd': '0'
627 }[service_type]
628
629 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
630
631 def _write_conf(self):
632 # In teuthology, we have the honour of writing the entire ceph.conf, but
633 # in vstart land it has mostly already been written and we need to carefully
634 # append to it.
635 conf_path = "./ceph.conf"
636 banner = "\n#LOCAL_TEST\n"
637 existing_str = open(conf_path).read()
638
639 if banner in existing_str:
640 existing_str = existing_str[0:existing_str.find(banner)]
641
642 existing_str += banner
643
644 for subsys, kvs in self._conf.items():
645 existing_str += "\n[{0}]\n".format(subsys)
646 for key, val in kvs.items():
647 # Comment out existing instance if it exists
648 log.info("Searching for existing instance {0}/{1}".format(
649 key, subsys
650 ))
651 existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
652 subsys
653 ), existing_str, re.MULTILINE)
654
655 if existing_section:
656 section_str = existing_str[existing_section.start():existing_section.end()]
657 existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
658 if existing_val:
659 start = existing_section.start() + existing_val.start(1)
660 log.info("Found string to replace at {0}".format(
661 start
662 ))
663 existing_str = existing_str[0:start] + "#" + existing_str[start:]
664
665 existing_str += "{0} = {1}\n".format(key, val)
666
667 open(conf_path, "w").write(existing_str)
668
669 def set_ceph_conf(self, subsys, key, value):
670 self._conf[subsys][key] = value
671 self._write_conf()
672
673 def clear_ceph_conf(self, subsys, key):
674 del self._conf[subsys][key]
675 self._write_conf()
676
677
678 class LocalMDSCluster(LocalCephCluster, MDSCluster):
679 def __init__(self, ctx):
680 super(LocalMDSCluster, self).__init__(ctx)
681
682 self.mds_ids = ctx.daemons.daemons['mds'].keys()
683 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
684
685 def clear_firewall(self):
686 # FIXME: unimplemented
687 pass
688
689 def newfs(self, name='cephfs', create=True):
690 return LocalFilesystem(self._ctx, name=name, create=create)
691
692
693 class LocalMgrCluster(LocalCephCluster, MgrCluster):
694 def __init__(self, ctx):
695 super(LocalMgrCluster, self).__init__(ctx)
696
697 self.mgr_ids = ctx.daemons.daemons['mgr'].keys()
698 self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
699
700
701 class LocalFilesystem(Filesystem, LocalMDSCluster):
702 def __init__(self, ctx, fscid=None, name='cephfs', create=False):
703 # Deliberately skip calling parent constructor
704 self._ctx = ctx
705
706 self.id = None
707 self.name = None
708 self.ec_profile = None
709 self.metadata_pool_name = None
710 self.metadata_overlay = False
711 self.data_pool_name = None
712 self.data_pools = None
713
714 # Hack: cheeky inspection of ceph.conf to see what MDSs exist
715 self.mds_ids = set()
716 for line in open("ceph.conf").readlines():
717 match = re.match("^\[mds\.(.+)\]$", line)
718 if match:
719 self.mds_ids.add(match.group(1))
720
721 if not self.mds_ids:
722 raise RuntimeError("No MDSs found in ceph.conf!")
723
724 self.mds_ids = list(self.mds_ids)
725
726 log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
727
728 self.mon_manager = LocalCephManager()
729
730 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
731
732 self.client_remote = LocalRemote()
733
734 self._conf = defaultdict(dict)
735
736 if name is not None:
737 if fscid is not None:
738 raise RuntimeError("cannot specify fscid when creating fs")
739 if create and not self.legacy_configured():
740 self.create()
741 else:
742 if fscid is not None:
743 self.id = fscid
744 self.getinfo(refresh=True)
745
746 # Stash a reference to the first created filesystem on ctx, so
747 # that if someone drops to the interactive shell they can easily
748 # poke our methods.
749 if not hasattr(self._ctx, "filesystem"):
750 self._ctx.filesystem = self
751
752 @property
753 def _prefix(self):
754 return BIN_PREFIX
755
756 def set_clients_block(self, blocked, mds_id=None):
757 raise NotImplementedError()
758
759 def get_pgs_per_fs_pool(self):
760 # FIXME: assuming there are 3 OSDs
761 return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
762
763
764 class InteractiveFailureResult(unittest.TextTestResult):
765 """
766 Specialization that implements interactive-on-error style
767 behavior.
768 """
769 def addFailure(self, test, err):
770 super(InteractiveFailureResult, self).addFailure(test, err)
771 log.error(self._exc_info_to_string(err, test))
772 log.error("Failure in test '{0}', going interactive".format(
773 self.getDescription(test)
774 ))
775 interactive.task(ctx=None, config=None)
776
777 def addError(self, test, err):
778 super(InteractiveFailureResult, self).addError(test, err)
779 log.error(self._exc_info_to_string(err, test))
780 log.error("Error in test '{0}', going interactive".format(
781 self.getDescription(test)
782 ))
783 interactive.task(ctx=None, config=None)
784
785
786 def enumerate_methods(s):
787 log.info("e: {0}".format(s))
788 for t in s._tests:
789 if isinstance(t, suite.BaseTestSuite):
790 for sub in enumerate_methods(t):
791 yield sub
792 else:
793 yield s, t
794
795
796 def load_tests(modules, loader):
797 if modules:
798 log.info("Executing modules: {0}".format(modules))
799 module_suites = []
800 for mod_name in modules:
801 # Test names like cephfs.test_auto_repair
802 module_suites.append(loader.loadTestsFromName(mod_name))
803 log.info("Loaded: {0}".format(list(module_suites)))
804 return suite.TestSuite(module_suites)
805 else:
806 log.info("Executing all cephfs tests")
807 return loader.discover(
808 os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs")
809 )
810
811
812 def scan_tests(modules):
813 overall_suite = load_tests(modules, loader.TestLoader())
814
815 max_required_mds = 0
816 max_required_clients = 0
817 max_required_mgr = 0
818
819 for suite, case in enumerate_methods(overall_suite):
820 max_required_mds = max(max_required_mds,
821 getattr(case, "MDSS_REQUIRED", 0))
822 max_required_clients = max(max_required_clients,
823 getattr(case, "CLIENTS_REQUIRED", 0))
824 max_required_mgr = max(max_required_mgr,
825 getattr(case, "MGRS_REQUIRED", 0))
826
827 return max_required_mds, max_required_clients, max_required_mgr
828
829
830 class LocalCluster(object):
831 def __init__(self, rolename="placeholder"):
832 self.remotes = {
833 LocalRemote(): [rolename]
834 }
835
836 def only(self, requested):
837 return self.__class__(rolename=requested)
838
839
840 class LocalContext(object):
841 def __init__(self):
842 self.config = {}
843 self.teuthology_config = teuth_config
844 self.cluster = LocalCluster()
845 self.daemons = DaemonGroup()
846
847 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
848 # tests that want to look these up via ctx can do so.
849 # Inspect ceph.conf to see what roles exist
850 for conf_line in open("ceph.conf").readlines():
851 for svc_type in ["mon", "osd", "mds", "mgr"]:
852 if svc_type not in self.daemons.daemons:
853 self.daemons.daemons[svc_type] = {}
854 match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
855 if match:
856 svc_id = match.group(1)
857 self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id)
858
859 def __del__(self):
860 shutil.rmtree(self.teuthology_config['test_path'])
861
862
863 def exec_test():
864 # Parse arguments
865 interactive_on_error = False
866 create_cluster = False
867
868 args = sys.argv[1:]
869 flags = [a for a in args if a.startswith("-")]
870 modules = [a for a in args if not a.startswith("-")]
871 for f in flags:
872 if f == "--interactive":
873 interactive_on_error = True
874 elif f == "--create":
875 create_cluster = True
876 else:
877 log.error("Unknown option '{0}'".format(f))
878 sys.exit(-1)
879
880 # Help developers by stopping up-front if their tree isn't built enough for all the
881 # tools that the tests might want to use (add more here if needed)
882 require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
883 "cephfs-table-tool", "ceph-fuse", "rados"]
884 missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
885 if missing_binaries:
886 log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
887 sys.exit(-1)
888
889 max_required_mds, max_required_clients, max_required_mgr = scan_tests(modules)
890
891 remote = LocalRemote()
892
893 # Tolerate no MDSs or clients running at start
894 ps_txt = remote.run(
895 args=["ps", "-u"+str(os.getuid())]
896 ).stdout.getvalue().strip()
897 lines = ps_txt.split("\n")[1:]
898 for line in lines:
899 if 'ceph-fuse' in line or 'ceph-mds' in line:
900 pid = int(line.split()[0])
901 log.warn("Killing stray process {0}".format(line))
902 os.kill(pid, signal.SIGKILL)
903
904 # Fire up the Ceph cluster if the user requested it
905 if create_cluster:
906 log.info("Creating cluster with {0} MDS daemons".format(
907 max_required_mds))
908 remote.run([os.path.join(SRC_PREFIX, "stop.sh")], check_status=False)
909 remote.run(["rm", "-rf", "./out"])
910 remote.run(["rm", "-rf", "./dev"])
911 vstart_env = os.environ.copy()
912 vstart_env["FS"] = "0"
913 vstart_env["MDS"] = max_required_mds.__str__()
914 vstart_env["OSD"] = "1"
915 vstart_env["MGR"] = max(max_required_mgr, 1).__str__()
916
917 remote.run([os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d", "--nolockdep"],
918 env=vstart_env)
919
920 # Wait for OSD to come up so that subsequent injectargs etc will
921 # definitely succeed
922 LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30)
923
924 # List of client mounts, sufficient to run the selected tests
925 clients = [i.__str__() for i in range(0, max_required_clients)]
926
927 test_dir = tempfile.mkdtemp()
928 teuth_config['test_path'] = test_dir
929
930 # Construct Mount classes
931 mounts = []
932 for client_id in clients:
933 # Populate client keyring (it sucks to use client.admin for test clients
934 # because it's awkward to find the logs later)
935 client_name = "client.{0}".format(client_id)
936
937 if client_name not in open("./keyring").read():
938 p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
939 "osd", "allow rw",
940 "mds", "allow",
941 "mon", "allow r"])
942
943 open("./keyring", "a").write(p.stdout.getvalue())
944
945 mount = LocalFuseMount(test_dir, client_id)
946 mounts.append(mount)
947 if mount.is_mounted():
948 log.warn("unmounting {0}".format(mount.mountpoint))
949 mount.umount_wait()
950 else:
951 if os.path.exists(mount.mountpoint):
952 os.rmdir(mount.mountpoint)
953
954 ctx = LocalContext()
955 ceph_cluster = LocalCephCluster(ctx)
956 mds_cluster = LocalMDSCluster(ctx)
957 mgr_cluster = LocalMgrCluster(ctx)
958
959 from tasks.cephfs_test_runner import DecoratingLoader
960
961 class LogStream(object):
962 def __init__(self):
963 self.buffer = ""
964
965 def write(self, data):
966 self.buffer += data
967 if "\n" in self.buffer:
968 lines = self.buffer.split("\n")
969 for line in lines[:-1]:
970 pass
971 # sys.stderr.write(line + "\n")
972 log.info(line)
973 self.buffer = lines[-1]
974
975 def flush(self):
976 pass
977
978 decorating_loader = DecoratingLoader({
979 "ctx": ctx,
980 "mounts": mounts,
981 "ceph_cluster": ceph_cluster,
982 "mds_cluster": mds_cluster,
983 "mgr_cluster": mgr_cluster,
984 })
985
986 # For the benefit of polling tests like test_full -- in teuthology land we set this
987 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
988 remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"])
989 ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval_max", "5")
990
991 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
992 # from normal IO latency. Increase it for running teests.
993 ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10")
994
995 # Make sure the filesystem created in tests has uid/gid that will let us talk to
996 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
997 # so that cephfs-data-scan will pick it up too.
998 ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
999 ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
1000
1001 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
1002 def _get_package_version(remote, pkg_name):
1003 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
1004 return "2.9"
1005
1006 import teuthology.packaging
1007 teuthology.packaging.get_package_version = _get_package_version
1008
1009 overall_suite = load_tests(modules, decorating_loader)
1010
1011 # Filter out tests that don't lend themselves to interactive running,
1012 victims = []
1013 for case, method in enumerate_methods(overall_suite):
1014 fn = getattr(method, method._testMethodName)
1015
1016 drop_test = False
1017
1018 if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
1019 drop_test = True
1020 log.warn("Dropping test because long running: ".format(method.id()))
1021
1022 if getattr(fn, "needs_trimming", False) is True:
1023 drop_test = (os.getuid() != 0)
1024 log.warn("Dropping test because client trim unavailable: ".format(method.id()))
1025
1026 if drop_test:
1027 # Don't drop the test if it was explicitly requested in arguments
1028 is_named = False
1029 for named in modules:
1030 if named.endswith(method.id()):
1031 is_named = True
1032 break
1033
1034 if not is_named:
1035 victims.append((case, method))
1036
1037 log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
1038 for s, method in victims:
1039 s._tests.remove(method)
1040
1041 if interactive_on_error:
1042 result_class = InteractiveFailureResult
1043 else:
1044 result_class = unittest.TextTestResult
1045 fail_on_skip = False
1046
1047 class LoggingResult(result_class):
1048 def startTest(self, test):
1049 log.info("Starting test: {0}".format(self.getDescription(test)))
1050 test.started_at = datetime.datetime.utcnow()
1051 return super(LoggingResult, self).startTest(test)
1052
1053 def stopTest(self, test):
1054 log.info("Stopped test: {0} in {1}s".format(
1055 self.getDescription(test),
1056 (datetime.datetime.utcnow() - test.started_at).total_seconds()
1057 ))
1058
1059 def addSkip(self, test, reason):
1060 if fail_on_skip:
1061 # Don't just call addFailure because that requires a traceback
1062 self.failures.append((test, reason))
1063 else:
1064 super(LoggingResult, self).addSkip(test, reason)
1065
1066 # Execute!
1067 result = unittest.TextTestRunner(
1068 stream=LogStream(),
1069 resultclass=LoggingResult,
1070 verbosity=2,
1071 failfast=True).run(overall_suite)
1072
1073 if not result.wasSuccessful():
1074 result.printErrors() # duplicate output at end for convenience
1075
1076 bad_tests = []
1077 for test, error in result.errors:
1078 bad_tests.append(str(test))
1079 for test, failure in result.failures:
1080 bad_tests.append(str(test))
1081
1082 sys.exit(-1)
1083 else:
1084 sys.exit(0)
1085
1086
1087 if __name__ == "__main__":
1088 exec_test()