]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/vstart_runner.py
update sources to 12.2.10
[ceph.git] / ceph / qa / tasks / vstart_runner.py
1 """
2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
5
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
7
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
11 cd ~/git/ceph/build
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
14
15 Alternative usage:
16
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py
19
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
22
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
25
26 """
27
28 from StringIO import StringIO
29 from collections import defaultdict
30 import getpass
31 import signal
32 import tempfile
33 import threading
34 import datetime
35 import shutil
36 import re
37 import os
38 import time
39 import json
40 import sys
41 import errno
42 from unittest import suite, loader
43 import unittest
44 import platform
45 from teuthology.orchestra.run import Raw, quote
46 from teuthology.orchestra.daemon import DaemonGroup
47 from teuthology.config import config as teuth_config
48
49 import logging
50
51 log = logging.getLogger(__name__)
52
53 handler = logging.FileHandler("./vstart_runner.log")
54 formatter = logging.Formatter(
55 fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
56 datefmt='%Y-%m-%dT%H:%M:%S')
57 handler.setFormatter(formatter)
58 log.addHandler(handler)
59 log.setLevel(logging.INFO)
60
61
62 def respawn_in_path(lib_path, python_paths):
63 execv_cmd = ['python']
64 if platform.system() == "Darwin":
65 lib_path_var = "DYLD_LIBRARY_PATH"
66 else:
67 lib_path_var = "LD_LIBRARY_PATH"
68
69 py_binary = os.environ.get("PYTHON", "python")
70
71 if lib_path_var in os.environ:
72 if lib_path not in os.environ[lib_path_var]:
73 os.environ[lib_path_var] += ':' + lib_path
74 os.execvp(py_binary, execv_cmd + sys.argv)
75 else:
76 os.environ[lib_path_var] = lib_path
77 os.execvp(py_binary, execv_cmd + sys.argv)
78
79 for p in python_paths:
80 sys.path.insert(0, p)
81
82
83 # Let's use some sensible defaults
84 if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
85
86 # A list of candidate paths for each package we need
87 guesses = [
88 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
89 ["lib/cython_modules/lib.2"],
90 ["../src/pybind"],
91 ]
92
93 python_paths = []
94
95 # Up one level so that "tasks.foo.bar" imports work
96 python_paths.append(os.path.abspath(
97 os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
98 ))
99
100 for package_guesses in guesses:
101 for g in package_guesses:
102 g_exp = os.path.abspath(os.path.expanduser(g))
103 if os.path.exists(g_exp):
104 python_paths.append(g_exp)
105
106 ld_path = os.path.join(os.getcwd(), "lib/")
107 print "Using guessed paths {0} {1}".format(ld_path, python_paths)
108 respawn_in_path(ld_path, python_paths)
109
110
111 try:
112 from teuthology.exceptions import CommandFailedError
113 from tasks.ceph_manager import CephManager
114 from tasks.cephfs.fuse_mount import FuseMount
115 from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
116 from mgr.mgr_test_case import MgrCluster
117 from teuthology.contextutil import MaxWhileTries
118 from teuthology.task import interactive
119 except ImportError:
120 sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
121 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
122 raise
123
124 # Must import after teuthology because of gevent monkey patching
125 import subprocess
126
127 if os.path.exists("./CMakeCache.txt"):
128 # Running in build dir of a cmake build
129 BIN_PREFIX = "./bin/"
130 SRC_PREFIX = "../src"
131 else:
132 # Running in src/ of an autotools build
133 BIN_PREFIX = "./"
134 SRC_PREFIX = "./"
135
136
137 class LocalRemoteProcess(object):
138 def __init__(self, args, subproc, check_status, stdout, stderr):
139 self.args = args
140 self.subproc = subproc
141 if stdout is None:
142 self.stdout = StringIO()
143 else:
144 self.stdout = stdout
145
146 if stderr is None:
147 self.stderr = StringIO()
148 else:
149 self.stderr = stderr
150
151 self.check_status = check_status
152 self.exitstatus = self.returncode = None
153
154 def wait(self):
155 if self.finished:
156 # Avoid calling communicate() on a dead process because it'll
157 # give you stick about std* already being closed
158 if self.exitstatus != 0:
159 raise CommandFailedError(self.args, self.exitstatus)
160 else:
161 return
162
163 out, err = self.subproc.communicate()
164 self.stdout.write(out)
165 self.stderr.write(err)
166
167 self.exitstatus = self.returncode = self.subproc.returncode
168
169 if self.exitstatus != 0:
170 sys.stderr.write(out)
171 sys.stderr.write(err)
172
173 if self.check_status and self.exitstatus != 0:
174 raise CommandFailedError(self.args, self.exitstatus)
175
176 @property
177 def finished(self):
178 if self.exitstatus is not None:
179 return True
180
181 if self.subproc.poll() is not None:
182 out, err = self.subproc.communicate()
183 self.stdout.write(out)
184 self.stderr.write(err)
185 self.exitstatus = self.returncode = self.subproc.returncode
186 return True
187 else:
188 return False
189
190 def kill(self):
191 log.info("kill ")
192 if self.subproc.pid and not self.finished:
193 log.info("kill: killing pid {0} ({1})".format(
194 self.subproc.pid, self.args))
195 safe_kill(self.subproc.pid)
196 else:
197 log.info("kill: already terminated ({0})".format(self.args))
198
199 @property
200 def stdin(self):
201 class FakeStdIn(object):
202 def __init__(self, mount_daemon):
203 self.mount_daemon = mount_daemon
204
205 def close(self):
206 self.mount_daemon.kill()
207
208 return FakeStdIn(self)
209
210
211 class LocalRemote(object):
212 """
213 Amusingly named class to present the teuthology RemoteProcess interface when we are really
214 running things locally for vstart
215
216 Run this inside your src/ dir!
217 """
218
219 def __init__(self):
220 self.name = "local"
221 self.hostname = "localhost"
222 self.user = getpass.getuser()
223
224 def get_file(self, path, sudo, dest_dir):
225 tmpfile = tempfile.NamedTemporaryFile(delete=False).name
226 shutil.copy(path, tmpfile)
227 return tmpfile
228
229 def put_file(self, src, dst, sudo=False):
230 shutil.copy(src, dst)
231
232 def run(self, args, check_status=True, wait=True,
233 stdout=None, stderr=None, cwd=None, stdin=None,
234 logger=None, label=None, env=None):
235 log.info("run args={0}".format(args))
236
237 # We don't need no stinkin' sudo
238 args = [a for a in args if a != "sudo"]
239
240 # We have to use shell=True if any run.Raw was present, e.g. &&
241 shell = any([a for a in args if isinstance(a, Raw)])
242
243 if shell:
244 filtered = []
245 i = 0
246 while i < len(args):
247 if args[i] == 'adjust-ulimits':
248 i += 1
249 elif args[i] == 'ceph-coverage':
250 i += 2
251 elif args[i] == 'timeout':
252 i += 2
253 else:
254 filtered.append(args[i])
255 i += 1
256
257 args = quote(filtered)
258 log.info("Running {0}".format(args))
259
260 subproc = subprocess.Popen(args,
261 stdout=subprocess.PIPE,
262 stderr=subprocess.PIPE,
263 stdin=subprocess.PIPE,
264 cwd=cwd,
265 shell=True)
266 else:
267 log.info("Running {0}".format(args))
268
269 for arg in args:
270 if not isinstance(arg, basestring):
271 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
272 arg, arg.__class__
273 ))
274
275 subproc = subprocess.Popen(args,
276 stdout=subprocess.PIPE,
277 stderr=subprocess.PIPE,
278 stdin=subprocess.PIPE,
279 cwd=cwd,
280 env=env)
281
282 if stdin:
283 if not isinstance(stdin, basestring):
284 raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
285
286 # Hack: writing to stdin is not deadlock-safe, but it "always" works
287 # as long as the input buffer is "small"
288 subproc.stdin.write(stdin)
289
290 proc = LocalRemoteProcess(
291 args, subproc, check_status,
292 stdout, stderr
293 )
294
295 if wait:
296 proc.wait()
297
298 return proc
299
300
301 class LocalDaemon(object):
302 def __init__(self, daemon_type, daemon_id):
303 self.daemon_type = daemon_type
304 self.daemon_id = daemon_id
305 self.controller = LocalRemote()
306 self.proc = None
307
308 @property
309 def remote(self):
310 return LocalRemote()
311
312 def running(self):
313 return self._get_pid() is not None
314
315 def _get_pid(self):
316 """
317 Return PID as an integer or None if not found
318 """
319 ps_txt = self.controller.run(
320 args=["ps", "ww", "-u"+str(os.getuid())]
321 ).stdout.getvalue().strip()
322 lines = ps_txt.split("\n")[1:]
323
324 for line in lines:
325 if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
326 log.info("Found ps line for daemon: {0}".format(line))
327 return int(line.split()[0])
328 log.info("No match for {0} {1}: {2}".format(
329 self.daemon_type, self.daemon_id, ps_txt
330 ))
331 return None
332
333 def wait(self, timeout):
334 waited = 0
335 while self._get_pid() is not None:
336 if waited > timeout:
337 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
338 time.sleep(1)
339 waited += 1
340
341 def stop(self, timeout=300):
342 if not self.running():
343 log.error('tried to stop a non-running daemon')
344 return
345
346 pid = self._get_pid()
347 log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
348 os.kill(pid, signal.SIGKILL)
349
350 waited = 0
351 while pid is not None:
352 new_pid = self._get_pid()
353 if new_pid is not None and new_pid != pid:
354 log.info("Killing new PID {0}".format(new_pid))
355 pid = new_pid
356 os.kill(pid, signal.SIGKILL)
357
358 if new_pid is None:
359 break
360 else:
361 if waited > timeout:
362 raise MaxWhileTries(
363 "Timed out waiting for daemon {0}.{1}".format(
364 self.daemon_type, self.daemon_id))
365 time.sleep(1)
366 waited += 1
367
368 self.wait(timeout=timeout)
369
370 def restart(self):
371 if self._get_pid() is not None:
372 self.stop()
373
374 self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id])
375
376 def signal(self, sig, silent=False):
377 if not self.running():
378 raise RuntimeError("Can't send signal to non-running daemon")
379
380 os.kill(self._get_pid(), sig)
381 if not silent:
382 log.info("Sent signal {0} to {1}.{2}".format(sig, self.daemon_type, self.daemon_id))
383
384
385 def safe_kill(pid):
386 """
387 os.kill annoyingly raises exception if process already dead. Ignore it.
388 """
389 try:
390 return os.kill(pid, signal.SIGKILL)
391 except OSError as e:
392 if e.errno == errno.ESRCH:
393 # Raced with process termination
394 pass
395 else:
396 raise
397
398
399 class LocalFuseMount(FuseMount):
400 def __init__(self, test_dir, client_id):
401 super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote())
402
403 @property
404 def config_path(self):
405 return "./ceph.conf"
406
407 def get_keyring_path(self):
408 # This is going to end up in a config file, so use an absolute path
409 # to avoid assumptions about daemons' pwd
410 return os.path.abspath("./client.{0}.keyring".format(self.client_id))
411
412 def run_shell(self, args, wait=True):
413 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
414 # the "cd foo && bar" shenanigans isn't needed to begin with and
415 # then we wouldn't have to special case this
416 return self.client_remote.run(
417 args, wait=wait, cwd=self.mountpoint
418 )
419
420 @property
421 def _prefix(self):
422 return BIN_PREFIX
423
424 def _asok_path(self):
425 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
426 # run foreground. When running it daemonized however, the asok is named after
427 # the PID of the launching process, not the long running ceph-fuse process. Therefore
428 # we need to give an exact path here as the logic for checking /proc/ for which
429 # asok is alive does not work.
430 path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid)
431 log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
432 return path
433
434 def umount(self):
435 if self.is_mounted():
436 super(LocalFuseMount, self).umount()
437
438 def mount(self, mount_path=None, mount_fs_name=None):
439 self.client_remote.run(
440 args=[
441 'mkdir',
442 '--',
443 self.mountpoint,
444 ],
445 )
446
447 def list_connections():
448 self.client_remote.run(
449 args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
450 check_status=False
451 )
452 p = self.client_remote.run(
453 args=["ls", "/sys/fs/fuse/connections"],
454 check_status=False
455 )
456 if p.exitstatus != 0:
457 log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus))
458 return []
459
460 ls_str = p.stdout.getvalue().strip()
461 if ls_str:
462 return [int(n) for n in ls_str.split("\n")]
463 else:
464 return []
465
466 # Before starting ceph-fuse process, note the contents of
467 # /sys/fs/fuse/connections
468 pre_mount_conns = list_connections()
469 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
470
471 prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
472 if os.getuid() != 0:
473 prefix += ["--client_die_on_failed_dentry_invalidate=false"]
474
475 if mount_path is not None:
476 prefix += ["--client_mountpoint={0}".format(mount_path)]
477
478 if mount_fs_name is not None:
479 prefix += ["--client_mds_namespace={0}".format(mount_fs_name)]
480
481 self.fuse_daemon = self.client_remote.run(args=
482 prefix + [
483 "-f",
484 "--name",
485 "client.{0}".format(self.client_id),
486 self.mountpoint
487 ], wait=False)
488
489 log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
490
491 # Wait for the connection reference to appear in /sys
492 waited = 0
493 post_mount_conns = list_connections()
494 while len(post_mount_conns) <= len(pre_mount_conns):
495 if self.fuse_daemon.finished:
496 # Did mount fail? Raise the CommandFailedError instead of
497 # hitting the "failed to populate /sys/" timeout
498 self.fuse_daemon.wait()
499 time.sleep(1)
500 waited += 1
501 if waited > 30:
502 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
503 waited
504 ))
505 post_mount_conns = list_connections()
506
507 log.info("Post-mount connections: {0}".format(post_mount_conns))
508
509 # Record our fuse connection number so that we can use it when
510 # forcing an unmount
511 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
512 if len(new_conns) == 0:
513 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
514 elif len(new_conns) > 1:
515 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
516 else:
517 self._fuse_conn = new_conns[0]
518
519 def _run_python(self, pyscript, py_version='python'):
520 """
521 Override this to remove the daemon-helper prefix that is used otherwise
522 to make the process killable.
523 """
524 return self.client_remote.run(args=[py_version, '-c', pyscript],
525 wait=False)
526
527 class LocalCephManager(CephManager):
528 def __init__(self):
529 # Deliberately skip parent init, only inheriting from it to get
530 # util methods like osd_dump that sit on top of raw_cluster_cmd
531 self.controller = LocalRemote()
532
533 # A minority of CephManager fns actually bother locking for when
534 # certain teuthology tests want to run tasks in parallel
535 self.lock = threading.RLock()
536
537 self.log = lambda x: log.info(x)
538
539 def find_remote(self, daemon_type, daemon_id):
540 """
541 daemon_type like 'mds', 'osd'
542 daemon_id like 'a', '0'
543 """
544 return LocalRemote()
545
546 def run_ceph_w(self):
547 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO())
548 return proc
549
550 def raw_cluster_cmd(self, *args):
551 """
552 args like ["osd", "dump"}
553 return stdout string
554 """
555 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args))
556 return proc.stdout.getvalue()
557
558 def raw_cluster_cmd_result(self, *args):
559 """
560 like raw_cluster_cmd but don't check status, just return rc
561 """
562 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False)
563 return proc.exitstatus
564
565 def admin_socket(self, daemon_type, daemon_id, command, check_status=True):
566 return self.controller.run(
567 args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status
568 )
569
570 # FIXME: copypasta
571 def get_mds_status(self, mds):
572 """
573 Run cluster commands for the mds in order to get mds information
574 """
575 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
576 j = json.loads(' '.join(out.splitlines()[1:]))
577 # collate; for dup ids, larger gid wins.
578 for info in j['info'].itervalues():
579 if info['name'] == mds:
580 return info
581 return None
582
583 # FIXME: copypasta
584 def get_mds_status_by_rank(self, rank):
585 """
586 Run cluster commands for the mds in order to get mds information
587 check rank.
588 """
589 j = self.get_mds_status_all()
590 # collate; for dup ids, larger gid wins.
591 for info in j['info'].itervalues():
592 if info['rank'] == rank:
593 return info
594 return None
595
596 def get_mds_status_all(self):
597 """
598 Run cluster command to extract all the mds status.
599 """
600 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
601 j = json.loads(' '.join(out.splitlines()[1:]))
602 return j
603
604
605 class LocalCephCluster(CephCluster):
606 def __init__(self, ctx):
607 # Deliberately skip calling parent constructor
608 self._ctx = ctx
609 self.mon_manager = LocalCephManager()
610 self._conf = defaultdict(dict)
611
612 @property
613 def admin_remote(self):
614 return LocalRemote()
615
616 def get_config(self, key, service_type=None):
617 if service_type is None:
618 service_type = 'mon'
619
620 # FIXME hardcoded vstart service IDs
621 service_id = {
622 'mon': 'a',
623 'mds': 'a',
624 'osd': '0'
625 }[service_type]
626
627 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
628
629 def _write_conf(self):
630 # In teuthology, we have the honour of writing the entire ceph.conf, but
631 # in vstart land it has mostly already been written and we need to carefully
632 # append to it.
633 conf_path = "./ceph.conf"
634 banner = "\n#LOCAL_TEST\n"
635 existing_str = open(conf_path).read()
636
637 if banner in existing_str:
638 existing_str = existing_str[0:existing_str.find(banner)]
639
640 existing_str += banner
641
642 for subsys, kvs in self._conf.items():
643 existing_str += "\n[{0}]\n".format(subsys)
644 for key, val in kvs.items():
645 # Comment out existing instance if it exists
646 log.info("Searching for existing instance {0}/{1}".format(
647 key, subsys
648 ))
649 existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
650 subsys
651 ), existing_str, re.MULTILINE)
652
653 if existing_section:
654 section_str = existing_str[existing_section.start():existing_section.end()]
655 existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
656 if existing_val:
657 start = existing_section.start() + existing_val.start(1)
658 log.info("Found string to replace at {0}".format(
659 start
660 ))
661 existing_str = existing_str[0:start] + "#" + existing_str[start:]
662
663 existing_str += "{0} = {1}\n".format(key, val)
664
665 open(conf_path, "w").write(existing_str)
666
667 def set_ceph_conf(self, subsys, key, value):
668 self._conf[subsys][key] = value
669 self._write_conf()
670
671 def clear_ceph_conf(self, subsys, key):
672 del self._conf[subsys][key]
673 self._write_conf()
674
675
676 class LocalMDSCluster(LocalCephCluster, MDSCluster):
677 def __init__(self, ctx):
678 super(LocalMDSCluster, self).__init__(ctx)
679
680 self.mds_ids = ctx.daemons.daemons['mds'].keys()
681 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
682
683 def clear_firewall(self):
684 # FIXME: unimplemented
685 pass
686
687 def newfs(self, name='cephfs', create=True):
688 return LocalFilesystem(self._ctx, name=name, create=create)
689
690
691 class LocalMgrCluster(LocalCephCluster, MgrCluster):
692 def __init__(self, ctx):
693 super(LocalMgrCluster, self).__init__(ctx)
694
695 self.mgr_ids = ctx.daemons.daemons['mgr'].keys()
696 self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
697
698
699 class LocalFilesystem(Filesystem, LocalMDSCluster):
700 def __init__(self, ctx, fscid=None, name='cephfs', create=False):
701 # Deliberately skip calling parent constructor
702 self._ctx = ctx
703
704 self.id = None
705 self.name = None
706 self.ec_profile = None
707 self.metadata_pool_name = None
708 self.metadata_overlay = False
709 self.data_pool_name = None
710 self.data_pools = None
711
712 # Hack: cheeky inspection of ceph.conf to see what MDSs exist
713 self.mds_ids = set()
714 for line in open("ceph.conf").readlines():
715 match = re.match("^\[mds\.(.+)\]$", line)
716 if match:
717 self.mds_ids.add(match.group(1))
718
719 if not self.mds_ids:
720 raise RuntimeError("No MDSs found in ceph.conf!")
721
722 self.mds_ids = list(self.mds_ids)
723
724 log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
725
726 self.mon_manager = LocalCephManager()
727
728 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
729
730 self.client_remote = LocalRemote()
731
732 self._conf = defaultdict(dict)
733
734 if name is not None:
735 if fscid is not None:
736 raise RuntimeError("cannot specify fscid when creating fs")
737 if create and not self.legacy_configured():
738 self.create()
739 else:
740 if fscid is not None:
741 self.id = fscid
742 self.getinfo(refresh=True)
743
744 # Stash a reference to the first created filesystem on ctx, so
745 # that if someone drops to the interactive shell they can easily
746 # poke our methods.
747 if not hasattr(self._ctx, "filesystem"):
748 self._ctx.filesystem = self
749
750 @property
751 def _prefix(self):
752 return BIN_PREFIX
753
754 def set_clients_block(self, blocked, mds_id=None):
755 raise NotImplementedError()
756
757 def get_pgs_per_fs_pool(self):
758 # FIXME: assuming there are 3 OSDs
759 return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
760
761
762 class InteractiveFailureResult(unittest.TextTestResult):
763 """
764 Specialization that implements interactive-on-error style
765 behavior.
766 """
767 def addFailure(self, test, err):
768 super(InteractiveFailureResult, self).addFailure(test, err)
769 log.error(self._exc_info_to_string(err, test))
770 log.error("Failure in test '{0}', going interactive".format(
771 self.getDescription(test)
772 ))
773 interactive.task(ctx=None, config=None)
774
775 def addError(self, test, err):
776 super(InteractiveFailureResult, self).addError(test, err)
777 log.error(self._exc_info_to_string(err, test))
778 log.error("Error in test '{0}', going interactive".format(
779 self.getDescription(test)
780 ))
781 interactive.task(ctx=None, config=None)
782
783
784 def enumerate_methods(s):
785 log.info("e: {0}".format(s))
786 for t in s._tests:
787 if isinstance(t, suite.BaseTestSuite):
788 for sub in enumerate_methods(t):
789 yield sub
790 else:
791 yield s, t
792
793
794 def load_tests(modules, loader):
795 if modules:
796 log.info("Executing modules: {0}".format(modules))
797 module_suites = []
798 for mod_name in modules:
799 # Test names like cephfs.test_auto_repair
800 module_suites.append(loader.loadTestsFromName(mod_name))
801 log.info("Loaded: {0}".format(list(module_suites)))
802 return suite.TestSuite(module_suites)
803 else:
804 log.info("Executing all cephfs tests")
805 return loader.discover(
806 os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs")
807 )
808
809
810 def scan_tests(modules):
811 overall_suite = load_tests(modules, loader.TestLoader())
812
813 max_required_mds = 0
814 max_required_clients = 0
815 max_required_mgr = 0
816
817 for suite, case in enumerate_methods(overall_suite):
818 max_required_mds = max(max_required_mds,
819 getattr(case, "MDSS_REQUIRED", 0))
820 max_required_clients = max(max_required_clients,
821 getattr(case, "CLIENTS_REQUIRED", 0))
822 max_required_mgr = max(max_required_mgr,
823 getattr(case, "MGRS_REQUIRED", 0))
824
825 return max_required_mds, max_required_clients, max_required_mgr
826
827
828 class LocalCluster(object):
829 def __init__(self, rolename="placeholder"):
830 self.remotes = {
831 LocalRemote(): [rolename]
832 }
833
834 def only(self, requested):
835 return self.__class__(rolename=requested)
836
837
838 class LocalContext(object):
839 def __init__(self):
840 self.config = {}
841 self.teuthology_config = teuth_config
842 self.cluster = LocalCluster()
843 self.daemons = DaemonGroup()
844
845 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
846 # tests that want to look these up via ctx can do so.
847 # Inspect ceph.conf to see what roles exist
848 for conf_line in open("ceph.conf").readlines():
849 for svc_type in ["mon", "osd", "mds", "mgr"]:
850 if svc_type not in self.daemons.daemons:
851 self.daemons.daemons[svc_type] = {}
852 match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
853 if match:
854 svc_id = match.group(1)
855 self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id)
856
857 def __del__(self):
858 shutil.rmtree(self.teuthology_config['test_path'])
859
860
861 def exec_test():
862 # Parse arguments
863 interactive_on_error = False
864 create_cluster = False
865
866 args = sys.argv[1:]
867 flags = [a for a in args if a.startswith("-")]
868 modules = [a for a in args if not a.startswith("-")]
869 for f in flags:
870 if f == "--interactive":
871 interactive_on_error = True
872 elif f == "--create":
873 create_cluster = True
874 else:
875 log.error("Unknown option '{0}'".format(f))
876 sys.exit(-1)
877
878 # Help developers by stopping up-front if their tree isn't built enough for all the
879 # tools that the tests might want to use (add more here if needed)
880 require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
881 "cephfs-table-tool", "ceph-fuse", "rados"]
882 missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
883 if missing_binaries:
884 log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
885 sys.exit(-1)
886
887 max_required_mds, max_required_clients, max_required_mgr = scan_tests(modules)
888
889 remote = LocalRemote()
890
891 # Tolerate no MDSs or clients running at start
892 ps_txt = remote.run(
893 args=["ps", "-u"+str(os.getuid())]
894 ).stdout.getvalue().strip()
895 lines = ps_txt.split("\n")[1:]
896 for line in lines:
897 if 'ceph-fuse' in line or 'ceph-mds' in line:
898 pid = int(line.split()[0])
899 log.warn("Killing stray process {0}".format(line))
900 os.kill(pid, signal.SIGKILL)
901
902 # Fire up the Ceph cluster if the user requested it
903 if create_cluster:
904 log.info("Creating cluster with {0} MDS daemons".format(
905 max_required_mds))
906 remote.run([os.path.join(SRC_PREFIX, "stop.sh")], check_status=False)
907 remote.run(["rm", "-rf", "./out"])
908 remote.run(["rm", "-rf", "./dev"])
909 vstart_env = os.environ.copy()
910 vstart_env["FS"] = "0"
911 vstart_env["MDS"] = max_required_mds.__str__()
912 vstart_env["OSD"] = "1"
913 vstart_env["MGR"] = max(max_required_mgr, 1).__str__()
914
915 remote.run([os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d", "--nolockdep"],
916 env=vstart_env)
917
918 # Wait for OSD to come up so that subsequent injectargs etc will
919 # definitely succeed
920 LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30)
921
922 # List of client mounts, sufficient to run the selected tests
923 clients = [i.__str__() for i in range(0, max_required_clients)]
924
925 test_dir = tempfile.mkdtemp()
926 teuth_config['test_path'] = test_dir
927
928 # Construct Mount classes
929 mounts = []
930 for client_id in clients:
931 # Populate client keyring (it sucks to use client.admin for test clients
932 # because it's awkward to find the logs later)
933 client_name = "client.{0}".format(client_id)
934
935 if client_name not in open("./keyring").read():
936 p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
937 "osd", "allow rw",
938 "mds", "allow",
939 "mon", "allow r"])
940
941 open("./keyring", "a").write(p.stdout.getvalue())
942
943 mount = LocalFuseMount(test_dir, client_id)
944 mounts.append(mount)
945 if mount.is_mounted():
946 log.warn("unmounting {0}".format(mount.mountpoint))
947 mount.umount_wait()
948 else:
949 if os.path.exists(mount.mountpoint):
950 os.rmdir(mount.mountpoint)
951
952 ctx = LocalContext()
953 ceph_cluster = LocalCephCluster(ctx)
954 mds_cluster = LocalMDSCluster(ctx)
955 mgr_cluster = LocalMgrCluster(ctx)
956
957 from tasks.cephfs_test_runner import DecoratingLoader
958
959 class LogStream(object):
960 def __init__(self):
961 self.buffer = ""
962
963 def write(self, data):
964 self.buffer += data
965 if "\n" in self.buffer:
966 lines = self.buffer.split("\n")
967 for line in lines[:-1]:
968 pass
969 # sys.stderr.write(line + "\n")
970 log.info(line)
971 self.buffer = lines[-1]
972
973 def flush(self):
974 pass
975
976 decorating_loader = DecoratingLoader({
977 "ctx": ctx,
978 "mounts": mounts,
979 "ceph_cluster": ceph_cluster,
980 "mds_cluster": mds_cluster,
981 "mgr_cluster": mgr_cluster,
982 })
983
984 # For the benefit of polling tests like test_full -- in teuthology land we set this
985 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
986 remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"])
987 ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval_max", "5")
988
989 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
990 # from normal IO latency. Increase it for running teests.
991 ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10")
992
993 # Make sure the filesystem created in tests has uid/gid that will let us talk to
994 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
995 # so that cephfs-data-scan will pick it up too.
996 ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
997 ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
998
999 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
1000 def _get_package_version(remote, pkg_name):
1001 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
1002 return "2.9"
1003
1004 import teuthology.packaging
1005 teuthology.packaging.get_package_version = _get_package_version
1006
1007 overall_suite = load_tests(modules, decorating_loader)
1008
1009 # Filter out tests that don't lend themselves to interactive running,
1010 victims = []
1011 for case, method in enumerate_methods(overall_suite):
1012 fn = getattr(method, method._testMethodName)
1013
1014 drop_test = False
1015
1016 if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
1017 drop_test = True
1018 log.warn("Dropping test because long running: ".format(method.id()))
1019
1020 if getattr(fn, "needs_trimming", False) is True:
1021 drop_test = (os.getuid() != 0)
1022 log.warn("Dropping test because client trim unavailable: ".format(method.id()))
1023
1024 if drop_test:
1025 # Don't drop the test if it was explicitly requested in arguments
1026 is_named = False
1027 for named in modules:
1028 if named.endswith(method.id()):
1029 is_named = True
1030 break
1031
1032 if not is_named:
1033 victims.append((case, method))
1034
1035 log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
1036 for s, method in victims:
1037 s._tests.remove(method)
1038
1039 if interactive_on_error:
1040 result_class = InteractiveFailureResult
1041 else:
1042 result_class = unittest.TextTestResult
1043 fail_on_skip = False
1044
1045 class LoggingResult(result_class):
1046 def startTest(self, test):
1047 log.info("Starting test: {0}".format(self.getDescription(test)))
1048 test.started_at = datetime.datetime.utcnow()
1049 return super(LoggingResult, self).startTest(test)
1050
1051 def stopTest(self, test):
1052 log.info("Stopped test: {0} in {1}s".format(
1053 self.getDescription(test),
1054 (datetime.datetime.utcnow() - test.started_at).total_seconds()
1055 ))
1056
1057 def addSkip(self, test, reason):
1058 if fail_on_skip:
1059 # Don't just call addFailure because that requires a traceback
1060 self.failures.append((test, reason))
1061 else:
1062 super(LoggingResult, self).addSkip(test, reason)
1063
1064 # Execute!
1065 result = unittest.TextTestRunner(
1066 stream=LogStream(),
1067 resultclass=LoggingResult,
1068 verbosity=2,
1069 failfast=True).run(overall_suite)
1070
1071 if not result.wasSuccessful():
1072 result.printErrors() # duplicate output at end for convenience
1073
1074 bad_tests = []
1075 for test, error in result.errors:
1076 bad_tests.append(str(test))
1077 for test, failure in result.failures:
1078 bad_tests.append(str(test))
1079
1080 sys.exit(-1)
1081 else:
1082 sys.exit(0)
1083
1084
1085 if __name__ == "__main__":
1086 exec_test()