]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/fuse_mount.py
2 from StringIO
import StringIO
6 from textwrap
import dedent
8 from teuthology
import misc
9 from teuthology
.contextutil
import MaxWhileTries
10 from teuthology
.orchestra
import run
11 from teuthology
.orchestra
.run
import CommandFailedError
12 from .mount
import CephFSMount
13 from tasks
.cephfs
.filesystem
import Filesystem
15 log
= logging
.getLogger(__name__
)
18 class FuseMount(CephFSMount
):
19 def __init__(self
, ctx
, client_config
, test_dir
, client_id
, client_remote
):
20 super(FuseMount
, self
).__init
__(ctx
, test_dir
, client_id
, client_remote
)
22 self
.client_config
= client_config
if client_config
else {}
23 self
.fuse_daemon
= None
24 self
._fuse
_conn
= None
29 def mount(self
, mount_path
=None, mount_fs_name
=None):
30 self
.setupfs(name
=mount_fs_name
)
33 return self
._mount
(mount_path
, mount_fs_name
)
35 # Catch exceptions by the mount() logic (i.e. not remote command
36 # failures) and ensure the mount is not left half-up.
37 # Otherwise we might leave a zombie mount point that causes
38 # anyone traversing cephtest/ to get hung up on.
39 log
.warn("Trying to clean up after failed mount")
40 self
.umount_wait(force
=True)
43 def _mount(self
, mount_path
, mount_fs_name
):
44 log
.info("Client client.%s config is %s" % (self
.client_id
, self
.client_config
))
46 daemon_signal
= 'kill'
47 if self
.client_config
.get('coverage') or self
.client_config
.get('valgrind') is not None:
48 daemon_signal
= 'term'
50 log
.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
51 id=self
.client_id
, remote
=self
.client_remote
, mnt
=self
.mountpoint
))
53 self
.client_remote
.run(
66 '{tdir}/archive/coverage'.format(tdir
=self
.test_dir
),
71 fuse_cmd
= ['ceph-fuse', "-f"]
73 if mount_path
is not None:
74 fuse_cmd
+= ["--client_mountpoint={0}".format(mount_path
)]
76 if mount_fs_name
is not None:
77 fuse_cmd
+= ["--client_mds_namespace={0}".format(mount_fs_name
)]
80 '--name', 'client.{id}'.format(id=self
.client_id
),
81 # TODO ceph-fuse doesn't understand dash dash '--',
85 if self
.client_config
.get('valgrind') is not None:
86 run_cmd
= misc
.get_valgrind_args(
88 'client.{id}'.format(id=self
.client_id
),
90 self
.client_config
.get('valgrind'),
93 run_cmd
.extend(fuse_cmd
)
95 def list_connections():
96 self
.client_remote
.run(
97 args
=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
101 p
= self
.client_remote
.run(
102 args
=["ls", "/sys/fs/fuse/connections"],
107 if p
.exitstatus
!= 0:
110 ls_str
= p
.stdout
.getvalue().strip()
112 return [int(n
) for n
in ls_str
.split("\n")]
116 # Before starting ceph-fuse process, note the contents of
117 # /sys/fs/fuse/connections
118 pre_mount_conns
= list_connections()
119 log
.info("Pre-mount connections: {0}".format(pre_mount_conns
))
121 proc
= self
.client_remote
.run(
123 logger
=log
.getChild('ceph-fuse.{id}'.format(id=self
.client_id
)),
127 self
.fuse_daemon
= proc
129 # Wait for the connection reference to appear in /sys
130 mount_wait
= self
.client_config
.get('mount_wait', 0)
132 log
.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait
))
133 time
.sleep(mount_wait
)
134 timeout
= int(self
.client_config
.get('mount_timeout', 30))
137 post_mount_conns
= list_connections()
138 while len(post_mount_conns
) <= len(pre_mount_conns
):
139 if self
.fuse_daemon
.finished
:
140 # Did mount fail? Raise the CommandFailedError instead of
141 # hitting the "failed to populate /sys/" timeout
142 self
.fuse_daemon
.wait()
146 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
150 post_mount_conns
= list_connections()
152 log
.info("Post-mount connections: {0}".format(post_mount_conns
))
154 # Record our fuse connection number so that we can use it when
156 new_conns
= list(set(post_mount_conns
) - set(pre_mount_conns
))
157 if len(new_conns
) == 0:
158 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns
))
159 elif len(new_conns
) > 1:
160 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns
))
162 self
._fuse
_conn
= new_conns
[0]
164 self
.gather_mount_info()
166 def gather_mount_info(self
):
167 status
= self
.admin_socket(['status'])
168 self
.id = status
['id']
170 self
.inst
= status
['inst_str']
171 self
.addr
= status
['addr_str']
172 except KeyError as e
:
173 sessions
= self
.fs
.rank_asok(['session', 'ls'])
175 if s
['id'] == self
.id:
176 self
.inst
= s
['inst']
177 self
.addr
= self
.inst
.split()[1]
178 if self
.inst
is None:
179 raise RuntimeError("cannot find client session")
181 def is_mounted(self
):
182 proc
= self
.client_remote
.run(
197 except CommandFailedError
:
198 if ("endpoint is not connected" in proc
.stderr
.getvalue()
199 or "Software caused connection abort" in proc
.stderr
.getvalue()):
200 # This happens is fuse is killed without unmount
201 log
.warn("Found stale moutn point at {0}".format(self
.mountpoint
))
204 # This happens if the mount directory doesn't exist
205 log
.info('mount point does not exist: %s', self
.mountpoint
)
208 fstype
= proc
.stdout
.getvalue().rstrip('\n')
209 if fstype
== 'fuseblk':
210 log
.info('ceph-fuse is mounted on %s', self
.mountpoint
)
213 log
.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
217 def wait_until_mounted(self
):
219 Check to make sure that fuse is mounted on mountpoint. If not,
220 sleep for 5 seconds and check again.
223 while not self
.is_mounted():
224 # Even if it's not mounted, it should at least
225 # be running: catch simple failures where it has terminated.
226 assert not self
.fuse_daemon
.poll()
230 # Now that we're mounted, set permissions so that the rest of the test will have
231 # unrestricted access to the filesystem mount.
234 self
.client_remote
.run(args
=['sudo', 'chmod', '1777', self
.mountpoint
], timeout
=(15*60), stderr
=stderr
)
235 except run
.CommandFailedError
:
236 stderr
= stderr
.getvalue()
237 if "Read-only file system".lower() in stderr
.lower():
242 def _mountpoint_exists(self
):
243 return self
.client_remote
.run(args
=["ls", "-d", self
.mountpoint
], check_status
=False, timeout
=(15*60)).exitstatus
== 0
247 log
.info('Running fusermount -u on {name}...'.format(name
=self
.client_remote
.name
))
248 self
.client_remote
.run(
257 except run
.CommandFailedError
:
258 log
.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name
=self
.client_remote
.name
))
260 self
.client_remote
.run(args
=[
262 run
.Raw('PATH=/usr/sbin:$PATH'),
269 # abort the fuse mount, killing all hung processes
271 self
.run_python(dedent("""
273 path = "/sys/fs/fuse/connections/{0}/abort"
274 if os.path.exists(path):
275 open(path, "w").write("1")
276 """).format(self
._fuse
_conn
))
277 self
._fuse
_conn
= None
281 # make sure its unmounted
282 self
.client_remote
.run(
293 except CommandFailedError
:
294 if self
.is_mounted():
297 assert not self
.is_mounted()
298 self
._fuse
_conn
= None
303 def umount_wait(self
, force
=False, require_clean
=False, timeout
=900):
305 :param force: Complete cleanly even if the MDS is offline
308 assert not require_clean
# mutually exclusive
310 # When we expect to be forcing, kill the ceph-fuse process directly.
311 # This should avoid hitting the more aggressive fallback killing
312 # in umount() which can affect other mounts too.
313 self
.fuse_daemon
.stdin
.close()
315 # However, we will still hit the aggressive wait if there is an ongoing
316 # mount -o remount (especially if the remount is stuck because MDSs
323 # Permit a timeout, so that we do not block forever
324 run
.wait([self
.fuse_daemon
], timeout
)
325 except MaxWhileTries
:
326 log
.error("process failed to terminate after unmount. This probably"
327 " indicates a bug within ceph-fuse.")
329 except CommandFailedError
:
337 Remove the mount point.
339 Prerequisite: the client is not mounted.
343 self
.client_remote
.run(
352 except CommandFailedError
:
353 if "No such file or directory" in stderr
.getvalue():
360 Terminate the client without removing the mount point.
362 log
.info('Killing ceph-fuse connection on {name}...'.format(name
=self
.client_remote
.name
))
363 self
.fuse_daemon
.stdin
.close()
365 self
.fuse_daemon
.wait()
366 except CommandFailedError
:
369 def kill_cleanup(self
):
371 Follow up ``kill`` to get to a clean unmounted state.
373 log
.info('Cleaning up killed ceph-fuse connection')
379 Whatever the state of the mount, get it gone.
381 super(FuseMount
, self
).teardown()
385 if self
.fuse_daemon
and not self
.fuse_daemon
.finished
:
386 self
.fuse_daemon
.stdin
.close()
388 self
.fuse_daemon
.wait()
389 except CommandFailedError
:
392 # Indiscriminate, unlike the touchier cleanup()
393 self
.client_remote
.run(
402 def _asok_path(self
):
403 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self
.client_id
)
409 def admin_socket(self
, args
):
416 def find_socket(client_name):
417 asok_path = "{asok_path}"
418 files = glob.glob(asok_path)
420 # Given a non-glob path, it better be there
421 if "*" not in asok_path:
422 assert(len(files) == 1)
426 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
427 if os.path.exists("/proc/{{0}}".format(pid)):
429 raise RuntimeError("Client socket {{0}} not found".format(client_name))
431 print find_socket("{client_name}")
433 asok_path
=self
._asok
_path
(),
434 client_name
="client.{0}".format(self
.client_id
))
436 # Find the admin socket
437 p
= self
.client_remote
.run(args
=[
438 'sudo', 'python2', '-c', pyscript
439 ], stdout
=StringIO(), timeout
=(15*60))
440 asok_path
= p
.stdout
.getvalue().strip()
441 log
.info("Found client admin socket at {0}".format(asok_path
))
443 # Query client ID from admin socket
444 p
= self
.client_remote
.run(
445 args
=['sudo', self
._prefix
+ 'ceph', '--admin-daemon', asok_path
] + args
,
446 stdout
=StringIO(), timeout
=(15*60))
447 return json
.loads(p
.stdout
.getvalue())
449 def get_global_id(self
):
451 Look up the CephFS client ID for this mount
453 return self
.admin_socket(['mds_sessions'])['id']
455 def get_global_inst(self
):
457 Look up the CephFS client instance for this mount
461 def get_global_addr(self
):
463 Look up the CephFS client addr for this mount
467 def get_client_pid(self
):
469 return pid of ceph-fuse process
471 status
= self
.admin_socket(['status'])
472 return status
['metadata']['pid']
474 def get_osd_epoch(self
):
476 Return 2-tuple of osd_epoch, osd_epoch_barrier
478 status
= self
.admin_socket(['status'])
479 return status
['osd_epoch'], status
['osd_epoch_barrier']
481 def get_dentry_count(self
):
483 Return 2-tuple of dentry_count, dentry_pinned_count
485 status
= self
.admin_socket(['status'])
486 return status
['dentry_count'], status
['dentry_pinned_count']
488 def set_cache_size(self
, size
):
489 return self
.admin_socket(['config', 'set', 'client_cache_size', str(size
)])