]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/fuse_mount.py
2 from io
import StringIO
9 from textwrap
import dedent
11 from teuthology
import misc
12 from teuthology
.contextutil
import MaxWhileTries
13 from teuthology
.orchestra
import run
14 from teuthology
.orchestra
.run
import CommandFailedError
15 from tasks
.cephfs
.mount
import CephFSMount
17 log
= logging
.getLogger(__name__
)
20 class FuseMount(CephFSMount
):
21 def __init__(self
, ctx
, client_config
, test_dir
, client_id
, client_remote
):
22 super(FuseMount
, self
).__init
__(ctx
, test_dir
, client_id
, client_remote
)
24 self
.client_config
= client_config
if client_config
else {}
25 self
.fuse_daemon
= None
26 self
._fuse
_conn
= None
31 def mount(self
, mount_path
=None, mount_fs_name
=None, mountpoint
=None, mount_options
=[]):
32 if mountpoint
is not None:
33 self
.mountpoint
= mountpoint
34 self
.setupfs(name
=mount_fs_name
)
37 return self
._mount
(mount_path
, mount_fs_name
, mount_options
)
39 # Catch exceptions by the mount() logic (i.e. not remote command
40 # failures) and ensure the mount is not left half-up.
41 # Otherwise we might leave a zombie mount point that causes
42 # anyone traversing cephtest/ to get hung up on.
43 log
.warning("Trying to clean up after failed mount")
44 self
.umount_wait(force
=True)
47 def _mount(self
, mount_path
, mount_fs_name
, mount_options
):
48 log
.info("Client client.%s config is %s" % (self
.client_id
, self
.client_config
))
50 daemon_signal
= 'kill'
51 if self
.client_config
.get('coverage') or self
.client_config
.get('valgrind') is not None:
52 daemon_signal
= 'term'
54 log
.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
55 id=self
.client_id
, remote
=self
.client_remote
, mnt
=self
.mountpoint
))
57 self
.client_remote
.run(args
=['mkdir', '-p', self
.mountpoint
],
58 timeout
=(15*60), cwd
=self
.test_dir
)
64 '{tdir}/archive/coverage'.format(tdir
=self
.test_dir
),
69 fuse_cmd
= ['ceph-fuse', "-f"]
71 if mount_path
is not None:
72 fuse_cmd
+= ["--client_mountpoint={0}".format(mount_path
)]
74 if mount_fs_name
is not None:
75 fuse_cmd
+= ["--client_fs={0}".format(mount_fs_name
)]
77 fuse_cmd
+= mount_options
80 '--name', 'client.{id}'.format(id=self
.client_id
),
81 # TODO ceph-fuse doesn't understand dash dash '--',
86 if self
.client_config
.get('valgrind') is not None:
87 run_cmd
= misc
.get_valgrind_args(
89 'client.{id}'.format(id=self
.client_id
),
91 self
.client_config
.get('valgrind'),
93 cwd
= None # misc.get_valgrind_args chdir for us
95 run_cmd
.extend(fuse_cmd
)
97 def list_connections():
98 from teuthology
.misc
import get_system_type
100 conn_dir
= "/sys/fs/fuse/connections"
102 self
.client_remote
.run(args
=['sudo', 'modprobe', 'fuse'],
104 self
.client_remote
.run(
105 args
=["sudo", "mount", "-t", "fusectl", conn_dir
, conn_dir
],
106 check_status
=False, timeout
=(30))
109 ls_str
= self
.client_remote
.sh("ls " + conn_dir
,
111 timeout
=(15*60)).strip()
112 except CommandFailedError
:
116 return [int(n
) for n
in ls_str
.split("\n")]
120 # Before starting ceph-fuse process, note the contents of
121 # /sys/fs/fuse/connections
122 pre_mount_conns
= list_connections()
123 log
.info("Pre-mount connections: {0}".format(pre_mount_conns
))
125 proc
= self
.client_remote
.run(
128 logger
=log
.getChild('ceph-fuse.{id}'.format(id=self
.client_id
)),
132 self
.fuse_daemon
= proc
134 # Wait for the connection reference to appear in /sys
135 mount_wait
= self
.client_config
.get('mount_wait', 0)
137 log
.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait
))
138 time
.sleep(mount_wait
)
139 timeout
= int(self
.client_config
.get('mount_timeout', 30))
142 post_mount_conns
= list_connections()
143 while len(post_mount_conns
) <= len(pre_mount_conns
):
144 if self
.fuse_daemon
.finished
:
145 # Did mount fail? Raise the CommandFailedError instead of
146 # hitting the "failed to populate /sys/" timeout
147 self
.fuse_daemon
.wait()
151 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
155 post_mount_conns
= list_connections()
157 log
.info("Post-mount connections: {0}".format(post_mount_conns
))
159 # Record our fuse connection number so that we can use it when
161 new_conns
= list(set(post_mount_conns
) - set(pre_mount_conns
))
162 if len(new_conns
) == 0:
163 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns
))
164 elif len(new_conns
) > 1:
165 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns
))
167 self
._fuse
_conn
= new_conns
[0]
169 self
.gather_mount_info()
171 def gather_mount_info(self
):
172 status
= self
.admin_socket(['status'])
173 self
.id = status
['id']
174 self
.client_pid
= status
['metadata']['pid']
176 self
.inst
= status
['inst_str']
177 self
.addr
= status
['addr_str']
179 sessions
= self
.fs
.rank_asok(['session', 'ls'])
181 if s
['id'] == self
.id:
182 self
.inst
= s
['inst']
183 self
.addr
= self
.inst
.split()[1]
184 if self
.inst
is None:
185 raise RuntimeError("cannot find client session")
187 def is_mounted(self
):
188 proc
= self
.client_remote
.run(
204 except CommandFailedError
:
205 error
= six
.ensure_str(proc
.stderr
.getvalue())
206 if ("endpoint is not connected" in error
207 or "Software caused connection abort" in error
):
208 # This happens is fuse is killed without unmount
209 log
.warning("Found stale moutn point at {0}".format(self
.mountpoint
))
212 # This happens if the mount directory doesn't exist
213 log
.info('mount point does not exist: %s', self
.mountpoint
)
216 fstype
= six
.ensure_str(proc
.stdout
.getvalue()).rstrip('\n')
217 if fstype
== 'fuseblk':
218 log
.info('ceph-fuse is mounted on %s', self
.mountpoint
)
221 log
.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
225 def wait_until_mounted(self
):
227 Check to make sure that fuse is mounted on mountpoint. If not,
228 sleep for 5 seconds and check again.
231 while not self
.is_mounted():
232 # Even if it's not mounted, it should at least
233 # be running: catch simple failures where it has terminated.
234 assert not self
.fuse_daemon
.poll()
238 # Now that we're mounted, set permissions so that the rest of the test will have
239 # unrestricted access to the filesystem mount.
242 self
.client_remote
.run(args
=['sudo', 'chmod', '1777', self
.mountpoint
], timeout
=(15*60), cwd
=self
.test_dir
, stderr
=stderr
)
243 except run
.CommandFailedError
:
244 stderr
= stderr
.getvalue()
245 if b
"Read-only file system".lower() in stderr
.lower():
250 def _mountpoint_exists(self
):
251 return self
.client_remote
.run(args
=["ls", "-d", self
.mountpoint
], check_status
=False, cwd
=self
.test_dir
, timeout
=(15*60)).exitstatus
== 0
254 if not self
.is_mounted():
258 log
.info('Running fusermount -u on {name}...'.format(name
=self
.client_remote
.name
))
259 self
.client_remote
.run(
269 except run
.CommandFailedError
:
270 log
.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name
=self
.client_remote
.name
))
272 self
.client_remote
.run(args
=[
274 run
.Raw('PATH=/usr/sbin:$PATH'),
281 # abort the fuse mount, killing all hung processes
283 self
.run_python(dedent("""
285 path = "/sys/fs/fuse/connections/{0}/abort"
286 if os.path.exists(path):
287 open(path, "w").write("1")
288 """).format(self
._fuse
_conn
))
289 self
._fuse
_conn
= None
293 # make sure its unmounted
294 self
.client_remote
.run(
305 except CommandFailedError
:
306 if self
.is_mounted():
309 assert not self
.is_mounted()
310 self
._fuse
_conn
= None
315 def umount_wait(self
, force
=False, require_clean
=False, timeout
=900):
317 :param force: Complete cleanly even if the MDS is offline
319 if not (self
.is_mounted() and self
.fuse_daemon
):
320 log
.debug('ceph-fuse client.{id} is not mounted at {remote} {mnt}'.format(id=self
.client_id
,
321 remote
=self
.client_remote
,
322 mnt
=self
.mountpoint
))
326 assert not require_clean
# mutually exclusive
328 # When we expect to be forcing, kill the ceph-fuse process directly.
329 # This should avoid hitting the more aggressive fallback killing
330 # in umount() which can affect other mounts too.
331 self
.fuse_daemon
.stdin
.close()
333 # However, we will still hit the aggressive wait if there is an ongoing
334 # mount -o remount (especially if the remount is stuck because MDSs
340 # Permit a timeout, so that we do not block forever
341 run
.wait([self
.fuse_daemon
], timeout
)
342 except MaxWhileTries
:
343 log
.error("process failed to terminate after unmount. This probably"
344 " indicates a bug within ceph-fuse.")
346 except CommandFailedError
:
354 Remove the mount point.
356 Prerequisite: the client is not mounted.
360 self
.client_remote
.run(
371 except CommandFailedError
:
372 if b
"No such file or directory" in stderr
.getvalue():
379 Terminate the client without removing the mount point.
381 log
.info('Killing ceph-fuse connection on {name}...'.format(name
=self
.client_remote
.name
))
382 self
.fuse_daemon
.stdin
.close()
384 self
.fuse_daemon
.wait()
385 except CommandFailedError
:
388 def kill_cleanup(self
):
390 Follow up ``kill`` to get to a clean unmounted state.
392 log
.info('Cleaning up killed ceph-fuse connection')
398 Whatever the state of the mount, get it gone.
400 super(FuseMount
, self
).teardown()
404 if self
.fuse_daemon
and not self
.fuse_daemon
.finished
:
405 self
.fuse_daemon
.stdin
.close()
407 self
.fuse_daemon
.wait()
408 except CommandFailedError
:
411 # Indiscriminate, unlike the touchier cleanup()
412 self
.client_remote
.run(
422 def _asok_path(self
):
423 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self
.client_id
)
429 def admin_socket(self
, args
):
436 def find_socket(client_name):
437 asok_path = "{asok_path}"
438 files = glob.glob(asok_path)
440 # Given a non-glob path, it better be there
441 if "*" not in asok_path:
442 assert(len(files) == 1)
446 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
447 if os.path.exists("/proc/{{0}}".format(pid)):
449 raise RuntimeError("Client socket {{0}} not found".format(client_name))
451 print(find_socket("{client_name}"))
453 asok_path
=self
._asok
_path
(),
454 client_name
="client.{0}".format(self
.client_id
))
456 # Find the admin socket
457 asok_path
= self
.client_remote
.sh(
458 ['sudo', 'python3', '-c', pyscript
],
460 timeout
=(15*60)).strip()
461 log
.info("Found client admin socket at {0}".format(asok_path
))
463 # Query client ID from admin socket
464 json_data
= self
.client_remote
.sh(
465 ['sudo', self
._prefix
+ 'ceph', '--admin-daemon', asok_path
] + args
,
468 return json
.loads(json_data
)
470 def get_global_id(self
):
472 Look up the CephFS client ID for this mount
474 return self
.admin_socket(['mds_sessions'])['id']
476 def get_global_inst(self
):
478 Look up the CephFS client instance for this mount
482 def get_global_addr(self
):
484 Look up the CephFS client addr for this mount
488 def get_client_pid(self
):
490 return pid of ceph-fuse process
492 status
= self
.admin_socket(['status'])
493 return status
['metadata']['pid']
495 def get_osd_epoch(self
):
497 Return 2-tuple of osd_epoch, osd_epoch_barrier
499 status
= self
.admin_socket(['status'])
500 return status
['osd_epoch'], status
['osd_epoch_barrier']
502 def get_dentry_count(self
):
504 Return 2-tuple of dentry_count, dentry_pinned_count
506 status
= self
.admin_socket(['status'])
507 return status
['dentry_count'], status
['dentry_pinned_count']
509 def set_cache_size(self
, size
):
510 return self
.admin_socket(['config', 'set', 'client_cache_size', str(size
)])