]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/mount.py
bcc9aefd89543420a2571260d87c6f59f4c89dac
[ceph.git] / ceph / qa / tasks / cephfs / mount.py
1 from contextlib import contextmanager
2 import json
3 import logging
4 import datetime
5 import time
6 from textwrap import dedent
7 import os
8 from StringIO import StringIO
9 from teuthology.orchestra import run
10 from teuthology.orchestra.run import CommandFailedError, ConnectionLostError
11 from tasks.cephfs.filesystem import Filesystem
12
13 log = logging.getLogger(__name__)
14
15
16 class CephFSMount(object):
17 def __init__(self, ctx, test_dir, client_id, client_remote):
18 """
19 :param test_dir: Global teuthology test dir
20 :param client_id: Client ID, the 'foo' in client.foo
21 :param client_remote: Remote instance for the host where client will run
22 """
23
24 self.ctx = ctx
25 self.test_dir = test_dir
26 self.client_id = client_id
27 self.client_remote = client_remote
28 self.mountpoint_dir_name = 'mnt.{id}'.format(id=self.client_id)
29 self.fs = None
30
31 self.test_files = ['a', 'b', 'c']
32
33 self.background_procs = []
34
35 @property
36 def mountpoint(self):
37 return os.path.join(
38 self.test_dir, '{dir_name}'.format(dir_name=self.mountpoint_dir_name))
39
40 def is_mounted(self):
41 raise NotImplementedError()
42
43 def setupfs(self, name=None):
44 if name is None and self.fs is not None:
45 # Previous mount existed, reuse the old name
46 name = self.fs.name
47 self.fs = Filesystem(self.ctx, name=name)
48 log.info('Wait for MDS to reach steady state...')
49 self.fs.wait_for_daemons()
50 log.info('Ready to start {}...'.format(type(self).__name__))
51
52 def mount(self, mount_path=None, mount_fs_name=None):
53 raise NotImplementedError()
54
55 def umount(self):
56 raise NotImplementedError()
57
58 def umount_wait(self, force=False, require_clean=False):
59 """
60
61 :param force: Expect that the mount will not shutdown cleanly: kill
62 it hard.
63 :param require_clean: Wait for the Ceph client associated with the
64 mount (e.g. ceph-fuse) to terminate, and
65 raise if it doesn't do so cleanly.
66 :return:
67 """
68 raise NotImplementedError()
69
70 def kill_cleanup(self):
71 raise NotImplementedError()
72
73 def kill(self):
74 raise NotImplementedError()
75
76 def cleanup(self):
77 raise NotImplementedError()
78
79 def wait_until_mounted(self):
80 raise NotImplementedError()
81
82 def get_keyring_path(self):
83 return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self.client_id)
84
85 @property
86 def config_path(self):
87 """
88 Path to ceph.conf: override this if you're not a normal systemwide ceph install
89 :return: stringv
90 """
91 return "/etc/ceph/ceph.conf"
92
93 @contextmanager
94 def mounted(self):
95 """
96 A context manager, from an initially unmounted state, to mount
97 this, yield, and then unmount and clean up.
98 """
99 self.mount()
100 self.wait_until_mounted()
101 try:
102 yield
103 finally:
104 self.umount_wait()
105
106 def is_blacklisted(self):
107 addr = self.get_global_addr()
108 blacklist = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "blacklist", "ls", "--format=json"))
109 for b in blacklist:
110 if addr == b["addr"]:
111 return True
112 return False
113
114 def create_files(self):
115 assert(self.is_mounted())
116
117 for suffix in self.test_files:
118 log.info("Creating file {0}".format(suffix))
119 self.client_remote.run(args=[
120 'sudo', 'touch', os.path.join(self.mountpoint, suffix)
121 ])
122
123 def check_files(self):
124 assert(self.is_mounted())
125
126 for suffix in self.test_files:
127 log.info("Checking file {0}".format(suffix))
128 r = self.client_remote.run(args=[
129 'sudo', 'ls', os.path.join(self.mountpoint, suffix)
130 ], check_status=False)
131 if r.exitstatus != 0:
132 raise RuntimeError("Expected file {0} not found".format(suffix))
133
134 def create_destroy(self):
135 assert(self.is_mounted())
136
137 filename = "{0} {1}".format(datetime.datetime.now(), self.client_id)
138 log.debug("Creating test file {0}".format(filename))
139 self.client_remote.run(args=[
140 'sudo', 'touch', os.path.join(self.mountpoint, filename)
141 ])
142 log.debug("Deleting test file {0}".format(filename))
143 self.client_remote.run(args=[
144 'sudo', 'rm', '-f', os.path.join(self.mountpoint, filename)
145 ])
146
147 def _run_python(self, pyscript, py_version='python'):
148 return self.client_remote.run(
149 args=['sudo', 'adjust-ulimits', 'daemon-helper', 'kill',
150 py_version, '-c', pyscript], wait=False, stdin=run.PIPE,
151 stdout=StringIO())
152
153 def run_python(self, pyscript, py_version='python'):
154 p = self._run_python(pyscript, py_version)
155 p.wait()
156 return p.stdout.getvalue().strip()
157
158 def run_shell(self, args, wait=True):
159 args = ["cd", self.mountpoint, run.Raw('&&'), "sudo"] + args
160 return self.client_remote.run(args=args, stdout=StringIO(),
161 stderr=StringIO(), wait=wait)
162
163 def open_no_data(self, basename):
164 """
165 A pure metadata operation
166 """
167 assert(self.is_mounted())
168
169 path = os.path.join(self.mountpoint, basename)
170
171 p = self._run_python(dedent(
172 """
173 f = open("{path}", 'w')
174 """.format(path=path)
175 ))
176 p.wait()
177
178 def open_background(self, basename="background_file"):
179 """
180 Open a file for writing, then block such that the client
181 will hold a capability.
182
183 Don't return until the remote process has got as far as opening
184 the file, then return the RemoteProcess instance.
185 """
186 assert(self.is_mounted())
187
188 path = os.path.join(self.mountpoint, basename)
189
190 pyscript = dedent("""
191 import time
192
193 f = open("{path}", 'w')
194 f.write('content')
195 f.flush()
196 f.write('content2')
197 while True:
198 time.sleep(1)
199 """).format(path=path)
200
201 rproc = self._run_python(pyscript)
202 self.background_procs.append(rproc)
203
204 # This wait would not be sufficient if the file had already
205 # existed, but it's simple and in practice users of open_background
206 # are not using it on existing files.
207 self.wait_for_visible(basename)
208
209 return rproc
210
211 def wait_for_visible(self, basename="background_file", timeout=30):
212 i = 0
213 while i < timeout:
214 r = self.client_remote.run(args=[
215 'sudo', 'ls', os.path.join(self.mountpoint, basename)
216 ], check_status=False)
217 if r.exitstatus == 0:
218 log.debug("File {0} became visible from {1} after {2}s".format(
219 basename, self.client_id, i))
220 return
221 else:
222 time.sleep(1)
223 i += 1
224
225 raise RuntimeError("Timed out after {0}s waiting for {1} to become visible from {2}".format(
226 i, basename, self.client_id))
227
228 def lock_background(self, basename="background_file", do_flock=True):
229 """
230 Open and lock a files for writing, hold the lock in a background process
231 """
232 assert(self.is_mounted())
233
234 path = os.path.join(self.mountpoint, basename)
235
236 script_builder = """
237 import time
238 import fcntl
239 import struct"""
240 if do_flock:
241 script_builder += """
242 f1 = open("{path}-1", 'w')
243 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)"""
244 script_builder += """
245 f2 = open("{path}-2", 'w')
246 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
247 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
248 while True:
249 time.sleep(1)
250 """
251
252 pyscript = dedent(script_builder).format(path=path)
253
254 log.info("lock_background file {0}".format(basename))
255 rproc = self._run_python(pyscript)
256 self.background_procs.append(rproc)
257 return rproc
258
259 def lock_and_release(self, basename="background_file"):
260 assert(self.is_mounted())
261
262 path = os.path.join(self.mountpoint, basename)
263
264 script = """
265 import time
266 import fcntl
267 import struct
268 f1 = open("{path}-1", 'w')
269 fcntl.flock(f1, fcntl.LOCK_EX)
270 f2 = open("{path}-2", 'w')
271 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
272 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
273 """
274 pyscript = dedent(script).format(path=path)
275
276 log.info("lock_and_release file {0}".format(basename))
277 return self._run_python(pyscript)
278
279 def check_filelock(self, basename="background_file", do_flock=True):
280 assert(self.is_mounted())
281
282 path = os.path.join(self.mountpoint, basename)
283
284 script_builder = """
285 import fcntl
286 import errno
287 import struct"""
288 if do_flock:
289 script_builder += """
290 f1 = open("{path}-1", 'r')
291 try:
292 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)
293 except IOError, e:
294 if e.errno == errno.EAGAIN:
295 pass
296 else:
297 raise RuntimeError("flock on file {path}-1 not found")"""
298 script_builder += """
299 f2 = open("{path}-2", 'r')
300 try:
301 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
302 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
303 except IOError, e:
304 if e.errno == errno.EAGAIN:
305 pass
306 else:
307 raise RuntimeError("posix lock on file {path}-2 not found")
308 """
309 pyscript = dedent(script_builder).format(path=path)
310
311 log.info("check lock on file {0}".format(basename))
312 self.client_remote.run(args=[
313 'sudo', 'python', '-c', pyscript
314 ])
315
316 def write_background(self, basename="background_file", loop=False):
317 """
318 Open a file for writing, complete as soon as you can
319 :param basename:
320 :return:
321 """
322 assert(self.is_mounted())
323
324 path = os.path.join(self.mountpoint, basename)
325
326 pyscript = dedent("""
327 import os
328 import time
329
330 fd = os.open("{path}", os.O_RDWR | os.O_CREAT, 0644)
331 try:
332 while True:
333 os.write(fd, 'content')
334 time.sleep(1)
335 if not {loop}:
336 break
337 except IOError, e:
338 pass
339 os.close(fd)
340 """).format(path=path, loop=str(loop))
341
342 rproc = self._run_python(pyscript)
343 self.background_procs.append(rproc)
344 return rproc
345
346 def write_n_mb(self, filename, n_mb, seek=0, wait=True):
347 """
348 Write the requested number of megabytes to a file
349 """
350 assert(self.is_mounted())
351
352 return self.run_shell(["dd", "if=/dev/urandom", "of={0}".format(filename),
353 "bs=1M", "conv=fdatasync",
354 "count={0}".format(n_mb),
355 "seek={0}".format(seek)
356 ], wait=wait)
357
358 def write_test_pattern(self, filename, size):
359 log.info("Writing {0} bytes to {1}".format(size, filename))
360 return self.run_python(dedent("""
361 import zlib
362 path = "{path}"
363 f = open(path, 'w')
364 for i in range(0, {size}):
365 val = zlib.crc32("%s" % i) & 7
366 f.write(chr(val))
367 f.close()
368 """.format(
369 path=os.path.join(self.mountpoint, filename),
370 size=size
371 )))
372
373 def validate_test_pattern(self, filename, size):
374 log.info("Validating {0} bytes from {1}".format(size, filename))
375 return self.run_python(dedent("""
376 import zlib
377 path = "{path}"
378 f = open(path, 'r')
379 bytes = f.read()
380 f.close()
381 if len(bytes) != {size}:
382 raise RuntimeError("Bad length {{0}} vs. expected {{1}}".format(
383 len(bytes), {size}
384 ))
385 for i, b in enumerate(bytes):
386 val = zlib.crc32("%s" % i) & 7
387 if b != chr(val):
388 raise RuntimeError("Bad data at offset {{0}}".format(i))
389 """.format(
390 path=os.path.join(self.mountpoint, filename),
391 size=size
392 )))
393
394 def open_n_background(self, fs_path, count):
395 """
396 Open N files for writing, hold them open in a background process
397
398 :param fs_path: Path relative to CephFS root, e.g. "foo/bar"
399 :return: a RemoteProcess
400 """
401 assert(self.is_mounted())
402
403 abs_path = os.path.join(self.mountpoint, fs_path)
404
405 pyscript = dedent("""
406 import sys
407 import time
408 import os
409
410 n = {count}
411 abs_path = "{abs_path}"
412
413 if not os.path.exists(os.path.dirname(abs_path)):
414 os.makedirs(os.path.dirname(abs_path))
415
416 handles = []
417 for i in range(0, n):
418 fname = "{{0}}_{{1}}".format(abs_path, i)
419 handles.append(open(fname, 'w'))
420
421 while True:
422 time.sleep(1)
423 """).format(abs_path=abs_path, count=count)
424
425 rproc = self._run_python(pyscript)
426 self.background_procs.append(rproc)
427 return rproc
428
429 def create_n_files(self, fs_path, count, sync=False):
430 assert(self.is_mounted())
431
432 abs_path = os.path.join(self.mountpoint, fs_path)
433
434 pyscript = dedent("""
435 import sys
436 import time
437 import os
438
439 n = {count}
440 abs_path = "{abs_path}"
441
442 if not os.path.exists(os.path.dirname(abs_path)):
443 os.makedirs(os.path.dirname(abs_path))
444
445 for i in range(0, n):
446 fname = "{{0}}_{{1}}".format(abs_path, i)
447 h = open(fname, 'w')
448 h.write('content')
449 if {sync}:
450 h.flush()
451 os.fsync(h.fileno())
452 h.close()
453 """).format(abs_path=abs_path, count=count, sync=str(sync))
454
455 self.run_python(pyscript)
456
457 def teardown(self):
458 for p in self.background_procs:
459 log.info("Terminating background process")
460 self._kill_background(p)
461
462 self.background_procs = []
463
464 def _kill_background(self, p):
465 if p.stdin:
466 p.stdin.close()
467 try:
468 p.wait()
469 except (CommandFailedError, ConnectionLostError):
470 pass
471
472 def kill_background(self, p):
473 """
474 For a process that was returned by one of the _background member functions,
475 kill it hard.
476 """
477 self._kill_background(p)
478 self.background_procs.remove(p)
479
480 def get_global_id(self):
481 raise NotImplementedError()
482
483 def get_global_inst(self):
484 raise NotImplementedError()
485
486 def get_global_addr(self):
487 raise NotImplementedError()
488
489 def get_osd_epoch(self):
490 raise NotImplementedError()
491
492 def stat(self, fs_path, wait=True):
493 """
494 stat a file, and return the result as a dictionary like this:
495 {
496 "st_ctime": 1414161137.0,
497 "st_mtime": 1414161137.0,
498 "st_nlink": 33,
499 "st_gid": 0,
500 "st_dev": 16777218,
501 "st_size": 1190,
502 "st_ino": 2,
503 "st_uid": 0,
504 "st_mode": 16877,
505 "st_atime": 1431520593.0
506 }
507
508 Raises exception on absent file.
509 """
510 abs_path = os.path.join(self.mountpoint, fs_path)
511
512 pyscript = dedent("""
513 import os
514 import stat
515 import json
516 import sys
517
518 try:
519 s = os.stat("{path}")
520 except OSError as e:
521 sys.exit(e.errno)
522
523 attrs = ["st_mode", "st_ino", "st_dev", "st_nlink", "st_uid", "st_gid", "st_size", "st_atime", "st_mtime", "st_ctime"]
524 print json.dumps(
525 dict([(a, getattr(s, a)) for a in attrs]),
526 indent=2)
527 """).format(path=abs_path)
528 proc = self._run_python(pyscript)
529 if wait:
530 proc.wait()
531 return json.loads(proc.stdout.getvalue().strip())
532 else:
533 return proc
534
535 def touch(self, fs_path):
536 """
537 Create a dentry if it doesn't already exist. This python
538 implementation exists because the usual command line tool doesn't
539 pass through error codes like EIO.
540
541 :param fs_path:
542 :return:
543 """
544 abs_path = os.path.join(self.mountpoint, fs_path)
545 pyscript = dedent("""
546 import sys
547 import errno
548
549 try:
550 f = open("{path}", "w")
551 f.close()
552 except IOError as e:
553 sys.exit(errno.EIO)
554 """).format(path=abs_path)
555 proc = self._run_python(pyscript)
556 proc.wait()
557
558 def path_to_ino(self, fs_path, follow_symlinks=True):
559 abs_path = os.path.join(self.mountpoint, fs_path)
560
561 if follow_symlinks:
562 pyscript = dedent("""
563 import os
564 import stat
565
566 print os.stat("{path}").st_ino
567 """).format(path=abs_path)
568 else:
569 pyscript = dedent("""
570 import os
571 import stat
572
573 print os.lstat("{path}").st_ino
574 """).format(path=abs_path)
575
576 proc = self._run_python(pyscript)
577 proc.wait()
578 return int(proc.stdout.getvalue().strip())
579
580 def path_to_nlink(self, fs_path):
581 abs_path = os.path.join(self.mountpoint, fs_path)
582
583 pyscript = dedent("""
584 import os
585 import stat
586
587 print os.stat("{path}").st_nlink
588 """).format(path=abs_path)
589
590 proc = self._run_python(pyscript)
591 proc.wait()
592 return int(proc.stdout.getvalue().strip())
593
594 def ls(self, path=None):
595 """
596 Wrap ls: return a list of strings
597 """
598 cmd = ["ls"]
599 if path:
600 cmd.append(path)
601
602 ls_text = self.run_shell(cmd).stdout.getvalue().strip()
603
604 if ls_text:
605 return ls_text.split("\n")
606 else:
607 # Special case because otherwise split on empty string
608 # gives you [''] instead of []
609 return []
610
611 def setfattr(self, path, key, val):
612 """
613 Wrap setfattr.
614
615 :param path: relative to mount point
616 :param key: xattr name
617 :param val: xattr value
618 :return: None
619 """
620 self.run_shell(["setfattr", "-n", key, "-v", val, path])
621
622 def getfattr(self, path, attr):
623 """
624 Wrap getfattr: return the values of a named xattr on one file, or
625 None if the attribute is not found.
626
627 :return: a string
628 """
629 p = self.run_shell(["getfattr", "--only-values", "-n", attr, path], wait=False)
630 try:
631 p.wait()
632 except CommandFailedError as e:
633 if e.exitstatus == 1 and "No such attribute" in p.stderr.getvalue():
634 return None
635 else:
636 raise
637
638 return p.stdout.getvalue()
639
640 def df(self):
641 """
642 Wrap df: return a dict of usage fields in bytes
643 """
644
645 p = self.run_shell(["df", "-B1", "."])
646 lines = p.stdout.getvalue().strip().split("\n")
647 fs, total, used, avail = lines[1].split()[:4]
648 log.warn(lines)
649
650 return {
651 "total": int(total),
652 "used": int(used),
653 "available": int(avail)
654 }