]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/fuse_mount.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / qa / tasks / cephfs / fuse_mount.py
CommitLineData
7c673cae
FG
1import json
2import time
3import logging
e306af50 4
f67539c2 5from io import StringIO
7c673cae
FG
6from textwrap import dedent
7
7c673cae 8from teuthology.contextutil import MaxWhileTries
f67539c2 9from teuthology.contextutil import safe_while
7c673cae
FG
10from teuthology.orchestra import run
11from teuthology.orchestra.run import CommandFailedError
f67539c2 12from tasks.ceph_manager import get_valgrind_args
e306af50 13from tasks.cephfs.mount import CephFSMount
7c673cae
FG
14
15log = logging.getLogger(__name__)
16
f67539c2 17# Refer mount.py for docstrings.
7c673cae 18class FuseMount(CephFSMount):
f67539c2
TL
19 def __init__(self, ctx, client_config, test_dir, client_id,
20 client_remote, client_keyring_path=None, cephfs_name=None,
21 cephfs_mntpt=None, hostfs_mntpt=None, brxnet=None):
22 super(FuseMount, self).__init__(ctx=ctx, test_dir=test_dir,
23 client_id=client_id, client_remote=client_remote,
24 client_keyring_path=client_keyring_path, hostfs_mntpt=hostfs_mntpt,
25 cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
7c673cae
FG
26
27 self.client_config = client_config if client_config else {}
28 self.fuse_daemon = None
29 self._fuse_conn = None
11fdf7f2
TL
30 self.id = None
31 self.inst = None
32 self.addr = None
7c673cae 33
f67539c2
TL
34 def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
35 self.update_attrs(**kwargs)
36 self.assert_and_log_minimum_mount_details()
37
38 self.setup_netns()
39
40 if createfs:
41 # TODO: don't call setupfs() from within mount(), since it's
42 # absurd. The proper order should be: create FS first and then
43 # call mount().
44 self.setupfs(name=self.cephfs_name)
11fdf7f2 45
7c673cae 46 try:
f67539c2 47 return self._mount(mntopts, check_status)
7c673cae
FG
48 except RuntimeError:
49 # Catch exceptions by the mount() logic (i.e. not remote command
50 # failures) and ensure the mount is not left half-up.
51 # Otherwise we might leave a zombie mount point that causes
52 # anyone traversing cephtest/ to get hung up on.
e306af50 53 log.warning("Trying to clean up after failed mount")
7c673cae
FG
54 self.umount_wait(force=True)
55 raise
56
f67539c2
TL
57 def _mount(self, mntopts, check_status):
58 log.info("Client client.%s config is %s" % (self.client_id,
59 self.client_config))
7c673cae
FG
60
61 daemon_signal = 'kill'
f67539c2
TL
62 if self.client_config.get('coverage') or \
63 self.client_config.get('valgrind') is not None:
7c673cae
FG
64 daemon_signal = 'term'
65
f91f0fd5
TL
66 # Use 0000 mode to prevent undesired modifications to the mountpoint on
67 # the local file system.
f67539c2
TL
68 script = f'mkdir -m 0000 -p -v {self.hostfs_mntpt}'.split()
69 stderr = StringIO()
70 try:
71 self.client_remote.run(args=script, timeout=(15*60),
72 stderr=StringIO())
73 except CommandFailedError:
74 if 'file exists' not in stderr.getvalue().lower():
75 raise
7c673cae
FG
76
77 run_cmd = [
78 'sudo',
79 'adjust-ulimits',
80 'ceph-coverage',
81 '{tdir}/archive/coverage'.format(tdir=self.test_dir),
82 'daemon-helper',
83 daemon_signal,
84 ]
85
f67539c2
TL
86 fuse_cmd = [
87 'ceph-fuse', "-f",
88 "--admin-socket", "/var/run/ceph/$cluster-$name.$pid.asok",
7c673cae 89 ]
f67539c2
TL
90 if self.client_id is not None:
91 fuse_cmd += ['--id', self.client_id]
92 if self.client_keyring_path and self.client_id is not None:
93 fuse_cmd += ['-k', self.client_keyring_path]
94 if self.cephfs_mntpt is not None:
95 fuse_cmd += ["--client_mountpoint=" + self.cephfs_mntpt]
96 if self.cephfs_name is not None:
97 fuse_cmd += ["--client_fs=" + self.cephfs_name]
98 if mntopts:
99 fuse_cmd += mntopts
100 fuse_cmd.append(self.hostfs_mntpt)
7c673cae
FG
101
102 if self.client_config.get('valgrind') is not None:
f67539c2 103 run_cmd = get_valgrind_args(
7c673cae
FG
104 self.test_dir,
105 'client.{id}'.format(id=self.client_id),
106 run_cmd,
107 self.client_config.get('valgrind'),
f67539c2 108 cd=False
7c673cae 109 )
f67539c2
TL
110
111 netns_prefix = ['sudo', 'nsenter',
112 '--net=/var/run/netns/{0}'.format(self.netns_name)]
113 run_cmd = netns_prefix + run_cmd
7c673cae
FG
114
115 run_cmd.extend(fuse_cmd)
116
117 def list_connections():
e306af50
TL
118 conn_dir = "/sys/fs/fuse/connections"
119
120 self.client_remote.run(args=['sudo', 'modprobe', 'fuse'],
121 check_status=False)
7c673cae 122 self.client_remote.run(
e306af50
TL
123 args=["sudo", "mount", "-t", "fusectl", conn_dir, conn_dir],
124 check_status=False, timeout=(30))
125
126 try:
127 ls_str = self.client_remote.sh("ls " + conn_dir,
128 stdout=StringIO(),
129 timeout=(15*60)).strip()
130 except CommandFailedError:
7c673cae
FG
131 return []
132
7c673cae
FG
133 if ls_str:
134 return [int(n) for n in ls_str.split("\n")]
135 else:
136 return []
137
138 # Before starting ceph-fuse process, note the contents of
139 # /sys/fs/fuse/connections
140 pre_mount_conns = list_connections()
141 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
142
f67539c2
TL
143 mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
144 self.fuse_daemon = self.client_remote.run(
7c673cae
FG
145 args=run_cmd,
146 logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
147 stdin=run.PIPE,
f67539c2
TL
148 stdout=mountcmd_stdout,
149 stderr=mountcmd_stderr,
150 wait=False
7c673cae 151 )
7c673cae
FG
152
153 # Wait for the connection reference to appear in /sys
154 mount_wait = self.client_config.get('mount_wait', 0)
155 if mount_wait > 0:
156 log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
157 time.sleep(mount_wait)
158 timeout = int(self.client_config.get('mount_timeout', 30))
159 waited = 0
160
161 post_mount_conns = list_connections()
162 while len(post_mount_conns) <= len(pre_mount_conns):
163 if self.fuse_daemon.finished:
164 # Did mount fail? Raise the CommandFailedError instead of
165 # hitting the "failed to populate /sys/" timeout
f67539c2
TL
166 try:
167 self.fuse_daemon.wait()
168 except CommandFailedError as e:
169 log.info('mount command failed.')
170 if check_status:
171 raise
172 else:
173 return (e, mountcmd_stdout.getvalue(),
174 mountcmd_stderr.getvalue())
7c673cae
FG
175 time.sleep(1)
176 waited += 1
177 if waited > timeout:
f67539c2
TL
178 raise RuntimeError(
179 "Fuse mount failed to populate/sys/ after {} "
180 "seconds".format(waited))
7c673cae
FG
181 else:
182 post_mount_conns = list_connections()
183
184 log.info("Post-mount connections: {0}".format(post_mount_conns))
185
186 # Record our fuse connection number so that we can use it when
187 # forcing an unmount
188 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
189 if len(new_conns) == 0:
190 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
191 elif len(new_conns) > 1:
192 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
193 else:
194 self._fuse_conn = new_conns[0]
195
11fdf7f2
TL
196 self.gather_mount_info()
197
f67539c2
TL
198 self.mounted = True
199
11fdf7f2
TL
200 def gather_mount_info(self):
201 status = self.admin_socket(['status'])
202 self.id = status['id']
eafe8130 203 self.client_pid = status['metadata']['pid']
11fdf7f2
TL
204 try:
205 self.inst = status['inst_str']
206 self.addr = status['addr_str']
9f95a23c 207 except KeyError:
11fdf7f2
TL
208 sessions = self.fs.rank_asok(['session', 'ls'])
209 for s in sessions:
210 if s['id'] == self.id:
211 self.inst = s['inst']
212 self.addr = self.inst.split()[1]
213 if self.inst is None:
214 raise RuntimeError("cannot find client session")
215
f67539c2 216 def check_mounted_state(self):
7c673cae
FG
217 proc = self.client_remote.run(
218 args=[
219 'stat',
220 '--file-system',
221 '--printf=%T\n',
222 '--',
f67539c2 223 self.hostfs_mntpt,
7c673cae 224 ],
f67539c2
TL
225 stdout=StringIO(),
226 stderr=StringIO(),
f64942e4
AA
227 wait=False,
228 timeout=(15*60)
7c673cae
FG
229 )
230 try:
231 proc.wait()
232 except CommandFailedError:
f67539c2
TL
233 error = proc.stderr.getvalue()
234 if ("endpoint is not connected" in error
235 or "Software caused connection abort" in error):
236 # This happens is fuse is killed without unmount
237 log.warning("Found stale mount point at {0}".format(self.hostfs_mntpt))
238 return True
239 else:
240 # This happens if the mount directory doesn't exist
241 log.info('mount point does not exist: %s', self.hostfs_mntpt)
242 return False
7c673cae 243
f67539c2 244 fstype = proc.stdout.getvalue().rstrip('\n')
7c673cae 245 if fstype == 'fuseblk':
f67539c2 246 log.info('ceph-fuse is mounted on %s', self.hostfs_mntpt)
7c673cae
FG
247 return True
248 else:
249 log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
250 fstype=fstype))
251 return False
252
253 def wait_until_mounted(self):
254 """
255 Check to make sure that fuse is mounted on mountpoint. If not,
256 sleep for 5 seconds and check again.
257 """
258
f67539c2 259 while not self.check_mounted_state():
7c673cae
FG
260 # Even if it's not mounted, it should at least
261 # be running: catch simple failures where it has terminated.
262 assert not self.fuse_daemon.poll()
263
264 time.sleep(5)
265
f67539c2
TL
266 self.mounted = True
267
268 # Now that we're mounted, set permissions so that the rest of the test
269 # will have unrestricted access to the filesystem mount.
270 for retry in range(10):
271 try:
272 stderr = StringIO()
273 self.client_remote.run(args=['sudo', 'chmod', '1777',
274 self.hostfs_mntpt],
275 timeout=(15*60),
276 stderr=stderr, omit_sudo=False)
277 break
278 except run.CommandFailedError:
279 stderr = stderr.getvalue().lower()
280 if "read-only file system" in stderr:
281 break
282 elif "permission denied" in stderr:
283 time.sleep(5)
284 else:
285 raise
7c673cae
FG
286
287 def _mountpoint_exists(self):
f67539c2 288 return self.client_remote.run(args=["ls", "-d", self.hostfs_mntpt], check_status=False, timeout=(15*60)).exitstatus == 0
7c673cae 289
f67539c2
TL
290 def umount(self, cleanup=True):
291 """
292 umount() must not run cleanup() when it's called by umount_wait()
293 since "run.wait([self.fuse_daemon], timeout)" would hang otherwise.
294 """
1911f103 295 if not self.is_mounted():
f67539c2
TL
296 if cleanup:
297 self.cleanup()
1911f103
TL
298 return
299
7c673cae
FG
300 try:
301 log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
f67539c2
TL
302 stderr = StringIO()
303 self.client_remote.run(args=['sudo', 'fusermount', '-u',
304 self.hostfs_mntpt],
305 stderr=stderr,
306 timeout=(30*60), omit_sudo=False)
7c673cae 307 except run.CommandFailedError:
f67539c2
TL
308 if "mountpoint not found" in stderr.getvalue():
309 # This happens if the mount directory doesn't exist
310 log.info('mount point does not exist: %s', self.mountpoint)
311 elif "not mounted" in stderr.getvalue():
312 # This happens if the mount directory already unmouted
313 log.info('mount point not mounted: %s', self.mountpoint)
314 else:
315 log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
7c673cae 316
f67539c2
TL
317 self.client_remote.run(
318 args=['sudo', run.Raw('PATH=/usr/sbin:$PATH'), 'lsof',
319 run.Raw(';'), 'ps', 'auxf'],
320 timeout=(60*15), omit_sudo=False)
321
322 # abort the fuse mount, killing all hung processes
323 if self._fuse_conn:
324 self.run_python(dedent("""
325 import os
326 path = "/sys/fs/fuse/connections/{0}/abort"
327 if os.path.exists(path):
328 open(path, "w").write("1")
329 """).format(self._fuse_conn))
330 self._fuse_conn = None
331
332 stderr = StringIO()
333 # make sure its unmounted
334 try:
335 self.client_remote.run(args=['sudo', 'umount', '-l', '-f',
336 self.hostfs_mntpt],
337 stderr=stderr, timeout=(60*15), omit_sudo=False)
338 except CommandFailedError:
339 if self.is_mounted():
340 raise
341
342 self.mounted = False
7c673cae 343 self._fuse_conn = None
11fdf7f2
TL
344 self.id = None
345 self.inst = None
346 self.addr = None
f67539c2
TL
347 if cleanup:
348 self.cleanup()
7c673cae 349
28e407b8 350 def umount_wait(self, force=False, require_clean=False, timeout=900):
7c673cae
FG
351 """
352 :param force: Complete cleanly even if the MDS is offline
353 """
9f95a23c 354 if not (self.is_mounted() and self.fuse_daemon):
f67539c2
TL
355 log.debug('ceph-fuse client.{id} is not mounted at {remote} '
356 '{mnt}'.format(id=self.client_id,
357 remote=self.client_remote,
358 mnt=self.hostfs_mntpt))
359 self.cleanup()
9f95a23c
TL
360 return
361
7c673cae
FG
362 if force:
363 assert not require_clean # mutually exclusive
364
365 # When we expect to be forcing, kill the ceph-fuse process directly.
366 # This should avoid hitting the more aggressive fallback killing
367 # in umount() which can affect other mounts too.
368 self.fuse_daemon.stdin.close()
369
370 # However, we will still hit the aggressive wait if there is an ongoing
371 # mount -o remount (especially if the remount is stuck because MDSs
372 # are unavailable)
373
f67539c2
TL
374 # cleanup is set to to fail since clieanup must happen after umount is
375 # complete; otherwise following call to run.wait hangs.
376 self.umount(cleanup=False)
7c673cae
FG
377
378 try:
9f95a23c
TL
379 # Permit a timeout, so that we do not block forever
380 run.wait([self.fuse_daemon], timeout)
f67539c2 381
7c673cae 382 except MaxWhileTries:
11fdf7f2
TL
383 log.error("process failed to terminate after unmount. This probably"
384 " indicates a bug within ceph-fuse.")
7c673cae
FG
385 raise
386 except CommandFailedError:
387 if require_clean:
388 raise
389
f67539c2 390 self.mounted = False
7c673cae
FG
391 self.cleanup()
392
393 def teardown(self):
394 """
395 Whatever the state of the mount, get it gone.
396 """
397 super(FuseMount, self).teardown()
398
399 self.umount()
400
401 if self.fuse_daemon and not self.fuse_daemon.finished:
402 self.fuse_daemon.stdin.close()
403 try:
404 self.fuse_daemon.wait()
405 except CommandFailedError:
406 pass
407
f67539c2 408 self.mounted = False
7c673cae
FG
409
410 def _asok_path(self):
411 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id)
412
413 @property
414 def _prefix(self):
415 return ""
416
f67539c2 417 def find_admin_socket(self):
7c673cae
FG
418 pyscript = """
419import glob
420import re
421import os
422import subprocess
423
f67539c2 424def _find_admin_socket(client_name):
7c673cae
FG
425 asok_path = "{asok_path}"
426 files = glob.glob(asok_path)
f67539c2 427 mountpoint = "{mountpoint}"
7c673cae
FG
428
429 # Given a non-glob path, it better be there
430 if "*" not in asok_path:
431 assert(len(files) == 1)
432 return files[0]
433
434 for f in files:
435 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
436 if os.path.exists("/proc/{{0}}".format(pid)):
f67539c2
TL
437 with open("/proc/{{0}}/cmdline".format(pid), 'r') as proc_f:
438 contents = proc_f.read()
439 if mountpoint in contents:
440 return f
7c673cae
FG
441 raise RuntimeError("Client socket {{0}} not found".format(client_name))
442
f67539c2 443print(_find_admin_socket("{client_name}"))
7c673cae
FG
444""".format(
445 asok_path=self._asok_path(),
f67539c2
TL
446 client_name="client.{0}".format(self.client_id),
447 mountpoint=self.mountpoint)
7c673cae 448
f67539c2 449 asok_path = self.run_python(pyscript)
7c673cae 450 log.info("Found client admin socket at {0}".format(asok_path))
f67539c2 451 return asok_path
7c673cae 452
f67539c2
TL
453 def admin_socket(self, args):
454 asok_path = self.find_admin_socket()
455
456 # Query client ID from admin socket, wait 2 seconds
457 # and retry 10 times if it is not ready
458 with safe_while(sleep=2, tries=10) as proceed:
459 while proceed():
460 try:
461 p = self.client_remote.run(args=
462 ['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
463 stdout=StringIO(), stderr=StringIO(), wait=False,
464 timeout=(15*60))
465 p.wait()
466 break
467 except CommandFailedError:
468 if "connection refused" in p.stderr.getvalue().lower():
469 pass
470
471 return json.loads(p.stdout.getvalue().strip())
7c673cae
FG
472
473 def get_global_id(self):
474 """
475 Look up the CephFS client ID for this mount
476 """
7c673cae
FG
477 return self.admin_socket(['mds_sessions'])['id']
478
11fdf7f2
TL
479 def get_global_inst(self):
480 """
481 Look up the CephFS client instance for this mount
482 """
483 return self.inst
484
485 def get_global_addr(self):
486 """
487 Look up the CephFS client addr for this mount
488 """
489 return self.addr
490
28e407b8
AA
491 def get_client_pid(self):
492 """
493 return pid of ceph-fuse process
494 """
495 status = self.admin_socket(['status'])
496 return status['metadata']['pid']
497
7c673cae
FG
498 def get_osd_epoch(self):
499 """
500 Return 2-tuple of osd_epoch, osd_epoch_barrier
501 """
502 status = self.admin_socket(['status'])
503 return status['osd_epoch'], status['osd_epoch_barrier']
504
505 def get_dentry_count(self):
506 """
507 Return 2-tuple of dentry_count, dentry_pinned_count
508 """
509 status = self.admin_socket(['status'])
510 return status['dentry_count'], status['dentry_pinned_count']
511
512 def set_cache_size(self, size):
513 return self.admin_socket(['config', 'set', 'client_cache_size', str(size)])
f67539c2
TL
514
515 def get_op_read_count(self):
516 return self.admin_socket(['perf', 'dump', 'objecter'])['objecter']['osdop_read']