]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart | |
3 | ceph instance instead of a packaged/installed cluster. Use this to turn around test cases | |
4 | quickly during development. | |
5 | ||
6 | Simple usage (assuming teuthology and ceph checked out in ~/git): | |
7 | ||
8 | # Activate the teuthology virtualenv | |
9 | source ~/git/teuthology/virtualenv/bin/activate | |
10 | # Go into your ceph build directory | |
11 | cd ~/git/ceph/build | |
12 | # Invoke a test using this script | |
13 | python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan | |
14 | ||
15 | Alternative usage: | |
16 | ||
17 | # Alternatively, if you use different paths, specify them as follows: | |
18 | LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py | |
19 | ||
20 | # If you wish to drop to a python shell on failures, use --interactive: | |
21 | python ~/git/ceph/qa/tasks/vstart_runner.py --interactive | |
22 | ||
23 | # If you wish to run a named test case, pass it as an argument: | |
24 | python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan | |
25 | ||
26 | """ | |
27 | ||
28 | from StringIO import StringIO | |
29 | from collections import defaultdict | |
30 | import getpass | |
31 | import signal | |
32 | import tempfile | |
33 | import threading | |
34 | import datetime | |
35 | import shutil | |
36 | import re | |
37 | import os | |
38 | import time | |
39 | import json | |
40 | import sys | |
41 | import errno | |
42 | from unittest import suite, loader | |
43 | import unittest | |
44 | import platform | |
45 | from teuthology.orchestra.run import Raw, quote | |
46 | from teuthology.orchestra.daemon import DaemonGroup | |
47 | from teuthology.config import config as teuth_config | |
48 | ||
49 | import logging | |
50 | ||
51 | log = logging.getLogger(__name__) | |
52 | ||
53 | handler = logging.FileHandler("./vstart_runner.log") | |
54 | formatter = logging.Formatter( | |
55 | fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s', | |
56 | datefmt='%Y-%m-%dT%H:%M:%S') | |
57 | handler.setFormatter(formatter) | |
58 | log.addHandler(handler) | |
59 | log.setLevel(logging.INFO) | |
60 | ||
61 | ||
62 | def respawn_in_path(lib_path, python_paths): | |
63 | execv_cmd = ['python'] | |
64 | if platform.system() == "Darwin": | |
65 | lib_path_var = "DYLD_LIBRARY_PATH" | |
66 | else: | |
67 | lib_path_var = "LD_LIBRARY_PATH" | |
68 | ||
69 | py_binary = os.environ.get("PYTHON", "python") | |
70 | ||
71 | if lib_path_var in os.environ: | |
72 | if lib_path not in os.environ[lib_path_var]: | |
73 | os.environ[lib_path_var] += ':' + lib_path | |
74 | os.execvp(py_binary, execv_cmd + sys.argv) | |
75 | else: | |
76 | os.environ[lib_path_var] = lib_path | |
77 | os.execvp(py_binary, execv_cmd + sys.argv) | |
78 | ||
79 | for p in python_paths: | |
80 | sys.path.insert(0, p) | |
81 | ||
82 | ||
83 | # Let's use some sensible defaults | |
84 | if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"): | |
85 | ||
86 | # A list of candidate paths for each package we need | |
87 | guesses = [ | |
88 | ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"], | |
89 | ["lib/cython_modules/lib.2"], | |
90 | ["../src/pybind"], | |
91 | ] | |
92 | ||
93 | python_paths = [] | |
94 | ||
95 | # Up one level so that "tasks.foo.bar" imports work | |
96 | python_paths.append(os.path.abspath( | |
97 | os.path.join(os.path.dirname(os.path.realpath(__file__)), "..") | |
98 | )) | |
99 | ||
100 | for package_guesses in guesses: | |
101 | for g in package_guesses: | |
102 | g_exp = os.path.abspath(os.path.expanduser(g)) | |
103 | if os.path.exists(g_exp): | |
104 | python_paths.append(g_exp) | |
105 | ||
106 | ld_path = os.path.join(os.getcwd(), "lib/") | |
107 | print "Using guessed paths {0} {1}".format(ld_path, python_paths) | |
108 | respawn_in_path(ld_path, python_paths) | |
109 | ||
110 | ||
111 | try: | |
112 | from teuthology.exceptions import CommandFailedError | |
113 | from tasks.ceph_manager import CephManager | |
114 | from tasks.cephfs.fuse_mount import FuseMount | |
115 | from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster | |
116 | from mgr.mgr_test_case import MgrCluster | |
117 | from teuthology.contextutil import MaxWhileTries | |
118 | from teuthology.task import interactive | |
119 | except ImportError: | |
120 | sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv " | |
121 | "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n") | |
122 | raise | |
123 | ||
124 | # Must import after teuthology because of gevent monkey patching | |
125 | import subprocess | |
126 | ||
127 | if os.path.exists("./CMakeCache.txt"): | |
128 | # Running in build dir of a cmake build | |
129 | BIN_PREFIX = "./bin/" | |
130 | SRC_PREFIX = "../src" | |
131 | else: | |
132 | # Running in src/ of an autotools build | |
133 | BIN_PREFIX = "./" | |
134 | SRC_PREFIX = "./" | |
135 | ||
136 | ||
137 | class LocalRemoteProcess(object): | |
138 | def __init__(self, args, subproc, check_status, stdout, stderr): | |
139 | self.args = args | |
140 | self.subproc = subproc | |
141 | if stdout is None: | |
142 | self.stdout = StringIO() | |
143 | else: | |
144 | self.stdout = stdout | |
145 | ||
146 | if stderr is None: | |
147 | self.stderr = StringIO() | |
148 | else: | |
149 | self.stderr = stderr | |
150 | ||
151 | self.check_status = check_status | |
152 | self.exitstatus = self.returncode = None | |
153 | ||
154 | def wait(self): | |
155 | if self.finished: | |
156 | # Avoid calling communicate() on a dead process because it'll | |
157 | # give you stick about std* already being closed | |
158 | if self.exitstatus != 0: | |
159 | raise CommandFailedError(self.args, self.exitstatus) | |
160 | else: | |
161 | return | |
162 | ||
163 | out, err = self.subproc.communicate() | |
164 | self.stdout.write(out) | |
165 | self.stderr.write(err) | |
166 | ||
167 | self.exitstatus = self.returncode = self.subproc.returncode | |
168 | ||
169 | if self.exitstatus != 0: | |
170 | sys.stderr.write(out) | |
171 | sys.stderr.write(err) | |
172 | ||
173 | if self.check_status and self.exitstatus != 0: | |
174 | raise CommandFailedError(self.args, self.exitstatus) | |
175 | ||
176 | @property | |
177 | def finished(self): | |
178 | if self.exitstatus is not None: | |
179 | return True | |
180 | ||
181 | if self.subproc.poll() is not None: | |
182 | out, err = self.subproc.communicate() | |
183 | self.stdout.write(out) | |
184 | self.stderr.write(err) | |
185 | self.exitstatus = self.returncode = self.subproc.returncode | |
186 | return True | |
187 | else: | |
188 | return False | |
189 | ||
190 | def kill(self): | |
191 | log.info("kill ") | |
192 | if self.subproc.pid and not self.finished: | |
193 | log.info("kill: killing pid {0} ({1})".format( | |
194 | self.subproc.pid, self.args)) | |
195 | safe_kill(self.subproc.pid) | |
196 | else: | |
197 | log.info("kill: already terminated ({0})".format(self.args)) | |
198 | ||
199 | @property | |
200 | def stdin(self): | |
201 | class FakeStdIn(object): | |
202 | def __init__(self, mount_daemon): | |
203 | self.mount_daemon = mount_daemon | |
204 | ||
205 | def close(self): | |
206 | self.mount_daemon.kill() | |
207 | ||
208 | return FakeStdIn(self) | |
209 | ||
210 | ||
211 | class LocalRemote(object): | |
212 | """ | |
213 | Amusingly named class to present the teuthology RemoteProcess interface when we are really | |
214 | running things locally for vstart | |
215 | ||
216 | Run this inside your src/ dir! | |
217 | """ | |
218 | ||
219 | def __init__(self): | |
220 | self.name = "local" | |
221 | self.hostname = "localhost" | |
222 | self.user = getpass.getuser() | |
223 | ||
224 | def get_file(self, path, sudo, dest_dir): | |
225 | tmpfile = tempfile.NamedTemporaryFile(delete=False).name | |
226 | shutil.copy(path, tmpfile) | |
227 | return tmpfile | |
228 | ||
229 | def put_file(self, src, dst, sudo=False): | |
230 | shutil.copy(src, dst) | |
231 | ||
232 | def run(self, args, check_status=True, wait=True, | |
233 | stdout=None, stderr=None, cwd=None, stdin=None, | |
234 | logger=None, label=None, env=None): | |
235 | log.info("run args={0}".format(args)) | |
236 | ||
237 | # We don't need no stinkin' sudo | |
238 | args = [a for a in args if a != "sudo"] | |
239 | ||
240 | # We have to use shell=True if any run.Raw was present, e.g. && | |
241 | shell = any([a for a in args if isinstance(a, Raw)]) | |
242 | ||
243 | if shell: | |
244 | filtered = [] | |
245 | i = 0 | |
246 | while i < len(args): | |
247 | if args[i] == 'adjust-ulimits': | |
248 | i += 1 | |
249 | elif args[i] == 'ceph-coverage': | |
250 | i += 2 | |
251 | elif args[i] == 'timeout': | |
252 | i += 2 | |
253 | else: | |
254 | filtered.append(args[i]) | |
255 | i += 1 | |
256 | ||
257 | args = quote(filtered) | |
258 | log.info("Running {0}".format(args)) | |
259 | ||
260 | subproc = subprocess.Popen(args, | |
261 | stdout=subprocess.PIPE, | |
262 | stderr=subprocess.PIPE, | |
263 | stdin=subprocess.PIPE, | |
264 | cwd=cwd, | |
265 | shell=True) | |
266 | else: | |
267 | log.info("Running {0}".format(args)) | |
268 | ||
269 | for arg in args: | |
270 | if not isinstance(arg, basestring): | |
271 | raise RuntimeError("Oops, can't handle arg {0} type {1}".format( | |
272 | arg, arg.__class__ | |
273 | )) | |
274 | ||
275 | subproc = subprocess.Popen(args, | |
276 | stdout=subprocess.PIPE, | |
277 | stderr=subprocess.PIPE, | |
278 | stdin=subprocess.PIPE, | |
279 | cwd=cwd, | |
280 | env=env) | |
281 | ||
282 | if stdin: | |
283 | if not isinstance(stdin, basestring): | |
284 | raise RuntimeError("Can't handle non-string stdins on a vstart cluster") | |
285 | ||
286 | # Hack: writing to stdin is not deadlock-safe, but it "always" works | |
287 | # as long as the input buffer is "small" | |
288 | subproc.stdin.write(stdin) | |
289 | ||
290 | proc = LocalRemoteProcess( | |
291 | args, subproc, check_status, | |
292 | stdout, stderr | |
293 | ) | |
294 | ||
295 | if wait: | |
296 | proc.wait() | |
297 | ||
298 | return proc | |
299 | ||
300 | ||
301 | class LocalDaemon(object): | |
302 | def __init__(self, daemon_type, daemon_id): | |
303 | self.daemon_type = daemon_type | |
304 | self.daemon_id = daemon_id | |
305 | self.controller = LocalRemote() | |
306 | self.proc = None | |
307 | ||
308 | @property | |
309 | def remote(self): | |
310 | return LocalRemote() | |
311 | ||
312 | def running(self): | |
313 | return self._get_pid() is not None | |
314 | ||
315 | def _get_pid(self): | |
316 | """ | |
317 | Return PID as an integer or None if not found | |
318 | """ | |
319 | ps_txt = self.controller.run( | |
320 | args=["ps", "ww", "-u"+str(os.getuid())] | |
321 | ).stdout.getvalue().strip() | |
322 | lines = ps_txt.split("\n")[1:] | |
323 | ||
324 | for line in lines: | |
325 | if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1: | |
326 | log.info("Found ps line for daemon: {0}".format(line)) | |
327 | return int(line.split()[0]) | |
328 | log.info("No match for {0} {1}: {2}".format( | |
329 | self.daemon_type, self.daemon_id, ps_txt | |
330 | )) | |
331 | return None | |
332 | ||
333 | def wait(self, timeout): | |
334 | waited = 0 | |
335 | while self._get_pid() is not None: | |
336 | if waited > timeout: | |
337 | raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id)) | |
338 | time.sleep(1) | |
339 | waited += 1 | |
340 | ||
341 | def stop(self, timeout=300): | |
342 | if not self.running(): | |
343 | log.error('tried to stop a non-running daemon') | |
344 | return | |
345 | ||
346 | pid = self._get_pid() | |
347 | log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id)) | |
348 | os.kill(pid, signal.SIGKILL) | |
349 | ||
350 | waited = 0 | |
351 | while pid is not None: | |
352 | new_pid = self._get_pid() | |
353 | if new_pid is not None and new_pid != pid: | |
354 | log.info("Killing new PID {0}".format(new_pid)) | |
355 | pid = new_pid | |
356 | os.kill(pid, signal.SIGKILL) | |
357 | ||
358 | if new_pid is None: | |
359 | break | |
360 | else: | |
361 | if waited > timeout: | |
362 | raise MaxWhileTries( | |
363 | "Timed out waiting for daemon {0}.{1}".format( | |
364 | self.daemon_type, self.daemon_id)) | |
365 | time.sleep(1) | |
366 | waited += 1 | |
367 | ||
368 | self.wait(timeout=timeout) | |
369 | ||
370 | def restart(self): | |
371 | if self._get_pid() is not None: | |
372 | self.stop() | |
373 | ||
374 | self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id]) | |
375 | ||
376 | ||
377 | def safe_kill(pid): | |
378 | """ | |
379 | os.kill annoyingly raises exception if process already dead. Ignore it. | |
380 | """ | |
381 | try: | |
382 | return os.kill(pid, signal.SIGKILL) | |
383 | except OSError as e: | |
384 | if e.errno == errno.ESRCH: | |
385 | # Raced with process termination | |
386 | pass | |
387 | else: | |
388 | raise | |
389 | ||
390 | ||
391 | class LocalFuseMount(FuseMount): | |
392 | def __init__(self, test_dir, client_id): | |
393 | super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote()) | |
394 | ||
395 | @property | |
396 | def config_path(self): | |
397 | return "./ceph.conf" | |
398 | ||
399 | def get_keyring_path(self): | |
400 | # This is going to end up in a config file, so use an absolute path | |
401 | # to avoid assumptions about daemons' pwd | |
402 | return os.path.abspath("./client.{0}.keyring".format(self.client_id)) | |
403 | ||
404 | def run_shell(self, args, wait=True): | |
405 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
406 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
407 | # then we wouldn't have to special case this | |
408 | return self.client_remote.run( | |
409 | args, wait=wait, cwd=self.mountpoint | |
410 | ) | |
411 | ||
412 | @property | |
413 | def _prefix(self): | |
414 | return BIN_PREFIX | |
415 | ||
416 | def _asok_path(self): | |
417 | # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's | |
418 | # run foreground. When running it daemonized however, the asok is named after | |
419 | # the PID of the launching process, not the long running ceph-fuse process. Therefore | |
420 | # we need to give an exact path here as the logic for checking /proc/ for which | |
421 | # asok is alive does not work. | |
422 | path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid) | |
423 | log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid)) | |
424 | return path | |
425 | ||
426 | def umount(self): | |
427 | if self.is_mounted(): | |
428 | super(LocalFuseMount, self).umount() | |
429 | ||
430 | def mount(self, mount_path=None, mount_fs_name=None): | |
431 | self.client_remote.run( | |
432 | args=[ | |
433 | 'mkdir', | |
434 | '--', | |
435 | self.mountpoint, | |
436 | ], | |
437 | ) | |
438 | ||
439 | def list_connections(): | |
440 | self.client_remote.run( | |
441 | args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"], | |
442 | check_status=False | |
443 | ) | |
444 | p = self.client_remote.run( | |
445 | args=["ls", "/sys/fs/fuse/connections"], | |
446 | check_status=False | |
447 | ) | |
448 | if p.exitstatus != 0: | |
449 | log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus)) | |
450 | return [] | |
451 | ||
452 | ls_str = p.stdout.getvalue().strip() | |
453 | if ls_str: | |
454 | return [int(n) for n in ls_str.split("\n")] | |
455 | else: | |
456 | return [] | |
457 | ||
458 | # Before starting ceph-fuse process, note the contents of | |
459 | # /sys/fs/fuse/connections | |
460 | pre_mount_conns = list_connections() | |
461 | log.info("Pre-mount connections: {0}".format(pre_mount_conns)) | |
462 | ||
463 | prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")] | |
464 | if os.getuid() != 0: | |
465 | prefix += ["--client-die-on-failed-remount=false"] | |
466 | ||
467 | if mount_path is not None: | |
468 | prefix += ["--client_mountpoint={0}".format(mount_path)] | |
469 | ||
470 | if mount_fs_name is not None: | |
471 | prefix += ["--client_mds_namespace={0}".format(mount_fs_name)] | |
472 | ||
473 | self.fuse_daemon = self.client_remote.run(args= | |
474 | prefix + [ | |
475 | "-f", | |
476 | "--name", | |
477 | "client.{0}".format(self.client_id), | |
478 | self.mountpoint | |
479 | ], wait=False) | |
480 | ||
481 | log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid)) | |
482 | ||
483 | # Wait for the connection reference to appear in /sys | |
484 | waited = 0 | |
485 | post_mount_conns = list_connections() | |
486 | while len(post_mount_conns) <= len(pre_mount_conns): | |
487 | if self.fuse_daemon.finished: | |
488 | # Did mount fail? Raise the CommandFailedError instead of | |
489 | # hitting the "failed to populate /sys/" timeout | |
490 | self.fuse_daemon.wait() | |
491 | time.sleep(1) | |
492 | waited += 1 | |
493 | if waited > 30: | |
494 | raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format( | |
495 | waited | |
496 | )) | |
497 | post_mount_conns = list_connections() | |
498 | ||
499 | log.info("Post-mount connections: {0}".format(post_mount_conns)) | |
500 | ||
501 | # Record our fuse connection number so that we can use it when | |
502 | # forcing an unmount | |
503 | new_conns = list(set(post_mount_conns) - set(pre_mount_conns)) | |
504 | if len(new_conns) == 0: | |
505 | raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns)) | |
506 | elif len(new_conns) > 1: | |
507 | raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns)) | |
508 | else: | |
509 | self._fuse_conn = new_conns[0] | |
510 | ||
511 | def _run_python(self, pyscript): | |
512 | """ | |
513 | Override this to remove the daemon-helper prefix that is used otherwise | |
514 | to make the process killable. | |
515 | """ | |
516 | return self.client_remote.run(args=[ | |
517 | 'python', '-c', pyscript | |
518 | ], wait=False) | |
519 | ||
520 | ||
521 | class LocalCephManager(CephManager): | |
522 | def __init__(self): | |
523 | # Deliberately skip parent init, only inheriting from it to get | |
524 | # util methods like osd_dump that sit on top of raw_cluster_cmd | |
525 | self.controller = LocalRemote() | |
526 | ||
527 | # A minority of CephManager fns actually bother locking for when | |
528 | # certain teuthology tests want to run tasks in parallel | |
529 | self.lock = threading.RLock() | |
530 | ||
531 | self.log = lambda x: log.info(x) | |
532 | ||
533 | def find_remote(self, daemon_type, daemon_id): | |
534 | """ | |
535 | daemon_type like 'mds', 'osd' | |
536 | daemon_id like 'a', '0' | |
537 | """ | |
538 | return LocalRemote() | |
539 | ||
540 | def run_ceph_w(self): | |
541 | proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO()) | |
542 | return proc | |
543 | ||
544 | def raw_cluster_cmd(self, *args): | |
545 | """ | |
546 | args like ["osd", "dump"} | |
547 | return stdout string | |
548 | """ | |
549 | proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args)) | |
550 | return proc.stdout.getvalue() | |
551 | ||
552 | def raw_cluster_cmd_result(self, *args): | |
553 | """ | |
554 | like raw_cluster_cmd but don't check status, just return rc | |
555 | """ | |
556 | proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False) | |
557 | return proc.exitstatus | |
558 | ||
559 | def admin_socket(self, daemon_type, daemon_id, command, check_status=True): | |
560 | return self.controller.run( | |
561 | args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status | |
562 | ) | |
563 | ||
564 | # FIXME: copypasta | |
565 | def get_mds_status(self, mds): | |
566 | """ | |
567 | Run cluster commands for the mds in order to get mds information | |
568 | """ | |
569 | out = self.raw_cluster_cmd('mds', 'dump', '--format=json') | |
570 | j = json.loads(' '.join(out.splitlines()[1:])) | |
571 | # collate; for dup ids, larger gid wins. | |
572 | for info in j['info'].itervalues(): | |
573 | if info['name'] == mds: | |
574 | return info | |
575 | return None | |
576 | ||
577 | # FIXME: copypasta | |
578 | def get_mds_status_by_rank(self, rank): | |
579 | """ | |
580 | Run cluster commands for the mds in order to get mds information | |
581 | check rank. | |
582 | """ | |
583 | j = self.get_mds_status_all() | |
584 | # collate; for dup ids, larger gid wins. | |
585 | for info in j['info'].itervalues(): | |
586 | if info['rank'] == rank: | |
587 | return info | |
588 | return None | |
589 | ||
590 | def get_mds_status_all(self): | |
591 | """ | |
592 | Run cluster command to extract all the mds status. | |
593 | """ | |
594 | out = self.raw_cluster_cmd('mds', 'dump', '--format=json') | |
595 | j = json.loads(' '.join(out.splitlines()[1:])) | |
596 | return j | |
597 | ||
598 | ||
599 | class LocalCephCluster(CephCluster): | |
600 | def __init__(self, ctx): | |
601 | # Deliberately skip calling parent constructor | |
602 | self._ctx = ctx | |
603 | self.mon_manager = LocalCephManager() | |
604 | self._conf = defaultdict(dict) | |
605 | ||
606 | @property | |
607 | def admin_remote(self): | |
608 | return LocalRemote() | |
609 | ||
610 | def get_config(self, key, service_type=None): | |
611 | if service_type is None: | |
612 | service_type = 'mon' | |
613 | ||
614 | # FIXME hardcoded vstart service IDs | |
615 | service_id = { | |
616 | 'mon': 'a', | |
617 | 'mds': 'a', | |
618 | 'osd': '0' | |
619 | }[service_type] | |
620 | ||
621 | return self.json_asok(['config', 'get', key], service_type, service_id)[key] | |
622 | ||
623 | def _write_conf(self): | |
624 | # In teuthology, we have the honour of writing the entire ceph.conf, but | |
625 | # in vstart land it has mostly already been written and we need to carefully | |
626 | # append to it. | |
627 | conf_path = "./ceph.conf" | |
628 | banner = "\n#LOCAL_TEST\n" | |
629 | existing_str = open(conf_path).read() | |
630 | ||
631 | if banner in existing_str: | |
632 | existing_str = existing_str[0:existing_str.find(banner)] | |
633 | ||
634 | existing_str += banner | |
635 | ||
636 | for subsys, kvs in self._conf.items(): | |
637 | existing_str += "\n[{0}]\n".format(subsys) | |
638 | for key, val in kvs.items(): | |
639 | # Comment out existing instance if it exists | |
640 | log.info("Searching for existing instance {0}/{1}".format( | |
641 | key, subsys | |
642 | )) | |
643 | existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format( | |
644 | subsys | |
645 | ), existing_str, re.MULTILINE) | |
646 | ||
647 | if existing_section: | |
648 | section_str = existing_str[existing_section.start():existing_section.end()] | |
649 | existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE) | |
650 | if existing_val: | |
651 | start = existing_section.start() + existing_val.start(1) | |
652 | log.info("Found string to replace at {0}".format( | |
653 | start | |
654 | )) | |
655 | existing_str = existing_str[0:start] + "#" + existing_str[start:] | |
656 | ||
657 | existing_str += "{0} = {1}\n".format(key, val) | |
658 | ||
659 | open(conf_path, "w").write(existing_str) | |
660 | ||
661 | def set_ceph_conf(self, subsys, key, value): | |
662 | self._conf[subsys][key] = value | |
663 | self._write_conf() | |
664 | ||
665 | def clear_ceph_conf(self, subsys, key): | |
666 | del self._conf[subsys][key] | |
667 | self._write_conf() | |
668 | ||
669 | ||
670 | class LocalMDSCluster(LocalCephCluster, MDSCluster): | |
671 | def __init__(self, ctx): | |
672 | super(LocalMDSCluster, self).__init__(ctx) | |
673 | ||
674 | self.mds_ids = ctx.daemons.daemons['mds'].keys() | |
675 | self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids]) | |
676 | ||
677 | def clear_firewall(self): | |
678 | # FIXME: unimplemented | |
679 | pass | |
680 | ||
681 | def newfs(self, name): | |
682 | return LocalFilesystem(self._ctx, create=name) | |
683 | ||
684 | ||
685 | class LocalMgrCluster(LocalCephCluster, MgrCluster): | |
686 | def __init__(self, ctx): | |
687 | super(LocalMgrCluster, self).__init__(ctx) | |
688 | ||
689 | self.mgr_ids = ctx.daemons.daemons['mgr'].keys() | |
690 | self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids]) | |
691 | ||
692 | ||
693 | class LocalFilesystem(Filesystem, LocalMDSCluster): | |
694 | def __init__(self, ctx, fscid=None, create=None): | |
695 | # Deliberately skip calling parent constructor | |
696 | self._ctx = ctx | |
697 | ||
698 | self.id = None | |
699 | self.name = None | |
700 | self.metadata_pool_name = None | |
701 | self.data_pools = None | |
702 | ||
703 | # Hack: cheeky inspection of ceph.conf to see what MDSs exist | |
704 | self.mds_ids = set() | |
705 | for line in open("ceph.conf").readlines(): | |
706 | match = re.match("^\[mds\.(.+)\]$", line) | |
707 | if match: | |
708 | self.mds_ids.add(match.group(1)) | |
709 | ||
710 | if not self.mds_ids: | |
711 | raise RuntimeError("No MDSs found in ceph.conf!") | |
712 | ||
713 | self.mds_ids = list(self.mds_ids) | |
714 | ||
715 | log.info("Discovered MDS IDs: {0}".format(self.mds_ids)) | |
716 | ||
717 | self.mon_manager = LocalCephManager() | |
718 | ||
719 | self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids]) | |
720 | ||
721 | self.client_remote = LocalRemote() | |
722 | ||
723 | self._conf = defaultdict(dict) | |
724 | ||
725 | if create is not None: | |
726 | if fscid is not None: | |
727 | raise RuntimeError("cannot specify fscid when creating fs") | |
728 | if create is True: | |
729 | self.name = 'cephfs' | |
730 | else: | |
731 | self.name = create | |
732 | self.create() | |
733 | elif fscid is not None: | |
734 | self.id = fscid | |
735 | self.getinfo(refresh=True) | |
736 | ||
737 | # Stash a reference to the first created filesystem on ctx, so | |
738 | # that if someone drops to the interactive shell they can easily | |
739 | # poke our methods. | |
740 | if not hasattr(self._ctx, "filesystem"): | |
741 | self._ctx.filesystem = self | |
742 | ||
743 | @property | |
744 | def _prefix(self): | |
745 | return BIN_PREFIX | |
746 | ||
747 | def set_clients_block(self, blocked, mds_id=None): | |
748 | raise NotImplementedError() | |
749 | ||
750 | def get_pgs_per_fs_pool(self): | |
751 | # FIXME: assuming there are 3 OSDs | |
752 | return 3 * int(self.get_config('mon_pg_warn_min_per_osd')) | |
753 | ||
754 | ||
755 | class InteractiveFailureResult(unittest.TextTestResult): | |
756 | """ | |
757 | Specialization that implements interactive-on-error style | |
758 | behavior. | |
759 | """ | |
760 | def addFailure(self, test, err): | |
761 | super(InteractiveFailureResult, self).addFailure(test, err) | |
762 | log.error(self._exc_info_to_string(err, test)) | |
763 | log.error("Failure in test '{0}', going interactive".format( | |
764 | self.getDescription(test) | |
765 | )) | |
766 | interactive.task(ctx=None, config=None) | |
767 | ||
768 | def addError(self, test, err): | |
769 | super(InteractiveFailureResult, self).addError(test, err) | |
770 | log.error(self._exc_info_to_string(err, test)) | |
771 | log.error("Error in test '{0}', going interactive".format( | |
772 | self.getDescription(test) | |
773 | )) | |
774 | interactive.task(ctx=None, config=None) | |
775 | ||
776 | ||
777 | def enumerate_methods(s): | |
778 | log.info("e: {0}".format(s)) | |
779 | for t in s._tests: | |
780 | if isinstance(t, suite.BaseTestSuite): | |
781 | for sub in enumerate_methods(t): | |
782 | yield sub | |
783 | else: | |
784 | yield s, t | |
785 | ||
786 | ||
787 | def load_tests(modules, loader): | |
788 | if modules: | |
789 | log.info("Executing modules: {0}".format(modules)) | |
790 | module_suites = [] | |
791 | for mod_name in modules: | |
792 | # Test names like cephfs.test_auto_repair | |
793 | module_suites.append(loader.loadTestsFromName(mod_name)) | |
794 | log.info("Loaded: {0}".format(list(module_suites))) | |
795 | return suite.TestSuite(module_suites) | |
796 | else: | |
797 | log.info("Executing all cephfs tests") | |
798 | return loader.discover( | |
799 | os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs") | |
800 | ) | |
801 | ||
802 | ||
803 | def scan_tests(modules): | |
804 | overall_suite = load_tests(modules, loader.TestLoader()) | |
805 | ||
806 | max_required_mds = 0 | |
807 | max_required_clients = 0 | |
808 | max_required_mgr = 0 | |
809 | ||
810 | for suite, case in enumerate_methods(overall_suite): | |
811 | max_required_mds = max(max_required_mds, | |
812 | getattr(case, "MDSS_REQUIRED", 0)) | |
813 | max_required_clients = max(max_required_clients, | |
814 | getattr(case, "CLIENTS_REQUIRED", 0)) | |
815 | max_required_mgr = max(max_required_mgr, | |
816 | getattr(case, "MGRS_REQUIRED", 0)) | |
817 | ||
818 | return max_required_mds, max_required_clients, max_required_mgr | |
819 | ||
820 | ||
821 | class LocalCluster(object): | |
822 | def __init__(self, rolename="placeholder"): | |
823 | self.remotes = { | |
824 | LocalRemote(): [rolename] | |
825 | } | |
826 | ||
827 | def only(self, requested): | |
828 | return self.__class__(rolename=requested) | |
829 | ||
830 | ||
831 | class LocalContext(object): | |
832 | def __init__(self): | |
833 | self.config = {} | |
834 | self.teuthology_config = teuth_config | |
835 | self.cluster = LocalCluster() | |
836 | self.daemons = DaemonGroup() | |
837 | ||
838 | # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any | |
839 | # tests that want to look these up via ctx can do so. | |
840 | # Inspect ceph.conf to see what roles exist | |
841 | for conf_line in open("ceph.conf").readlines(): | |
842 | for svc_type in ["mon", "osd", "mds", "mgr"]: | |
843 | if svc_type not in self.daemons.daemons: | |
844 | self.daemons.daemons[svc_type] = {} | |
845 | match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line) | |
846 | if match: | |
847 | svc_id = match.group(1) | |
848 | self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id) | |
849 | ||
850 | def __del__(self): | |
851 | shutil.rmtree(self.teuthology_config['test_path']) | |
852 | ||
853 | ||
854 | def exec_test(): | |
855 | # Parse arguments | |
856 | interactive_on_error = False | |
857 | create_cluster = False | |
858 | ||
859 | args = sys.argv[1:] | |
860 | flags = [a for a in args if a.startswith("-")] | |
861 | modules = [a for a in args if not a.startswith("-")] | |
862 | for f in flags: | |
863 | if f == "--interactive": | |
864 | interactive_on_error = True | |
865 | elif f == "--create": | |
866 | create_cluster = True | |
867 | else: | |
868 | log.error("Unknown option '{0}'".format(f)) | |
869 | sys.exit(-1) | |
870 | ||
871 | # Help developers by stopping up-front if their tree isn't built enough for all the | |
872 | # tools that the tests might want to use (add more here if needed) | |
873 | require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan", | |
874 | "cephfs-table-tool", "ceph-fuse", "rados"] | |
875 | missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))] | |
876 | if missing_binaries: | |
877 | log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries))) | |
878 | sys.exit(-1) | |
879 | ||
880 | max_required_mds, max_required_clients, max_required_mgr = scan_tests(modules) | |
881 | ||
882 | remote = LocalRemote() | |
883 | ||
884 | # Tolerate no MDSs or clients running at start | |
885 | ps_txt = remote.run( | |
886 | args=["ps", "-u"+str(os.getuid())] | |
887 | ).stdout.getvalue().strip() | |
888 | lines = ps_txt.split("\n")[1:] | |
889 | for line in lines: | |
890 | if 'ceph-fuse' in line or 'ceph-mds' in line: | |
891 | pid = int(line.split()[0]) | |
892 | log.warn("Killing stray process {0}".format(line)) | |
893 | os.kill(pid, signal.SIGKILL) | |
894 | ||
895 | # Fire up the Ceph cluster if the user requested it | |
896 | if create_cluster: | |
897 | log.info("Creating cluster with {0} MDS daemons".format( | |
898 | max_required_mds)) | |
899 | remote.run([os.path.join(SRC_PREFIX, "stop.sh")], check_status=False) | |
900 | remote.run(["rm", "-rf", "./out"]) | |
901 | remote.run(["rm", "-rf", "./dev"]) | |
902 | vstart_env = os.environ.copy() | |
903 | vstart_env["FS"] = "0" | |
904 | vstart_env["MDS"] = max_required_mds.__str__() | |
905 | vstart_env["OSD"] = "1" | |
906 | vstart_env["MGR"] = max(max_required_mgr, 1).__str__() | |
907 | ||
908 | remote.run([os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d", "--nolockdep"], | |
909 | env=vstart_env) | |
910 | ||
911 | # Wait for OSD to come up so that subsequent injectargs etc will | |
912 | # definitely succeed | |
c07f9fc5 | 913 | LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30) |
7c673cae FG |
914 | |
915 | # List of client mounts, sufficient to run the selected tests | |
916 | clients = [i.__str__() for i in range(0, max_required_clients)] | |
917 | ||
918 | test_dir = tempfile.mkdtemp() | |
919 | teuth_config['test_path'] = test_dir | |
920 | ||
921 | # Construct Mount classes | |
922 | mounts = [] | |
923 | for client_id in clients: | |
924 | # Populate client keyring (it sucks to use client.admin for test clients | |
925 | # because it's awkward to find the logs later) | |
926 | client_name = "client.{0}".format(client_id) | |
927 | ||
928 | if client_name not in open("./keyring").read(): | |
929 | p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name, | |
930 | "osd", "allow rw", | |
931 | "mds", "allow", | |
932 | "mon", "allow r"]) | |
933 | ||
934 | open("./keyring", "a").write(p.stdout.getvalue()) | |
935 | ||
936 | mount = LocalFuseMount(test_dir, client_id) | |
937 | mounts.append(mount) | |
938 | if mount.is_mounted(): | |
939 | log.warn("unmounting {0}".format(mount.mountpoint)) | |
940 | mount.umount_wait() | |
941 | else: | |
942 | if os.path.exists(mount.mountpoint): | |
943 | os.rmdir(mount.mountpoint) | |
944 | ||
945 | ctx = LocalContext() | |
946 | ceph_cluster = LocalCephCluster(ctx) | |
947 | mds_cluster = LocalMDSCluster(ctx) | |
948 | mgr_cluster = LocalMgrCluster(ctx) | |
949 | ||
950 | from tasks.cephfs_test_runner import DecoratingLoader | |
951 | ||
952 | class LogStream(object): | |
953 | def __init__(self): | |
954 | self.buffer = "" | |
955 | ||
956 | def write(self, data): | |
957 | self.buffer += data | |
958 | if "\n" in self.buffer: | |
959 | lines = self.buffer.split("\n") | |
960 | for line in lines[:-1]: | |
961 | pass | |
962 | # sys.stderr.write(line + "\n") | |
963 | log.info(line) | |
964 | self.buffer = lines[-1] | |
965 | ||
966 | def flush(self): | |
967 | pass | |
968 | ||
969 | decorating_loader = DecoratingLoader({ | |
970 | "ctx": ctx, | |
971 | "mounts": mounts, | |
972 | "ceph_cluster": ceph_cluster, | |
973 | "mds_cluster": mds_cluster, | |
974 | "mgr_cluster": mgr_cluster, | |
975 | }) | |
976 | ||
977 | # For the benefit of polling tests like test_full -- in teuthology land we set this | |
978 | # in a .yaml, here it's just a hardcoded thing for the developer's pleasure. | |
979 | remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"]) | |
980 | ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval_max", "5") | |
981 | ||
982 | # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning | |
983 | # from normal IO latency. Increase it for running teests. | |
984 | ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10") | |
985 | ||
986 | # Make sure the filesystem created in tests has uid/gid that will let us talk to | |
987 | # it after mounting it (without having to go root). Set in 'global' not just 'mds' | |
988 | # so that cephfs-data-scan will pick it up too. | |
989 | ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid()) | |
990 | ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid()) | |
991 | ||
992 | # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on | |
993 | def _get_package_version(remote, pkg_name): | |
994 | # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right? | |
995 | return "2.9" | |
996 | ||
997 | import teuthology.packaging | |
998 | teuthology.packaging.get_package_version = _get_package_version | |
999 | ||
1000 | overall_suite = load_tests(modules, decorating_loader) | |
1001 | ||
1002 | # Filter out tests that don't lend themselves to interactive running, | |
1003 | victims = [] | |
1004 | for case, method in enumerate_methods(overall_suite): | |
1005 | fn = getattr(method, method._testMethodName) | |
1006 | ||
1007 | drop_test = False | |
1008 | ||
1009 | if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True: | |
1010 | drop_test = True | |
1011 | log.warn("Dropping test because long running: ".format(method.id())) | |
1012 | ||
1013 | if getattr(fn, "needs_trimming", False) is True: | |
1014 | drop_test = (os.getuid() != 0) | |
1015 | log.warn("Dropping test because client trim unavailable: ".format(method.id())) | |
1016 | ||
1017 | if drop_test: | |
1018 | # Don't drop the test if it was explicitly requested in arguments | |
1019 | is_named = False | |
1020 | for named in modules: | |
1021 | if named.endswith(method.id()): | |
1022 | is_named = True | |
1023 | break | |
1024 | ||
1025 | if not is_named: | |
1026 | victims.append((case, method)) | |
1027 | ||
1028 | log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims))) | |
1029 | for s, method in victims: | |
1030 | s._tests.remove(method) | |
1031 | ||
1032 | if interactive_on_error: | |
1033 | result_class = InteractiveFailureResult | |
1034 | else: | |
1035 | result_class = unittest.TextTestResult | |
1036 | fail_on_skip = False | |
1037 | ||
1038 | class LoggingResult(result_class): | |
1039 | def startTest(self, test): | |
1040 | log.info("Starting test: {0}".format(self.getDescription(test))) | |
1041 | test.started_at = datetime.datetime.utcnow() | |
1042 | return super(LoggingResult, self).startTest(test) | |
1043 | ||
1044 | def stopTest(self, test): | |
1045 | log.info("Stopped test: {0} in {1}s".format( | |
1046 | self.getDescription(test), | |
1047 | (datetime.datetime.utcnow() - test.started_at).total_seconds() | |
1048 | )) | |
1049 | ||
1050 | def addSkip(self, test, reason): | |
1051 | if fail_on_skip: | |
1052 | # Don't just call addFailure because that requires a traceback | |
1053 | self.failures.append((test, reason)) | |
1054 | else: | |
1055 | super(LoggingResult, self).addSkip(test, reason) | |
1056 | ||
1057 | # Execute! | |
1058 | result = unittest.TextTestRunner( | |
1059 | stream=LogStream(), | |
1060 | resultclass=LoggingResult, | |
1061 | verbosity=2, | |
1062 | failfast=True).run(overall_suite) | |
1063 | ||
1064 | if not result.wasSuccessful(): | |
1065 | result.printErrors() # duplicate output at end for convenience | |
1066 | ||
1067 | bad_tests = [] | |
1068 | for test, error in result.errors: | |
1069 | bad_tests.append(str(test)) | |
1070 | for test, failure in result.failures: | |
1071 | bad_tests.append(str(test)) | |
1072 | ||
1073 | sys.exit(-1) | |
1074 | else: | |
1075 | sys.exit(0) | |
1076 | ||
1077 | ||
1078 | if __name__ == "__main__": | |
1079 | exec_test() |