]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart | |
3 | ceph instance instead of a packaged/installed cluster. Use this to turn around test cases | |
4 | quickly during development. | |
5 | ||
6 | Simple usage (assuming teuthology and ceph checked out in ~/git): | |
7 | ||
8 | # Activate the teuthology virtualenv | |
9 | source ~/git/teuthology/virtualenv/bin/activate | |
10 | # Go into your ceph build directory | |
11 | cd ~/git/ceph/build | |
12 | # Invoke a test using this script | |
13 | python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan | |
14 | ||
15 | Alternative usage: | |
16 | ||
17 | # Alternatively, if you use different paths, specify them as follows: | |
18 | LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py | |
19 | ||
20 | # If you wish to drop to a python shell on failures, use --interactive: | |
21 | python ~/git/ceph/qa/tasks/vstart_runner.py --interactive | |
22 | ||
23 | # If you wish to run a named test case, pass it as an argument: | |
24 | python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan | |
25 | ||
26 | """ | |
27 | ||
28 | from StringIO import StringIO | |
29 | from collections import defaultdict | |
30 | import getpass | |
31 | import signal | |
32 | import tempfile | |
33 | import threading | |
34 | import datetime | |
35 | import shutil | |
36 | import re | |
37 | import os | |
38 | import time | |
39 | import json | |
40 | import sys | |
41 | import errno | |
42 | from unittest import suite, loader | |
43 | import unittest | |
44 | import platform | |
45 | from teuthology.orchestra.run import Raw, quote | |
46 | from teuthology.orchestra.daemon import DaemonGroup | |
47 | from teuthology.config import config as teuth_config | |
48 | ||
49 | import logging | |
50 | ||
51 | log = logging.getLogger(__name__) | |
52 | ||
53 | handler = logging.FileHandler("./vstart_runner.log") | |
54 | formatter = logging.Formatter( | |
55 | fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s', | |
56 | datefmt='%Y-%m-%dT%H:%M:%S') | |
57 | handler.setFormatter(formatter) | |
58 | log.addHandler(handler) | |
59 | log.setLevel(logging.INFO) | |
60 | ||
61 | ||
62 | def respawn_in_path(lib_path, python_paths): | |
63 | execv_cmd = ['python'] | |
64 | if platform.system() == "Darwin": | |
65 | lib_path_var = "DYLD_LIBRARY_PATH" | |
66 | else: | |
67 | lib_path_var = "LD_LIBRARY_PATH" | |
68 | ||
69 | py_binary = os.environ.get("PYTHON", "python") | |
70 | ||
71 | if lib_path_var in os.environ: | |
72 | if lib_path not in os.environ[lib_path_var]: | |
73 | os.environ[lib_path_var] += ':' + lib_path | |
74 | os.execvp(py_binary, execv_cmd + sys.argv) | |
75 | else: | |
76 | os.environ[lib_path_var] = lib_path | |
77 | os.execvp(py_binary, execv_cmd + sys.argv) | |
78 | ||
79 | for p in python_paths: | |
80 | sys.path.insert(0, p) | |
81 | ||
82 | ||
83 | # Let's use some sensible defaults | |
84 | if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"): | |
85 | ||
86 | # A list of candidate paths for each package we need | |
87 | guesses = [ | |
88 | ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"], | |
89 | ["lib/cython_modules/lib.2"], | |
90 | ["../src/pybind"], | |
91 | ] | |
92 | ||
93 | python_paths = [] | |
94 | ||
95 | # Up one level so that "tasks.foo.bar" imports work | |
96 | python_paths.append(os.path.abspath( | |
97 | os.path.join(os.path.dirname(os.path.realpath(__file__)), "..") | |
98 | )) | |
99 | ||
100 | for package_guesses in guesses: | |
101 | for g in package_guesses: | |
102 | g_exp = os.path.abspath(os.path.expanduser(g)) | |
103 | if os.path.exists(g_exp): | |
104 | python_paths.append(g_exp) | |
105 | ||
106 | ld_path = os.path.join(os.getcwd(), "lib/") | |
107 | print "Using guessed paths {0} {1}".format(ld_path, python_paths) | |
108 | respawn_in_path(ld_path, python_paths) | |
109 | ||
110 | ||
111 | try: | |
112 | from teuthology.exceptions import CommandFailedError | |
113 | from tasks.ceph_manager import CephManager | |
114 | from tasks.cephfs.fuse_mount import FuseMount | |
115 | from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster | |
116 | from mgr.mgr_test_case import MgrCluster | |
117 | from teuthology.contextutil import MaxWhileTries | |
118 | from teuthology.task import interactive | |
119 | except ImportError: | |
120 | sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv " | |
121 | "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n") | |
122 | raise | |
123 | ||
124 | # Must import after teuthology because of gevent monkey patching | |
125 | import subprocess | |
126 | ||
127 | if os.path.exists("./CMakeCache.txt"): | |
128 | # Running in build dir of a cmake build | |
129 | BIN_PREFIX = "./bin/" | |
130 | SRC_PREFIX = "../src" | |
131 | else: | |
132 | # Running in src/ of an autotools build | |
133 | BIN_PREFIX = "./" | |
134 | SRC_PREFIX = "./" | |
135 | ||
136 | ||
137 | class LocalRemoteProcess(object): | |
138 | def __init__(self, args, subproc, check_status, stdout, stderr): | |
139 | self.args = args | |
140 | self.subproc = subproc | |
141 | if stdout is None: | |
142 | self.stdout = StringIO() | |
143 | else: | |
144 | self.stdout = stdout | |
145 | ||
146 | if stderr is None: | |
147 | self.stderr = StringIO() | |
148 | else: | |
149 | self.stderr = stderr | |
150 | ||
151 | self.check_status = check_status | |
152 | self.exitstatus = self.returncode = None | |
153 | ||
154 | def wait(self): | |
155 | if self.finished: | |
156 | # Avoid calling communicate() on a dead process because it'll | |
157 | # give you stick about std* already being closed | |
158 | if self.exitstatus != 0: | |
159 | raise CommandFailedError(self.args, self.exitstatus) | |
160 | else: | |
161 | return | |
162 | ||
163 | out, err = self.subproc.communicate() | |
164 | self.stdout.write(out) | |
165 | self.stderr.write(err) | |
166 | ||
167 | self.exitstatus = self.returncode = self.subproc.returncode | |
168 | ||
169 | if self.exitstatus != 0: | |
170 | sys.stderr.write(out) | |
171 | sys.stderr.write(err) | |
172 | ||
173 | if self.check_status and self.exitstatus != 0: | |
174 | raise CommandFailedError(self.args, self.exitstatus) | |
175 | ||
176 | @property | |
177 | def finished(self): | |
178 | if self.exitstatus is not None: | |
179 | return True | |
180 | ||
181 | if self.subproc.poll() is not None: | |
182 | out, err = self.subproc.communicate() | |
183 | self.stdout.write(out) | |
184 | self.stderr.write(err) | |
185 | self.exitstatus = self.returncode = self.subproc.returncode | |
186 | return True | |
187 | else: | |
188 | return False | |
189 | ||
190 | def kill(self): | |
191 | log.info("kill ") | |
192 | if self.subproc.pid and not self.finished: | |
193 | log.info("kill: killing pid {0} ({1})".format( | |
194 | self.subproc.pid, self.args)) | |
195 | safe_kill(self.subproc.pid) | |
196 | else: | |
197 | log.info("kill: already terminated ({0})".format(self.args)) | |
198 | ||
199 | @property | |
200 | def stdin(self): | |
201 | class FakeStdIn(object): | |
202 | def __init__(self, mount_daemon): | |
203 | self.mount_daemon = mount_daemon | |
204 | ||
205 | def close(self): | |
206 | self.mount_daemon.kill() | |
207 | ||
208 | return FakeStdIn(self) | |
209 | ||
210 | ||
211 | class LocalRemote(object): | |
212 | """ | |
213 | Amusingly named class to present the teuthology RemoteProcess interface when we are really | |
214 | running things locally for vstart | |
215 | ||
216 | Run this inside your src/ dir! | |
217 | """ | |
218 | ||
219 | def __init__(self): | |
220 | self.name = "local" | |
221 | self.hostname = "localhost" | |
222 | self.user = getpass.getuser() | |
223 | ||
224 | def get_file(self, path, sudo, dest_dir): | |
225 | tmpfile = tempfile.NamedTemporaryFile(delete=False).name | |
226 | shutil.copy(path, tmpfile) | |
227 | return tmpfile | |
228 | ||
229 | def put_file(self, src, dst, sudo=False): | |
230 | shutil.copy(src, dst) | |
231 | ||
232 | def run(self, args, check_status=True, wait=True, | |
233 | stdout=None, stderr=None, cwd=None, stdin=None, | |
234 | logger=None, label=None, env=None): | |
235 | log.info("run args={0}".format(args)) | |
236 | ||
237 | # We don't need no stinkin' sudo | |
238 | args = [a for a in args if a != "sudo"] | |
239 | ||
240 | # We have to use shell=True if any run.Raw was present, e.g. && | |
241 | shell = any([a for a in args if isinstance(a, Raw)]) | |
242 | ||
243 | if shell: | |
244 | filtered = [] | |
245 | i = 0 | |
246 | while i < len(args): | |
247 | if args[i] == 'adjust-ulimits': | |
248 | i += 1 | |
249 | elif args[i] == 'ceph-coverage': | |
250 | i += 2 | |
251 | elif args[i] == 'timeout': | |
252 | i += 2 | |
253 | else: | |
254 | filtered.append(args[i]) | |
255 | i += 1 | |
256 | ||
257 | args = quote(filtered) | |
258 | log.info("Running {0}".format(args)) | |
259 | ||
260 | subproc = subprocess.Popen(args, | |
261 | stdout=subprocess.PIPE, | |
262 | stderr=subprocess.PIPE, | |
263 | stdin=subprocess.PIPE, | |
264 | cwd=cwd, | |
265 | shell=True) | |
266 | else: | |
267 | log.info("Running {0}".format(args)) | |
268 | ||
269 | for arg in args: | |
270 | if not isinstance(arg, basestring): | |
271 | raise RuntimeError("Oops, can't handle arg {0} type {1}".format( | |
272 | arg, arg.__class__ | |
273 | )) | |
274 | ||
275 | subproc = subprocess.Popen(args, | |
276 | stdout=subprocess.PIPE, | |
277 | stderr=subprocess.PIPE, | |
278 | stdin=subprocess.PIPE, | |
279 | cwd=cwd, | |
280 | env=env) | |
281 | ||
282 | if stdin: | |
283 | if not isinstance(stdin, basestring): | |
284 | raise RuntimeError("Can't handle non-string stdins on a vstart cluster") | |
285 | ||
286 | # Hack: writing to stdin is not deadlock-safe, but it "always" works | |
287 | # as long as the input buffer is "small" | |
288 | subproc.stdin.write(stdin) | |
289 | ||
290 | proc = LocalRemoteProcess( | |
291 | args, subproc, check_status, | |
292 | stdout, stderr | |
293 | ) | |
294 | ||
295 | if wait: | |
296 | proc.wait() | |
297 | ||
298 | return proc | |
299 | ||
300 | ||
301 | class LocalDaemon(object): | |
302 | def __init__(self, daemon_type, daemon_id): | |
303 | self.daemon_type = daemon_type | |
304 | self.daemon_id = daemon_id | |
305 | self.controller = LocalRemote() | |
306 | self.proc = None | |
307 | ||
308 | @property | |
309 | def remote(self): | |
310 | return LocalRemote() | |
311 | ||
312 | def running(self): | |
313 | return self._get_pid() is not None | |
314 | ||
315 | def _get_pid(self): | |
316 | """ | |
317 | Return PID as an integer or None if not found | |
318 | """ | |
319 | ps_txt = self.controller.run( | |
320 | args=["ps", "ww", "-u"+str(os.getuid())] | |
321 | ).stdout.getvalue().strip() | |
322 | lines = ps_txt.split("\n")[1:] | |
323 | ||
324 | for line in lines: | |
325 | if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1: | |
326 | log.info("Found ps line for daemon: {0}".format(line)) | |
327 | return int(line.split()[0]) | |
328 | log.info("No match for {0} {1}: {2}".format( | |
329 | self.daemon_type, self.daemon_id, ps_txt | |
330 | )) | |
331 | return None | |
332 | ||
333 | def wait(self, timeout): | |
334 | waited = 0 | |
335 | while self._get_pid() is not None: | |
336 | if waited > timeout: | |
337 | raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id)) | |
338 | time.sleep(1) | |
339 | waited += 1 | |
340 | ||
341 | def stop(self, timeout=300): | |
342 | if not self.running(): | |
343 | log.error('tried to stop a non-running daemon') | |
344 | return | |
345 | ||
346 | pid = self._get_pid() | |
347 | log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id)) | |
348 | os.kill(pid, signal.SIGKILL) | |
349 | ||
350 | waited = 0 | |
351 | while pid is not None: | |
352 | new_pid = self._get_pid() | |
353 | if new_pid is not None and new_pid != pid: | |
354 | log.info("Killing new PID {0}".format(new_pid)) | |
355 | pid = new_pid | |
356 | os.kill(pid, signal.SIGKILL) | |
357 | ||
358 | if new_pid is None: | |
359 | break | |
360 | else: | |
361 | if waited > timeout: | |
362 | raise MaxWhileTries( | |
363 | "Timed out waiting for daemon {0}.{1}".format( | |
364 | self.daemon_type, self.daemon_id)) | |
365 | time.sleep(1) | |
366 | waited += 1 | |
367 | ||
368 | self.wait(timeout=timeout) | |
369 | ||
370 | def restart(self): | |
371 | if self._get_pid() is not None: | |
372 | self.stop() | |
373 | ||
374 | self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id]) | |
375 | ||
1adf2230 AA |
376 | def signal(self, sig, silent=False): |
377 | if not self.running(): | |
378 | raise RuntimeError("Can't send signal to non-running daemon") | |
379 | ||
380 | os.kill(self._get_pid(), sig) | |
381 | if not silent: | |
382 | log.info("Sent signal {0} to {1}.{2}".format(sig, self.daemon_type, self.daemon_id)) | |
383 | ||
7c673cae FG |
384 | |
385 | def safe_kill(pid): | |
386 | """ | |
387 | os.kill annoyingly raises exception if process already dead. Ignore it. | |
388 | """ | |
389 | try: | |
390 | return os.kill(pid, signal.SIGKILL) | |
391 | except OSError as e: | |
392 | if e.errno == errno.ESRCH: | |
393 | # Raced with process termination | |
394 | pass | |
395 | else: | |
396 | raise | |
397 | ||
398 | ||
399 | class LocalFuseMount(FuseMount): | |
400 | def __init__(self, test_dir, client_id): | |
401 | super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote()) | |
402 | ||
403 | @property | |
404 | def config_path(self): | |
405 | return "./ceph.conf" | |
406 | ||
407 | def get_keyring_path(self): | |
408 | # This is going to end up in a config file, so use an absolute path | |
409 | # to avoid assumptions about daemons' pwd | |
410 | return os.path.abspath("./client.{0}.keyring".format(self.client_id)) | |
411 | ||
412 | def run_shell(self, args, wait=True): | |
413 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
414 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
415 | # then we wouldn't have to special case this | |
416 | return self.client_remote.run( | |
417 | args, wait=wait, cwd=self.mountpoint | |
418 | ) | |
419 | ||
420 | @property | |
421 | def _prefix(self): | |
422 | return BIN_PREFIX | |
423 | ||
424 | def _asok_path(self): | |
425 | # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's | |
426 | # run foreground. When running it daemonized however, the asok is named after | |
427 | # the PID of the launching process, not the long running ceph-fuse process. Therefore | |
428 | # we need to give an exact path here as the logic for checking /proc/ for which | |
429 | # asok is alive does not work. | |
430 | path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid) | |
431 | log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid)) | |
432 | return path | |
433 | ||
434 | def umount(self): | |
435 | if self.is_mounted(): | |
436 | super(LocalFuseMount, self).umount() | |
437 | ||
438 | def mount(self, mount_path=None, mount_fs_name=None): | |
439 | self.client_remote.run( | |
440 | args=[ | |
441 | 'mkdir', | |
442 | '--', | |
443 | self.mountpoint, | |
444 | ], | |
445 | ) | |
446 | ||
447 | def list_connections(): | |
448 | self.client_remote.run( | |
449 | args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"], | |
450 | check_status=False | |
451 | ) | |
452 | p = self.client_remote.run( | |
453 | args=["ls", "/sys/fs/fuse/connections"], | |
454 | check_status=False | |
455 | ) | |
456 | if p.exitstatus != 0: | |
457 | log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus)) | |
458 | return [] | |
459 | ||
460 | ls_str = p.stdout.getvalue().strip() | |
461 | if ls_str: | |
462 | return [int(n) for n in ls_str.split("\n")] | |
463 | else: | |
464 | return [] | |
465 | ||
466 | # Before starting ceph-fuse process, note the contents of | |
467 | # /sys/fs/fuse/connections | |
468 | pre_mount_conns = list_connections() | |
469 | log.info("Pre-mount connections: {0}".format(pre_mount_conns)) | |
470 | ||
471 | prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")] | |
472 | if os.getuid() != 0: | |
b32b8144 | 473 | prefix += ["--client_die_on_failed_dentry_invalidate=false"] |
7c673cae FG |
474 | |
475 | if mount_path is not None: | |
476 | prefix += ["--client_mountpoint={0}".format(mount_path)] | |
477 | ||
478 | if mount_fs_name is not None: | |
479 | prefix += ["--client_mds_namespace={0}".format(mount_fs_name)] | |
480 | ||
481 | self.fuse_daemon = self.client_remote.run(args= | |
482 | prefix + [ | |
483 | "-f", | |
484 | "--name", | |
485 | "client.{0}".format(self.client_id), | |
486 | self.mountpoint | |
487 | ], wait=False) | |
488 | ||
489 | log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid)) | |
490 | ||
491 | # Wait for the connection reference to appear in /sys | |
492 | waited = 0 | |
493 | post_mount_conns = list_connections() | |
494 | while len(post_mount_conns) <= len(pre_mount_conns): | |
495 | if self.fuse_daemon.finished: | |
496 | # Did mount fail? Raise the CommandFailedError instead of | |
497 | # hitting the "failed to populate /sys/" timeout | |
498 | self.fuse_daemon.wait() | |
499 | time.sleep(1) | |
500 | waited += 1 | |
501 | if waited > 30: | |
502 | raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format( | |
503 | waited | |
504 | )) | |
505 | post_mount_conns = list_connections() | |
506 | ||
507 | log.info("Post-mount connections: {0}".format(post_mount_conns)) | |
508 | ||
509 | # Record our fuse connection number so that we can use it when | |
510 | # forcing an unmount | |
511 | new_conns = list(set(post_mount_conns) - set(pre_mount_conns)) | |
512 | if len(new_conns) == 0: | |
513 | raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns)) | |
514 | elif len(new_conns) > 1: | |
515 | raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns)) | |
516 | else: | |
517 | self._fuse_conn = new_conns[0] | |
518 | ||
91327a77 | 519 | def _run_python(self, pyscript, py_version='python'): |
7c673cae FG |
520 | """ |
521 | Override this to remove the daemon-helper prefix that is used otherwise | |
522 | to make the process killable. | |
523 | """ | |
91327a77 AA |
524 | return self.client_remote.run(args=[py_version, '-c', pyscript], |
525 | wait=False) | |
7c673cae FG |
526 | |
527 | class LocalCephManager(CephManager): | |
528 | def __init__(self): | |
529 | # Deliberately skip parent init, only inheriting from it to get | |
530 | # util methods like osd_dump that sit on top of raw_cluster_cmd | |
531 | self.controller = LocalRemote() | |
532 | ||
533 | # A minority of CephManager fns actually bother locking for when | |
534 | # certain teuthology tests want to run tasks in parallel | |
535 | self.lock = threading.RLock() | |
536 | ||
537 | self.log = lambda x: log.info(x) | |
538 | ||
539 | def find_remote(self, daemon_type, daemon_id): | |
540 | """ | |
541 | daemon_type like 'mds', 'osd' | |
542 | daemon_id like 'a', '0' | |
543 | """ | |
544 | return LocalRemote() | |
545 | ||
546 | def run_ceph_w(self): | |
547 | proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO()) | |
548 | return proc | |
549 | ||
550 | def raw_cluster_cmd(self, *args): | |
551 | """ | |
552 | args like ["osd", "dump"} | |
553 | return stdout string | |
554 | """ | |
555 | proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args)) | |
556 | return proc.stdout.getvalue() | |
557 | ||
558 | def raw_cluster_cmd_result(self, *args): | |
559 | """ | |
560 | like raw_cluster_cmd but don't check status, just return rc | |
561 | """ | |
562 | proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False) | |
563 | return proc.exitstatus | |
564 | ||
565 | def admin_socket(self, daemon_type, daemon_id, command, check_status=True): | |
566 | return self.controller.run( | |
567 | args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status | |
568 | ) | |
569 | ||
570 | # FIXME: copypasta | |
571 | def get_mds_status(self, mds): | |
572 | """ | |
573 | Run cluster commands for the mds in order to get mds information | |
574 | """ | |
575 | out = self.raw_cluster_cmd('mds', 'dump', '--format=json') | |
576 | j = json.loads(' '.join(out.splitlines()[1:])) | |
577 | # collate; for dup ids, larger gid wins. | |
578 | for info in j['info'].itervalues(): | |
579 | if info['name'] == mds: | |
580 | return info | |
581 | return None | |
582 | ||
583 | # FIXME: copypasta | |
584 | def get_mds_status_by_rank(self, rank): | |
585 | """ | |
586 | Run cluster commands for the mds in order to get mds information | |
587 | check rank. | |
588 | """ | |
589 | j = self.get_mds_status_all() | |
590 | # collate; for dup ids, larger gid wins. | |
591 | for info in j['info'].itervalues(): | |
592 | if info['rank'] == rank: | |
593 | return info | |
594 | return None | |
595 | ||
596 | def get_mds_status_all(self): | |
597 | """ | |
598 | Run cluster command to extract all the mds status. | |
599 | """ | |
600 | out = self.raw_cluster_cmd('mds', 'dump', '--format=json') | |
601 | j = json.loads(' '.join(out.splitlines()[1:])) | |
602 | return j | |
603 | ||
604 | ||
605 | class LocalCephCluster(CephCluster): | |
606 | def __init__(self, ctx): | |
607 | # Deliberately skip calling parent constructor | |
608 | self._ctx = ctx | |
609 | self.mon_manager = LocalCephManager() | |
610 | self._conf = defaultdict(dict) | |
611 | ||
612 | @property | |
613 | def admin_remote(self): | |
614 | return LocalRemote() | |
615 | ||
616 | def get_config(self, key, service_type=None): | |
617 | if service_type is None: | |
618 | service_type = 'mon' | |
619 | ||
620 | # FIXME hardcoded vstart service IDs | |
621 | service_id = { | |
622 | 'mon': 'a', | |
623 | 'mds': 'a', | |
624 | 'osd': '0' | |
625 | }[service_type] | |
626 | ||
627 | return self.json_asok(['config', 'get', key], service_type, service_id)[key] | |
628 | ||
629 | def _write_conf(self): | |
630 | # In teuthology, we have the honour of writing the entire ceph.conf, but | |
631 | # in vstart land it has mostly already been written and we need to carefully | |
632 | # append to it. | |
633 | conf_path = "./ceph.conf" | |
634 | banner = "\n#LOCAL_TEST\n" | |
635 | existing_str = open(conf_path).read() | |
636 | ||
637 | if banner in existing_str: | |
638 | existing_str = existing_str[0:existing_str.find(banner)] | |
639 | ||
640 | existing_str += banner | |
641 | ||
642 | for subsys, kvs in self._conf.items(): | |
643 | existing_str += "\n[{0}]\n".format(subsys) | |
644 | for key, val in kvs.items(): | |
645 | # Comment out existing instance if it exists | |
646 | log.info("Searching for existing instance {0}/{1}".format( | |
647 | key, subsys | |
648 | )) | |
649 | existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format( | |
650 | subsys | |
651 | ), existing_str, re.MULTILINE) | |
652 | ||
653 | if existing_section: | |
654 | section_str = existing_str[existing_section.start():existing_section.end()] | |
655 | existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE) | |
656 | if existing_val: | |
657 | start = existing_section.start() + existing_val.start(1) | |
658 | log.info("Found string to replace at {0}".format( | |
659 | start | |
660 | )) | |
661 | existing_str = existing_str[0:start] + "#" + existing_str[start:] | |
662 | ||
663 | existing_str += "{0} = {1}\n".format(key, val) | |
664 | ||
665 | open(conf_path, "w").write(existing_str) | |
666 | ||
667 | def set_ceph_conf(self, subsys, key, value): | |
668 | self._conf[subsys][key] = value | |
669 | self._write_conf() | |
670 | ||
671 | def clear_ceph_conf(self, subsys, key): | |
672 | del self._conf[subsys][key] | |
673 | self._write_conf() | |
674 | ||
675 | ||
676 | class LocalMDSCluster(LocalCephCluster, MDSCluster): | |
677 | def __init__(self, ctx): | |
678 | super(LocalMDSCluster, self).__init__(ctx) | |
679 | ||
680 | self.mds_ids = ctx.daemons.daemons['mds'].keys() | |
681 | self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids]) | |
682 | ||
683 | def clear_firewall(self): | |
684 | # FIXME: unimplemented | |
685 | pass | |
686 | ||
181888fb FG |
687 | def newfs(self, name='cephfs', create=True): |
688 | return LocalFilesystem(self._ctx, name=name, create=create) | |
7c673cae FG |
689 | |
690 | ||
691 | class LocalMgrCluster(LocalCephCluster, MgrCluster): | |
692 | def __init__(self, ctx): | |
693 | super(LocalMgrCluster, self).__init__(ctx) | |
694 | ||
695 | self.mgr_ids = ctx.daemons.daemons['mgr'].keys() | |
696 | self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids]) | |
697 | ||
698 | ||
699 | class LocalFilesystem(Filesystem, LocalMDSCluster): | |
181888fb | 700 | def __init__(self, ctx, fscid=None, name='cephfs', create=False): |
7c673cae FG |
701 | # Deliberately skip calling parent constructor |
702 | self._ctx = ctx | |
703 | ||
704 | self.id = None | |
705 | self.name = None | |
b32b8144 | 706 | self.ec_profile = None |
7c673cae | 707 | self.metadata_pool_name = None |
181888fb FG |
708 | self.metadata_overlay = False |
709 | self.data_pool_name = None | |
7c673cae FG |
710 | self.data_pools = None |
711 | ||
712 | # Hack: cheeky inspection of ceph.conf to see what MDSs exist | |
713 | self.mds_ids = set() | |
714 | for line in open("ceph.conf").readlines(): | |
715 | match = re.match("^\[mds\.(.+)\]$", line) | |
716 | if match: | |
717 | self.mds_ids.add(match.group(1)) | |
718 | ||
719 | if not self.mds_ids: | |
720 | raise RuntimeError("No MDSs found in ceph.conf!") | |
721 | ||
722 | self.mds_ids = list(self.mds_ids) | |
723 | ||
724 | log.info("Discovered MDS IDs: {0}".format(self.mds_ids)) | |
725 | ||
726 | self.mon_manager = LocalCephManager() | |
727 | ||
728 | self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids]) | |
729 | ||
730 | self.client_remote = LocalRemote() | |
731 | ||
732 | self._conf = defaultdict(dict) | |
733 | ||
181888fb | 734 | if name is not None: |
7c673cae FG |
735 | if fscid is not None: |
736 | raise RuntimeError("cannot specify fscid when creating fs") | |
181888fb FG |
737 | if create and not self.legacy_configured(): |
738 | self.create() | |
739 | else: | |
740 | if fscid is not None: | |
741 | self.id = fscid | |
742 | self.getinfo(refresh=True) | |
7c673cae FG |
743 | |
744 | # Stash a reference to the first created filesystem on ctx, so | |
745 | # that if someone drops to the interactive shell they can easily | |
746 | # poke our methods. | |
747 | if not hasattr(self._ctx, "filesystem"): | |
748 | self._ctx.filesystem = self | |
749 | ||
750 | @property | |
751 | def _prefix(self): | |
752 | return BIN_PREFIX | |
753 | ||
754 | def set_clients_block(self, blocked, mds_id=None): | |
755 | raise NotImplementedError() | |
756 | ||
757 | def get_pgs_per_fs_pool(self): | |
758 | # FIXME: assuming there are 3 OSDs | |
759 | return 3 * int(self.get_config('mon_pg_warn_min_per_osd')) | |
760 | ||
761 | ||
762 | class InteractiveFailureResult(unittest.TextTestResult): | |
763 | """ | |
764 | Specialization that implements interactive-on-error style | |
765 | behavior. | |
766 | """ | |
767 | def addFailure(self, test, err): | |
768 | super(InteractiveFailureResult, self).addFailure(test, err) | |
769 | log.error(self._exc_info_to_string(err, test)) | |
770 | log.error("Failure in test '{0}', going interactive".format( | |
771 | self.getDescription(test) | |
772 | )) | |
773 | interactive.task(ctx=None, config=None) | |
774 | ||
775 | def addError(self, test, err): | |
776 | super(InteractiveFailureResult, self).addError(test, err) | |
777 | log.error(self._exc_info_to_string(err, test)) | |
778 | log.error("Error in test '{0}', going interactive".format( | |
779 | self.getDescription(test) | |
780 | )) | |
781 | interactive.task(ctx=None, config=None) | |
782 | ||
783 | ||
784 | def enumerate_methods(s): | |
785 | log.info("e: {0}".format(s)) | |
786 | for t in s._tests: | |
787 | if isinstance(t, suite.BaseTestSuite): | |
788 | for sub in enumerate_methods(t): | |
789 | yield sub | |
790 | else: | |
791 | yield s, t | |
792 | ||
793 | ||
794 | def load_tests(modules, loader): | |
795 | if modules: | |
796 | log.info("Executing modules: {0}".format(modules)) | |
797 | module_suites = [] | |
798 | for mod_name in modules: | |
799 | # Test names like cephfs.test_auto_repair | |
800 | module_suites.append(loader.loadTestsFromName(mod_name)) | |
801 | log.info("Loaded: {0}".format(list(module_suites))) | |
802 | return suite.TestSuite(module_suites) | |
803 | else: | |
804 | log.info("Executing all cephfs tests") | |
805 | return loader.discover( | |
806 | os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs") | |
807 | ) | |
808 | ||
809 | ||
810 | def scan_tests(modules): | |
811 | overall_suite = load_tests(modules, loader.TestLoader()) | |
812 | ||
813 | max_required_mds = 0 | |
814 | max_required_clients = 0 | |
815 | max_required_mgr = 0 | |
816 | ||
817 | for suite, case in enumerate_methods(overall_suite): | |
818 | max_required_mds = max(max_required_mds, | |
819 | getattr(case, "MDSS_REQUIRED", 0)) | |
820 | max_required_clients = max(max_required_clients, | |
821 | getattr(case, "CLIENTS_REQUIRED", 0)) | |
822 | max_required_mgr = max(max_required_mgr, | |
823 | getattr(case, "MGRS_REQUIRED", 0)) | |
824 | ||
825 | return max_required_mds, max_required_clients, max_required_mgr | |
826 | ||
827 | ||
828 | class LocalCluster(object): | |
829 | def __init__(self, rolename="placeholder"): | |
830 | self.remotes = { | |
831 | LocalRemote(): [rolename] | |
832 | } | |
833 | ||
834 | def only(self, requested): | |
835 | return self.__class__(rolename=requested) | |
836 | ||
837 | ||
838 | class LocalContext(object): | |
839 | def __init__(self): | |
840 | self.config = {} | |
841 | self.teuthology_config = teuth_config | |
842 | self.cluster = LocalCluster() | |
843 | self.daemons = DaemonGroup() | |
844 | ||
845 | # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any | |
846 | # tests that want to look these up via ctx can do so. | |
847 | # Inspect ceph.conf to see what roles exist | |
848 | for conf_line in open("ceph.conf").readlines(): | |
849 | for svc_type in ["mon", "osd", "mds", "mgr"]: | |
850 | if svc_type not in self.daemons.daemons: | |
851 | self.daemons.daemons[svc_type] = {} | |
852 | match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line) | |
853 | if match: | |
854 | svc_id = match.group(1) | |
855 | self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id) | |
856 | ||
857 | def __del__(self): | |
858 | shutil.rmtree(self.teuthology_config['test_path']) | |
859 | ||
860 | ||
861 | def exec_test(): | |
862 | # Parse arguments | |
863 | interactive_on_error = False | |
864 | create_cluster = False | |
865 | ||
866 | args = sys.argv[1:] | |
867 | flags = [a for a in args if a.startswith("-")] | |
868 | modules = [a for a in args if not a.startswith("-")] | |
869 | for f in flags: | |
870 | if f == "--interactive": | |
871 | interactive_on_error = True | |
872 | elif f == "--create": | |
873 | create_cluster = True | |
874 | else: | |
875 | log.error("Unknown option '{0}'".format(f)) | |
876 | sys.exit(-1) | |
877 | ||
878 | # Help developers by stopping up-front if their tree isn't built enough for all the | |
879 | # tools that the tests might want to use (add more here if needed) | |
880 | require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan", | |
881 | "cephfs-table-tool", "ceph-fuse", "rados"] | |
882 | missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))] | |
883 | if missing_binaries: | |
884 | log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries))) | |
885 | sys.exit(-1) | |
886 | ||
887 | max_required_mds, max_required_clients, max_required_mgr = scan_tests(modules) | |
888 | ||
889 | remote = LocalRemote() | |
890 | ||
891 | # Tolerate no MDSs or clients running at start | |
892 | ps_txt = remote.run( | |
893 | args=["ps", "-u"+str(os.getuid())] | |
894 | ).stdout.getvalue().strip() | |
895 | lines = ps_txt.split("\n")[1:] | |
896 | for line in lines: | |
897 | if 'ceph-fuse' in line or 'ceph-mds' in line: | |
898 | pid = int(line.split()[0]) | |
899 | log.warn("Killing stray process {0}".format(line)) | |
900 | os.kill(pid, signal.SIGKILL) | |
901 | ||
902 | # Fire up the Ceph cluster if the user requested it | |
903 | if create_cluster: | |
904 | log.info("Creating cluster with {0} MDS daemons".format( | |
905 | max_required_mds)) | |
906 | remote.run([os.path.join(SRC_PREFIX, "stop.sh")], check_status=False) | |
907 | remote.run(["rm", "-rf", "./out"]) | |
908 | remote.run(["rm", "-rf", "./dev"]) | |
909 | vstart_env = os.environ.copy() | |
910 | vstart_env["FS"] = "0" | |
911 | vstart_env["MDS"] = max_required_mds.__str__() | |
912 | vstart_env["OSD"] = "1" | |
913 | vstart_env["MGR"] = max(max_required_mgr, 1).__str__() | |
914 | ||
915 | remote.run([os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d", "--nolockdep"], | |
916 | env=vstart_env) | |
917 | ||
918 | # Wait for OSD to come up so that subsequent injectargs etc will | |
919 | # definitely succeed | |
c07f9fc5 | 920 | LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30) |
7c673cae FG |
921 | |
922 | # List of client mounts, sufficient to run the selected tests | |
923 | clients = [i.__str__() for i in range(0, max_required_clients)] | |
924 | ||
925 | test_dir = tempfile.mkdtemp() | |
926 | teuth_config['test_path'] = test_dir | |
927 | ||
928 | # Construct Mount classes | |
929 | mounts = [] | |
930 | for client_id in clients: | |
931 | # Populate client keyring (it sucks to use client.admin for test clients | |
932 | # because it's awkward to find the logs later) | |
933 | client_name = "client.{0}".format(client_id) | |
934 | ||
935 | if client_name not in open("./keyring").read(): | |
936 | p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name, | |
937 | "osd", "allow rw", | |
938 | "mds", "allow", | |
939 | "mon", "allow r"]) | |
940 | ||
941 | open("./keyring", "a").write(p.stdout.getvalue()) | |
942 | ||
943 | mount = LocalFuseMount(test_dir, client_id) | |
944 | mounts.append(mount) | |
945 | if mount.is_mounted(): | |
946 | log.warn("unmounting {0}".format(mount.mountpoint)) | |
947 | mount.umount_wait() | |
948 | else: | |
949 | if os.path.exists(mount.mountpoint): | |
950 | os.rmdir(mount.mountpoint) | |
951 | ||
952 | ctx = LocalContext() | |
953 | ceph_cluster = LocalCephCluster(ctx) | |
954 | mds_cluster = LocalMDSCluster(ctx) | |
955 | mgr_cluster = LocalMgrCluster(ctx) | |
956 | ||
957 | from tasks.cephfs_test_runner import DecoratingLoader | |
958 | ||
959 | class LogStream(object): | |
960 | def __init__(self): | |
961 | self.buffer = "" | |
962 | ||
963 | def write(self, data): | |
964 | self.buffer += data | |
965 | if "\n" in self.buffer: | |
966 | lines = self.buffer.split("\n") | |
967 | for line in lines[:-1]: | |
968 | pass | |
969 | # sys.stderr.write(line + "\n") | |
970 | log.info(line) | |
971 | self.buffer = lines[-1] | |
972 | ||
973 | def flush(self): | |
974 | pass | |
975 | ||
976 | decorating_loader = DecoratingLoader({ | |
977 | "ctx": ctx, | |
978 | "mounts": mounts, | |
979 | "ceph_cluster": ceph_cluster, | |
980 | "mds_cluster": mds_cluster, | |
981 | "mgr_cluster": mgr_cluster, | |
982 | }) | |
983 | ||
984 | # For the benefit of polling tests like test_full -- in teuthology land we set this | |
985 | # in a .yaml, here it's just a hardcoded thing for the developer's pleasure. | |
986 | remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"]) | |
987 | ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval_max", "5") | |
988 | ||
989 | # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning | |
990 | # from normal IO latency. Increase it for running teests. | |
991 | ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10") | |
992 | ||
993 | # Make sure the filesystem created in tests has uid/gid that will let us talk to | |
994 | # it after mounting it (without having to go root). Set in 'global' not just 'mds' | |
995 | # so that cephfs-data-scan will pick it up too. | |
996 | ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid()) | |
997 | ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid()) | |
998 | ||
999 | # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on | |
1000 | def _get_package_version(remote, pkg_name): | |
1001 | # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right? | |
1002 | return "2.9" | |
1003 | ||
1004 | import teuthology.packaging | |
1005 | teuthology.packaging.get_package_version = _get_package_version | |
1006 | ||
1007 | overall_suite = load_tests(modules, decorating_loader) | |
1008 | ||
1009 | # Filter out tests that don't lend themselves to interactive running, | |
1010 | victims = [] | |
1011 | for case, method in enumerate_methods(overall_suite): | |
1012 | fn = getattr(method, method._testMethodName) | |
1013 | ||
1014 | drop_test = False | |
1015 | ||
1016 | if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True: | |
1017 | drop_test = True | |
1018 | log.warn("Dropping test because long running: ".format(method.id())) | |
1019 | ||
1020 | if getattr(fn, "needs_trimming", False) is True: | |
1021 | drop_test = (os.getuid() != 0) | |
1022 | log.warn("Dropping test because client trim unavailable: ".format(method.id())) | |
1023 | ||
1024 | if drop_test: | |
1025 | # Don't drop the test if it was explicitly requested in arguments | |
1026 | is_named = False | |
1027 | for named in modules: | |
1028 | if named.endswith(method.id()): | |
1029 | is_named = True | |
1030 | break | |
1031 | ||
1032 | if not is_named: | |
1033 | victims.append((case, method)) | |
1034 | ||
1035 | log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims))) | |
1036 | for s, method in victims: | |
1037 | s._tests.remove(method) | |
1038 | ||
1039 | if interactive_on_error: | |
1040 | result_class = InteractiveFailureResult | |
1041 | else: | |
1042 | result_class = unittest.TextTestResult | |
1043 | fail_on_skip = False | |
1044 | ||
1045 | class LoggingResult(result_class): | |
1046 | def startTest(self, test): | |
1047 | log.info("Starting test: {0}".format(self.getDescription(test))) | |
1048 | test.started_at = datetime.datetime.utcnow() | |
1049 | return super(LoggingResult, self).startTest(test) | |
1050 | ||
1051 | def stopTest(self, test): | |
1052 | log.info("Stopped test: {0} in {1}s".format( | |
1053 | self.getDescription(test), | |
1054 | (datetime.datetime.utcnow() - test.started_at).total_seconds() | |
1055 | )) | |
1056 | ||
1057 | def addSkip(self, test, reason): | |
1058 | if fail_on_skip: | |
1059 | # Don't just call addFailure because that requires a traceback | |
1060 | self.failures.append((test, reason)) | |
1061 | else: | |
1062 | super(LoggingResult, self).addSkip(test, reason) | |
1063 | ||
1064 | # Execute! | |
1065 | result = unittest.TextTestRunner( | |
1066 | stream=LogStream(), | |
1067 | resultclass=LoggingResult, | |
1068 | verbosity=2, | |
1069 | failfast=True).run(overall_suite) | |
1070 | ||
1071 | if not result.wasSuccessful(): | |
1072 | result.printErrors() # duplicate output at end for convenience | |
1073 | ||
1074 | bad_tests = [] | |
1075 | for test, error in result.errors: | |
1076 | bad_tests.append(str(test)) | |
1077 | for test, failure in result.failures: | |
1078 | bad_tests.append(str(test)) | |
1079 | ||
1080 | sys.exit(-1) | |
1081 | else: | |
1082 | sys.exit(0) | |
1083 | ||
1084 | ||
1085 | if __name__ == "__main__": | |
1086 | exec_test() |