]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/fuse_mount.py
5 from io
import StringIO
6 from textwrap
import dedent
8 from teuthology
.contextutil
import MaxWhileTries
9 from teuthology
.contextutil
import safe_while
10 from teuthology
.orchestra
import run
11 from teuthology
.exceptions
import CommandFailedError
12 from tasks
.ceph_manager
import get_valgrind_args
13 from tasks
.cephfs
.mount
import CephFSMount
, UMOUNT_TIMEOUT
15 log
= logging
.getLogger(__name__
)
17 # Refer mount.py for docstrings.
18 class FuseMount(CephFSMount
):
19 def __init__(self
, ctx
, test_dir
, client_id
, client_remote
,
20 client_keyring_path
=None, cephfs_name
=None,
21 cephfs_mntpt
=None, hostfs_mntpt
=None, brxnet
=None,
23 super(FuseMount
, self
).__init
__(ctx
=ctx
, test_dir
=test_dir
,
24 client_id
=client_id
, client_remote
=client_remote
,
25 client_keyring_path
=client_keyring_path
, hostfs_mntpt
=hostfs_mntpt
,
26 cephfs_name
=cephfs_name
, cephfs_mntpt
=cephfs_mntpt
, brxnet
=brxnet
,
27 client_config
=client_config
)
29 self
.fuse_daemon
= None
30 self
._fuse
_conn
= None
34 self
.mount_timeout
= int(self
.client_config
.get('mount_timeout', 30))
38 "--admin-socket", "/var/run/ceph/$cluster-$name.$pid.asok"]
39 self
._mount
_cmd
_cwd
= self
.test_dir
40 if self
.client_config
.get('valgrind') is not None:
41 self
.cwd
= None # get_valgrind_args chdir for us
42 self
._mount
_cmd
_logger
= log
.getChild('ceph-fuse.{id}'.format(id=self
.client_id
))
43 self
._mount
_cmd
_stdin
= run
.PIPE
45 def mount(self
, mntopts
=None, check_status
=True, mntargs
=None, **kwargs
):
46 self
.update_attrs(**kwargs
)
47 self
.assert_and_log_minimum_mount_details()
52 return self
._mount
(mntopts
, mntargs
, check_status
)
54 # Catch exceptions by the mount() logic (i.e. not remote command
55 # failures) and ensure the mount is not left half-up.
56 # Otherwise we might leave a zombie mount point that causes
57 # anyone traversing cephtest/ to get hung up on.
58 log
.warning("Trying to clean up after failed mount")
59 self
.umount_wait(force
=True)
62 def _mount(self
, mntopts
, mntargs
, check_status
):
63 log
.info("Client client.%s config is %s" % (self
.client_id
,
68 retval
= self
._run
_mount
_cmd
(mntopts
, mntargs
, check_status
)
72 self
.gather_mount_info()
74 def _run_mount_cmd(self
, mntopts
, mntargs
, check_status
):
75 mount_cmd
= self
._get
_mount
_cmd
(mntopts
, mntargs
)
76 mountcmd_stdout
, mountcmd_stderr
= StringIO(), StringIO()
78 # Before starting ceph-fuse process, note the contents of
79 # /sys/fs/fuse/connections
80 pre_mount_conns
= self
._list
_fuse
_conns
()
81 log
.info("Pre-mount connections: {0}".format(pre_mount_conns
))
83 self
.fuse_daemon
= self
.client_remote
.run(
85 cwd
=self
._mount
_cmd
_cwd
,
86 logger
=self
._mount
_cmd
_logger
,
87 stdin
=self
._mount
_cmd
_stdin
,
88 stdout
=mountcmd_stdout
,
89 stderr
=mountcmd_stderr
,
93 return self
._wait
_and
_record
_our
_fuse
_conn
(
94 check_status
, pre_mount_conns
, mountcmd_stdout
, mountcmd_stderr
)
96 def _get_mount_cmd(self
, mntopts
, mntargs
):
97 daemon_signal
= 'kill'
98 if self
.client_config
.get('coverage') or \
99 self
.client_config
.get('valgrind') is not None:
100 daemon_signal
= 'term'
102 mount_cmd
= ['sudo', 'adjust-ulimits', 'ceph-coverage',
103 '{tdir}/archive/coverage'.format(tdir
=self
.test_dir
),
104 'daemon-helper', daemon_signal
]
106 mount_cmd
= self
._add
_valgrind
_args
(mount_cmd
)
107 mount_cmd
= ['sudo'] + self
._nsenter
_args
+ mount_cmd
109 mount_cmd
+= self
._mount
_bin
+ [self
.hostfs_mntpt
]
111 mount_cmd
+= ['--id', self
.client_id
]
112 if self
.client_keyring_path
and self
.client_id
:
113 mount_cmd
+= ['-k', self
.client_keyring_path
]
115 self
.validate_subvol_options()
117 if self
.cephfs_mntpt
:
118 mount_cmd
+= ["--client_mountpoint=" + self
.cephfs_mntpt
]
121 mount_cmd
+= ["--client_fs=" + self
.cephfs_name
]
123 mount_cmd
.extend(('-o', ','.join(mntopts
)))
125 mount_cmd
.extend(mntargs
)
129 def _add_valgrind_args(self
, mount_cmd
):
130 if self
.client_config
.get('valgrind') is not None:
131 mount_cmd
= get_valgrind_args(
133 'client.{id}'.format(id=self
.client_id
),
135 self
.client_config
.get('valgrind'),
141 def _list_fuse_conns(self
):
142 conn_dir
= "/sys/fs/fuse/connections"
144 self
.client_remote
.run(args
=['sudo', 'modprobe', 'fuse'],
146 self
.client_remote
.run(
147 args
=["sudo", "mount", "-t", "fusectl", conn_dir
, conn_dir
],
148 check_status
=False, timeout
=(30))
151 ls_str
= self
.client_remote
.sh("ls " + conn_dir
,
154 except CommandFailedError
:
158 return [int(n
) for n
in ls_str
.split("\n")]
162 def _wait_and_record_our_fuse_conn(self
, check_status
, pre_mount_conns
,
163 mountcmd_stdout
, mountcmd_stderr
):
165 Wait for the connection reference to appear in /sys
169 post_mount_conns
= self
._list
_fuse
_conns
()
170 while len(post_mount_conns
) <= len(pre_mount_conns
):
171 if self
.fuse_daemon
.finished
:
172 # Did mount fail? Raise the CommandFailedError instead of
173 # hitting the "failed to populate /sys/" timeout
175 self
.fuse_daemon
.wait()
176 except CommandFailedError
as e
:
177 log
.info('mount command failed.')
181 return (e
, mountcmd_stdout
.getvalue(),
182 mountcmd_stderr
.getvalue())
185 if waited
> self
._fuse
_conn
_check
_timeout
:
187 "Fuse mount failed to populate/sys/ after {} "
188 "seconds".format(waited
))
190 post_mount_conns
= self
._list
_fuse
_conns
()
192 log
.info("Post-mount connections: {0}".format(post_mount_conns
))
194 self
._record
_our
_fuse
_conn
(pre_mount_conns
, post_mount_conns
)
197 def _fuse_conn_check_timeout(self
):
198 mount_wait
= self
.client_config
.get('mount_wait', 0)
200 log
.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait
))
201 time
.sleep(mount_wait
)
202 timeout
= int(self
.client_config
.get('mount_timeout', 30))
205 def _record_our_fuse_conn(self
, pre_mount_conns
, post_mount_conns
):
207 Record our fuse connection number so that we can use it when forcing
210 new_conns
= list(set(post_mount_conns
) - set(pre_mount_conns
))
211 if len(new_conns
) == 0:
212 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns
))
213 elif len(new_conns
) > 1:
214 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns
))
216 self
._fuse
_conn
= new_conns
[0]
218 def gather_mount_info(self
):
219 status
= self
.admin_socket(['status'])
220 self
.id = status
['id']
221 self
.client_pid
= status
['metadata']['pid']
223 self
.inst
= status
['inst_str']
224 self
.addr
= status
['addr_str']
226 sessions
= self
.fs
.rank_asok(['session', 'ls'])
228 if s
['id'] == self
.id:
229 self
.inst
= s
['inst']
230 self
.addr
= self
.inst
.split()[1]
231 if self
.inst
is None:
232 raise RuntimeError("cannot find client session")
234 def check_mounted_state(self
):
235 proc
= self
.client_remote
.run(
250 except CommandFailedError
:
251 error
= proc
.stderr
.getvalue()
252 if ("endpoint is not connected" in error
253 or "Software caused connection abort" in error
):
254 # This happens is fuse is killed without unmount
255 log
.warning("Found stale mount point at {0}".format(self
.hostfs_mntpt
))
258 # This happens if the mount directory doesn't exist
259 log
.info('mount point does not exist: %s', self
.hostfs_mntpt
)
262 fstype
= proc
.stdout
.getvalue().rstrip('\n')
263 if fstype
== 'fuseblk':
264 log
.info('ceph-fuse is mounted on %s', self
.hostfs_mntpt
)
267 log
.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
271 def wait_until_mounted(self
):
273 Check to make sure that fuse is mounted on mountpoint. If not,
274 sleep for 5 seconds and check again.
277 while not self
.check_mounted_state():
278 # Even if it's not mounted, it should at least
279 # be running: catch simple failures where it has terminated.
280 assert not self
.fuse_daemon
.poll()
284 # Now that we're mounted, set permissions so that the rest of the test
285 # will have unrestricted access to the filesystem mount.
286 for retry
in range(10):
289 self
.client_remote
.run(args
=['sudo', 'chmod', '1777',
292 stderr
=stderr
, omit_sudo
=False)
294 except run
.CommandFailedError
:
295 stderr
= stderr
.getvalue().lower()
296 if "read-only file system" in stderr
:
298 elif "permission denied" in stderr
:
303 def _mountpoint_exists(self
):
304 return self
.client_remote
.run(args
=["ls", "-d", self
.hostfs_mntpt
],
306 timeout
=300).exitstatus
== 0
308 def umount(self
, cleanup
=True):
310 umount() must not run cleanup() when it's called by umount_wait()
311 since "run.wait([self.fuse_daemon], timeout)" would hang otherwise.
313 if not self
.is_mounted():
317 if self
.is_blocked():
318 self
._run
_umount
_lf
()
324 log
.info('Running fusermount -u on {name}...'.format(name
=self
.client_remote
.name
))
326 self
.client_remote
.run(
327 args
=['sudo', 'fusermount', '-u', self
.hostfs_mntpt
],
328 stderr
=stderr
, timeout
=UMOUNT_TIMEOUT
, omit_sudo
=False)
329 except run
.CommandFailedError
:
330 if "mountpoint not found" in stderr
.getvalue():
331 # This happens if the mount directory doesn't exist
332 log
.info('mount point does not exist: %s', self
.mountpoint
)
333 elif "not mounted" in stderr
.getvalue():
334 # This happens if the mount directory already unmouted
335 log
.info('mount point not mounted: %s', self
.mountpoint
)
337 log
.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name
=self
.client_remote
.name
))
339 self
.client_remote
.run(
340 args
=['sudo', run
.Raw('PATH=/usr/sbin:$PATH'), 'lsof',
341 run
.Raw(';'), 'ps', 'auxf'],
342 timeout
=UMOUNT_TIMEOUT
, omit_sudo
=False)
344 # abort the fuse mount, killing all hung processes
346 self
.run_python(dedent("""
348 path = "/sys/fs/fuse/connections/{0}/abort"
349 if os.path.exists(path):
350 open(path, "w").write("1")
351 """).format(self
._fuse
_conn
))
352 self
._fuse
_conn
= None
354 # make sure its unmounted
355 self
._run
_umount
_lf
()
357 self
._fuse
_conn
= None
364 def umount_wait(self
, force
=False, require_clean
=False,
365 timeout
=UMOUNT_TIMEOUT
):
367 :param force: Complete cleanly even if the MDS is offline
369 if not (self
.is_mounted() and self
.fuse_daemon
):
370 log
.debug('ceph-fuse client.{id} is not mounted at {remote} '
371 '{mnt}'.format(id=self
.client_id
,
372 remote
=self
.client_remote
,
373 mnt
=self
.hostfs_mntpt
))
378 assert not require_clean
# mutually exclusive
380 # When we expect to be forcing, kill the ceph-fuse process directly.
381 # This should avoid hitting the more aggressive fallback killing
382 # in umount() which can affect other mounts too.
383 self
.fuse_daemon
.stdin
.close()
385 # However, we will still hit the aggressive wait if there is an ongoing
386 # mount -o remount (especially if the remount is stuck because MDSs
389 if self
.is_blocked():
390 self
._run
_umount
_lf
()
394 # cleanup is set to to fail since clieanup must happen after umount is
395 # complete; otherwise following call to run.wait hangs.
396 self
.umount(cleanup
=False)
399 # Permit a timeout, so that we do not block forever
400 run
.wait([self
.fuse_daemon
], timeout
)
402 except MaxWhileTries
:
403 log
.error("process failed to terminate after unmount. This probably"
404 " indicates a bug within ceph-fuse.")
406 except CommandFailedError
:
414 Whatever the state of the mount, get it gone.
416 super(FuseMount
, self
).teardown()
420 if self
.fuse_daemon
and not self
.fuse_daemon
.finished
:
421 self
.fuse_daemon
.stdin
.close()
423 self
.fuse_daemon
.wait()
424 except CommandFailedError
:
427 def _asok_path(self
):
428 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self
.client_id
)
434 def find_admin_socket(self
):
441 def _find_admin_socket(client_name):
442 asok_path = "{asok_path}"
443 files = glob.glob(asok_path)
444 mountpoint = "{mountpoint}"
446 # Given a non-glob path, it better be there
447 if "*" not in asok_path:
448 assert(len(files) == 1)
452 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
453 if os.path.exists("/proc/{{0}}".format(pid)):
454 with open("/proc/{{0}}/cmdline".format(pid), 'r') as proc_f:
455 contents = proc_f.read()
456 if mountpoint in contents:
458 raise RuntimeError("Client socket {{0}} not found".format(client_name))
460 print(_find_admin_socket("{client_name}"))
462 asok_path
=self
._asok
_path
(),
463 client_name
="client.{0}".format(self
.client_id
),
464 mountpoint
=self
.mountpoint
)
466 asok_path
= self
.run_python(pyscript
, sudo
=True)
467 log
.info("Found client admin socket at {0}".format(asok_path
))
470 def admin_socket(self
, args
):
471 asok_path
= self
.find_admin_socket()
473 # Query client ID from admin socket, wait 2 seconds
474 # and retry 10 times if it is not ready
475 with
safe_while(sleep
=2, tries
=10) as proceed
:
478 p
= self
.client_remote
.run(args
=
479 ['sudo', self
._prefix
+ 'ceph', '--admin-daemon', asok_path
] + args
,
480 stdout
=StringIO(), stderr
=StringIO(), wait
=False,
484 except CommandFailedError
:
485 if "connection refused" in p
.stderr
.getvalue().lower():
488 return json
.loads(p
.stdout
.getvalue().strip())
490 def get_global_id(self
):
492 Look up the CephFS client ID for this mount
494 return self
.admin_socket(['mds_sessions'])['id']
496 def get_global_inst(self
):
498 Look up the CephFS client instance for this mount
502 def get_global_addr(self
):
504 Look up the CephFS client addr for this mount
508 def get_client_pid(self
):
510 return pid of ceph-fuse process
512 status
= self
.admin_socket(['status'])
513 return status
['metadata']['pid']
515 def get_osd_epoch(self
):
517 Return 2-tuple of osd_epoch, osd_epoch_barrier
519 status
= self
.admin_socket(['status'])
520 return status
['osd_epoch'], status
['osd_epoch_barrier']
522 def get_dentry_count(self
):
524 Return 2-tuple of dentry_count, dentry_pinned_count
526 status
= self
.admin_socket(['status'])
527 return status
['dentry_count'], status
['dentry_pinned_count']
529 def set_cache_size(self
, size
):
530 return self
.admin_socket(['config', 'set', 'client_cache_size', str(size
)])
532 def get_op_read_count(self
):
533 return self
.admin_socket(['perf', 'dump', 'objecter'])['objecter']['osdop_read']