]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/vstart_runner.py
import ceph 14.2.5
[ceph.git] / ceph / qa / tasks / vstart_runner.py
1 """
2 vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart
3 ceph instance instead of a packaged/installed cluster. Use this to turn around test cases
4 quickly during development.
5
6 Simple usage (assuming teuthology and ceph checked out in ~/git):
7
8 # Activate the teuthology virtualenv
9 source ~/git/teuthology/virtualenv/bin/activate
10 # Go into your ceph build directory
11 cd ~/git/ceph/build
12 # Invoke a test using this script
13 python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan
14
15 Alternative usage:
16
17 # Alternatively, if you use different paths, specify them as follows:
18 LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py
19
20 # If you wish to drop to a python shell on failures, use --interactive:
21 python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
22
23 # If you wish to run a named test case, pass it as an argument:
24 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
25
26 # Also, you can create the cluster once and then run named test cases against it:
27 python ~/git/ceph/qa/tasks/vstart_runner.py --create-cluster-only
28 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_health
29 python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_rgw
30
31 """
32
33 from StringIO import StringIO
34 from collections import defaultdict
35 import getpass
36 import signal
37 import tempfile
38 import threading
39 import datetime
40 import shutil
41 import re
42 import os
43 import time
44 import json
45 import sys
46 import errno
47 from unittest import suite, loader
48 import unittest
49 import platform
50 from teuthology.orchestra.run import Raw, quote
51 from teuthology.orchestra.daemon import DaemonGroup
52 from teuthology.config import config as teuth_config
53
54 import logging
55
56 log = logging.getLogger(__name__)
57
58 handler = logging.FileHandler("./vstart_runner.log")
59 formatter = logging.Formatter(
60 fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
61 datefmt='%Y-%m-%dT%H:%M:%S')
62 handler.setFormatter(formatter)
63 log.addHandler(handler)
64 log.setLevel(logging.INFO)
65
66
67 def respawn_in_path(lib_path, python_paths):
68 execv_cmd = ['python']
69 if platform.system() == "Darwin":
70 lib_path_var = "DYLD_LIBRARY_PATH"
71 else:
72 lib_path_var = "LD_LIBRARY_PATH"
73
74 py_binary = os.environ.get("PYTHON", "python")
75
76 if lib_path_var in os.environ:
77 if lib_path not in os.environ[lib_path_var]:
78 os.environ[lib_path_var] += ':' + lib_path
79 os.execvp(py_binary, execv_cmd + sys.argv)
80 else:
81 os.environ[lib_path_var] = lib_path
82 os.execvp(py_binary, execv_cmd + sys.argv)
83
84 for p in python_paths:
85 sys.path.insert(0, p)
86
87
88 # Let's use some sensible defaults
89 if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
90
91 # A list of candidate paths for each package we need
92 guesses = [
93 ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
94 ["lib/cython_modules/lib.2"],
95 ["../src/pybind"],
96 ]
97
98 python_paths = []
99
100 # Up one level so that "tasks.foo.bar" imports work
101 python_paths.append(os.path.abspath(
102 os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
103 ))
104
105 for package_guesses in guesses:
106 for g in package_guesses:
107 g_exp = os.path.abspath(os.path.expanduser(g))
108 if os.path.exists(g_exp):
109 python_paths.append(g_exp)
110
111 ld_path = os.path.join(os.getcwd(), "lib/")
112 print("Using guessed paths {0} {1}".format(ld_path, python_paths))
113 respawn_in_path(ld_path, python_paths)
114
115
116 try:
117 from teuthology.exceptions import CommandFailedError
118 from tasks.ceph_manager import CephManager
119 from tasks.cephfs.fuse_mount import FuseMount
120 from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
121 from mgr.mgr_test_case import MgrCluster
122 from teuthology.contextutil import MaxWhileTries
123 from teuthology.task import interactive
124 except ImportError:
125 sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
126 "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n")
127 raise
128
129 # Must import after teuthology because of gevent monkey patching
130 import subprocess
131
132 if os.path.exists("./CMakeCache.txt"):
133 # Running in build dir of a cmake build
134 BIN_PREFIX = "./bin/"
135 SRC_PREFIX = "../src"
136 else:
137 # Running in src/ of an autotools build
138 BIN_PREFIX = "./"
139 SRC_PREFIX = "./"
140
141
142 class LocalRemoteProcess(object):
143 def __init__(self, args, subproc, check_status, stdout, stderr):
144 self.args = args
145 self.subproc = subproc
146 if stdout is None:
147 self.stdout = StringIO()
148 else:
149 self.stdout = stdout
150
151 if stderr is None:
152 self.stderr = StringIO()
153 else:
154 self.stderr = stderr
155
156 self.check_status = check_status
157 self.exitstatus = self.returncode = None
158
159 def wait(self):
160 if self.finished:
161 # Avoid calling communicate() on a dead process because it'll
162 # give you stick about std* already being closed
163 if self.check_status and self.exitstatus != 0:
164 raise CommandFailedError(self.args, self.exitstatus)
165 else:
166 return
167
168 out, err = self.subproc.communicate()
169 self.stdout.write(out)
170 self.stderr.write(err)
171
172 self.exitstatus = self.returncode = self.subproc.returncode
173
174 if self.exitstatus != 0:
175 sys.stderr.write(out)
176 sys.stderr.write(err)
177
178 if self.check_status and self.exitstatus != 0:
179 raise CommandFailedError(self.args, self.exitstatus)
180
181 @property
182 def finished(self):
183 if self.exitstatus is not None:
184 return True
185
186 if self.subproc.poll() is not None:
187 out, err = self.subproc.communicate()
188 self.stdout.write(out)
189 self.stderr.write(err)
190 self.exitstatus = self.returncode = self.subproc.returncode
191 return True
192 else:
193 return False
194
195 def kill(self):
196 log.info("kill ")
197 if self.subproc.pid and not self.finished:
198 log.info("kill: killing pid {0} ({1})".format(
199 self.subproc.pid, self.args))
200 safe_kill(self.subproc.pid)
201 else:
202 log.info("kill: already terminated ({0})".format(self.args))
203
204 @property
205 def stdin(self):
206 class FakeStdIn(object):
207 def __init__(self, mount_daemon):
208 self.mount_daemon = mount_daemon
209
210 def close(self):
211 self.mount_daemon.kill()
212
213 return FakeStdIn(self)
214
215
216 class LocalRemote(object):
217 """
218 Amusingly named class to present the teuthology RemoteProcess interface when we are really
219 running things locally for vstart
220
221 Run this inside your src/ dir!
222 """
223
224 def __init__(self):
225 self.name = "local"
226 self.hostname = "localhost"
227 self.user = getpass.getuser()
228
229 def get_file(self, path, sudo, dest_dir):
230 tmpfile = tempfile.NamedTemporaryFile(delete=False).name
231 shutil.copy(path, tmpfile)
232 return tmpfile
233
234 def put_file(self, src, dst, sudo=False):
235 shutil.copy(src, dst)
236
237 def run(self, args, check_status=True, wait=True,
238 stdout=None, stderr=None, cwd=None, stdin=None,
239 logger=None, label=None, env=None, timeout=None):
240
241 # We don't need no stinkin' sudo
242 args = [a for a in args if a != "sudo"]
243
244 # We have to use shell=True if any run.Raw was present, e.g. &&
245 shell = any([a for a in args if isinstance(a, Raw)])
246
247 # Filter out helper tools that don't exist in a vstart environment
248 args = [a for a in args if a not in {
249 'adjust-ulimits', 'ceph-coverage', 'timeout'}]
250
251 # Adjust binary path prefix if given a bare program name
252 if "/" not in args[0]:
253 # If they asked for a bare binary name, and it exists
254 # in our built tree, use the one there.
255 local_bin = os.path.join(BIN_PREFIX, args[0])
256 if os.path.exists(local_bin):
257 args = [local_bin] + args[1:]
258 else:
259 log.debug("'{0}' is not a binary in the Ceph build dir".format(
260 args[0]
261 ))
262
263 log.info("Running {0}".format(args))
264
265 if shell:
266 subproc = subprocess.Popen(quote(args),
267 stdout=subprocess.PIPE,
268 stderr=subprocess.PIPE,
269 stdin=subprocess.PIPE,
270 cwd=cwd,
271 shell=True)
272 else:
273 # Sanity check that we've got a list of strings
274 for arg in args:
275 if not isinstance(arg, basestring):
276 raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
277 arg, arg.__class__
278 ))
279
280 subproc = subprocess.Popen(args,
281 stdout=subprocess.PIPE,
282 stderr=subprocess.PIPE,
283 stdin=subprocess.PIPE,
284 cwd=cwd,
285 env=env)
286
287 if stdin:
288 if not isinstance(stdin, basestring):
289 raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
290
291 # Hack: writing to stdin is not deadlock-safe, but it "always" works
292 # as long as the input buffer is "small"
293 subproc.stdin.write(stdin)
294
295 proc = LocalRemoteProcess(
296 args, subproc, check_status,
297 stdout, stderr
298 )
299
300 if wait:
301 proc.wait()
302
303 return proc
304
305
306 class LocalDaemon(object):
307 def __init__(self, daemon_type, daemon_id):
308 self.daemon_type = daemon_type
309 self.daemon_id = daemon_id
310 self.controller = LocalRemote()
311 self.proc = None
312
313 @property
314 def remote(self):
315 return LocalRemote()
316
317 def running(self):
318 return self._get_pid() is not None
319
320 def check_status(self):
321 if self.proc:
322 return self.proc.poll()
323
324 def _get_pid(self):
325 """
326 Return PID as an integer or None if not found
327 """
328 ps_txt = self.controller.run(
329 args=["ps", "ww", "-u"+str(os.getuid())]
330 ).stdout.getvalue().strip()
331 lines = ps_txt.split("\n")[1:]
332
333 for line in lines:
334 if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
335 log.info("Found ps line for daemon: {0}".format(line))
336 return int(line.split()[0])
337 log.info("No match for {0} {1}: {2}".format(
338 self.daemon_type, self.daemon_id, ps_txt
339 ))
340 return None
341
342 def wait(self, timeout):
343 waited = 0
344 while self._get_pid() is not None:
345 if waited > timeout:
346 raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id))
347 time.sleep(1)
348 waited += 1
349
350 def stop(self, timeout=300):
351 if not self.running():
352 log.error('tried to stop a non-running daemon')
353 return
354
355 pid = self._get_pid()
356 log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
357 os.kill(pid, signal.SIGTERM)
358
359 waited = 0
360 while pid is not None:
361 new_pid = self._get_pid()
362 if new_pid is not None and new_pid != pid:
363 log.info("Killing new PID {0}".format(new_pid))
364 pid = new_pid
365 os.kill(pid, signal.SIGTERM)
366
367 if new_pid is None:
368 break
369 else:
370 if waited > timeout:
371 raise MaxWhileTries(
372 "Timed out waiting for daemon {0}.{1}".format(
373 self.daemon_type, self.daemon_id))
374 time.sleep(1)
375 waited += 1
376
377 self.wait(timeout=timeout)
378
379 def restart(self):
380 if self._get_pid() is not None:
381 self.stop()
382
383 self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id])
384
385 def signal(self, sig, silent=False):
386 if not self.running():
387 raise RuntimeError("Can't send signal to non-running daemon")
388
389 os.kill(self._get_pid(), sig)
390 if not silent:
391 log.info("Sent signal {0} to {1}.{2}".format(sig, self.daemon_type, self.daemon_id))
392
393
394 def safe_kill(pid):
395 """
396 os.kill annoyingly raises exception if process already dead. Ignore it.
397 """
398 try:
399 return os.kill(pid, signal.SIGKILL)
400 except OSError as e:
401 if e.errno == errno.ESRCH:
402 # Raced with process termination
403 pass
404 else:
405 raise
406
407
408 class LocalFuseMount(FuseMount):
409 def __init__(self, ctx, test_dir, client_id):
410 super(LocalFuseMount, self).__init__(ctx, None, test_dir, client_id, LocalRemote())
411
412 @property
413 def config_path(self):
414 return "./ceph.conf"
415
416 def get_keyring_path(self):
417 # This is going to end up in a config file, so use an absolute path
418 # to avoid assumptions about daemons' pwd
419 return os.path.abspath("./client.{0}.keyring".format(self.client_id))
420
421 def run_shell(self, args, wait=True):
422 # FIXME maybe should add a pwd arg to teuthology.orchestra so that
423 # the "cd foo && bar" shenanigans isn't needed to begin with and
424 # then we wouldn't have to special case this
425 return self.client_remote.run(
426 args, wait=wait, cwd=self.mountpoint
427 )
428
429 def setupfs(self, name=None):
430 if name is None and self.fs is not None:
431 # Previous mount existed, reuse the old name
432 name = self.fs.name
433 self.fs = LocalFilesystem(self.ctx, name=name)
434 log.info('Wait for MDS to reach steady state...')
435 self.fs.wait_for_daemons()
436 log.info('Ready to start {}...'.format(type(self).__name__))
437
438 @property
439 def _prefix(self):
440 return BIN_PREFIX
441
442 def _asok_path(self):
443 # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
444 # run foreground. When running it daemonized however, the asok is named after
445 # the PID of the launching process, not the long running ceph-fuse process. Therefore
446 # we need to give an exact path here as the logic for checking /proc/ for which
447 # asok is alive does not work.
448
449 # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
450 # in a tmpdir. All of the paths are the same, so no need to select
451 # based off of the service type.
452 d = "./out"
453 with open(self.config_path) as f:
454 for line in f:
455 asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line)
456 if asok_conf:
457 d = asok_conf.groups(1)[0]
458 break
459 path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid)
460 log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
461 return path
462
463 def umount(self):
464 if self.is_mounted():
465 super(LocalFuseMount, self).umount()
466
467 def mount(self, mount_path=None, mount_fs_name=None):
468 self.setupfs(name=mount_fs_name)
469
470 self.client_remote.run(
471 args=[
472 'mkdir',
473 '--',
474 self.mountpoint,
475 ],
476 )
477
478 def list_connections():
479 self.client_remote.run(
480 args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
481 check_status=False
482 )
483 p = self.client_remote.run(
484 args=["ls", "/sys/fs/fuse/connections"],
485 check_status=False
486 )
487 if p.exitstatus != 0:
488 log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus))
489 return []
490
491 ls_str = p.stdout.getvalue().strip()
492 if ls_str:
493 return [int(n) for n in ls_str.split("\n")]
494 else:
495 return []
496
497 # Before starting ceph-fuse process, note the contents of
498 # /sys/fs/fuse/connections
499 pre_mount_conns = list_connections()
500 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
501
502 prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
503 if os.getuid() != 0:
504 prefix += ["--client_die_on_failed_dentry_invalidate=false"]
505
506 if mount_path is not None:
507 prefix += ["--client_mountpoint={0}".format(mount_path)]
508
509 if mount_fs_name is not None:
510 prefix += ["--client_mds_namespace={0}".format(mount_fs_name)]
511
512 self.fuse_daemon = self.client_remote.run(args=
513 prefix + [
514 "-f",
515 "--name",
516 "client.{0}".format(self.client_id),
517 self.mountpoint
518 ], wait=False)
519
520 log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
521
522 # Wait for the connection reference to appear in /sys
523 waited = 0
524 post_mount_conns = list_connections()
525 while len(post_mount_conns) <= len(pre_mount_conns):
526 if self.fuse_daemon.finished:
527 # Did mount fail? Raise the CommandFailedError instead of
528 # hitting the "failed to populate /sys/" timeout
529 self.fuse_daemon.wait()
530 time.sleep(1)
531 waited += 1
532 if waited > 30:
533 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
534 waited
535 ))
536 post_mount_conns = list_connections()
537
538 log.info("Post-mount connections: {0}".format(post_mount_conns))
539
540 # Record our fuse connection number so that we can use it when
541 # forcing an unmount
542 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
543 if len(new_conns) == 0:
544 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
545 elif len(new_conns) > 1:
546 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
547 else:
548 self._fuse_conn = new_conns[0]
549
550 self.gather_mount_info()
551
552 def _run_python(self, pyscript, py_version='python'):
553 """
554 Override this to remove the daemon-helper prefix that is used otherwise
555 to make the process killable.
556 """
557 return self.client_remote.run(args=[py_version, '-c', pyscript],
558 wait=False)
559
560 class LocalCephManager(CephManager):
561 def __init__(self):
562 # Deliberately skip parent init, only inheriting from it to get
563 # util methods like osd_dump that sit on top of raw_cluster_cmd
564 self.controller = LocalRemote()
565
566 # A minority of CephManager fns actually bother locking for when
567 # certain teuthology tests want to run tasks in parallel
568 self.lock = threading.RLock()
569
570 self.log = lambda x: log.info(x)
571
572 # Don't bother constructing a map of pools: it should be empty
573 # at test cluster start, and in any case it would be out of date
574 # in no time. The attribute needs to exist for some of the CephManager
575 # methods to work though.
576 self.pools = {}
577
578 def find_remote(self, daemon_type, daemon_id):
579 """
580 daemon_type like 'mds', 'osd'
581 daemon_id like 'a', '0'
582 """
583 return LocalRemote()
584
585 def run_ceph_w(self, watch_channel=None):
586 """
587 :param watch_channel: Specifies the channel to be watched.
588 This can be 'cluster', 'audit', ...
589 :type watch_channel: str
590 """
591 args = [os.path.join(BIN_PREFIX, "ceph"), "-w"]
592 if watch_channel is not None:
593 args.append("--watch-channel")
594 args.append(watch_channel)
595 proc = self.controller.run(args, wait=False, stdout=StringIO())
596 return proc
597
598 def raw_cluster_cmd(self, *args, **kwargs):
599 """
600 args like ["osd", "dump"}
601 return stdout string
602 """
603 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), **kwargs)
604 return proc.stdout.getvalue()
605
606 def raw_cluster_cmd_result(self, *args, **kwargs):
607 """
608 like raw_cluster_cmd but don't check status, just return rc
609 """
610 kwargs['check_status'] = False
611 proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), **kwargs)
612 return proc.exitstatus
613
614 def admin_socket(self, daemon_type, daemon_id, command, check_status=True, timeout=None):
615 return self.controller.run(
616 args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command,
617 check_status=check_status,
618 timeout=timeout
619 )
620
621
622 class LocalCephCluster(CephCluster):
623 def __init__(self, ctx):
624 # Deliberately skip calling parent constructor
625 self._ctx = ctx
626 self.mon_manager = LocalCephManager()
627 self._conf = defaultdict(dict)
628
629 @property
630 def admin_remote(self):
631 return LocalRemote()
632
633 def get_config(self, key, service_type=None):
634 if service_type is None:
635 service_type = 'mon'
636
637 # FIXME hardcoded vstart service IDs
638 service_id = {
639 'mon': 'a',
640 'mds': 'a',
641 'osd': '0'
642 }[service_type]
643
644 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
645
646 def _write_conf(self):
647 # In teuthology, we have the honour of writing the entire ceph.conf, but
648 # in vstart land it has mostly already been written and we need to carefully
649 # append to it.
650 conf_path = "./ceph.conf"
651 banner = "\n#LOCAL_TEST\n"
652 existing_str = open(conf_path).read()
653
654 if banner in existing_str:
655 existing_str = existing_str[0:existing_str.find(banner)]
656
657 existing_str += banner
658
659 for subsys, kvs in self._conf.items():
660 existing_str += "\n[{0}]\n".format(subsys)
661 for key, val in kvs.items():
662 # Comment out existing instance if it exists
663 log.info("Searching for existing instance {0}/{1}".format(
664 key, subsys
665 ))
666 existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format(
667 subsys
668 ), existing_str, re.MULTILINE)
669
670 if existing_section:
671 section_str = existing_str[existing_section.start():existing_section.end()]
672 existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE)
673 if existing_val:
674 start = existing_section.start() + existing_val.start(1)
675 log.info("Found string to replace at {0}".format(
676 start
677 ))
678 existing_str = existing_str[0:start] + "#" + existing_str[start:]
679
680 existing_str += "{0} = {1}\n".format(key, val)
681
682 open(conf_path, "w").write(existing_str)
683
684 def set_ceph_conf(self, subsys, key, value):
685 self._conf[subsys][key] = value
686 self._write_conf()
687
688 def clear_ceph_conf(self, subsys, key):
689 del self._conf[subsys][key]
690 self._write_conf()
691
692
693 class LocalMDSCluster(LocalCephCluster, MDSCluster):
694 def __init__(self, ctx):
695 super(LocalMDSCluster, self).__init__(ctx)
696
697 self.mds_ids = ctx.daemons.daemons['ceph.mds'].keys()
698 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
699
700 def clear_firewall(self):
701 # FIXME: unimplemented
702 pass
703
704 def newfs(self, name='cephfs', create=True):
705 return LocalFilesystem(self._ctx, name=name, create=create)
706
707
708 class LocalMgrCluster(LocalCephCluster, MgrCluster):
709 def __init__(self, ctx):
710 super(LocalMgrCluster, self).__init__(ctx)
711
712 self.mgr_ids = ctx.daemons.daemons['ceph.mgr'].keys()
713 self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
714
715
716 class LocalFilesystem(Filesystem, LocalMDSCluster):
717 def __init__(self, ctx, fscid=None, name='cephfs', create=False):
718 # Deliberately skip calling parent constructor
719 self._ctx = ctx
720
721 self.id = None
722 self.name = None
723 self.ec_profile = None
724 self.metadata_pool_name = None
725 self.metadata_overlay = False
726 self.data_pool_name = None
727 self.data_pools = None
728
729 # Hack: cheeky inspection of ceph.conf to see what MDSs exist
730 self.mds_ids = set()
731 for line in open("ceph.conf").readlines():
732 match = re.match("^\[mds\.(.+)\]$", line)
733 if match:
734 self.mds_ids.add(match.group(1))
735
736 if not self.mds_ids:
737 raise RuntimeError("No MDSs found in ceph.conf!")
738
739 self.mds_ids = list(self.mds_ids)
740
741 log.info("Discovered MDS IDs: {0}".format(self.mds_ids))
742
743 self.mon_manager = LocalCephManager()
744
745 self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
746
747 self.client_remote = LocalRemote()
748
749 self._conf = defaultdict(dict)
750
751 if name is not None:
752 if fscid is not None:
753 raise RuntimeError("cannot specify fscid when creating fs")
754 if create and not self.legacy_configured():
755 self.create()
756 else:
757 if fscid is not None:
758 self.id = fscid
759 self.getinfo(refresh=True)
760
761 # Stash a reference to the first created filesystem on ctx, so
762 # that if someone drops to the interactive shell they can easily
763 # poke our methods.
764 if not hasattr(self._ctx, "filesystem"):
765 self._ctx.filesystem = self
766
767 @property
768 def _prefix(self):
769 return BIN_PREFIX
770
771 def set_clients_block(self, blocked, mds_id=None):
772 raise NotImplementedError()
773
774 def get_pgs_per_fs_pool(self):
775 # FIXME: assuming there are 3 OSDs
776 return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
777
778
779 class InteractiveFailureResult(unittest.TextTestResult):
780 """
781 Specialization that implements interactive-on-error style
782 behavior.
783 """
784 def addFailure(self, test, err):
785 super(InteractiveFailureResult, self).addFailure(test, err)
786 log.error(self._exc_info_to_string(err, test))
787 log.error("Failure in test '{0}', going interactive".format(
788 self.getDescription(test)
789 ))
790 interactive.task(ctx=None, config=None)
791
792 def addError(self, test, err):
793 super(InteractiveFailureResult, self).addError(test, err)
794 log.error(self._exc_info_to_string(err, test))
795 log.error("Error in test '{0}', going interactive".format(
796 self.getDescription(test)
797 ))
798 interactive.task(ctx=None, config=None)
799
800
801 def enumerate_methods(s):
802 log.info("e: {0}".format(s))
803 for t in s._tests:
804 if isinstance(t, suite.BaseTestSuite):
805 for sub in enumerate_methods(t):
806 yield sub
807 else:
808 yield s, t
809
810
811 def load_tests(modules, loader):
812 if modules:
813 log.info("Executing modules: {0}".format(modules))
814 module_suites = []
815 for mod_name in modules:
816 # Test names like cephfs.test_auto_repair
817 module_suites.append(loader.loadTestsFromName(mod_name))
818 log.info("Loaded: {0}".format(list(module_suites)))
819 return suite.TestSuite(module_suites)
820 else:
821 log.info("Executing all cephfs tests")
822 return loader.discover(
823 os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs")
824 )
825
826
827 def scan_tests(modules):
828 overall_suite = load_tests(modules, loader.TestLoader())
829
830 max_required_mds = 0
831 max_required_clients = 0
832 max_required_mgr = 0
833 require_memstore = False
834
835 for suite, case in enumerate_methods(overall_suite):
836 max_required_mds = max(max_required_mds,
837 getattr(case, "MDSS_REQUIRED", 0))
838 max_required_clients = max(max_required_clients,
839 getattr(case, "CLIENTS_REQUIRED", 0))
840 max_required_mgr = max(max_required_mgr,
841 getattr(case, "MGRS_REQUIRED", 0))
842 require_memstore = getattr(case, "REQUIRE_MEMSTORE", False) \
843 or require_memstore
844
845 return max_required_mds, max_required_clients, \
846 max_required_mgr, require_memstore
847
848
849 class LocalCluster(object):
850 def __init__(self, rolename="placeholder"):
851 self.remotes = {
852 LocalRemote(): [rolename]
853 }
854
855 def only(self, requested):
856 return self.__class__(rolename=requested)
857
858
859 class LocalContext(object):
860 def __init__(self):
861 self.config = {}
862 self.teuthology_config = teuth_config
863 self.cluster = LocalCluster()
864 self.daemons = DaemonGroup()
865
866 # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
867 # tests that want to look these up via ctx can do so.
868 # Inspect ceph.conf to see what roles exist
869 for conf_line in open("ceph.conf").readlines():
870 for svc_type in ["mon", "osd", "mds", "mgr"]:
871 prefixed_type = "ceph." + svc_type
872 if prefixed_type not in self.daemons.daemons:
873 self.daemons.daemons[prefixed_type] = {}
874 match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
875 if match:
876 svc_id = match.group(1)
877 self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
878
879 def __del__(self):
880 shutil.rmtree(self.teuthology_config['test_path'])
881
882 def exec_test():
883 # Parse arguments
884 interactive_on_error = False
885 create_cluster = False
886 create_cluster_only = False
887 ignore_missing_binaries = False
888
889 args = sys.argv[1:]
890 flags = [a for a in args if a.startswith("-")]
891 modules = [a for a in args if not a.startswith("-")]
892 for f in flags:
893 if f == "--interactive":
894 interactive_on_error = True
895 elif f == "--create":
896 create_cluster = True
897 elif f == "--create-cluster-only":
898 create_cluster_only = True
899 elif f == "--ignore-missing-binaries":
900 ignore_missing_binaries = True
901 else:
902 log.error("Unknown option '{0}'".format(f))
903 sys.exit(-1)
904
905 # Help developers by stopping up-front if their tree isn't built enough for all the
906 # tools that the tests might want to use (add more here if needed)
907 require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
908 "cephfs-table-tool", "ceph-fuse", "rados"]
909 missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
910 if missing_binaries and not ignore_missing_binaries:
911 log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
912 sys.exit(-1)
913
914 max_required_mds, max_required_clients, \
915 max_required_mgr, require_memstore = scan_tests(modules)
916
917 remote = LocalRemote()
918
919 # Tolerate no MDSs or clients running at start
920 ps_txt = remote.run(
921 args=["ps", "-u"+str(os.getuid())]
922 ).stdout.getvalue().strip()
923 lines = ps_txt.split("\n")[1:]
924 for line in lines:
925 if 'ceph-fuse' in line or 'ceph-mds' in line:
926 pid = int(line.split()[0])
927 log.warn("Killing stray process {0}".format(line))
928 os.kill(pid, signal.SIGKILL)
929
930 # Fire up the Ceph cluster if the user requested it
931 if create_cluster or create_cluster_only:
932 log.info("Creating cluster with {0} MDS daemons".format(
933 max_required_mds))
934 remote.run([os.path.join(SRC_PREFIX, "stop.sh")], check_status=False)
935 remote.run(["rm", "-rf", "./out"])
936 remote.run(["rm", "-rf", "./dev"])
937 vstart_env = os.environ.copy()
938 vstart_env["FS"] = "0"
939 vstart_env["MDS"] = max_required_mds.__str__()
940 vstart_env["OSD"] = "4"
941 vstart_env["MGR"] = max(max_required_mgr, 1).__str__()
942
943 args = [os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d",
944 "--nolockdep"]
945 if require_memstore:
946 args.append("--memstore")
947
948 remote.run(args, env=vstart_env)
949
950 # Wait for OSD to come up so that subsequent injectargs etc will
951 # definitely succeed
952 LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30)
953
954 if create_cluster_only:
955 return
956
957 # List of client mounts, sufficient to run the selected tests
958 clients = [i.__str__() for i in range(0, max_required_clients)]
959
960 test_dir = tempfile.mkdtemp()
961 teuth_config['test_path'] = test_dir
962
963 ctx = LocalContext()
964 ceph_cluster = LocalCephCluster(ctx)
965 mds_cluster = LocalMDSCluster(ctx)
966 mgr_cluster = LocalMgrCluster(ctx)
967
968 # Construct Mount classes
969 mounts = []
970 for client_id in clients:
971 # Populate client keyring (it sucks to use client.admin for test clients
972 # because it's awkward to find the logs later)
973 client_name = "client.{0}".format(client_id)
974
975 if client_name not in open("./keyring").read():
976 p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
977 "osd", "allow rw",
978 "mds", "allow",
979 "mon", "allow r"])
980
981 open("./keyring", "a").write(p.stdout.getvalue())
982
983 mount = LocalFuseMount(ctx, test_dir, client_id)
984 mounts.append(mount)
985 if mount.is_mounted():
986 log.warn("unmounting {0}".format(mount.mountpoint))
987 mount.umount_wait()
988 else:
989 if os.path.exists(mount.mountpoint):
990 os.rmdir(mount.mountpoint)
991
992 from tasks.cephfs_test_runner import DecoratingLoader
993
994 class LogStream(object):
995 def __init__(self):
996 self.buffer = ""
997
998 def write(self, data):
999 self.buffer += data
1000 if "\n" in self.buffer:
1001 lines = self.buffer.split("\n")
1002 for line in lines[:-1]:
1003 pass
1004 # sys.stderr.write(line + "\n")
1005 log.info(line)
1006 self.buffer = lines[-1]
1007
1008 def flush(self):
1009 pass
1010
1011 decorating_loader = DecoratingLoader({
1012 "ctx": ctx,
1013 "mounts": mounts,
1014 "ceph_cluster": ceph_cluster,
1015 "mds_cluster": mds_cluster,
1016 "mgr_cluster": mgr_cluster,
1017 })
1018
1019 # For the benefit of polling tests like test_full -- in teuthology land we set this
1020 # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
1021 remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval", "5"])
1022 ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval", "5")
1023
1024 # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
1025 # from normal IO latency. Increase it for running teests.
1026 ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10")
1027
1028 # Make sure the filesystem created in tests has uid/gid that will let us talk to
1029 # it after mounting it (without having to go root). Set in 'global' not just 'mds'
1030 # so that cephfs-data-scan will pick it up too.
1031 ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid())
1032 ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid())
1033
1034 # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on
1035 def _get_package_version(remote, pkg_name):
1036 # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right?
1037 return "2.9"
1038
1039 import teuthology.packaging
1040 teuthology.packaging.get_package_version = _get_package_version
1041
1042 overall_suite = load_tests(modules, decorating_loader)
1043
1044 # Filter out tests that don't lend themselves to interactive running,
1045 victims = []
1046 for case, method in enumerate_methods(overall_suite):
1047 fn = getattr(method, method._testMethodName)
1048
1049 drop_test = False
1050
1051 if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
1052 drop_test = True
1053 log.warn("Dropping test because long running: ".format(method.id()))
1054
1055 if getattr(fn, "needs_trimming", False) is True:
1056 drop_test = (os.getuid() != 0)
1057 log.warn("Dropping test because client trim unavailable: ".format(method.id()))
1058
1059 if drop_test:
1060 # Don't drop the test if it was explicitly requested in arguments
1061 is_named = False
1062 for named in modules:
1063 if named.endswith(method.id()):
1064 is_named = True
1065 break
1066
1067 if not is_named:
1068 victims.append((case, method))
1069
1070 log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims)))
1071 for s, method in victims:
1072 s._tests.remove(method)
1073
1074 if interactive_on_error:
1075 result_class = InteractiveFailureResult
1076 else:
1077 result_class = unittest.TextTestResult
1078 fail_on_skip = False
1079
1080 class LoggingResult(result_class):
1081 def startTest(self, test):
1082 log.info("Starting test: {0}".format(self.getDescription(test)))
1083 test.started_at = datetime.datetime.utcnow()
1084 return super(LoggingResult, self).startTest(test)
1085
1086 def stopTest(self, test):
1087 log.info("Stopped test: {0} in {1}s".format(
1088 self.getDescription(test),
1089 (datetime.datetime.utcnow() - test.started_at).total_seconds()
1090 ))
1091
1092 def addSkip(self, test, reason):
1093 if fail_on_skip:
1094 # Don't just call addFailure because that requires a traceback
1095 self.failures.append((test, reason))
1096 else:
1097 super(LoggingResult, self).addSkip(test, reason)
1098
1099 # Execute!
1100 result = unittest.TextTestRunner(
1101 stream=LogStream(),
1102 resultclass=LoggingResult,
1103 verbosity=2,
1104 failfast=True).run(overall_suite)
1105
1106 if not result.wasSuccessful():
1107 result.printErrors() # duplicate output at end for convenience
1108
1109 bad_tests = []
1110 for test, error in result.errors:
1111 bad_tests.append(str(test))
1112 for test, failure in result.failures:
1113 bad_tests.append(str(test))
1114
1115 sys.exit(-1)
1116 else:
1117 sys.exit(0)
1118
1119
1120 if __name__ == "__main__":
1121 exec_test()