]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/fuse_mount.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / qa / tasks / cephfs / fuse_mount.py
CommitLineData
7c673cae
FG
1import json
2import time
3import logging
e306af50 4
f67539c2 5from io import StringIO
7c673cae
FG
6from textwrap import dedent
7
7c673cae 8from teuthology.contextutil import MaxWhileTries
f67539c2 9from teuthology.contextutil import safe_while
7c673cae 10from teuthology.orchestra import run
20effc67 11from teuthology.exceptions import CommandFailedError
f67539c2 12from tasks.ceph_manager import get_valgrind_args
1e59de90 13from tasks.cephfs.mount import CephFSMount, UMOUNT_TIMEOUT
7c673cae
FG
14
15log = logging.getLogger(__name__)
16
f67539c2 17# Refer mount.py for docstrings.
7c673cae 18class FuseMount(CephFSMount):
20effc67
TL
19 def __init__(self, ctx, test_dir, client_id, client_remote,
20 client_keyring_path=None, cephfs_name=None,
21 cephfs_mntpt=None, hostfs_mntpt=None, brxnet=None,
22 client_config={}):
f67539c2
TL
23 super(FuseMount, self).__init__(ctx=ctx, test_dir=test_dir,
24 client_id=client_id, client_remote=client_remote,
25 client_keyring_path=client_keyring_path, hostfs_mntpt=hostfs_mntpt,
1e59de90
TL
26 cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet,
27 client_config=client_config)
7c673cae 28
7c673cae
FG
29 self.fuse_daemon = None
30 self._fuse_conn = None
11fdf7f2
TL
31 self.id = None
32 self.inst = None
33 self.addr = None
20effc67 34 self.mount_timeout = int(self.client_config.get('mount_timeout', 30))
7c673cae 35
20effc67
TL
36 self._mount_bin = [
37 'ceph-fuse', "-f",
38 "--admin-socket", "/var/run/ceph/$cluster-$name.$pid.asok"]
39 self._mount_cmd_cwd = self.test_dir
40 if self.client_config.get('valgrind') is not None:
41 self.cwd = None # get_valgrind_args chdir for us
42 self._mount_cmd_logger = log.getChild('ceph-fuse.{id}'.format(id=self.client_id))
43 self._mount_cmd_stdin = run.PIPE
44
1e59de90 45 def mount(self, mntopts=None, check_status=True, mntargs=None, **kwargs):
f67539c2
TL
46 self.update_attrs(**kwargs)
47 self.assert_and_log_minimum_mount_details()
48
49 self.setup_netns()
50
7c673cae 51 try:
1e59de90 52 return self._mount(mntopts, mntargs, check_status)
7c673cae
FG
53 except RuntimeError:
54 # Catch exceptions by the mount() logic (i.e. not remote command
55 # failures) and ensure the mount is not left half-up.
56 # Otherwise we might leave a zombie mount point that causes
57 # anyone traversing cephtest/ to get hung up on.
e306af50 58 log.warning("Trying to clean up after failed mount")
7c673cae
FG
59 self.umount_wait(force=True)
60 raise
61
1e59de90 62 def _mount(self, mntopts, mntargs, check_status):
f67539c2
TL
63 log.info("Client client.%s config is %s" % (self.client_id,
64 self.client_config))
7c673cae 65
20effc67
TL
66 self._create_mntpt()
67
1e59de90 68 retval = self._run_mount_cmd(mntopts, mntargs, check_status)
20effc67
TL
69 if retval:
70 return retval
71
72 self.gather_mount_info()
73
1e59de90
TL
74 def _run_mount_cmd(self, mntopts, mntargs, check_status):
75 mount_cmd = self._get_mount_cmd(mntopts, mntargs)
20effc67
TL
76 mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
77
78 # Before starting ceph-fuse process, note the contents of
79 # /sys/fs/fuse/connections
80 pre_mount_conns = self._list_fuse_conns()
81 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
82
83 self.fuse_daemon = self.client_remote.run(
84 args=mount_cmd,
85 cwd=self._mount_cmd_cwd,
86 logger=self._mount_cmd_logger,
87 stdin=self._mount_cmd_stdin,
88 stdout=mountcmd_stdout,
89 stderr=mountcmd_stderr,
90 wait=False
91 )
92
93 return self._wait_and_record_our_fuse_conn(
94 check_status, pre_mount_conns, mountcmd_stdout, mountcmd_stderr)
95
1e59de90 96 def _get_mount_cmd(self, mntopts, mntargs):
7c673cae 97 daemon_signal = 'kill'
f67539c2
TL
98 if self.client_config.get('coverage') or \
99 self.client_config.get('valgrind') is not None:
7c673cae
FG
100 daemon_signal = 'term'
101
20effc67
TL
102 mount_cmd = ['sudo', 'adjust-ulimits', 'ceph-coverage',
103 '{tdir}/archive/coverage'.format(tdir=self.test_dir),
104 'daemon-helper', daemon_signal]
105
106 mount_cmd = self._add_valgrind_args(mount_cmd)
107 mount_cmd = ['sudo'] + self._nsenter_args + mount_cmd
108
109 mount_cmd += self._mount_bin + [self.hostfs_mntpt]
110 if self.client_id:
111 mount_cmd += ['--id', self.client_id]
112 if self.client_keyring_path and self.client_id:
113 mount_cmd += ['-k', self.client_keyring_path]
1e59de90
TL
114
115 self.validate_subvol_options()
116
20effc67
TL
117 if self.cephfs_mntpt:
118 mount_cmd += ["--client_mountpoint=" + self.cephfs_mntpt]
1e59de90 119
20effc67
TL
120 if self.cephfs_name:
121 mount_cmd += ["--client_fs=" + self.cephfs_name]
f67539c2 122 if mntopts:
1e59de90
TL
123 mount_cmd.extend(('-o', ','.join(mntopts)))
124 if mntargs:
125 mount_cmd.extend(mntargs)
7c673cae 126
20effc67
TL
127 return mount_cmd
128
129 def _add_valgrind_args(self, mount_cmd):
7c673cae 130 if self.client_config.get('valgrind') is not None:
20effc67 131 mount_cmd = get_valgrind_args(
7c673cae
FG
132 self.test_dir,
133 'client.{id}'.format(id=self.client_id),
20effc67 134 mount_cmd,
7c673cae 135 self.client_config.get('valgrind'),
f67539c2 136 cd=False
7c673cae 137 )
f67539c2 138
20effc67 139 return mount_cmd
e306af50 140
20effc67
TL
141 def _list_fuse_conns(self):
142 conn_dir = "/sys/fs/fuse/connections"
e306af50 143
20effc67
TL
144 self.client_remote.run(args=['sudo', 'modprobe', 'fuse'],
145 check_status=False)
146 self.client_remote.run(
147 args=["sudo", "mount", "-t", "fusectl", conn_dir, conn_dir],
148 check_status=False, timeout=(30))
7c673cae 149
20effc67
TL
150 try:
151 ls_str = self.client_remote.sh("ls " + conn_dir,
152 stdout=StringIO(),
1e59de90 153 timeout=300).strip()
20effc67
TL
154 except CommandFailedError:
155 return []
7c673cae 156
20effc67
TL
157 if ls_str:
158 return [int(n) for n in ls_str.split("\n")]
159 else:
160 return []
7c673cae 161
20effc67
TL
162 def _wait_and_record_our_fuse_conn(self, check_status, pre_mount_conns,
163 mountcmd_stdout, mountcmd_stderr):
164 """
165 Wait for the connection reference to appear in /sys
166 """
7c673cae
FG
167 waited = 0
168
20effc67 169 post_mount_conns = self._list_fuse_conns()
7c673cae
FG
170 while len(post_mount_conns) <= len(pre_mount_conns):
171 if self.fuse_daemon.finished:
172 # Did mount fail? Raise the CommandFailedError instead of
173 # hitting the "failed to populate /sys/" timeout
f67539c2
TL
174 try:
175 self.fuse_daemon.wait()
176 except CommandFailedError as e:
177 log.info('mount command failed.')
178 if check_status:
179 raise
180 else:
181 return (e, mountcmd_stdout.getvalue(),
182 mountcmd_stderr.getvalue())
7c673cae
FG
183 time.sleep(1)
184 waited += 1
20effc67 185 if waited > self._fuse_conn_check_timeout:
f67539c2
TL
186 raise RuntimeError(
187 "Fuse mount failed to populate/sys/ after {} "
188 "seconds".format(waited))
7c673cae 189 else:
20effc67 190 post_mount_conns = self._list_fuse_conns()
7c673cae
FG
191
192 log.info("Post-mount connections: {0}".format(post_mount_conns))
193
20effc67
TL
194 self._record_our_fuse_conn(pre_mount_conns, post_mount_conns)
195
196 @property
197 def _fuse_conn_check_timeout(self):
198 mount_wait = self.client_config.get('mount_wait', 0)
199 if mount_wait > 0:
200 log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
201 time.sleep(mount_wait)
202 timeout = int(self.client_config.get('mount_timeout', 30))
203 return timeout
204
205 def _record_our_fuse_conn(self, pre_mount_conns, post_mount_conns):
206 """
207 Record our fuse connection number so that we can use it when forcing
208 an unmount.
209 """
7c673cae
FG
210 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
211 if len(new_conns) == 0:
212 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
213 elif len(new_conns) > 1:
214 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
215 else:
216 self._fuse_conn = new_conns[0]
217
11fdf7f2
TL
218 def gather_mount_info(self):
219 status = self.admin_socket(['status'])
220 self.id = status['id']
eafe8130 221 self.client_pid = status['metadata']['pid']
11fdf7f2
TL
222 try:
223 self.inst = status['inst_str']
224 self.addr = status['addr_str']
9f95a23c 225 except KeyError:
11fdf7f2
TL
226 sessions = self.fs.rank_asok(['session', 'ls'])
227 for s in sessions:
228 if s['id'] == self.id:
229 self.inst = s['inst']
230 self.addr = self.inst.split()[1]
231 if self.inst is None:
232 raise RuntimeError("cannot find client session")
233
f67539c2 234 def check_mounted_state(self):
7c673cae
FG
235 proc = self.client_remote.run(
236 args=[
237 'stat',
238 '--file-system',
239 '--printf=%T\n',
240 '--',
f67539c2 241 self.hostfs_mntpt,
7c673cae 242 ],
f67539c2
TL
243 stdout=StringIO(),
244 stderr=StringIO(),
f64942e4 245 wait=False,
1e59de90 246 timeout=300
7c673cae
FG
247 )
248 try:
249 proc.wait()
250 except CommandFailedError:
f67539c2
TL
251 error = proc.stderr.getvalue()
252 if ("endpoint is not connected" in error
253 or "Software caused connection abort" in error):
254 # This happens is fuse is killed without unmount
255 log.warning("Found stale mount point at {0}".format(self.hostfs_mntpt))
256 return True
257 else:
258 # This happens if the mount directory doesn't exist
259 log.info('mount point does not exist: %s', self.hostfs_mntpt)
260 return False
7c673cae 261
f67539c2 262 fstype = proc.stdout.getvalue().rstrip('\n')
7c673cae 263 if fstype == 'fuseblk':
f67539c2 264 log.info('ceph-fuse is mounted on %s', self.hostfs_mntpt)
7c673cae
FG
265 return True
266 else:
267 log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
268 fstype=fstype))
269 return False
270
271 def wait_until_mounted(self):
272 """
273 Check to make sure that fuse is mounted on mountpoint. If not,
274 sleep for 5 seconds and check again.
275 """
276
f67539c2 277 while not self.check_mounted_state():
7c673cae
FG
278 # Even if it's not mounted, it should at least
279 # be running: catch simple failures where it has terminated.
280 assert not self.fuse_daemon.poll()
281
282 time.sleep(5)
283
f67539c2
TL
284 # Now that we're mounted, set permissions so that the rest of the test
285 # will have unrestricted access to the filesystem mount.
286 for retry in range(10):
287 try:
288 stderr = StringIO()
289 self.client_remote.run(args=['sudo', 'chmod', '1777',
290 self.hostfs_mntpt],
1e59de90 291 timeout=300,
f67539c2
TL
292 stderr=stderr, omit_sudo=False)
293 break
294 except run.CommandFailedError:
295 stderr = stderr.getvalue().lower()
296 if "read-only file system" in stderr:
297 break
298 elif "permission denied" in stderr:
299 time.sleep(5)
300 else:
301 raise
7c673cae
FG
302
303 def _mountpoint_exists(self):
1e59de90
TL
304 return self.client_remote.run(args=["ls", "-d", self.hostfs_mntpt],
305 check_status=False,
306 timeout=300).exitstatus == 0
7c673cae 307
f67539c2
TL
308 def umount(self, cleanup=True):
309 """
310 umount() must not run cleanup() when it's called by umount_wait()
311 since "run.wait([self.fuse_daemon], timeout)" would hang otherwise.
312 """
1911f103 313 if not self.is_mounted():
f67539c2
TL
314 if cleanup:
315 self.cleanup()
1911f103 316 return
1e59de90
TL
317 if self.is_blocked():
318 self._run_umount_lf()
319 if cleanup:
320 self.cleanup()
321 return
1911f103 322
7c673cae
FG
323 try:
324 log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
f67539c2 325 stderr = StringIO()
1e59de90
TL
326 self.client_remote.run(
327 args=['sudo', 'fusermount', '-u', self.hostfs_mntpt],
328 stderr=stderr, timeout=UMOUNT_TIMEOUT, omit_sudo=False)
7c673cae 329 except run.CommandFailedError:
f67539c2
TL
330 if "mountpoint not found" in stderr.getvalue():
331 # This happens if the mount directory doesn't exist
332 log.info('mount point does not exist: %s', self.mountpoint)
333 elif "not mounted" in stderr.getvalue():
334 # This happens if the mount directory already unmouted
335 log.info('mount point not mounted: %s', self.mountpoint)
336 else:
337 log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
7c673cae 338
f67539c2
TL
339 self.client_remote.run(
340 args=['sudo', run.Raw('PATH=/usr/sbin:$PATH'), 'lsof',
341 run.Raw(';'), 'ps', 'auxf'],
1e59de90 342 timeout=UMOUNT_TIMEOUT, omit_sudo=False)
f67539c2
TL
343
344 # abort the fuse mount, killing all hung processes
345 if self._fuse_conn:
346 self.run_python(dedent("""
347 import os
348 path = "/sys/fs/fuse/connections/{0}/abort"
349 if os.path.exists(path):
350 open(path, "w").write("1")
351 """).format(self._fuse_conn))
352 self._fuse_conn = None
353
f67539c2 354 # make sure its unmounted
1e59de90 355 self._run_umount_lf()
f67539c2 356
7c673cae 357 self._fuse_conn = None
11fdf7f2
TL
358 self.id = None
359 self.inst = None
360 self.addr = None
f67539c2
TL
361 if cleanup:
362 self.cleanup()
7c673cae 363
1e59de90
TL
364 def umount_wait(self, force=False, require_clean=False,
365 timeout=UMOUNT_TIMEOUT):
7c673cae
FG
366 """
367 :param force: Complete cleanly even if the MDS is offline
368 """
9f95a23c 369 if not (self.is_mounted() and self.fuse_daemon):
f67539c2
TL
370 log.debug('ceph-fuse client.{id} is not mounted at {remote} '
371 '{mnt}'.format(id=self.client_id,
372 remote=self.client_remote,
373 mnt=self.hostfs_mntpt))
374 self.cleanup()
9f95a23c
TL
375 return
376
7c673cae
FG
377 if force:
378 assert not require_clean # mutually exclusive
379
380 # When we expect to be forcing, kill the ceph-fuse process directly.
381 # This should avoid hitting the more aggressive fallback killing
382 # in umount() which can affect other mounts too.
383 self.fuse_daemon.stdin.close()
384
385 # However, we will still hit the aggressive wait if there is an ongoing
386 # mount -o remount (especially if the remount is stuck because MDSs
387 # are unavailable)
388
1e59de90
TL
389 if self.is_blocked():
390 self._run_umount_lf()
391 self.cleanup()
392 return
393
f67539c2
TL
394 # cleanup is set to to fail since clieanup must happen after umount is
395 # complete; otherwise following call to run.wait hangs.
396 self.umount(cleanup=False)
7c673cae
FG
397
398 try:
9f95a23c
TL
399 # Permit a timeout, so that we do not block forever
400 run.wait([self.fuse_daemon], timeout)
f67539c2 401
7c673cae 402 except MaxWhileTries:
11fdf7f2
TL
403 log.error("process failed to terminate after unmount. This probably"
404 " indicates a bug within ceph-fuse.")
7c673cae
FG
405 raise
406 except CommandFailedError:
407 if require_clean:
408 raise
409
7c673cae
FG
410 self.cleanup()
411
412 def teardown(self):
413 """
414 Whatever the state of the mount, get it gone.
415 """
416 super(FuseMount, self).teardown()
417
418 self.umount()
419
420 if self.fuse_daemon and not self.fuse_daemon.finished:
421 self.fuse_daemon.stdin.close()
422 try:
423 self.fuse_daemon.wait()
424 except CommandFailedError:
425 pass
426
7c673cae
FG
427 def _asok_path(self):
428 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id)
429
430 @property
431 def _prefix(self):
432 return ""
433
f67539c2 434 def find_admin_socket(self):
7c673cae
FG
435 pyscript = """
436import glob
437import re
438import os
439import subprocess
440
f67539c2 441def _find_admin_socket(client_name):
7c673cae
FG
442 asok_path = "{asok_path}"
443 files = glob.glob(asok_path)
f67539c2 444 mountpoint = "{mountpoint}"
7c673cae
FG
445
446 # Given a non-glob path, it better be there
447 if "*" not in asok_path:
448 assert(len(files) == 1)
449 return files[0]
450
451 for f in files:
452 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
453 if os.path.exists("/proc/{{0}}".format(pid)):
f67539c2
TL
454 with open("/proc/{{0}}/cmdline".format(pid), 'r') as proc_f:
455 contents = proc_f.read()
456 if mountpoint in contents:
457 return f
7c673cae
FG
458 raise RuntimeError("Client socket {{0}} not found".format(client_name))
459
f67539c2 460print(_find_admin_socket("{client_name}"))
7c673cae
FG
461""".format(
462 asok_path=self._asok_path(),
f67539c2
TL
463 client_name="client.{0}".format(self.client_id),
464 mountpoint=self.mountpoint)
7c673cae 465
522d829b 466 asok_path = self.run_python(pyscript, sudo=True)
7c673cae 467 log.info("Found client admin socket at {0}".format(asok_path))
f67539c2 468 return asok_path
7c673cae 469
f67539c2
TL
470 def admin_socket(self, args):
471 asok_path = self.find_admin_socket()
472
473 # Query client ID from admin socket, wait 2 seconds
474 # and retry 10 times if it is not ready
475 with safe_while(sleep=2, tries=10) as proceed:
476 while proceed():
477 try:
478 p = self.client_remote.run(args=
479 ['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
480 stdout=StringIO(), stderr=StringIO(), wait=False,
1e59de90 481 timeout=300)
f67539c2
TL
482 p.wait()
483 break
484 except CommandFailedError:
485 if "connection refused" in p.stderr.getvalue().lower():
486 pass
487
488 return json.loads(p.stdout.getvalue().strip())
7c673cae
FG
489
490 def get_global_id(self):
491 """
492 Look up the CephFS client ID for this mount
493 """
7c673cae
FG
494 return self.admin_socket(['mds_sessions'])['id']
495
11fdf7f2
TL
496 def get_global_inst(self):
497 """
498 Look up the CephFS client instance for this mount
499 """
500 return self.inst
501
502 def get_global_addr(self):
503 """
504 Look up the CephFS client addr for this mount
505 """
506 return self.addr
507
28e407b8
AA
508 def get_client_pid(self):
509 """
510 return pid of ceph-fuse process
511 """
512 status = self.admin_socket(['status'])
513 return status['metadata']['pid']
514
7c673cae
FG
515 def get_osd_epoch(self):
516 """
517 Return 2-tuple of osd_epoch, osd_epoch_barrier
518 """
519 status = self.admin_socket(['status'])
520 return status['osd_epoch'], status['osd_epoch_barrier']
521
522 def get_dentry_count(self):
523 """
524 Return 2-tuple of dentry_count, dentry_pinned_count
525 """
526 status = self.admin_socket(['status'])
527 return status['dentry_count'], status['dentry_pinned_count']
528
529 def set_cache_size(self, size):
530 return self.admin_socket(['config', 'set', 'client_cache_size', str(size)])
f67539c2
TL
531
532 def get_op_read_count(self):
533 return self.admin_socket(['perf', 'dump', 'objecter'])['objecter']['osdop_read']