]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/mount.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / qa / tasks / cephfs / mount.py
1 from contextlib import contextmanager
2 import json
3 import logging
4 import datetime
5 import time
6 from textwrap import dedent
7 import os
8 from StringIO import StringIO
9 from teuthology.orchestra import run
10 from teuthology.orchestra.run import CommandFailedError, ConnectionLostError
11 from tasks.cephfs.filesystem import Filesystem
12
13 log = logging.getLogger(__name__)
14
15
16 class CephFSMount(object):
17 def __init__(self, ctx, test_dir, client_id, client_remote):
18 """
19 :param test_dir: Global teuthology test dir
20 :param client_id: Client ID, the 'foo' in client.foo
21 :param client_remote: Remote instance for the host where client will run
22 """
23
24 self.ctx = ctx
25 self.test_dir = test_dir
26 self.client_id = client_id
27 self.client_remote = client_remote
28 self.mountpoint_dir_name = 'mnt.{id}'.format(id=self.client_id)
29 self.fs = None
30
31 self.test_files = ['a', 'b', 'c']
32
33 self.background_procs = []
34
35 @property
36 def mountpoint(self):
37 return os.path.join(
38 self.test_dir, '{dir_name}'.format(dir_name=self.mountpoint_dir_name))
39
40 def is_mounted(self):
41 raise NotImplementedError()
42
43 def setupfs(self, name=None):
44 if name is None and self.fs is not None:
45 # Previous mount existed, reuse the old name
46 name = self.fs.name
47 self.fs = Filesystem(self.ctx, name=name)
48 log.info('Wait for MDS to reach steady state...')
49 self.fs.wait_for_daemons()
50 log.info('Ready to start {}...'.format(type(self).__name__))
51
52 def mount(self, mount_path=None, mount_fs_name=None):
53 raise NotImplementedError()
54
55 def umount(self):
56 raise NotImplementedError()
57
58 def umount_wait(self, force=False, require_clean=False):
59 """
60
61 :param force: Expect that the mount will not shutdown cleanly: kill
62 it hard.
63 :param require_clean: Wait for the Ceph client associated with the
64 mount (e.g. ceph-fuse) to terminate, and
65 raise if it doesn't do so cleanly.
66 :return:
67 """
68 raise NotImplementedError()
69
70 def kill_cleanup(self):
71 raise NotImplementedError()
72
73 def kill(self):
74 raise NotImplementedError()
75
76 def cleanup(self):
77 raise NotImplementedError()
78
79 def wait_until_mounted(self):
80 raise NotImplementedError()
81
82 def get_keyring_path(self):
83 return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self.client_id)
84
85 @property
86 def config_path(self):
87 """
88 Path to ceph.conf: override this if you're not a normal systemwide ceph install
89 :return: stringv
90 """
91 return "/etc/ceph/ceph.conf"
92
93 @contextmanager
94 def mounted(self):
95 """
96 A context manager, from an initially unmounted state, to mount
97 this, yield, and then unmount and clean up.
98 """
99 self.mount()
100 self.wait_until_mounted()
101 try:
102 yield
103 finally:
104 self.umount_wait()
105
106 def is_blacklisted(self):
107 addr = self.get_global_addr()
108 blacklist = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "blacklist", "ls", "--format=json"))
109 for b in blacklist:
110 if addr == b["addr"]:
111 return True
112 return False
113
114 def create_files(self):
115 assert(self.is_mounted())
116
117 for suffix in self.test_files:
118 log.info("Creating file {0}".format(suffix))
119 self.client_remote.run(args=[
120 'sudo', 'touch', os.path.join(self.mountpoint, suffix)
121 ])
122
123 def check_files(self):
124 assert(self.is_mounted())
125
126 for suffix in self.test_files:
127 log.info("Checking file {0}".format(suffix))
128 r = self.client_remote.run(args=[
129 'sudo', 'ls', os.path.join(self.mountpoint, suffix)
130 ], check_status=False)
131 if r.exitstatus != 0:
132 raise RuntimeError("Expected file {0} not found".format(suffix))
133
134 def create_destroy(self):
135 assert(self.is_mounted())
136
137 filename = "{0} {1}".format(datetime.datetime.now(), self.client_id)
138 log.debug("Creating test file {0}".format(filename))
139 self.client_remote.run(args=[
140 'sudo', 'touch', os.path.join(self.mountpoint, filename)
141 ])
142 log.debug("Deleting test file {0}".format(filename))
143 self.client_remote.run(args=[
144 'sudo', 'rm', '-f', os.path.join(self.mountpoint, filename)
145 ])
146
147 def _run_python(self, pyscript, py_version='python'):
148 return self.client_remote.run(
149 args=['sudo', 'adjust-ulimits', 'daemon-helper', 'kill',
150 py_version, '-c', pyscript], wait=False, stdin=run.PIPE,
151 stdout=StringIO())
152
153 def run_python(self, pyscript, py_version='python'):
154 p = self._run_python(pyscript, py_version)
155 p.wait()
156 return p.stdout.getvalue().strip()
157
158 def run_shell(self, args, wait=True):
159 args = ["cd", self.mountpoint, run.Raw('&&'), "sudo"] + args
160 return self.client_remote.run(args=args, stdout=StringIO(),
161 stderr=StringIO(), wait=wait)
162
163 def open_no_data(self, basename):
164 """
165 A pure metadata operation
166 """
167 assert(self.is_mounted())
168
169 path = os.path.join(self.mountpoint, basename)
170
171 p = self._run_python(dedent(
172 """
173 f = open("{path}", 'w')
174 """.format(path=path)
175 ))
176 p.wait()
177
178 def open_background(self, basename="background_file", write=True):
179 """
180 Open a file for writing, then block such that the client
181 will hold a capability.
182
183 Don't return until the remote process has got as far as opening
184 the file, then return the RemoteProcess instance.
185 """
186 assert(self.is_mounted())
187
188 path = os.path.join(self.mountpoint, basename)
189
190 if write:
191 pyscript = dedent("""
192 import time
193
194 f = open("{path}", 'w')
195 f.write('content')
196 f.flush()
197 f.write('content2')
198 while True:
199 time.sleep(1)
200 """).format(path=path)
201 else:
202 pyscript = dedent("""
203 import time
204
205 f = open("{path}", 'r')
206 while True:
207 time.sleep(1)
208 """).format(path=path)
209
210 rproc = self._run_python(pyscript)
211 self.background_procs.append(rproc)
212
213 # This wait would not be sufficient if the file had already
214 # existed, but it's simple and in practice users of open_background
215 # are not using it on existing files.
216 self.wait_for_visible(basename)
217
218 return rproc
219
220 def wait_for_dir_empty(self, dirname, timeout=30):
221 i = 0
222 dirpath = os.path.join(self.mountpoint, dirname)
223 while i < timeout:
224 nr_entries = int(self.getfattr(dirpath, "ceph.dir.entries"))
225 if nr_entries == 0:
226 log.debug("Directory {0} seen empty from {1} after {2}s ".format(
227 dirname, self.client_id, i))
228 return
229 else:
230 time.sleep(1)
231 i += 1
232
233 raise RuntimeError("Timed out after {0}s waiting for {1} to become empty from {2}".format(
234 i, dirname, self.client_id))
235
236 def wait_for_visible(self, basename="background_file", timeout=30):
237 i = 0
238 while i < timeout:
239 r = self.client_remote.run(args=[
240 'sudo', 'ls', os.path.join(self.mountpoint, basename)
241 ], check_status=False)
242 if r.exitstatus == 0:
243 log.debug("File {0} became visible from {1} after {2}s".format(
244 basename, self.client_id, i))
245 return
246 else:
247 time.sleep(1)
248 i += 1
249
250 raise RuntimeError("Timed out after {0}s waiting for {1} to become visible from {2}".format(
251 i, basename, self.client_id))
252
253 def lock_background(self, basename="background_file", do_flock=True):
254 """
255 Open and lock a files for writing, hold the lock in a background process
256 """
257 assert(self.is_mounted())
258
259 path = os.path.join(self.mountpoint, basename)
260
261 script_builder = """
262 import time
263 import fcntl
264 import struct"""
265 if do_flock:
266 script_builder += """
267 f1 = open("{path}-1", 'w')
268 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)"""
269 script_builder += """
270 f2 = open("{path}-2", 'w')
271 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
272 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
273 while True:
274 time.sleep(1)
275 """
276
277 pyscript = dedent(script_builder).format(path=path)
278
279 log.info("lock_background file {0}".format(basename))
280 rproc = self._run_python(pyscript)
281 self.background_procs.append(rproc)
282 return rproc
283
284 def lock_and_release(self, basename="background_file"):
285 assert(self.is_mounted())
286
287 path = os.path.join(self.mountpoint, basename)
288
289 script = """
290 import time
291 import fcntl
292 import struct
293 f1 = open("{path}-1", 'w')
294 fcntl.flock(f1, fcntl.LOCK_EX)
295 f2 = open("{path}-2", 'w')
296 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
297 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
298 """
299 pyscript = dedent(script).format(path=path)
300
301 log.info("lock_and_release file {0}".format(basename))
302 return self._run_python(pyscript)
303
304 def check_filelock(self, basename="background_file", do_flock=True):
305 assert(self.is_mounted())
306
307 path = os.path.join(self.mountpoint, basename)
308
309 script_builder = """
310 import fcntl
311 import errno
312 import struct"""
313 if do_flock:
314 script_builder += """
315 f1 = open("{path}-1", 'r')
316 try:
317 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)
318 except IOError, e:
319 if e.errno == errno.EAGAIN:
320 pass
321 else:
322 raise RuntimeError("flock on file {path}-1 not found")"""
323 script_builder += """
324 f2 = open("{path}-2", 'r')
325 try:
326 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
327 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
328 except IOError, e:
329 if e.errno == errno.EAGAIN:
330 pass
331 else:
332 raise RuntimeError("posix lock on file {path}-2 not found")
333 """
334 pyscript = dedent(script_builder).format(path=path)
335
336 log.info("check lock on file {0}".format(basename))
337 self.client_remote.run(args=[
338 'sudo', 'python', '-c', pyscript
339 ])
340
341 def write_background(self, basename="background_file", loop=False):
342 """
343 Open a file for writing, complete as soon as you can
344 :param basename:
345 :return:
346 """
347 assert(self.is_mounted())
348
349 path = os.path.join(self.mountpoint, basename)
350
351 pyscript = dedent("""
352 import os
353 import time
354
355 fd = os.open("{path}", os.O_RDWR | os.O_CREAT, 0644)
356 try:
357 while True:
358 os.write(fd, 'content')
359 time.sleep(1)
360 if not {loop}:
361 break
362 except IOError, e:
363 pass
364 os.close(fd)
365 """).format(path=path, loop=str(loop))
366
367 rproc = self._run_python(pyscript)
368 self.background_procs.append(rproc)
369 return rproc
370
371 def write_n_mb(self, filename, n_mb, seek=0, wait=True):
372 """
373 Write the requested number of megabytes to a file
374 """
375 assert(self.is_mounted())
376
377 return self.run_shell(["dd", "if=/dev/urandom", "of={0}".format(filename),
378 "bs=1M", "conv=fdatasync",
379 "count={0}".format(n_mb),
380 "seek={0}".format(seek)
381 ], wait=wait)
382
383 def write_test_pattern(self, filename, size):
384 log.info("Writing {0} bytes to {1}".format(size, filename))
385 return self.run_python(dedent("""
386 import zlib
387 path = "{path}"
388 f = open(path, 'w')
389 for i in range(0, {size}):
390 val = zlib.crc32("%s" % i) & 7
391 f.write(chr(val))
392 f.close()
393 """.format(
394 path=os.path.join(self.mountpoint, filename),
395 size=size
396 )))
397
398 def validate_test_pattern(self, filename, size):
399 log.info("Validating {0} bytes from {1}".format(size, filename))
400 return self.run_python(dedent("""
401 import zlib
402 path = "{path}"
403 f = open(path, 'r')
404 bytes = f.read()
405 f.close()
406 if len(bytes) != {size}:
407 raise RuntimeError("Bad length {{0}} vs. expected {{1}}".format(
408 len(bytes), {size}
409 ))
410 for i, b in enumerate(bytes):
411 val = zlib.crc32("%s" % i) & 7
412 if b != chr(val):
413 raise RuntimeError("Bad data at offset {{0}}".format(i))
414 """.format(
415 path=os.path.join(self.mountpoint, filename),
416 size=size
417 )))
418
419 def open_n_background(self, fs_path, count):
420 """
421 Open N files for writing, hold them open in a background process
422
423 :param fs_path: Path relative to CephFS root, e.g. "foo/bar"
424 :return: a RemoteProcess
425 """
426 assert(self.is_mounted())
427
428 abs_path = os.path.join(self.mountpoint, fs_path)
429
430 pyscript = dedent("""
431 import sys
432 import time
433 import os
434
435 n = {count}
436 abs_path = "{abs_path}"
437
438 if not os.path.exists(os.path.dirname(abs_path)):
439 os.makedirs(os.path.dirname(abs_path))
440
441 handles = []
442 for i in range(0, n):
443 fname = "{{0}}_{{1}}".format(abs_path, i)
444 handles.append(open(fname, 'w'))
445
446 while True:
447 time.sleep(1)
448 """).format(abs_path=abs_path, count=count)
449
450 rproc = self._run_python(pyscript)
451 self.background_procs.append(rproc)
452 return rproc
453
454 def create_n_files(self, fs_path, count, sync=False):
455 assert(self.is_mounted())
456
457 abs_path = os.path.join(self.mountpoint, fs_path)
458
459 pyscript = dedent("""
460 import sys
461 import time
462 import os
463
464 n = {count}
465 abs_path = "{abs_path}"
466
467 if not os.path.exists(os.path.dirname(abs_path)):
468 os.makedirs(os.path.dirname(abs_path))
469
470 for i in range(0, n):
471 fname = "{{0}}_{{1}}".format(abs_path, i)
472 h = open(fname, 'w')
473 h.write('content')
474 if {sync}:
475 h.flush()
476 os.fsync(h.fileno())
477 h.close()
478 """).format(abs_path=abs_path, count=count, sync=str(sync))
479
480 self.run_python(pyscript)
481
482 def teardown(self):
483 for p in self.background_procs:
484 log.info("Terminating background process")
485 self._kill_background(p)
486
487 self.background_procs = []
488
489 def _kill_background(self, p):
490 if p.stdin:
491 p.stdin.close()
492 try:
493 p.wait()
494 except (CommandFailedError, ConnectionLostError):
495 pass
496
497 def kill_background(self, p):
498 """
499 For a process that was returned by one of the _background member functions,
500 kill it hard.
501 """
502 self._kill_background(p)
503 self.background_procs.remove(p)
504
505 def get_global_id(self):
506 raise NotImplementedError()
507
508 def get_global_inst(self):
509 raise NotImplementedError()
510
511 def get_global_addr(self):
512 raise NotImplementedError()
513
514 def get_osd_epoch(self):
515 raise NotImplementedError()
516
517 def stat(self, fs_path, wait=True):
518 """
519 stat a file, and return the result as a dictionary like this:
520 {
521 "st_ctime": 1414161137.0,
522 "st_mtime": 1414161137.0,
523 "st_nlink": 33,
524 "st_gid": 0,
525 "st_dev": 16777218,
526 "st_size": 1190,
527 "st_ino": 2,
528 "st_uid": 0,
529 "st_mode": 16877,
530 "st_atime": 1431520593.0
531 }
532
533 Raises exception on absent file.
534 """
535 abs_path = os.path.join(self.mountpoint, fs_path)
536
537 pyscript = dedent("""
538 import os
539 import stat
540 import json
541 import sys
542
543 try:
544 s = os.stat("{path}")
545 except OSError as e:
546 sys.exit(e.errno)
547
548 attrs = ["st_mode", "st_ino", "st_dev", "st_nlink", "st_uid", "st_gid", "st_size", "st_atime", "st_mtime", "st_ctime"]
549 print json.dumps(
550 dict([(a, getattr(s, a)) for a in attrs]),
551 indent=2)
552 """).format(path=abs_path)
553 proc = self._run_python(pyscript)
554 if wait:
555 proc.wait()
556 return json.loads(proc.stdout.getvalue().strip())
557 else:
558 return proc
559
560 def touch(self, fs_path):
561 """
562 Create a dentry if it doesn't already exist. This python
563 implementation exists because the usual command line tool doesn't
564 pass through error codes like EIO.
565
566 :param fs_path:
567 :return:
568 """
569 abs_path = os.path.join(self.mountpoint, fs_path)
570 pyscript = dedent("""
571 import sys
572 import errno
573
574 try:
575 f = open("{path}", "w")
576 f.close()
577 except IOError as e:
578 sys.exit(errno.EIO)
579 """).format(path=abs_path)
580 proc = self._run_python(pyscript)
581 proc.wait()
582
583 def path_to_ino(self, fs_path, follow_symlinks=True):
584 abs_path = os.path.join(self.mountpoint, fs_path)
585
586 if follow_symlinks:
587 pyscript = dedent("""
588 import os
589 import stat
590
591 print os.stat("{path}").st_ino
592 """).format(path=abs_path)
593 else:
594 pyscript = dedent("""
595 import os
596 import stat
597
598 print os.lstat("{path}").st_ino
599 """).format(path=abs_path)
600
601 proc = self._run_python(pyscript)
602 proc.wait()
603 return int(proc.stdout.getvalue().strip())
604
605 def path_to_nlink(self, fs_path):
606 abs_path = os.path.join(self.mountpoint, fs_path)
607
608 pyscript = dedent("""
609 import os
610 import stat
611
612 print os.stat("{path}").st_nlink
613 """).format(path=abs_path)
614
615 proc = self._run_python(pyscript)
616 proc.wait()
617 return int(proc.stdout.getvalue().strip())
618
619 def ls(self, path=None):
620 """
621 Wrap ls: return a list of strings
622 """
623 cmd = ["ls"]
624 if path:
625 cmd.append(path)
626
627 ls_text = self.run_shell(cmd).stdout.getvalue().strip()
628
629 if ls_text:
630 return ls_text.split("\n")
631 else:
632 # Special case because otherwise split on empty string
633 # gives you [''] instead of []
634 return []
635
636 def setfattr(self, path, key, val):
637 """
638 Wrap setfattr.
639
640 :param path: relative to mount point
641 :param key: xattr name
642 :param val: xattr value
643 :return: None
644 """
645 self.run_shell(["setfattr", "-n", key, "-v", val, path])
646
647 def getfattr(self, path, attr):
648 """
649 Wrap getfattr: return the values of a named xattr on one file, or
650 None if the attribute is not found.
651
652 :return: a string
653 """
654 p = self.run_shell(["getfattr", "--only-values", "-n", attr, path], wait=False)
655 try:
656 p.wait()
657 except CommandFailedError as e:
658 if e.exitstatus == 1 and "No such attribute" in p.stderr.getvalue():
659 return None
660 else:
661 raise
662
663 return p.stdout.getvalue()
664
665 def df(self):
666 """
667 Wrap df: return a dict of usage fields in bytes
668 """
669
670 p = self.run_shell(["df", "-B1", "."])
671 lines = p.stdout.getvalue().strip().split("\n")
672 fs, total, used, avail = lines[1].split()[:4]
673 log.warn(lines)
674
675 return {
676 "total": int(total),
677 "used": int(used),
678 "available": int(avail)
679 }