]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/fuse_mount.py
2 from StringIO
import StringIO
6 from textwrap
import dedent
8 from teuthology
import misc
9 from teuthology
.contextutil
import MaxWhileTries
10 from teuthology
.orchestra
import run
11 from teuthology
.orchestra
.run
import CommandFailedError
12 from .mount
import CephFSMount
14 log
= logging
.getLogger(__name__
)
17 class FuseMount(CephFSMount
):
18 def __init__(self
, client_config
, test_dir
, client_id
, client_remote
):
19 super(FuseMount
, self
).__init
__(test_dir
, client_id
, client_remote
)
21 self
.client_config
= client_config
if client_config
else {}
22 self
.fuse_daemon
= None
23 self
._fuse
_conn
= None
25 def mount(self
, mount_path
=None, mount_fs_name
=None):
27 return self
._mount
(mount_path
, mount_fs_name
)
29 # Catch exceptions by the mount() logic (i.e. not remote command
30 # failures) and ensure the mount is not left half-up.
31 # Otherwise we might leave a zombie mount point that causes
32 # anyone traversing cephtest/ to get hung up on.
33 log
.warn("Trying to clean up after failed mount")
34 self
.umount_wait(force
=True)
37 def _mount(self
, mount_path
, mount_fs_name
):
38 log
.info("Client client.%s config is %s" % (self
.client_id
, self
.client_config
))
40 daemon_signal
= 'kill'
41 if self
.client_config
.get('coverage') or self
.client_config
.get('valgrind') is not None:
42 daemon_signal
= 'term'
44 log
.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
45 id=self
.client_id
, remote
=self
.client_remote
, mnt
=self
.mountpoint
))
47 self
.client_remote
.run(
60 '{tdir}/archive/coverage'.format(tdir
=self
.test_dir
),
65 fuse_cmd
= ['ceph-fuse', "-f"]
67 if mount_path
is not None:
68 fuse_cmd
+= ["--client_mountpoint={0}".format(mount_path
)]
70 if mount_fs_name
is not None:
71 fuse_cmd
+= ["--client_mds_namespace={0}".format(mount_fs_name
)]
74 '--name', 'client.{id}'.format(id=self
.client_id
),
75 # TODO ceph-fuse doesn't understand dash dash '--',
79 if self
.client_config
.get('valgrind') is not None:
80 run_cmd
= misc
.get_valgrind_args(
82 'client.{id}'.format(id=self
.client_id
),
84 self
.client_config
.get('valgrind'),
87 run_cmd
.extend(fuse_cmd
)
89 def list_connections():
90 self
.client_remote
.run(
91 args
=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
95 p
= self
.client_remote
.run(
96 args
=["ls", "/sys/fs/fuse/connections"],
101 if p
.exitstatus
!= 0:
104 ls_str
= p
.stdout
.getvalue().strip()
106 return [int(n
) for n
in ls_str
.split("\n")]
110 # Before starting ceph-fuse process, note the contents of
111 # /sys/fs/fuse/connections
112 pre_mount_conns
= list_connections()
113 log
.info("Pre-mount connections: {0}".format(pre_mount_conns
))
115 proc
= self
.client_remote
.run(
117 logger
=log
.getChild('ceph-fuse.{id}'.format(id=self
.client_id
)),
121 self
.fuse_daemon
= proc
123 # Wait for the connection reference to appear in /sys
124 mount_wait
= self
.client_config
.get('mount_wait', 0)
126 log
.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait
))
127 time
.sleep(mount_wait
)
128 timeout
= int(self
.client_config
.get('mount_timeout', 30))
131 post_mount_conns
= list_connections()
132 while len(post_mount_conns
) <= len(pre_mount_conns
):
133 if self
.fuse_daemon
.finished
:
134 # Did mount fail? Raise the CommandFailedError instead of
135 # hitting the "failed to populate /sys/" timeout
136 self
.fuse_daemon
.wait()
140 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
144 post_mount_conns
= list_connections()
146 log
.info("Post-mount connections: {0}".format(post_mount_conns
))
148 # Record our fuse connection number so that we can use it when
150 new_conns
= list(set(post_mount_conns
) - set(pre_mount_conns
))
151 if len(new_conns
) == 0:
152 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns
))
153 elif len(new_conns
) > 1:
154 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns
))
156 self
._fuse
_conn
= new_conns
[0]
158 def is_mounted(self
):
159 proc
= self
.client_remote
.run(
174 except CommandFailedError
:
175 if ("endpoint is not connected" in proc
.stderr
.getvalue()
176 or "Software caused connection abort" in proc
.stderr
.getvalue()):
177 # This happens is fuse is killed without unmount
178 log
.warn("Found stale moutn point at {0}".format(self
.mountpoint
))
181 # This happens if the mount directory doesn't exist
182 log
.info('mount point does not exist: %s', self
.mountpoint
)
185 fstype
= proc
.stdout
.getvalue().rstrip('\n')
186 if fstype
== 'fuseblk':
187 log
.info('ceph-fuse is mounted on %s', self
.mountpoint
)
190 log
.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
194 def wait_until_mounted(self
):
196 Check to make sure that fuse is mounted on mountpoint. If not,
197 sleep for 5 seconds and check again.
200 while not self
.is_mounted():
201 # Even if it's not mounted, it should at least
202 # be running: catch simple failures where it has terminated.
203 assert not self
.fuse_daemon
.poll()
207 # Now that we're mounted, set permissions so that the rest of the test will have
208 # unrestricted access to the filesystem mount.
211 self
.client_remote
.run(args
=['sudo', 'chmod', '1777', self
.mountpoint
], timeout
=(15*60), stderr
=stderr
)
212 except run
.CommandFailedError
:
213 stderr
= stderr
.getvalue()
214 if "Read-only file system".lower() in stderr
.lower():
219 def _mountpoint_exists(self
):
220 return self
.client_remote
.run(args
=["ls", "-d", self
.mountpoint
], check_status
=False, timeout
=(15*60)).exitstatus
== 0
224 log
.info('Running fusermount -u on {name}...'.format(name
=self
.client_remote
.name
))
225 self
.client_remote
.run(
234 except run
.CommandFailedError
:
235 log
.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name
=self
.client_remote
.name
))
237 self
.client_remote
.run(args
=[
239 run
.Raw('PATH=/usr/sbin:$PATH'),
246 # abort the fuse mount, killing all hung processes
248 self
.run_python(dedent("""
250 path = "/sys/fs/fuse/connections/{0}/abort"
251 if os.path.exists(path):
252 open(path, "w").write("1")
253 """).format(self
._fuse
_conn
))
254 self
._fuse
_conn
= None
258 # make sure its unmounted
259 self
.client_remote
.run(
270 except CommandFailedError
:
271 if self
.is_mounted():
274 assert not self
.is_mounted()
275 self
._fuse
_conn
= None
277 def umount_wait(self
, force
=False, require_clean
=False, timeout
=900):
279 :param force: Complete cleanly even if the MDS is offline
282 assert not require_clean
# mutually exclusive
284 # When we expect to be forcing, kill the ceph-fuse process directly.
285 # This should avoid hitting the more aggressive fallback killing
286 # in umount() which can affect other mounts too.
287 self
.fuse_daemon
.stdin
.close()
289 # However, we will still hit the aggressive wait if there is an ongoing
290 # mount -o remount (especially if the remount is stuck because MDSs
297 # Permit a timeout, so that we do not block forever
298 run
.wait([self
.fuse_daemon
], timeout
)
299 except MaxWhileTries
:
300 log
.error("process failed to terminate after unmount. This probably"
301 "indicates a bug within ceph-fuse.")
303 except CommandFailedError
:
311 Remove the mount point.
313 Prerequisite: the client is not mounted.
317 self
.client_remote
.run(
326 except CommandFailedError
:
327 if "No such file or directory" in stderr
.getvalue():
334 Terminate the client without removing the mount point.
336 self
.fuse_daemon
.stdin
.close()
338 self
.fuse_daemon
.wait()
339 except CommandFailedError
:
342 def kill_cleanup(self
):
344 Follow up ``kill`` to get to a clean unmounted state.
351 Whatever the state of the mount, get it gone.
353 super(FuseMount
, self
).teardown()
357 if self
.fuse_daemon
and not self
.fuse_daemon
.finished
:
358 self
.fuse_daemon
.stdin
.close()
360 self
.fuse_daemon
.wait()
361 except CommandFailedError
:
364 # Indiscriminate, unlike the touchier cleanup()
365 self
.client_remote
.run(
374 def _asok_path(self
):
375 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self
.client_id
)
381 def admin_socket(self
, args
):
388 def find_socket(client_name):
389 asok_path = "{asok_path}"
390 files = glob.glob(asok_path)
392 # Given a non-glob path, it better be there
393 if "*" not in asok_path:
394 assert(len(files) == 1)
398 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
399 if os.path.exists("/proc/{{0}}".format(pid)):
401 raise RuntimeError("Client socket {{0}} not found".format(client_name))
403 print find_socket("{client_name}")
405 asok_path
=self
._asok
_path
(),
406 client_name
="client.{0}".format(self
.client_id
))
408 # Find the admin socket
409 p
= self
.client_remote
.run(args
=[
410 'sudo', 'python2', '-c', pyscript
411 ], stdout
=StringIO(), timeout
=(15*60))
412 asok_path
= p
.stdout
.getvalue().strip()
413 log
.info("Found client admin socket at {0}".format(asok_path
))
415 # Query client ID from admin socket
416 p
= self
.client_remote
.run(
417 args
=['sudo', self
._prefix
+ 'ceph', '--admin-daemon', asok_path
] + args
,
418 stdout
=StringIO(), timeout
=(15*60))
419 return json
.loads(p
.stdout
.getvalue())
421 def get_global_id(self
):
423 Look up the CephFS client ID for this mount
425 return self
.admin_socket(['mds_sessions'])['id']
427 def get_client_pid(self
):
429 return pid of ceph-fuse process
431 status
= self
.admin_socket(['status'])
432 return status
['metadata']['pid']
434 def get_osd_epoch(self
):
436 Return 2-tuple of osd_epoch, osd_epoch_barrier
438 status
= self
.admin_socket(['status'])
439 return status
['osd_epoch'], status
['osd_epoch_barrier']
441 def get_dentry_count(self
):
443 Return 2-tuple of dentry_count, dentry_pinned_count
445 status
= self
.admin_socket(['status'])
446 return status
['dentry_count'], status
['dentry_pinned_count']
448 def set_cache_size(self
, size
):
449 return self
.admin_socket(['config', 'set', 'client_cache_size', str(size
)])