]>
Commit | Line | Data |
---|---|---|
1 | """ | |
2 | vstart_runner: override Filesystem and Mount interfaces to run a CephFSTestCase against a vstart | |
3 | ceph instance instead of a packaged/installed cluster. Use this to turn around test cases | |
4 | quickly during development. | |
5 | ||
6 | Simple usage (assuming teuthology and ceph checked out in ~/git): | |
7 | ||
8 | # Activate the teuthology virtualenv | |
9 | source ~/git/teuthology/virtualenv/bin/activate | |
10 | # Go into your ceph build directory | |
11 | cd ~/git/ceph/build | |
12 | # Invoke a test using this script | |
13 | python ~/git/ceph/qa/tasks/vstart_runner.py --create tasks.cephfs.test_data_scan | |
14 | ||
15 | Alternative usage: | |
16 | ||
17 | # Alternatively, if you use different paths, specify them as follows: | |
18 | LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.3 python ~/git/ceph/qa/tasks/vstart_runner.py | |
19 | ||
20 | # If you wish to drop to a python shell on failures, use --interactive: | |
21 | python ~/git/ceph/qa/tasks/vstart_runner.py --interactive | |
22 | ||
23 | # If you wish to run a named test case, pass it as an argument: | |
24 | python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan | |
25 | ||
26 | # Also, you can create the cluster once and then run named test cases against it: | |
27 | python ~/git/ceph/qa/tasks/vstart_runner.py --create-cluster-only | |
28 | python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_health | |
29 | python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_rgw | |
30 | ||
31 | """ | |
32 | ||
33 | from six import StringIO | |
34 | from io import BytesIO | |
35 | from collections import defaultdict | |
36 | import getpass | |
37 | import signal | |
38 | import tempfile | |
39 | import threading | |
40 | import datetime | |
41 | import shutil | |
42 | import re | |
43 | import os | |
44 | import time | |
45 | import sys | |
46 | import errno | |
47 | from unittest import suite, loader | |
48 | import unittest | |
49 | import platform | |
50 | from teuthology import misc | |
51 | from teuthology.orchestra.run import Raw, quote | |
52 | from teuthology.orchestra.daemon import DaemonGroup | |
53 | from teuthology.config import config as teuth_config | |
54 | import six | |
55 | import logging | |
56 | ||
57 | def init_log(): | |
58 | global log | |
59 | if log is not None: | |
60 | del log | |
61 | log = logging.getLogger(__name__) | |
62 | ||
63 | global logpath | |
64 | logpath = './vstart_runner.log' | |
65 | ||
66 | handler = logging.FileHandler(logpath) | |
67 | formatter = logging.Formatter( | |
68 | fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s', | |
69 | datefmt='%Y-%m-%dT%H:%M:%S') | |
70 | handler.setFormatter(formatter) | |
71 | log.addHandler(handler) | |
72 | log.setLevel(logging.INFO) | |
73 | ||
74 | log = None | |
75 | init_log() | |
76 | ||
77 | ||
78 | def respawn_in_path(lib_path, python_paths): | |
79 | execv_cmd = ['python'] | |
80 | if platform.system() == "Darwin": | |
81 | lib_path_var = "DYLD_LIBRARY_PATH" | |
82 | else: | |
83 | lib_path_var = "LD_LIBRARY_PATH" | |
84 | ||
85 | py_binary = os.environ.get("PYTHON", sys.executable) | |
86 | ||
87 | if lib_path_var in os.environ: | |
88 | if lib_path not in os.environ[lib_path_var]: | |
89 | os.environ[lib_path_var] += ':' + lib_path | |
90 | os.execvp(py_binary, execv_cmd + sys.argv) | |
91 | else: | |
92 | os.environ[lib_path_var] = lib_path | |
93 | os.execvp(py_binary, execv_cmd + sys.argv) | |
94 | ||
95 | for p in python_paths: | |
96 | sys.path.insert(0, p) | |
97 | ||
98 | ||
99 | # Let's use some sensible defaults | |
100 | if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"): | |
101 | ||
102 | # A list of candidate paths for each package we need | |
103 | guesses = [ | |
104 | ["~/git/teuthology", "~/scm/teuthology", "~/teuthology"], | |
105 | ["lib/cython_modules/lib.3"], | |
106 | ["../src/pybind"], | |
107 | ] | |
108 | ||
109 | python_paths = [] | |
110 | ||
111 | # Up one level so that "tasks.foo.bar" imports work | |
112 | python_paths.append(os.path.abspath( | |
113 | os.path.join(os.path.dirname(os.path.realpath(__file__)), "..") | |
114 | )) | |
115 | ||
116 | for package_guesses in guesses: | |
117 | for g in package_guesses: | |
118 | g_exp = os.path.abspath(os.path.expanduser(g)) | |
119 | if os.path.exists(g_exp): | |
120 | python_paths.append(g_exp) | |
121 | ||
122 | ld_path = os.path.join(os.getcwd(), "lib/") | |
123 | print("Using guessed paths {0} {1}".format(ld_path, python_paths)) | |
124 | respawn_in_path(ld_path, python_paths) | |
125 | ||
126 | ||
127 | try: | |
128 | from teuthology.exceptions import CommandFailedError | |
129 | from tasks.ceph_manager import CephManager | |
130 | from tasks.cephfs.fuse_mount import FuseMount | |
131 | from tasks.cephfs.kernel_mount import KernelMount | |
132 | from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster | |
133 | from tasks.mgr.mgr_test_case import MgrCluster | |
134 | from teuthology.contextutil import MaxWhileTries | |
135 | from teuthology.task import interactive | |
136 | except ImportError: | |
137 | sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv " | |
138 | "and set PYTHONPATH to point to teuthology and ceph-qa-suite?\n***\n\n") | |
139 | raise | |
140 | ||
141 | # Must import after teuthology because of gevent monkey patching | |
142 | import subprocess | |
143 | ||
144 | if os.path.exists("./CMakeCache.txt"): | |
145 | # Running in build dir of a cmake build | |
146 | BIN_PREFIX = "./bin/" | |
147 | SRC_PREFIX = "../src" | |
148 | else: | |
149 | # Running in src/ of an autotools build | |
150 | BIN_PREFIX = "./" | |
151 | SRC_PREFIX = "./" | |
152 | ||
153 | ||
154 | def rm_nonascii_chars(var): | |
155 | var = var.replace('\xe2\x80\x98', '\'') | |
156 | var = var.replace('\xe2\x80\x99', '\'') | |
157 | return var | |
158 | ||
159 | class LocalRemoteProcess(object): | |
160 | def __init__(self, args, subproc, check_status, stdout, stderr): | |
161 | self.args = args | |
162 | self.subproc = subproc | |
163 | self.stdout = stdout or BytesIO() | |
164 | self.stderr = stderr or BytesIO() | |
165 | ||
166 | self.check_status = check_status | |
167 | self.exitstatus = self.returncode = None | |
168 | ||
169 | def wait(self): | |
170 | if self.finished: | |
171 | # Avoid calling communicate() on a dead process because it'll | |
172 | # give you stick about std* already being closed | |
173 | if self.check_status and self.exitstatus != 0: | |
174 | raise CommandFailedError(self.args, self.exitstatus) | |
175 | else: | |
176 | return | |
177 | ||
178 | out, err = self.subproc.communicate() | |
179 | out, err = rm_nonascii_chars(out), rm_nonascii_chars(err) | |
180 | self.stdout.write(out) | |
181 | self.stderr.write(err) | |
182 | ||
183 | self.exitstatus = self.returncode = self.subproc.returncode | |
184 | ||
185 | if self.exitstatus != 0: | |
186 | sys.stderr.write(six.ensure_str(out)) | |
187 | sys.stderr.write(six.ensure_str(err)) | |
188 | ||
189 | if self.check_status and self.exitstatus != 0: | |
190 | raise CommandFailedError(self.args, self.exitstatus) | |
191 | ||
192 | @property | |
193 | def finished(self): | |
194 | if self.exitstatus is not None: | |
195 | return True | |
196 | ||
197 | if self.subproc.poll() is not None: | |
198 | out, err = self.subproc.communicate() | |
199 | self.stdout.write(out) | |
200 | self.stderr.write(err) | |
201 | self.exitstatus = self.returncode = self.subproc.returncode | |
202 | return True | |
203 | else: | |
204 | return False | |
205 | ||
206 | def kill(self): | |
207 | log.info("kill ") | |
208 | if self.subproc.pid and not self.finished: | |
209 | log.info("kill: killing pid {0} ({1})".format( | |
210 | self.subproc.pid, self.args)) | |
211 | safe_kill(self.subproc.pid) | |
212 | else: | |
213 | log.info("kill: already terminated ({0})".format(self.args)) | |
214 | ||
215 | @property | |
216 | def stdin(self): | |
217 | class FakeStdIn(object): | |
218 | def __init__(self, mount_daemon): | |
219 | self.mount_daemon = mount_daemon | |
220 | ||
221 | def close(self): | |
222 | self.mount_daemon.kill() | |
223 | ||
224 | return FakeStdIn(self) | |
225 | ||
226 | ||
227 | class LocalRemote(object): | |
228 | """ | |
229 | Amusingly named class to present the teuthology RemoteProcess interface when we are really | |
230 | running things locally for vstart | |
231 | ||
232 | Run this inside your src/ dir! | |
233 | """ | |
234 | ||
235 | def __init__(self): | |
236 | self.name = "local" | |
237 | self.hostname = "localhost" | |
238 | self.user = getpass.getuser() | |
239 | ||
240 | def get_file(self, path, sudo, dest_dir): | |
241 | tmpfile = tempfile.NamedTemporaryFile(delete=False).name | |
242 | shutil.copy(path, tmpfile) | |
243 | return tmpfile | |
244 | ||
245 | # XXX: This method ignores the error raised when src and dst are | |
246 | # holding same path. For teuthology, same path still represents | |
247 | # different locations as they lie on different machines. | |
248 | def put_file(self, src, dst, sudo=False): | |
249 | if sys.version_info.major < 3: | |
250 | exception = shutil.Error | |
251 | elif sys.version_info.major >= 3: | |
252 | exception = shutil.SameFileError | |
253 | ||
254 | try: | |
255 | shutil.copy(src, dst) | |
256 | except exception as e: | |
257 | if sys.version_info.major < 3 and e.message.find('are the same ' | |
258 | 'file') != -1: | |
259 | return | |
260 | raise e | |
261 | ||
262 | # XXX: accepts only two arugments to maintain compatibility with | |
263 | # teuthology's mkdtemp. | |
264 | def mkdtemp(self, suffix='', parentdir=None): | |
265 | from tempfile import mkdtemp | |
266 | ||
267 | # XXX: prefix had to be set without that this method failed against | |
268 | # Python2.7 - | |
269 | # > /usr/lib64/python2.7/tempfile.py(337)mkdtemp() | |
270 | # -> file = _os.path.join(dir, prefix + name + suffix) | |
271 | # (Pdb) p prefix | |
272 | # None | |
273 | return mkdtemp(suffix=suffix, prefix='', dir=parentdir) | |
274 | ||
275 | def mktemp(self, suffix=None, parentdir=None): | |
276 | """ | |
277 | Make a remote temporary file | |
278 | ||
279 | Returns: the path of the temp file created. | |
280 | """ | |
281 | from tempfile import mktemp | |
282 | return mktemp(suffix=suffix, dir=parentdir) | |
283 | ||
284 | def _perform_checks_and_return_list_of_args(self, args, omit_sudo): | |
285 | # Since Python's shell simulation can only work when commands are | |
286 | # provided as a list of argumensts... | |
287 | if isinstance(args, str) or isinstance(args, six.text_type): | |
288 | args = args.split() | |
289 | ||
290 | # We'll let sudo be a part of command even omit flag says otherwise in | |
291 | # cases of commands which can normally be ran only by root. | |
292 | try: | |
293 | if args[args.index('sudo') + 1] in ['-u', 'passwd', 'chown']: | |
294 | omit_sudo = False | |
295 | except ValueError: | |
296 | pass | |
297 | ||
298 | # Quotes wrapping a command argument don't work fine in Python's shell | |
299 | # simulation if the arguments contains spaces too. E.g. '"ls"' is OK | |
300 | # but "ls /" isn't. | |
301 | errmsg = "Don't surround arguments commands by quotes if it " + \ | |
302 | "contains spaces.\nargs - %s" % (args) | |
303 | for arg in args: | |
304 | if isinstance(arg, Raw): | |
305 | continue | |
306 | ||
307 | if arg and (arg[0] in ['"', "'"] or arg[-1] in ['"', "'"]) and \ | |
308 | (arg.find(' ') != -1 and 0 < arg.find(' ') < len(arg) - 1): | |
309 | raise RuntimeError(errmsg) | |
310 | ||
311 | # ['sudo', '-u', 'user', '-s', 'path-to-shell', '-c', 'ls', 'a'] | |
312 | # and ['sudo', '-u', user, '-s', path_to_shell, '-c', 'ls a'] are | |
313 | # treated differently by Python's shell simulation. Only latter has | |
314 | # the desired effect. | |
315 | errmsg = 'The entire command to executed as other user should be a ' +\ | |
316 | 'single argument.\nargs - %s' % (args) | |
317 | if 'sudo' in args and '-u' in args and '-c' in args and \ | |
318 | args.count('-c') == 1: | |
319 | if args.index('-c') != len(args) - 2 and \ | |
320 | args[args.index('-c') + 2].find('-') == -1: | |
321 | raise RuntimeError(errmsg) | |
322 | ||
323 | if omit_sudo: | |
324 | args = [a for a in args if a != "sudo"] | |
325 | ||
326 | return args | |
327 | ||
328 | # Wrapper to keep the interface exactly same as that of | |
329 | # teuthology.remote.run. | |
330 | def run(self, **kwargs): | |
331 | return self._do_run(**kwargs) | |
332 | ||
333 | def _do_run(self, args, check_status=True, wait=True, stdout=None, | |
334 | stderr=None, cwd=None, stdin=None, logger=None, label=None, | |
335 | env=None, timeout=None, omit_sudo=False): | |
336 | args = self._perform_checks_and_return_list_of_args(args, omit_sudo) | |
337 | ||
338 | # We have to use shell=True if any run.Raw was present, e.g. && | |
339 | shell = any([a for a in args if isinstance(a, Raw)]) | |
340 | ||
341 | # Filter out helper tools that don't exist in a vstart environment | |
342 | args = [a for a in args if a not in ('adjust-ulimits', | |
343 | 'ceph-coverage')] | |
344 | ||
345 | # Adjust binary path prefix if given a bare program name | |
346 | if "/" not in args[0]: | |
347 | # If they asked for a bare binary name, and it exists | |
348 | # in our built tree, use the one there. | |
349 | local_bin = os.path.join(BIN_PREFIX, args[0]) | |
350 | if os.path.exists(local_bin): | |
351 | args = [local_bin] + args[1:] | |
352 | else: | |
353 | log.debug("'{0}' is not a binary in the Ceph build dir".format( | |
354 | args[0] | |
355 | )) | |
356 | ||
357 | log.info("Running {0}".format(args)) | |
358 | ||
359 | if shell: | |
360 | subproc = subprocess.Popen(quote(args), | |
361 | stdout=subprocess.PIPE, | |
362 | stderr=subprocess.PIPE, | |
363 | stdin=subprocess.PIPE, | |
364 | cwd=cwd, | |
365 | shell=True) | |
366 | else: | |
367 | # Sanity check that we've got a list of strings | |
368 | for arg in args: | |
369 | if not isinstance(arg, six.string_types): | |
370 | raise RuntimeError("Oops, can't handle arg {0} type {1}".format( | |
371 | arg, arg.__class__ | |
372 | )) | |
373 | ||
374 | subproc = subprocess.Popen(args, | |
375 | stdout=subprocess.PIPE, | |
376 | stderr=subprocess.PIPE, | |
377 | stdin=subprocess.PIPE, | |
378 | cwd=cwd, | |
379 | env=env) | |
380 | ||
381 | if stdin: | |
382 | if not isinstance(stdin, six.string_types): | |
383 | raise RuntimeError("Can't handle non-string stdins on a vstart cluster") | |
384 | ||
385 | # Hack: writing to stdin is not deadlock-safe, but it "always" works | |
386 | # as long as the input buffer is "small" | |
387 | subproc.stdin.write(stdin) | |
388 | ||
389 | proc = LocalRemoteProcess( | |
390 | args, subproc, check_status, | |
391 | stdout, stderr | |
392 | ) | |
393 | ||
394 | if wait: | |
395 | proc.wait() | |
396 | ||
397 | return proc | |
398 | ||
399 | # XXX: for compatibility keep this method same teuthology.orchestra.remote.sh | |
400 | def sh(self, script, **kwargs): | |
401 | """ | |
402 | Shortcut for run method. | |
403 | ||
404 | Usage: | |
405 | my_name = remote.sh('whoami') | |
406 | remote_date = remote.sh('date') | |
407 | """ | |
408 | if 'stdout' not in kwargs: | |
409 | kwargs['stdout'] = StringIO() | |
410 | if 'args' not in kwargs: | |
411 | kwargs['args'] = script | |
412 | proc = self.run(**kwargs) | |
413 | return proc.stdout.getvalue() | |
414 | ||
415 | ||
416 | class LocalDaemon(object): | |
417 | def __init__(self, daemon_type, daemon_id): | |
418 | self.daemon_type = daemon_type | |
419 | self.daemon_id = daemon_id | |
420 | self.controller = LocalRemote() | |
421 | self.proc = None | |
422 | ||
423 | @property | |
424 | def remote(self): | |
425 | return LocalRemote() | |
426 | ||
427 | def running(self): | |
428 | return self._get_pid() is not None | |
429 | ||
430 | def check_status(self): | |
431 | if self.proc: | |
432 | return self.proc.poll() | |
433 | ||
434 | def _get_pid(self): | |
435 | """ | |
436 | Return PID as an integer or None if not found | |
437 | """ | |
438 | ps_txt = six.ensure_str(self.controller.run( | |
439 | args=["ps", "ww", "-u"+str(os.getuid())] | |
440 | ).stdout.getvalue()).strip() | |
441 | lines = ps_txt.split("\n")[1:] | |
442 | ||
443 | for line in lines: | |
444 | if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1: | |
445 | log.info("Found ps line for daemon: {0}".format(line)) | |
446 | return int(line.split()[0]) | |
447 | if opt_log_ps_output: | |
448 | log.info("No match for {0} {1}: {2}".format( | |
449 | self.daemon_type, self.daemon_id, ps_txt)) | |
450 | else: | |
451 | log.info("No match for {0} {1}".format(self.daemon_type, | |
452 | self.daemon_id)) | |
453 | return None | |
454 | ||
455 | def wait(self, timeout): | |
456 | waited = 0 | |
457 | while self._get_pid() is not None: | |
458 | if waited > timeout: | |
459 | raise MaxWhileTries("Timed out waiting for daemon {0}.{1}".format(self.daemon_type, self.daemon_id)) | |
460 | time.sleep(1) | |
461 | waited += 1 | |
462 | ||
463 | def stop(self, timeout=300): | |
464 | if not self.running(): | |
465 | log.error('tried to stop a non-running daemon') | |
466 | return | |
467 | ||
468 | pid = self._get_pid() | |
469 | log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id)) | |
470 | os.kill(pid, signal.SIGTERM) | |
471 | ||
472 | waited = 0 | |
473 | while pid is not None: | |
474 | new_pid = self._get_pid() | |
475 | if new_pid is not None and new_pid != pid: | |
476 | log.info("Killing new PID {0}".format(new_pid)) | |
477 | pid = new_pid | |
478 | os.kill(pid, signal.SIGTERM) | |
479 | ||
480 | if new_pid is None: | |
481 | break | |
482 | else: | |
483 | if waited > timeout: | |
484 | raise MaxWhileTries( | |
485 | "Timed out waiting for daemon {0}.{1}".format( | |
486 | self.daemon_type, self.daemon_id)) | |
487 | time.sleep(1) | |
488 | waited += 1 | |
489 | ||
490 | self.wait(timeout=timeout) | |
491 | ||
492 | def restart(self): | |
493 | if self._get_pid() is not None: | |
494 | self.stop() | |
495 | ||
496 | self.proc = self.controller.run(args=[ | |
497 | os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), | |
498 | "-i", self.daemon_id]) | |
499 | ||
500 | def signal(self, sig, silent=False): | |
501 | if not self.running(): | |
502 | raise RuntimeError("Can't send signal to non-running daemon") | |
503 | ||
504 | os.kill(self._get_pid(), sig) | |
505 | if not silent: | |
506 | log.info("Sent signal {0} to {1}.{2}".format(sig, self.daemon_type, self.daemon_id)) | |
507 | ||
508 | ||
509 | def safe_kill(pid): | |
510 | """ | |
511 | os.kill annoyingly raises exception if process already dead. Ignore it. | |
512 | """ | |
513 | try: | |
514 | return os.kill(pid, signal.SIGKILL) | |
515 | except OSError as e: | |
516 | if e.errno == errno.ESRCH: | |
517 | # Raced with process termination | |
518 | pass | |
519 | else: | |
520 | raise | |
521 | ||
522 | ||
523 | class LocalKernelMount(KernelMount): | |
524 | def __init__(self, ctx, test_dir, client_id): | |
525 | super(LocalKernelMount, self).__init__(ctx, test_dir, client_id, LocalRemote(), None, None, None) | |
526 | ||
527 | @property | |
528 | def config_path(self): | |
529 | return "./ceph.conf" | |
530 | ||
531 | def get_keyring_path(self): | |
532 | # This is going to end up in a config file, so use an absolute path | |
533 | # to avoid assumptions about daemons' pwd | |
534 | keyring_path = "./client.{0}.keyring".format(self.client_id) | |
535 | try: | |
536 | os.stat(keyring_path) | |
537 | except OSError: | |
538 | return os.path.join(os.getcwd(), 'keyring') | |
539 | else: | |
540 | return keyring_path | |
541 | ||
542 | def run_shell(self, args, wait=True, stdin=None, check_status=True, | |
543 | omit_sudo=False): | |
544 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
545 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
546 | # then we wouldn't have to special case this | |
547 | return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint, | |
548 | stdin=stdin, check_status=check_status, | |
549 | omit_sudo=omit_sudo) | |
550 | ||
551 | def run_as_user(self, args, user, wait=True, stdin=None, check_status=True): | |
552 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
553 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
554 | # then we wouldn't have to special case this | |
555 | if isinstance(args, str): | |
556 | args = 'sudo -u %s -s /bin/bash -c %s' % (user, args) | |
557 | elif isinstance(args, list): | |
558 | cmdlist = args | |
559 | cmd = '' | |
560 | for i in cmdlist: | |
561 | cmd = cmd + i + ' ' | |
562 | args = ['sudo', '-u', user, '-s', '/bin/bash', '-c'] | |
563 | args.append(cmd) | |
564 | ||
565 | return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint, | |
566 | check_status=check_status, stdin=stdin, | |
567 | omit_sudo=False) | |
568 | ||
569 | def run_as_root(self, args, wait=True, stdin=None, check_status=True): | |
570 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
571 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
572 | # then we wouldn't have to special case this | |
573 | if isinstance(args, str): | |
574 | args = 'sudo ' + args | |
575 | if isinstance(args, list): | |
576 | args.insert(0, 'sudo') | |
577 | ||
578 | return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint, | |
579 | check_status=check_status, | |
580 | omit_sudo=False) | |
581 | ||
582 | def testcmd(self, args, wait=True, stdin=None, omit_sudo=False): | |
583 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
584 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
585 | # then we wouldn't have to special case this | |
586 | return self.run_shell(args, wait=wait, stdin=stdin, check_status=False, | |
587 | omit_sudo=omit_sudo) | |
588 | ||
589 | def testcmd_as_user(self, args, user, wait=True, stdin=None): | |
590 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
591 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
592 | # then we wouldn't have to special case this | |
593 | return self.run_as_user(args, user=user, wait=wait, stdin=stdin, | |
594 | check_status=False) | |
595 | ||
596 | def testcmd_as_root(self, args, wait=True, stdin=None): | |
597 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
598 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
599 | # then we wouldn't have to special case this | |
600 | return self.run_as_root(args, wait=wait, stdin=stdin, | |
601 | check_status=False) | |
602 | ||
603 | def setupfs(self, name=None): | |
604 | if name is None and self.fs is not None: | |
605 | # Previous mount existed, reuse the old name | |
606 | name = self.fs.name | |
607 | self.fs = LocalFilesystem(self.ctx, name=name) | |
608 | log.info('Wait for MDS to reach steady state...') | |
609 | self.fs.wait_for_daemons() | |
610 | log.info('Ready to start {}...'.format(type(self).__name__)) | |
611 | ||
612 | @property | |
613 | def _prefix(self): | |
614 | return BIN_PREFIX | |
615 | ||
616 | def _asok_path(self): | |
617 | # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's | |
618 | # run foreground. When running it daemonized however, the asok is named after | |
619 | # the PID of the launching process, not the long running ceph-fuse process. Therefore | |
620 | # we need to give an exact path here as the logic for checking /proc/ for which | |
621 | # asok is alive does not work. | |
622 | ||
623 | # Load the asok path from ceph.conf as vstart.sh now puts admin sockets | |
624 | # in a tmpdir. All of the paths are the same, so no need to select | |
625 | # based off of the service type. | |
626 | d = "./out" | |
627 | with open(self.config_path) as f: | |
628 | for line in f: | |
629 | asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line) | |
630 | if asok_conf: | |
631 | d = asok_conf.groups(1)[0] | |
632 | break | |
633 | path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid) | |
634 | log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid)) | |
635 | return path | |
636 | ||
637 | def mount(self, mount_path=None, mount_fs_name=None, mount_options=[], **kwargs): | |
638 | self.setupfs(name=mount_fs_name) | |
639 | ||
640 | log.info('Mounting kclient client.{id} at {remote} {mnt}...'.format( | |
641 | id=self.client_id, remote=self.client_remote, mnt=self.mountpoint)) | |
642 | ||
643 | self.client_remote.run( | |
644 | args=[ | |
645 | 'mkdir', | |
646 | '--', | |
647 | self.mountpoint, | |
648 | ], | |
649 | timeout=(5*60), | |
650 | ) | |
651 | ||
652 | if mount_path is None: | |
653 | mount_path = "/" | |
654 | ||
655 | opts = 'name={id},norequire_active_mds,conf={conf}'.format(id=self.client_id, | |
656 | conf=self.config_path) | |
657 | ||
658 | if mount_fs_name is not None: | |
659 | opts += ",mds_namespace={0}".format(mount_fs_name) | |
660 | ||
661 | for mount_opt in mount_options: | |
662 | opts += ",{0}".format(mount_opt) | |
663 | ||
664 | self.client_remote.run( | |
665 | args=[ | |
666 | 'sudo', | |
667 | './bin/mount.ceph', | |
668 | ':{mount_path}'.format(mount_path=mount_path), | |
669 | self.mountpoint, | |
670 | '-v', | |
671 | '-o', | |
672 | opts | |
673 | ], | |
674 | timeout=(30*60), | |
675 | omit_sudo=False, | |
676 | ) | |
677 | ||
678 | self.client_remote.run( | |
679 | args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(5*60)) | |
680 | ||
681 | self.mounted = True | |
682 | ||
683 | def _run_python(self, pyscript, py_version='python'): | |
684 | """ | |
685 | Override this to remove the daemon-helper prefix that is used otherwise | |
686 | to make the process killable. | |
687 | """ | |
688 | return self.client_remote.run(args=[py_version, '-c', pyscript], | |
689 | wait=False) | |
690 | ||
691 | class LocalFuseMount(FuseMount): | |
692 | def __init__(self, ctx, test_dir, client_id): | |
693 | super(LocalFuseMount, self).__init__(ctx, None, test_dir, client_id, LocalRemote()) | |
694 | ||
695 | @property | |
696 | def config_path(self): | |
697 | return "./ceph.conf" | |
698 | ||
699 | def get_keyring_path(self): | |
700 | # This is going to end up in a config file, so use an absolute path | |
701 | # to avoid assumptions about daemons' pwd | |
702 | return os.path.abspath("./client.{0}.keyring".format(self.client_id)) | |
703 | ||
704 | def run_shell(self, args, wait=True, stdin=None, check_status=True, omit_sudo=True): | |
705 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
706 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
707 | # then we wouldn't have to special case this | |
708 | return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint, | |
709 | stdin=stdin, check_status=check_status, | |
710 | omit_sudo=omit_sudo) | |
711 | ||
712 | def run_as_user(self, args, user, wait=True, stdin=None, check_status=True): | |
713 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
714 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
715 | # then we wouldn't have to special case this | |
716 | if isinstance(args, str): | |
717 | args = 'sudo -u %s -s /bin/bash -c %s' % (user, args) | |
718 | elif isinstance(args, list): | |
719 | cmdlist = args | |
720 | cmd = '' | |
721 | for i in cmdlist: | |
722 | cmd = cmd + i + ' ' | |
723 | args = ['sudo', '-u', user, '-s', '/bin/bash', '-c'] | |
724 | args.append(cmd) | |
725 | ||
726 | return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint, | |
727 | check_status=check_status, stdin=stdin, | |
728 | omit_sudo=False) | |
729 | ||
730 | def run_as_root(self, args, wait=True, stdin=None, check_status=True): | |
731 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
732 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
733 | # then we wouldn't have to special case this | |
734 | if isinstance(args, str): | |
735 | args = 'sudo ' + args | |
736 | if isinstance(args, list): | |
737 | args.insert(0, 'sudo') | |
738 | ||
739 | return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint, | |
740 | check_status=check_status, | |
741 | omit_sudo=False) | |
742 | ||
743 | def testcmd(self, args, wait=True, stdin=None, omit_sudo=True): | |
744 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
745 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
746 | # then we wouldn't have to special case this | |
747 | return self.run_shell(args, wait=wait, stdin=stdin, check_status=False, | |
748 | omit_sudo=omit_sudo) | |
749 | ||
750 | def testcmd_as_user(self, args, user, wait=True, stdin=None): | |
751 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
752 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
753 | # then we wouldn't have to special case this | |
754 | return self.run_as_user(args, user=user, wait=wait, stdin=stdin, | |
755 | check_status=False) | |
756 | ||
757 | def testcmd_as_root(self, args, wait=True, stdin=None): | |
758 | # FIXME maybe should add a pwd arg to teuthology.orchestra so that | |
759 | # the "cd foo && bar" shenanigans isn't needed to begin with and | |
760 | # then we wouldn't have to special case this | |
761 | return self.run_as_root(args, wait=wait, stdin=stdin, | |
762 | check_status=False) | |
763 | ||
764 | def setupfs(self, name=None): | |
765 | if name is None and self.fs is not None: | |
766 | # Previous mount existed, reuse the old name | |
767 | name = self.fs.name | |
768 | self.fs = LocalFilesystem(self.ctx, name=name) | |
769 | log.info('Wait for MDS to reach steady state...') | |
770 | self.fs.wait_for_daemons() | |
771 | log.info('Ready to start {}...'.format(type(self).__name__)) | |
772 | ||
773 | @property | |
774 | def _prefix(self): | |
775 | return BIN_PREFIX | |
776 | ||
777 | def _asok_path(self): | |
778 | # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's | |
779 | # run foreground. When running it daemonized however, the asok is named after | |
780 | # the PID of the launching process, not the long running ceph-fuse process. Therefore | |
781 | # we need to give an exact path here as the logic for checking /proc/ for which | |
782 | # asok is alive does not work. | |
783 | ||
784 | # Load the asok path from ceph.conf as vstart.sh now puts admin sockets | |
785 | # in a tmpdir. All of the paths are the same, so no need to select | |
786 | # based off of the service type. | |
787 | d = "./out" | |
788 | with open(self.config_path) as f: | |
789 | for line in f: | |
790 | asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line) | |
791 | if asok_conf: | |
792 | d = asok_conf.groups(1)[0] | |
793 | break | |
794 | path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid) | |
795 | log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid)) | |
796 | return path | |
797 | ||
798 | def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]): | |
799 | if mountpoint is not None: | |
800 | self.mountpoint = mountpoint | |
801 | self.setupfs(name=mount_fs_name) | |
802 | ||
803 | self.client_remote.run(args=['mkdir', '-p', self.mountpoint]) | |
804 | ||
805 | def list_connections(): | |
806 | self.client_remote.run( | |
807 | args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"], | |
808 | check_status=False | |
809 | ) | |
810 | p = self.client_remote.run( | |
811 | args=["ls", "/sys/fs/fuse/connections"], | |
812 | check_status=False | |
813 | ) | |
814 | if p.exitstatus != 0: | |
815 | log.warning("ls conns failed with {0}, assuming none".format(p.exitstatus)) | |
816 | return [] | |
817 | ||
818 | ls_str = six.ensure_str(p.stdout.getvalue().strip()) | |
819 | if ls_str: | |
820 | return [int(n) for n in ls_str.split("\n")] | |
821 | else: | |
822 | return [] | |
823 | ||
824 | # Before starting ceph-fuse process, note the contents of | |
825 | # /sys/fs/fuse/connections | |
826 | pre_mount_conns = list_connections() | |
827 | log.info("Pre-mount connections: {0}".format(pre_mount_conns)) | |
828 | ||
829 | prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")] | |
830 | if os.getuid() != 0: | |
831 | prefix += ["--client_die_on_failed_dentry_invalidate=false"] | |
832 | ||
833 | if mount_path is not None: | |
834 | prefix += ["--client_mountpoint={0}".format(mount_path)] | |
835 | ||
836 | if mount_fs_name is not None: | |
837 | prefix += ["--client_fs={0}".format(mount_fs_name)] | |
838 | ||
839 | prefix += mount_options; | |
840 | ||
841 | self.fuse_daemon = self.client_remote.run(args= | |
842 | prefix + [ | |
843 | "-f", | |
844 | "--name", | |
845 | "client.{0}".format(self.client_id), | |
846 | self.mountpoint | |
847 | ], wait=False) | |
848 | ||
849 | log.info("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid)) | |
850 | ||
851 | # Wait for the connection reference to appear in /sys | |
852 | waited = 0 | |
853 | post_mount_conns = list_connections() | |
854 | while len(post_mount_conns) <= len(pre_mount_conns): | |
855 | if self.fuse_daemon.finished: | |
856 | # Did mount fail? Raise the CommandFailedError instead of | |
857 | # hitting the "failed to populate /sys/" timeout | |
858 | self.fuse_daemon.wait() | |
859 | time.sleep(1) | |
860 | waited += 1 | |
861 | if waited > 30: | |
862 | raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format( | |
863 | waited | |
864 | )) | |
865 | post_mount_conns = list_connections() | |
866 | ||
867 | log.info("Post-mount connections: {0}".format(post_mount_conns)) | |
868 | ||
869 | # Record our fuse connection number so that we can use it when | |
870 | # forcing an unmount | |
871 | new_conns = list(set(post_mount_conns) - set(pre_mount_conns)) | |
872 | if len(new_conns) == 0: | |
873 | raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns)) | |
874 | elif len(new_conns) > 1: | |
875 | raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns)) | |
876 | else: | |
877 | self._fuse_conn = new_conns[0] | |
878 | ||
879 | self.gather_mount_info() | |
880 | ||
881 | self.mounted = True | |
882 | ||
883 | def _run_python(self, pyscript, py_version='python'): | |
884 | """ | |
885 | Override this to remove the daemon-helper prefix that is used otherwise | |
886 | to make the process killable. | |
887 | """ | |
888 | return self.client_remote.run(args=[py_version, '-c', pyscript], | |
889 | wait=False) | |
890 | ||
891 | # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of | |
892 | # the same name. | |
893 | class LocalCephManager(CephManager): | |
894 | def __init__(self): | |
895 | # Deliberately skip parent init, only inheriting from it to get | |
896 | # util methods like osd_dump that sit on top of raw_cluster_cmd | |
897 | self.controller = LocalRemote() | |
898 | ||
899 | # A minority of CephManager fns actually bother locking for when | |
900 | # certain teuthology tests want to run tasks in parallel | |
901 | self.lock = threading.RLock() | |
902 | ||
903 | self.log = lambda x: log.info(x) | |
904 | ||
905 | # Don't bother constructing a map of pools: it should be empty | |
906 | # at test cluster start, and in any case it would be out of date | |
907 | # in no time. The attribute needs to exist for some of the CephManager | |
908 | # methods to work though. | |
909 | self.pools = {} | |
910 | ||
911 | def find_remote(self, daemon_type, daemon_id): | |
912 | """ | |
913 | daemon_type like 'mds', 'osd' | |
914 | daemon_id like 'a', '0' | |
915 | """ | |
916 | return LocalRemote() | |
917 | ||
918 | def run_ceph_w(self, watch_channel=None): | |
919 | """ | |
920 | :param watch_channel: Specifies the channel to be watched. | |
921 | This can be 'cluster', 'audit', ... | |
922 | :type watch_channel: str | |
923 | """ | |
924 | args = [os.path.join(BIN_PREFIX, "ceph"), "-w"] | |
925 | if watch_channel is not None: | |
926 | args.append("--watch-channel") | |
927 | args.append(watch_channel) | |
928 | proc = self.controller.run(args=args, wait=False, stdout=StringIO()) | |
929 | return proc | |
930 | ||
931 | def raw_cluster_cmd(self, *args, **kwargs): | |
932 | """ | |
933 | args like ["osd", "dump"} | |
934 | return stdout string | |
935 | """ | |
936 | proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \ | |
937 | list(args), **kwargs) | |
938 | return six.ensure_str(proc.stdout.getvalue()) | |
939 | ||
940 | def raw_cluster_cmd_result(self, *args, **kwargs): | |
941 | """ | |
942 | like raw_cluster_cmd but don't check status, just return rc | |
943 | """ | |
944 | kwargs['check_status'] = False | |
945 | proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \ | |
946 | list(args), **kwargs) | |
947 | return proc.exitstatus | |
948 | ||
949 | def admin_socket(self, daemon_type, daemon_id, command, check_status=True, timeout=None): | |
950 | return self.controller.run( | |
951 | args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, | |
952 | check_status=check_status, | |
953 | timeout=timeout | |
954 | ) | |
955 | ||
956 | def get_mon_socks(self): | |
957 | """ | |
958 | Get monitor sockets. | |
959 | ||
960 | :return socks: tuple of strings; strings are individual sockets. | |
961 | """ | |
962 | from json import loads | |
963 | ||
964 | output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump')) | |
965 | socks = [] | |
966 | for mon in output['mons']: | |
967 | for addrvec_mem in mon['public_addrs']['addrvec']: | |
968 | socks.append(addrvec_mem['addr']) | |
969 | return tuple(socks) | |
970 | ||
971 | def get_msgrv1_mon_socks(self): | |
972 | """ | |
973 | Get monitor sockets that use msgrv2 to operate. | |
974 | ||
975 | :return socks: tuple of strings; strings are individual sockets. | |
976 | """ | |
977 | from json import loads | |
978 | ||
979 | output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump')) | |
980 | socks = [] | |
981 | for mon in output['mons']: | |
982 | for addrvec_mem in mon['public_addrs']['addrvec']: | |
983 | if addrvec_mem['type'] == 'v1': | |
984 | socks.append(addrvec_mem['addr']) | |
985 | return tuple(socks) | |
986 | ||
987 | def get_msgrv2_mon_socks(self): | |
988 | """ | |
989 | Get monitor sockets that use msgrv2 to operate. | |
990 | ||
991 | :return socks: tuple of strings; strings are individual sockets. | |
992 | """ | |
993 | from json import loads | |
994 | ||
995 | output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump')) | |
996 | socks = [] | |
997 | for mon in output['mons']: | |
998 | for addrvec_mem in mon['public_addrs']['addrvec']: | |
999 | if addrvec_mem['type'] == 'v2': | |
1000 | socks.append(addrvec_mem['addr']) | |
1001 | return tuple(socks) | |
1002 | ||
1003 | ||
1004 | class LocalCephCluster(CephCluster): | |
1005 | def __init__(self, ctx): | |
1006 | # Deliberately skip calling parent constructor | |
1007 | self._ctx = ctx | |
1008 | self.mon_manager = LocalCephManager() | |
1009 | self._conf = defaultdict(dict) | |
1010 | ||
1011 | @property | |
1012 | def admin_remote(self): | |
1013 | return LocalRemote() | |
1014 | ||
1015 | def get_config(self, key, service_type=None): | |
1016 | if service_type is None: | |
1017 | service_type = 'mon' | |
1018 | ||
1019 | # FIXME hardcoded vstart service IDs | |
1020 | service_id = { | |
1021 | 'mon': 'a', | |
1022 | 'mds': 'a', | |
1023 | 'osd': '0' | |
1024 | }[service_type] | |
1025 | ||
1026 | return self.json_asok(['config', 'get', key], service_type, service_id)[key] | |
1027 | ||
1028 | def _write_conf(self): | |
1029 | # In teuthology, we have the honour of writing the entire ceph.conf, but | |
1030 | # in vstart land it has mostly already been written and we need to carefully | |
1031 | # append to it. | |
1032 | conf_path = "./ceph.conf" | |
1033 | banner = "\n#LOCAL_TEST\n" | |
1034 | existing_str = open(conf_path).read() | |
1035 | ||
1036 | if banner in existing_str: | |
1037 | existing_str = existing_str[0:existing_str.find(banner)] | |
1038 | ||
1039 | existing_str += banner | |
1040 | ||
1041 | for subsys, kvs in self._conf.items(): | |
1042 | existing_str += "\n[{0}]\n".format(subsys) | |
1043 | for key, val in kvs.items(): | |
1044 | # Comment out existing instance if it exists | |
1045 | log.info("Searching for existing instance {0}/{1}".format( | |
1046 | key, subsys | |
1047 | )) | |
1048 | existing_section = re.search("^\[{0}\]$([\n]|[^\[])+".format( | |
1049 | subsys | |
1050 | ), existing_str, re.MULTILINE) | |
1051 | ||
1052 | if existing_section: | |
1053 | section_str = existing_str[existing_section.start():existing_section.end()] | |
1054 | existing_val = re.search("^\s*[^#]({0}) =".format(key), section_str, re.MULTILINE) | |
1055 | if existing_val: | |
1056 | start = existing_section.start() + existing_val.start(1) | |
1057 | log.info("Found string to replace at {0}".format( | |
1058 | start | |
1059 | )) | |
1060 | existing_str = existing_str[0:start] + "#" + existing_str[start:] | |
1061 | ||
1062 | existing_str += "{0} = {1}\n".format(key, val) | |
1063 | ||
1064 | open(conf_path, "w").write(existing_str) | |
1065 | ||
1066 | def set_ceph_conf(self, subsys, key, value): | |
1067 | self._conf[subsys][key] = value | |
1068 | self._write_conf() | |
1069 | ||
1070 | def clear_ceph_conf(self, subsys, key): | |
1071 | del self._conf[subsys][key] | |
1072 | self._write_conf() | |
1073 | ||
1074 | ||
1075 | class LocalMDSCluster(LocalCephCluster, MDSCluster): | |
1076 | def __init__(self, ctx): | |
1077 | super(LocalMDSCluster, self).__init__(ctx) | |
1078 | ||
1079 | self.mds_ids = ctx.daemons.daemons['ceph.mds'].keys() | |
1080 | self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids]) | |
1081 | ||
1082 | def clear_firewall(self): | |
1083 | # FIXME: unimplemented | |
1084 | pass | |
1085 | ||
1086 | def newfs(self, name='cephfs', create=True): | |
1087 | return LocalFilesystem(self._ctx, name=name, create=create) | |
1088 | ||
1089 | ||
1090 | class LocalMgrCluster(LocalCephCluster, MgrCluster): | |
1091 | def __init__(self, ctx): | |
1092 | super(LocalMgrCluster, self).__init__(ctx) | |
1093 | ||
1094 | self.mgr_ids = ctx.daemons.daemons['ceph.mgr'].keys() | |
1095 | self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids]) | |
1096 | ||
1097 | ||
1098 | class LocalFilesystem(Filesystem, LocalMDSCluster): | |
1099 | def __init__(self, ctx, fscid=None, name='cephfs', create=False, ec_profile=None): | |
1100 | # Deliberately skip calling parent constructor | |
1101 | self._ctx = ctx | |
1102 | ||
1103 | self.id = None | |
1104 | self.name = name | |
1105 | self.ec_profile = ec_profile | |
1106 | self.metadata_pool_name = None | |
1107 | self.metadata_overlay = False | |
1108 | self.data_pool_name = None | |
1109 | self.data_pools = None | |
1110 | ||
1111 | # Hack: cheeky inspection of ceph.conf to see what MDSs exist | |
1112 | self.mds_ids = set() | |
1113 | for line in open("ceph.conf").readlines(): | |
1114 | match = re.match("^\[mds\.(.+)\]$", line) | |
1115 | if match: | |
1116 | self.mds_ids.add(match.group(1)) | |
1117 | ||
1118 | if not self.mds_ids: | |
1119 | raise RuntimeError("No MDSs found in ceph.conf!") | |
1120 | ||
1121 | self.mds_ids = list(self.mds_ids) | |
1122 | ||
1123 | log.info("Discovered MDS IDs: {0}".format(self.mds_ids)) | |
1124 | ||
1125 | self.mon_manager = LocalCephManager() | |
1126 | ||
1127 | self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids]) | |
1128 | ||
1129 | self.client_remote = LocalRemote() | |
1130 | ||
1131 | self._conf = defaultdict(dict) | |
1132 | ||
1133 | if name is not None: | |
1134 | if fscid is not None: | |
1135 | raise RuntimeError("cannot specify fscid when creating fs") | |
1136 | if create and not self.legacy_configured(): | |
1137 | self.create() | |
1138 | else: | |
1139 | if fscid is not None: | |
1140 | self.id = fscid | |
1141 | self.getinfo(refresh=True) | |
1142 | ||
1143 | # Stash a reference to the first created filesystem on ctx, so | |
1144 | # that if someone drops to the interactive shell they can easily | |
1145 | # poke our methods. | |
1146 | if not hasattr(self._ctx, "filesystem"): | |
1147 | self._ctx.filesystem = self | |
1148 | ||
1149 | @property | |
1150 | def _prefix(self): | |
1151 | return BIN_PREFIX | |
1152 | ||
1153 | def set_clients_block(self, blocked, mds_id=None): | |
1154 | raise NotImplementedError() | |
1155 | ||
1156 | ||
1157 | class InteractiveFailureResult(unittest.TextTestResult): | |
1158 | """ | |
1159 | Specialization that implements interactive-on-error style | |
1160 | behavior. | |
1161 | """ | |
1162 | def addFailure(self, test, err): | |
1163 | super(InteractiveFailureResult, self).addFailure(test, err) | |
1164 | log.error(self._exc_info_to_string(err, test)) | |
1165 | log.error("Failure in test '{0}', going interactive".format( | |
1166 | self.getDescription(test) | |
1167 | )) | |
1168 | interactive.task(ctx=None, config=None) | |
1169 | ||
1170 | def addError(self, test, err): | |
1171 | super(InteractiveFailureResult, self).addError(test, err) | |
1172 | log.error(self._exc_info_to_string(err, test)) | |
1173 | log.error("Error in test '{0}', going interactive".format( | |
1174 | self.getDescription(test) | |
1175 | )) | |
1176 | interactive.task(ctx=None, config=None) | |
1177 | ||
1178 | ||
1179 | def enumerate_methods(s): | |
1180 | log.info("e: {0}".format(s)) | |
1181 | for t in s._tests: | |
1182 | if isinstance(t, suite.BaseTestSuite): | |
1183 | for sub in enumerate_methods(t): | |
1184 | yield sub | |
1185 | else: | |
1186 | yield s, t | |
1187 | ||
1188 | ||
1189 | def load_tests(modules, loader): | |
1190 | if modules: | |
1191 | log.info("Executing modules: {0}".format(modules)) | |
1192 | module_suites = [] | |
1193 | for mod_name in modules: | |
1194 | # Test names like cephfs.test_auto_repair | |
1195 | module_suites.append(loader.loadTestsFromName(mod_name)) | |
1196 | log.info("Loaded: {0}".format(list(module_suites))) | |
1197 | return suite.TestSuite(module_suites) | |
1198 | else: | |
1199 | log.info("Executing all cephfs tests") | |
1200 | return loader.discover( | |
1201 | os.path.join(os.path.dirname(os.path.abspath(__file__)), "cephfs") | |
1202 | ) | |
1203 | ||
1204 | ||
1205 | def scan_tests(modules): | |
1206 | overall_suite = load_tests(modules, loader.TestLoader()) | |
1207 | ||
1208 | max_required_mds = 0 | |
1209 | max_required_clients = 0 | |
1210 | max_required_mgr = 0 | |
1211 | require_memstore = False | |
1212 | ||
1213 | for suite_, case in enumerate_methods(overall_suite): | |
1214 | max_required_mds = max(max_required_mds, | |
1215 | getattr(case, "MDSS_REQUIRED", 0)) | |
1216 | max_required_clients = max(max_required_clients, | |
1217 | getattr(case, "CLIENTS_REQUIRED", 0)) | |
1218 | max_required_mgr = max(max_required_mgr, | |
1219 | getattr(case, "MGRS_REQUIRED", 0)) | |
1220 | require_memstore = getattr(case, "REQUIRE_MEMSTORE", False) \ | |
1221 | or require_memstore | |
1222 | ||
1223 | return max_required_mds, max_required_clients, \ | |
1224 | max_required_mgr, require_memstore | |
1225 | ||
1226 | ||
1227 | class LocalCluster(object): | |
1228 | def __init__(self, rolename="placeholder"): | |
1229 | self.remotes = { | |
1230 | LocalRemote(): [rolename] | |
1231 | } | |
1232 | ||
1233 | def only(self, requested): | |
1234 | return self.__class__(rolename=requested) | |
1235 | ||
1236 | ||
1237 | class LocalContext(object): | |
1238 | def __init__(self): | |
1239 | self.config = {} | |
1240 | self.teuthology_config = teuth_config | |
1241 | self.cluster = LocalCluster() | |
1242 | self.daemons = DaemonGroup() | |
1243 | ||
1244 | # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any | |
1245 | # tests that want to look these up via ctx can do so. | |
1246 | # Inspect ceph.conf to see what roles exist | |
1247 | for conf_line in open("ceph.conf").readlines(): | |
1248 | for svc_type in ["mon", "osd", "mds", "mgr"]: | |
1249 | prefixed_type = "ceph." + svc_type | |
1250 | if prefixed_type not in self.daemons.daemons: | |
1251 | self.daemons.daemons[prefixed_type] = {} | |
1252 | match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line) | |
1253 | if match: | |
1254 | svc_id = match.group(1) | |
1255 | self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id) | |
1256 | ||
1257 | def __del__(self): | |
1258 | shutil.rmtree(self.teuthology_config['test_path']) | |
1259 | ||
1260 | def teardown_cluster(): | |
1261 | log.info('\ntearing down the cluster...') | |
1262 | remote.run(args=[os.path.join(SRC_PREFIX, "stop.sh")], timeout=60) | |
1263 | remote.run(args=['rm', '-rf', './dev', './out']) | |
1264 | ||
1265 | def clear_old_log(): | |
1266 | from os import stat | |
1267 | ||
1268 | try: | |
1269 | stat(logpath) | |
1270 | # would need an update when making this py3 compatible. Use FileNotFound | |
1271 | # instead. | |
1272 | except OSError: | |
1273 | return | |
1274 | else: | |
1275 | os.remove(logpath) | |
1276 | with open(logpath, 'w') as logfile: | |
1277 | logfile.write('') | |
1278 | init_log() | |
1279 | log.info('logging in a fresh file now...') | |
1280 | ||
1281 | def exec_test(): | |
1282 | # Parse arguments | |
1283 | opt_interactive_on_error = False | |
1284 | opt_create_cluster = False | |
1285 | opt_create_cluster_only = False | |
1286 | opt_ignore_missing_binaries = False | |
1287 | opt_teardown_cluster = False | |
1288 | global opt_log_ps_output | |
1289 | opt_log_ps_output = False | |
1290 | use_kernel_client = False | |
1291 | ||
1292 | args = sys.argv[1:] | |
1293 | flags = [a for a in args if a.startswith("-")] | |
1294 | modules = [a for a in args if not a.startswith("-")] | |
1295 | for f in flags: | |
1296 | if f == "--interactive": | |
1297 | opt_interactive_on_error = True | |
1298 | elif f == "--create": | |
1299 | opt_create_cluster = True | |
1300 | elif f == "--create-cluster-only": | |
1301 | opt_create_cluster_only = True | |
1302 | elif f == "--ignore-missing-binaries": | |
1303 | opt_ignore_missing_binaries = True | |
1304 | elif f == '--teardown': | |
1305 | opt_teardown_cluster = True | |
1306 | elif f == '--log-ps-output': | |
1307 | opt_log_ps_output = True | |
1308 | elif f == '--clear-old-log': | |
1309 | clear_old_log() | |
1310 | elif f == "--kclient": | |
1311 | use_kernel_client = True | |
1312 | else: | |
1313 | log.error("Unknown option '{0}'".format(f)) | |
1314 | sys.exit(-1) | |
1315 | ||
1316 | # Help developers by stopping up-front if their tree isn't built enough for all the | |
1317 | # tools that the tests might want to use (add more here if needed) | |
1318 | require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan", | |
1319 | "cephfs-table-tool", "ceph-fuse", "rados"] | |
1320 | missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))] | |
1321 | if missing_binaries and not opt_ignore_missing_binaries: | |
1322 | log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries))) | |
1323 | sys.exit(-1) | |
1324 | ||
1325 | max_required_mds, max_required_clients, \ | |
1326 | max_required_mgr, require_memstore = scan_tests(modules) | |
1327 | ||
1328 | global remote | |
1329 | remote = LocalRemote() | |
1330 | ||
1331 | # Tolerate no MDSs or clients running at start | |
1332 | ps_txt = six.ensure_str(remote.run( | |
1333 | args=["ps", "-u"+str(os.getuid())] | |
1334 | ).stdout.getvalue().strip()) | |
1335 | lines = ps_txt.split("\n")[1:] | |
1336 | for line in lines: | |
1337 | if 'ceph-fuse' in line or 'ceph-mds' in line: | |
1338 | pid = int(line.split()[0]) | |
1339 | log.warning("Killing stray process {0}".format(line)) | |
1340 | os.kill(pid, signal.SIGKILL) | |
1341 | ||
1342 | # Fire up the Ceph cluster if the user requested it | |
1343 | if opt_create_cluster or opt_create_cluster_only: | |
1344 | log.info("Creating cluster with {0} MDS daemons".format( | |
1345 | max_required_mds)) | |
1346 | teardown_cluster() | |
1347 | vstart_env = os.environ.copy() | |
1348 | vstart_env["FS"] = "0" | |
1349 | vstart_env["MDS"] = max_required_mds.__str__() | |
1350 | vstart_env["OSD"] = "4" | |
1351 | vstart_env["MGR"] = max(max_required_mgr, 1).__str__() | |
1352 | ||
1353 | args = [os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d", | |
1354 | "--nolockdep"] | |
1355 | if require_memstore: | |
1356 | args.append("--memstore") | |
1357 | ||
1358 | # usually, i get vstart.sh running completely in less than 100 | |
1359 | # seconds. | |
1360 | remote.run(args=args, env=vstart_env, timeout=(3 * 60)) | |
1361 | ||
1362 | # Wait for OSD to come up so that subsequent injectargs etc will | |
1363 | # definitely succeed | |
1364 | LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30) | |
1365 | ||
1366 | if opt_create_cluster_only: | |
1367 | return | |
1368 | ||
1369 | # List of client mounts, sufficient to run the selected tests | |
1370 | clients = [i.__str__() for i in range(0, max_required_clients)] | |
1371 | ||
1372 | test_dir = tempfile.mkdtemp() | |
1373 | teuth_config['test_path'] = test_dir | |
1374 | ||
1375 | ctx = LocalContext() | |
1376 | ceph_cluster = LocalCephCluster(ctx) | |
1377 | mds_cluster = LocalMDSCluster(ctx) | |
1378 | mgr_cluster = LocalMgrCluster(ctx) | |
1379 | ||
1380 | # Construct Mount classes | |
1381 | mounts = [] | |
1382 | for client_id in clients: | |
1383 | # Populate client keyring (it sucks to use client.admin for test clients | |
1384 | # because it's awkward to find the logs later) | |
1385 | client_name = "client.{0}".format(client_id) | |
1386 | ||
1387 | if client_name not in open("./keyring").read(): | |
1388 | p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name, | |
1389 | "osd", "allow rw", | |
1390 | "mds", "allow", | |
1391 | "mon", "allow r"]) | |
1392 | ||
1393 | open("./keyring", "a").write(p.stdout.getvalue()) | |
1394 | ||
1395 | if use_kernel_client: | |
1396 | mount = LocalKernelMount(ctx, test_dir, client_id) | |
1397 | else: | |
1398 | mount = LocalFuseMount(ctx, test_dir, client_id) | |
1399 | ||
1400 | mounts.append(mount) | |
1401 | if os.path.exists(mount.mountpoint): | |
1402 | if mount.is_mounted(): | |
1403 | log.warning("unmounting {0}".format(mount.mountpoint)) | |
1404 | mount.umount_wait() | |
1405 | else: | |
1406 | os.rmdir(mount.mountpoint) | |
1407 | ||
1408 | from tasks.cephfs_test_runner import DecoratingLoader | |
1409 | ||
1410 | class LogStream(object): | |
1411 | def __init__(self): | |
1412 | self.buffer = "" | |
1413 | ||
1414 | def write(self, data): | |
1415 | self.buffer += data | |
1416 | if "\n" in self.buffer: | |
1417 | lines = self.buffer.split("\n") | |
1418 | for line in lines[:-1]: | |
1419 | pass | |
1420 | # sys.stderr.write(line + "\n") | |
1421 | log.info(line) | |
1422 | self.buffer = lines[-1] | |
1423 | ||
1424 | def flush(self): | |
1425 | pass | |
1426 | ||
1427 | decorating_loader = DecoratingLoader({ | |
1428 | "ctx": ctx, | |
1429 | "mounts": mounts, | |
1430 | "ceph_cluster": ceph_cluster, | |
1431 | "mds_cluster": mds_cluster, | |
1432 | "mgr_cluster": mgr_cluster, | |
1433 | }) | |
1434 | ||
1435 | # For the benefit of polling tests like test_full -- in teuthology land we set this | |
1436 | # in a .yaml, here it's just a hardcoded thing for the developer's pleasure. | |
1437 | remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval", "5"]) | |
1438 | ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval", "5") | |
1439 | ||
1440 | # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning | |
1441 | # from normal IO latency. Increase it for running teests. | |
1442 | ceph_cluster.set_ceph_conf("mds", "mds log max segments", "10") | |
1443 | ||
1444 | # Make sure the filesystem created in tests has uid/gid that will let us talk to | |
1445 | # it after mounting it (without having to go root). Set in 'global' not just 'mds' | |
1446 | # so that cephfs-data-scan will pick it up too. | |
1447 | ceph_cluster.set_ceph_conf("global", "mds root ino uid", "%s" % os.getuid()) | |
1448 | ceph_cluster.set_ceph_conf("global", "mds root ino gid", "%s" % os.getgid()) | |
1449 | ||
1450 | # Monkeypatch get_package_version to avoid having to work out what kind of distro we're on | |
1451 | def _get_package_version(remote, pkg_name): | |
1452 | # Used in cephfs tests to find fuse version. Your development workstation *does* have >=2.9, right? | |
1453 | return "2.9" | |
1454 | ||
1455 | import teuthology.packaging | |
1456 | teuthology.packaging.get_package_version = _get_package_version | |
1457 | ||
1458 | overall_suite = load_tests(modules, decorating_loader) | |
1459 | ||
1460 | # Filter out tests that don't lend themselves to interactive running, | |
1461 | victims = [] | |
1462 | for case, method in enumerate_methods(overall_suite): | |
1463 | fn = getattr(method, method._testMethodName) | |
1464 | ||
1465 | drop_test = False | |
1466 | ||
1467 | if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True: | |
1468 | drop_test = True | |
1469 | log.warning("Dropping test because long running: {method_id}".format(method_id=method.id())) | |
1470 | ||
1471 | if getattr(fn, "needs_trimming", False) is True: | |
1472 | drop_test = (os.getuid() != 0) | |
1473 | log.warning("Dropping test because client trim unavailable: {method_id}".format(method_id=method.id())) | |
1474 | ||
1475 | if drop_test: | |
1476 | # Don't drop the test if it was explicitly requested in arguments | |
1477 | is_named = False | |
1478 | for named in modules: | |
1479 | if named.endswith(method.id()): | |
1480 | is_named = True | |
1481 | break | |
1482 | ||
1483 | if not is_named: | |
1484 | victims.append((case, method)) | |
1485 | ||
1486 | log.info("Disabling {0} tests because of is_for_teuthology or needs_trimming".format(len(victims))) | |
1487 | for s, method in victims: | |
1488 | s._tests.remove(method) | |
1489 | ||
1490 | if opt_interactive_on_error: | |
1491 | result_class = InteractiveFailureResult | |
1492 | else: | |
1493 | result_class = unittest.TextTestResult | |
1494 | fail_on_skip = False | |
1495 | ||
1496 | class LoggingResult(result_class): | |
1497 | def startTest(self, test): | |
1498 | log.info("Starting test: {0}".format(self.getDescription(test))) | |
1499 | test.started_at = datetime.datetime.utcnow() | |
1500 | return super(LoggingResult, self).startTest(test) | |
1501 | ||
1502 | def stopTest(self, test): | |
1503 | log.info("Stopped test: {0} in {1}s".format( | |
1504 | self.getDescription(test), | |
1505 | (datetime.datetime.utcnow() - test.started_at).total_seconds() | |
1506 | )) | |
1507 | ||
1508 | def addSkip(self, test, reason): | |
1509 | if fail_on_skip: | |
1510 | # Don't just call addFailure because that requires a traceback | |
1511 | self.failures.append((test, reason)) | |
1512 | else: | |
1513 | super(LoggingResult, self).addSkip(test, reason) | |
1514 | ||
1515 | # Execute! | |
1516 | result = unittest.TextTestRunner( | |
1517 | stream=LogStream(), | |
1518 | resultclass=LoggingResult, | |
1519 | verbosity=2, | |
1520 | failfast=True).run(overall_suite) | |
1521 | ||
1522 | if opt_teardown_cluster: | |
1523 | teardown_cluster() | |
1524 | ||
1525 | if not result.wasSuccessful(): | |
1526 | result.printErrors() # duplicate output at end for convenience | |
1527 | ||
1528 | bad_tests = [] | |
1529 | for test, error in result.errors: | |
1530 | bad_tests.append(str(test)) | |
1531 | for test, failure in result.failures: | |
1532 | bad_tests.append(str(test)) | |
1533 | ||
1534 | sys.exit(-1) | |
1535 | else: | |
1536 | sys.exit(0) | |
1537 | ||
1538 | ||
1539 | if __name__ == "__main__": | |
1540 | exec_test() |