]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/mount.py
aeed4fa2043466c7d743332caafc13ed405a3ad8
[ceph.git] / ceph / qa / tasks / cephfs / mount.py
1 from contextlib import contextmanager
2 import json
3 import logging
4 import datetime
5 import time
6 from textwrap import dedent
7 import os
8 from StringIO import StringIO
9 from teuthology.orchestra import run
10 from teuthology.orchestra.run import CommandFailedError, ConnectionLostError
11 from tasks.cephfs.filesystem import Filesystem
12
13 log = logging.getLogger(__name__)
14
15
16 class CephFSMount(object):
17 def __init__(self, ctx, test_dir, client_id, client_remote):
18 """
19 :param test_dir: Global teuthology test dir
20 :param client_id: Client ID, the 'foo' in client.foo
21 :param client_remote: Remote instance for the host where client will run
22 """
23
24 self.ctx = ctx
25 self.test_dir = test_dir
26 self.client_id = client_id
27 self.client_remote = client_remote
28 self.mountpoint_dir_name = 'mnt.{id}'.format(id=self.client_id)
29 self._mountpoint = None
30 self.fs = None
31
32 self.test_files = ['a', 'b', 'c']
33
34 self.background_procs = []
35
36 @property
37 def mountpoint(self):
38 if self._mountpoint == None:
39 self._mountpoint= os.path.join(
40 self.test_dir, '{dir_name}'.format(dir_name=self.mountpoint_dir_name))
41 return self._mountpoint
42
43 @mountpoint.setter
44 def mountpoint(self, path):
45 if not isinstance(path, str):
46 raise RuntimeError('path should be of str type.')
47 self._mountpoint = path
48
49 def is_mounted(self):
50 raise NotImplementedError()
51
52 def setupfs(self, name=None):
53 if name is None and self.fs is not None:
54 # Previous mount existed, reuse the old name
55 name = self.fs.name
56 self.fs = Filesystem(self.ctx, name=name)
57 log.info('Wait for MDS to reach steady state...')
58 self.fs.wait_for_daemons()
59 log.info('Ready to start {}...'.format(type(self).__name__))
60
61 def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
62 raise NotImplementedError()
63
64 def umount(self):
65 raise NotImplementedError()
66
67 def umount_wait(self, force=False, require_clean=False):
68 """
69
70 :param force: Expect that the mount will not shutdown cleanly: kill
71 it hard.
72 :param require_clean: Wait for the Ceph client associated with the
73 mount (e.g. ceph-fuse) to terminate, and
74 raise if it doesn't do so cleanly.
75 :return:
76 """
77 raise NotImplementedError()
78
79 def kill_cleanup(self):
80 raise NotImplementedError()
81
82 def kill(self):
83 raise NotImplementedError()
84
85 def cleanup(self):
86 raise NotImplementedError()
87
88 def wait_until_mounted(self):
89 raise NotImplementedError()
90
91 def get_keyring_path(self):
92 return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self.client_id)
93
94 @property
95 def config_path(self):
96 """
97 Path to ceph.conf: override this if you're not a normal systemwide ceph install
98 :return: stringv
99 """
100 return "/etc/ceph/ceph.conf"
101
102 @contextmanager
103 def mounted(self):
104 """
105 A context manager, from an initially unmounted state, to mount
106 this, yield, and then unmount and clean up.
107 """
108 self.mount()
109 self.wait_until_mounted()
110 try:
111 yield
112 finally:
113 self.umount_wait()
114
115 def is_blacklisted(self):
116 addr = self.get_global_addr()
117 blacklist = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "blacklist", "ls", "--format=json"))
118 for b in blacklist:
119 if addr == b["addr"]:
120 return True
121 return False
122
123 def create_file(self, filename='testfile', dirname=None, user=None,
124 check_status=True):
125 assert(self.is_mounted())
126
127 if not os.path.isabs(filename):
128 if dirname:
129 if os.path.isabs(dirname):
130 path = os.path.join(dirname, filename)
131 else:
132 path = os.path.join(self.mountpoint, dirname, filename)
133 else:
134 path = os.path.join(self.mountpoint, filename)
135 else:
136 path = filename
137
138 if user:
139 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', 'touch ' + path]
140 else:
141 args = 'touch ' + path
142
143 return self.client_remote.run(args=args, check_status=check_status)
144
145 def create_files(self):
146 assert(self.is_mounted())
147
148 for suffix in self.test_files:
149 log.info("Creating file {0}".format(suffix))
150 self.client_remote.run(args=[
151 'sudo', 'touch', os.path.join(self.mountpoint, suffix)
152 ])
153
154 def test_create_file(self, filename='testfile', dirname=None, user=None,
155 check_status=True):
156 return self.create_file(filename=filename, dirname=dirname, user=user,
157 check_status=False)
158
159 def check_files(self):
160 assert(self.is_mounted())
161
162 for suffix in self.test_files:
163 log.info("Checking file {0}".format(suffix))
164 r = self.client_remote.run(args=[
165 'sudo', 'ls', os.path.join(self.mountpoint, suffix)
166 ], check_status=False)
167 if r.exitstatus != 0:
168 raise RuntimeError("Expected file {0} not found".format(suffix))
169
170 def create_destroy(self):
171 assert(self.is_mounted())
172
173 filename = "{0} {1}".format(datetime.datetime.now(), self.client_id)
174 log.debug("Creating test file {0}".format(filename))
175 self.client_remote.run(args=[
176 'sudo', 'touch', os.path.join(self.mountpoint, filename)
177 ])
178 log.debug("Deleting test file {0}".format(filename))
179 self.client_remote.run(args=[
180 'sudo', 'rm', '-f', os.path.join(self.mountpoint, filename)
181 ])
182
183 def _run_python(self, pyscript, py_version='python3'):
184 return self.client_remote.run(
185 args=['sudo', 'adjust-ulimits', 'daemon-helper', 'kill',
186 py_version, '-c', pyscript], wait=False, stdin=run.PIPE,
187 stdout=StringIO())
188
189 def run_python(self, pyscript, py_version='python3'):
190 p = self._run_python(pyscript, py_version)
191 p.wait()
192 return p.stdout.getvalue().strip()
193
194 def run_shell(self, args, wait=True, stdin=None, check_status=True,
195 omit_sudo=True):
196 if isinstance(args, str):
197 args = args.split()
198
199 args = ["cd", self.mountpoint, run.Raw('&&'), "sudo"] + args
200 return self.client_remote.run(args=args, stdout=StringIO(),
201 stderr=StringIO(), wait=wait,
202 stdin=stdin, check_status=check_status,
203 omit_sudo=omit_sudo)
204
205 def open_no_data(self, basename):
206 """
207 A pure metadata operation
208 """
209 assert(self.is_mounted())
210
211 path = os.path.join(self.mountpoint, basename)
212
213 p = self._run_python(dedent(
214 """
215 f = open("{path}", 'w')
216 """.format(path=path)
217 ))
218 p.wait()
219
220 def open_background(self, basename="background_file", write=True):
221 """
222 Open a file for writing, then block such that the client
223 will hold a capability.
224
225 Don't return until the remote process has got as far as opening
226 the file, then return the RemoteProcess instance.
227 """
228 assert(self.is_mounted())
229
230 path = os.path.join(self.mountpoint, basename)
231
232 if write:
233 pyscript = dedent("""
234 import time
235
236 with open("{path}", 'w') as f:
237 f.write('content')
238 f.flush()
239 f.write('content2')
240 while True:
241 time.sleep(1)
242 """).format(path=path)
243 else:
244 pyscript = dedent("""
245 import time
246
247 with open("{path}", 'r') as f:
248 while True:
249 time.sleep(1)
250 """).format(path=path)
251
252 rproc = self._run_python(pyscript)
253 self.background_procs.append(rproc)
254
255 # This wait would not be sufficient if the file had already
256 # existed, but it's simple and in practice users of open_background
257 # are not using it on existing files.
258 self.wait_for_visible(basename)
259
260 return rproc
261
262 def wait_for_dir_empty(self, dirname, timeout=30):
263 i = 0
264 dirpath = os.path.join(self.mountpoint, dirname)
265 while i < timeout:
266 nr_entries = int(self.getfattr(dirpath, "ceph.dir.entries"))
267 if nr_entries == 0:
268 log.debug("Directory {0} seen empty from {1} after {2}s ".format(
269 dirname, self.client_id, i))
270 return
271 else:
272 time.sleep(1)
273 i += 1
274
275 raise RuntimeError("Timed out after {0}s waiting for {1} to become empty from {2}".format(
276 i, dirname, self.client_id))
277
278 def wait_for_visible(self, basename="background_file", timeout=30):
279 i = 0
280 while i < timeout:
281 r = self.client_remote.run(args=[
282 'sudo', 'ls', os.path.join(self.mountpoint, basename)
283 ], check_status=False)
284 if r.exitstatus == 0:
285 log.debug("File {0} became visible from {1} after {2}s".format(
286 basename, self.client_id, i))
287 return
288 else:
289 time.sleep(1)
290 i += 1
291
292 raise RuntimeError("Timed out after {0}s waiting for {1} to become visible from {2}".format(
293 i, basename, self.client_id))
294
295 def lock_background(self, basename="background_file", do_flock=True):
296 """
297 Open and lock a files for writing, hold the lock in a background process
298 """
299 assert(self.is_mounted())
300
301 path = os.path.join(self.mountpoint, basename)
302
303 script_builder = """
304 import time
305 import fcntl
306 import struct"""
307 if do_flock:
308 script_builder += """
309 f1 = open("{path}-1", 'w')
310 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)"""
311 script_builder += """
312 f2 = open("{path}-2", 'w')
313 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
314 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
315 while True:
316 time.sleep(1)
317 """
318
319 pyscript = dedent(script_builder).format(path=path)
320
321 log.info("lock_background file {0}".format(basename))
322 rproc = self._run_python(pyscript)
323 self.background_procs.append(rproc)
324 return rproc
325
326 def lock_and_release(self, basename="background_file"):
327 assert(self.is_mounted())
328
329 path = os.path.join(self.mountpoint, basename)
330
331 script = """
332 import time
333 import fcntl
334 import struct
335 f1 = open("{path}-1", 'w')
336 fcntl.flock(f1, fcntl.LOCK_EX)
337 f2 = open("{path}-2", 'w')
338 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
339 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
340 """
341 pyscript = dedent(script).format(path=path)
342
343 log.info("lock_and_release file {0}".format(basename))
344 return self._run_python(pyscript)
345
346 def check_filelock(self, basename="background_file", do_flock=True):
347 assert(self.is_mounted())
348
349 path = os.path.join(self.mountpoint, basename)
350
351 script_builder = """
352 import fcntl
353 import errno
354 import struct"""
355 if do_flock:
356 script_builder += """
357 f1 = open("{path}-1", 'r')
358 try:
359 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)
360 except IOError as e:
361 if e.errno == errno.EAGAIN:
362 pass
363 else:
364 raise RuntimeError("flock on file {path}-1 not found")"""
365 script_builder += """
366 f2 = open("{path}-2", 'r')
367 try:
368 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
369 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
370 except IOError as e:
371 if e.errno == errno.EAGAIN:
372 pass
373 else:
374 raise RuntimeError("posix lock on file {path}-2 not found")
375 """
376 pyscript = dedent(script_builder).format(path=path)
377
378 log.info("check lock on file {0}".format(basename))
379 self.client_remote.run(args=[
380 'sudo', 'python3', '-c', pyscript
381 ])
382
383 def write_background(self, basename="background_file", loop=False):
384 """
385 Open a file for writing, complete as soon as you can
386 :param basename:
387 :return:
388 """
389 assert(self.is_mounted())
390
391 path = os.path.join(self.mountpoint, basename)
392
393 pyscript = dedent("""
394 import os
395 import time
396
397 fd = os.open("{path}", os.O_RDWR | os.O_CREAT, 0o644)
398 try:
399 while True:
400 os.write(fd, b'content')
401 time.sleep(1)
402 if not {loop}:
403 break
404 except IOError as e:
405 pass
406 os.close(fd)
407 """).format(path=path, loop=str(loop))
408
409 rproc = self._run_python(pyscript)
410 self.background_procs.append(rproc)
411 return rproc
412
413 def write_n_mb(self, filename, n_mb, seek=0, wait=True):
414 """
415 Write the requested number of megabytes to a file
416 """
417 assert(self.is_mounted())
418
419 return self.run_shell(["dd", "if=/dev/urandom", "of={0}".format(filename),
420 "bs=1M", "conv=fdatasync",
421 "count={0}".format(n_mb),
422 "seek={0}".format(seek)
423 ], wait=wait)
424
425 def write_test_pattern(self, filename, size):
426 log.info("Writing {0} bytes to {1}".format(size, filename))
427 return self.run_python(dedent("""
428 import zlib
429 path = "{path}"
430 with open(path, 'w') as f:
431 for i in range(0, {size}):
432 val = zlib.crc32(str(i).encode('utf-8')) & 7
433 f.write(chr(val))
434 """.format(
435 path=os.path.join(self.mountpoint, filename),
436 size=size
437 )))
438
439 def validate_test_pattern(self, filename, size):
440 log.info("Validating {0} bytes from {1}".format(size, filename))
441 return self.run_python(dedent("""
442 import zlib
443 path = "{path}"
444 with open(path, 'r') as f:
445 bytes = f.read()
446 if len(bytes) != {size}:
447 raise RuntimeError("Bad length {{0}} vs. expected {{1}}".format(
448 len(bytes), {size}
449 ))
450 for i, b in enumerate(bytes):
451 val = zlib.crc32(str(i).encode('utf-8')) & 7
452 if b != chr(val):
453 raise RuntimeError("Bad data at offset {{0}}".format(i))
454 """.format(
455 path=os.path.join(self.mountpoint, filename),
456 size=size
457 )))
458
459 def open_n_background(self, fs_path, count):
460 """
461 Open N files for writing, hold them open in a background process
462
463 :param fs_path: Path relative to CephFS root, e.g. "foo/bar"
464 :return: a RemoteProcess
465 """
466 assert(self.is_mounted())
467
468 abs_path = os.path.join(self.mountpoint, fs_path)
469
470 pyscript = dedent("""
471 import sys
472 import time
473 import os
474
475 n = {count}
476 abs_path = "{abs_path}"
477
478 if not os.path.exists(os.path.dirname(abs_path)):
479 os.makedirs(os.path.dirname(abs_path))
480
481 handles = []
482 for i in range(0, n):
483 fname = "{{0}}_{{1}}".format(abs_path, i)
484 handles.append(open(fname, 'w'))
485
486 while True:
487 time.sleep(1)
488 """).format(abs_path=abs_path, count=count)
489
490 rproc = self._run_python(pyscript)
491 self.background_procs.append(rproc)
492 return rproc
493
494 def create_n_files(self, fs_path, count, sync=False):
495 assert(self.is_mounted())
496
497 abs_path = os.path.join(self.mountpoint, fs_path)
498
499 pyscript = dedent("""
500 import sys
501 import time
502 import os
503
504 n = {count}
505 abs_path = "{abs_path}"
506
507 if not os.path.exists(os.path.dirname(abs_path)):
508 os.makedirs(os.path.dirname(abs_path))
509
510 for i in range(0, n):
511 fname = "{{0}}_{{1}}".format(abs_path, i)
512 with open(fname, 'w') as f:
513 f.write('content')
514 if {sync}:
515 f.flush()
516 os.fsync(f.fileno())
517 """).format(abs_path=abs_path, count=count, sync=str(sync))
518
519 self.run_python(pyscript)
520
521 def teardown(self):
522 for p in self.background_procs:
523 log.info("Terminating background process")
524 self._kill_background(p)
525
526 self.background_procs = []
527
528 def _kill_background(self, p):
529 if p.stdin:
530 p.stdin.close()
531 try:
532 p.wait()
533 except (CommandFailedError, ConnectionLostError):
534 pass
535
536 def kill_background(self, p):
537 """
538 For a process that was returned by one of the _background member functions,
539 kill it hard.
540 """
541 self._kill_background(p)
542 self.background_procs.remove(p)
543
544 def send_signal(self, signal):
545 signal = signal.lower()
546 if signal.lower() not in ['sigstop', 'sigcont', 'sigterm', 'sigkill']:
547 raise NotImplementedError
548
549 self.client_remote.run(args=['sudo', 'kill', '-{0}'.format(signal),
550 self.client_pid], omit_sudo=False)
551
552 def get_global_id(self):
553 raise NotImplementedError()
554
555 def get_global_inst(self):
556 raise NotImplementedError()
557
558 def get_global_addr(self):
559 raise NotImplementedError()
560
561 def get_osd_epoch(self):
562 raise NotImplementedError()
563
564 def lstat(self, fs_path, follow_symlinks=False, wait=True):
565 return self.stat(fs_path, follow_symlinks=False, wait=True)
566
567 def stat(self, fs_path, follow_symlinks=True, wait=True):
568 """
569 stat a file, and return the result as a dictionary like this:
570 {
571 "st_ctime": 1414161137.0,
572 "st_mtime": 1414161137.0,
573 "st_nlink": 33,
574 "st_gid": 0,
575 "st_dev": 16777218,
576 "st_size": 1190,
577 "st_ino": 2,
578 "st_uid": 0,
579 "st_mode": 16877,
580 "st_atime": 1431520593.0
581 }
582
583 Raises exception on absent file.
584 """
585 abs_path = os.path.join(self.mountpoint, fs_path)
586 if follow_symlinks:
587 stat_call = "os.stat('" + abs_path + "')"
588 else:
589 stat_call = "os.lstat('" + abs_path + "')"
590
591 pyscript = dedent("""
592 import os
593 import stat
594 import json
595 import sys
596
597 try:
598 s = {stat_call}
599 except OSError as e:
600 sys.exit(e.errno)
601
602 attrs = ["st_mode", "st_ino", "st_dev", "st_nlink", "st_uid", "st_gid", "st_size", "st_atime", "st_mtime", "st_ctime"]
603 print(json.dumps(
604 dict([(a, getattr(s, a)) for a in attrs]),
605 indent=2))
606 """).format(stat_call=stat_call)
607 proc = self._run_python(pyscript)
608 if wait:
609 proc.wait()
610 return json.loads(proc.stdout.getvalue().strip())
611 else:
612 return proc
613
614 def touch(self, fs_path):
615 """
616 Create a dentry if it doesn't already exist. This python
617 implementation exists because the usual command line tool doesn't
618 pass through error codes like EIO.
619
620 :param fs_path:
621 :return:
622 """
623 abs_path = os.path.join(self.mountpoint, fs_path)
624 pyscript = dedent("""
625 import sys
626 import errno
627
628 try:
629 f = open("{path}", "w")
630 f.close()
631 except IOError as e:
632 sys.exit(errno.EIO)
633 """).format(path=abs_path)
634 proc = self._run_python(pyscript)
635 proc.wait()
636
637 def path_to_ino(self, fs_path, follow_symlinks=True):
638 abs_path = os.path.join(self.mountpoint, fs_path)
639
640 if follow_symlinks:
641 pyscript = dedent("""
642 import os
643 import stat
644
645 print(os.stat("{path}").st_ino)
646 """).format(path=abs_path)
647 else:
648 pyscript = dedent("""
649 import os
650 import stat
651
652 print(os.lstat("{path}").st_ino)
653 """).format(path=abs_path)
654
655 proc = self._run_python(pyscript)
656 proc.wait()
657 return int(proc.stdout.getvalue().strip())
658
659 def path_to_nlink(self, fs_path):
660 abs_path = os.path.join(self.mountpoint, fs_path)
661
662 pyscript = dedent("""
663 import os
664 import stat
665
666 print(os.stat("{path}").st_nlink)
667 """).format(path=abs_path)
668
669 proc = self._run_python(pyscript)
670 proc.wait()
671 return int(proc.stdout.getvalue().strip())
672
673 def ls(self, path=None):
674 """
675 Wrap ls: return a list of strings
676 """
677 cmd = ["ls"]
678 if path:
679 cmd.append(path)
680
681 ls_text = self.run_shell(cmd).stdout.getvalue().strip()
682
683 if ls_text:
684 return ls_text.split("\n")
685 else:
686 # Special case because otherwise split on empty string
687 # gives you [''] instead of []
688 return []
689
690 def setfattr(self, path, key, val):
691 """
692 Wrap setfattr.
693
694 :param path: relative to mount point
695 :param key: xattr name
696 :param val: xattr value
697 :return: None
698 """
699 self.run_shell(["setfattr", "-n", key, "-v", val, path])
700
701 def getfattr(self, path, attr):
702 """
703 Wrap getfattr: return the values of a named xattr on one file, or
704 None if the attribute is not found.
705
706 :return: a string
707 """
708 p = self.run_shell(["getfattr", "--only-values", "-n", attr, path], wait=False)
709 try:
710 p.wait()
711 except CommandFailedError as e:
712 if e.exitstatus == 1 and "No such attribute" in p.stderr.getvalue():
713 return None
714 else:
715 raise
716
717 return p.stdout.getvalue()
718
719 def df(self):
720 """
721 Wrap df: return a dict of usage fields in bytes
722 """
723
724 p = self.run_shell(["df", "-B1", "."])
725 lines = p.stdout.getvalue().strip().split("\n")
726 fs, total, used, avail = lines[1].split()[:4]
727 log.warn(lines)
728
729 return {
730 "total": int(total),
731 "used": int(used),
732 "available": int(avail)
733 }