]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/fuse_mount.py
b121680b0b39040ff31044c5f7a8f6776621222c
2 from StringIO
import StringIO
6 from textwrap
import dedent
8 from teuthology
import misc
9 from teuthology
.contextutil
import MaxWhileTries
10 from teuthology
.orchestra
import run
11 from teuthology
.orchestra
.run
import CommandFailedError
12 from .mount
import CephFSMount
14 log
= logging
.getLogger(__name__
)
17 class FuseMount(CephFSMount
):
18 def __init__(self
, client_config
, test_dir
, client_id
, client_remote
):
19 super(FuseMount
, self
).__init
__(test_dir
, client_id
, client_remote
)
21 self
.client_config
= client_config
if client_config
else {}
22 self
.fuse_daemon
= None
23 self
._fuse
_conn
= None
25 def mount(self
, mount_path
=None, mount_fs_name
=None):
27 return self
._mount
(mount_path
, mount_fs_name
)
29 # Catch exceptions by the mount() logic (i.e. not remote command
30 # failures) and ensure the mount is not left half-up.
31 # Otherwise we might leave a zombie mount point that causes
32 # anyone traversing cephtest/ to get hung up on.
33 log
.warn("Trying to clean up after failed mount")
34 self
.umount_wait(force
=True)
37 def _mount(self
, mount_path
, mount_fs_name
):
38 log
.info("Client client.%s config is %s" % (self
.client_id
, self
.client_config
))
40 daemon_signal
= 'kill'
41 if self
.client_config
.get('coverage') or self
.client_config
.get('valgrind') is not None:
42 daemon_signal
= 'term'
44 log
.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
45 id=self
.client_id
, remote
=self
.client_remote
, mnt
=self
.mountpoint
))
47 self
.client_remote
.run(
59 '{tdir}/archive/coverage'.format(tdir
=self
.test_dir
),
64 fuse_cmd
= ['ceph-fuse', "-f"]
66 if mount_path
is not None:
67 fuse_cmd
+= ["--client_mountpoint={0}".format(mount_path
)]
69 if mount_fs_name
is not None:
70 fuse_cmd
+= ["--client_mds_namespace={0}".format(mount_fs_name
)]
73 '--name', 'client.{id}'.format(id=self
.client_id
),
74 # TODO ceph-fuse doesn't understand dash dash '--',
78 if self
.client_config
.get('valgrind') is not None:
79 run_cmd
= misc
.get_valgrind_args(
81 'client.{id}'.format(id=self
.client_id
),
83 self
.client_config
.get('valgrind'),
86 run_cmd
.extend(fuse_cmd
)
88 def list_connections():
89 self
.client_remote
.run(
90 args
=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
93 p
= self
.client_remote
.run(
94 args
=["ls", "/sys/fs/fuse/connections"],
101 ls_str
= p
.stdout
.getvalue().strip()
103 return [int(n
) for n
in ls_str
.split("\n")]
107 # Before starting ceph-fuse process, note the contents of
108 # /sys/fs/fuse/connections
109 pre_mount_conns
= list_connections()
110 log
.info("Pre-mount connections: {0}".format(pre_mount_conns
))
112 proc
= self
.client_remote
.run(
114 logger
=log
.getChild('ceph-fuse.{id}'.format(id=self
.client_id
)),
118 self
.fuse_daemon
= proc
120 # Wait for the connection reference to appear in /sys
121 mount_wait
= self
.client_config
.get('mount_wait', 0)
123 log
.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait
))
124 time
.sleep(mount_wait
)
125 timeout
= int(self
.client_config
.get('mount_timeout', 30))
128 post_mount_conns
= list_connections()
129 while len(post_mount_conns
) <= len(pre_mount_conns
):
130 if self
.fuse_daemon
.finished
:
131 # Did mount fail? Raise the CommandFailedError instead of
132 # hitting the "failed to populate /sys/" timeout
133 self
.fuse_daemon
.wait()
137 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
141 post_mount_conns
= list_connections()
143 log
.info("Post-mount connections: {0}".format(post_mount_conns
))
145 # Record our fuse connection number so that we can use it when
147 new_conns
= list(set(post_mount_conns
) - set(pre_mount_conns
))
148 if len(new_conns
) == 0:
149 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns
))
150 elif len(new_conns
) > 1:
151 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns
))
153 self
._fuse
_conn
= new_conns
[0]
155 def is_mounted(self
):
156 proc
= self
.client_remote
.run(
170 except CommandFailedError
:
171 if ("endpoint is not connected" in proc
.stderr
.getvalue()
172 or "Software caused connection abort" in proc
.stderr
.getvalue()):
173 # This happens is fuse is killed without unmount
174 log
.warn("Found stale moutn point at {0}".format(self
.mountpoint
))
177 # This happens if the mount directory doesn't exist
178 log
.info('mount point does not exist: %s', self
.mountpoint
)
181 fstype
= proc
.stdout
.getvalue().rstrip('\n')
182 if fstype
== 'fuseblk':
183 log
.info('ceph-fuse is mounted on %s', self
.mountpoint
)
186 log
.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
190 def wait_until_mounted(self
):
192 Check to make sure that fuse is mounted on mountpoint. If not,
193 sleep for 5 seconds and check again.
196 while not self
.is_mounted():
197 # Even if it's not mounted, it should at least
198 # be running: catch simple failures where it has terminated.
199 assert not self
.fuse_daemon
.poll()
203 # Now that we're mounted, set permissions so that the rest of the test will have
204 # unrestricted access to the filesystem mount.
205 self
.client_remote
.run(
206 args
=['sudo', 'chmod', '1777', self
.mountpoint
])
208 def _mountpoint_exists(self
):
209 return self
.client_remote
.run(args
=["ls", "-d", self
.mountpoint
], check_status
=False).exitstatus
== 0
213 log
.info('Running fusermount -u on {name}...'.format(name
=self
.client_remote
.name
))
214 self
.client_remote
.run(
222 except run
.CommandFailedError
:
223 log
.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name
=self
.client_remote
.name
))
225 self
.client_remote
.run(args
=[
227 run
.Raw('PATH=/usr/sbin:$PATH'),
234 # abort the fuse mount, killing all hung processes
236 self
.run_python(dedent("""
238 path = "/sys/fs/fuse/connections/{0}/abort"
239 if os.path.exists(path):
240 open(path, "w").write("1")
241 """).format(self
._fuse
_conn
))
242 self
._fuse
_conn
= None
246 # make sure its unmounted
247 self
.client_remote
.run(
257 except CommandFailedError
:
258 if self
.is_mounted():
261 assert not self
.is_mounted()
262 self
._fuse
_conn
= None
264 def umount_wait(self
, force
=False, require_clean
=False, timeout
=900):
266 :param force: Complete cleanly even if the MDS is offline
269 assert not require_clean
# mutually exclusive
271 # When we expect to be forcing, kill the ceph-fuse process directly.
272 # This should avoid hitting the more aggressive fallback killing
273 # in umount() which can affect other mounts too.
274 self
.fuse_daemon
.stdin
.close()
276 # However, we will still hit the aggressive wait if there is an ongoing
277 # mount -o remount (especially if the remount is stuck because MDSs
284 # Permit a timeout, so that we do not block forever
285 run
.wait([self
.fuse_daemon
], timeout
)
286 except MaxWhileTries
:
287 log
.error("process failed to terminate after unmount. This probably"
288 "indicates a bug within ceph-fuse.")
290 except CommandFailedError
:
298 Remove the mount point.
300 Prerequisite: the client is not mounted.
304 self
.client_remote
.run(
312 except CommandFailedError
:
313 if "No such file or directory" in stderr
.getvalue():
320 Terminate the client without removing the mount point.
322 self
.fuse_daemon
.stdin
.close()
324 self
.fuse_daemon
.wait()
325 except CommandFailedError
:
328 def kill_cleanup(self
):
330 Follow up ``kill`` to get to a clean unmounted state.
337 Whatever the state of the mount, get it gone.
339 super(FuseMount
, self
).teardown()
343 if self
.fuse_daemon
and not self
.fuse_daemon
.finished
:
344 self
.fuse_daemon
.stdin
.close()
346 self
.fuse_daemon
.wait()
347 except CommandFailedError
:
350 # Indiscriminate, unlike the touchier cleanup()
351 self
.client_remote
.run(
359 def _asok_path(self
):
360 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self
.client_id
)
366 def admin_socket(self
, args
):
373 def find_socket(client_name):
374 asok_path = "{asok_path}"
375 files = glob.glob(asok_path)
377 # Given a non-glob path, it better be there
378 if "*" not in asok_path:
379 assert(len(files) == 1)
383 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
384 if os.path.exists("/proc/{{0}}".format(pid)):
386 raise RuntimeError("Client socket {{0}} not found".format(client_name))
388 print find_socket("{client_name}")
390 asok_path
=self
._asok
_path
(),
391 client_name
="client.{0}".format(self
.client_id
))
393 # Find the admin socket
394 p
= self
.client_remote
.run(args
=[
395 'python', '-c', pyscript
396 ], stdout
=StringIO())
397 asok_path
= p
.stdout
.getvalue().strip()
398 log
.info("Found client admin socket at {0}".format(asok_path
))
400 # Query client ID from admin socket
401 p
= self
.client_remote
.run(
402 args
=['sudo', self
._prefix
+ 'ceph', '--admin-daemon', asok_path
] + args
,
404 return json
.loads(p
.stdout
.getvalue())
406 def get_global_id(self
):
408 Look up the CephFS client ID for this mount
410 return self
.admin_socket(['mds_sessions'])['id']
412 def get_client_pid(self
):
414 return pid of ceph-fuse process
416 status
= self
.admin_socket(['status'])
417 return status
['metadata']['pid']
419 def get_osd_epoch(self
):
421 Return 2-tuple of osd_epoch, osd_epoch_barrier
423 status
= self
.admin_socket(['status'])
424 return status
['osd_epoch'], status
['osd_epoch_barrier']
426 def get_dentry_count(self
):
428 Return 2-tuple of dentry_count, dentry_pinned_count
430 status
= self
.admin_socket(['status'])
431 return status
['dentry_count'], status
['dentry_pinned_count']
433 def set_cache_size(self
, size
):
434 return self
.admin_socket(['config', 'set', 'client_cache_size', str(size
)])