]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/fuse_mount.py
1 from StringIO
import StringIO
5 from textwrap
import dedent
7 from teuthology
import misc
8 from teuthology
.contextutil
import MaxWhileTries
9 from teuthology
.orchestra
import run
10 from teuthology
.orchestra
.run
import CommandFailedError
11 from .mount
import CephFSMount
13 log
= logging
.getLogger(__name__
)
16 class FuseMount(CephFSMount
):
17 def __init__(self
, ctx
, client_config
, test_dir
, client_id
, client_remote
):
18 super(FuseMount
, self
).__init
__(ctx
, test_dir
, client_id
, client_remote
)
20 self
.client_config
= client_config
if client_config
else {}
21 self
.fuse_daemon
= None
22 self
._fuse
_conn
= None
27 def mount(self
, mount_path
=None, mount_fs_name
=None, mountpoint
=None, mount_options
=[]):
28 if mountpoint
is not None:
29 self
.mountpoint
= mountpoint
30 self
.setupfs(name
=mount_fs_name
)
33 return self
._mount
(mount_path
, mount_fs_name
, mount_options
)
35 # Catch exceptions by the mount() logic (i.e. not remote command
36 # failures) and ensure the mount is not left half-up.
37 # Otherwise we might leave a zombie mount point that causes
38 # anyone traversing cephtest/ to get hung up on.
39 log
.warn("Trying to clean up after failed mount")
40 self
.umount_wait(force
=True)
43 def _mount(self
, mount_path
, mount_fs_name
, mount_options
):
44 log
.info("Client client.%s config is %s" % (self
.client_id
, self
.client_config
))
46 daemon_signal
= 'kill'
47 if self
.client_config
.get('coverage') or self
.client_config
.get('valgrind') is not None:
48 daemon_signal
= 'term'
50 log
.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
51 id=self
.client_id
, remote
=self
.client_remote
, mnt
=self
.mountpoint
))
53 self
.client_remote
.run(args
=['mkdir', '-p', self
.mountpoint
],
54 timeout
=(15*60), cwd
=self
.test_dir
)
60 '{tdir}/archive/coverage'.format(tdir
=self
.test_dir
),
65 fuse_cmd
= ['ceph-fuse', "-f"]
67 if mount_path
is not None:
68 fuse_cmd
+= ["--client_mountpoint={0}".format(mount_path
)]
70 if mount_fs_name
is not None:
71 fuse_cmd
+= ["--client_fs={0}".format(mount_fs_name
)]
73 fuse_cmd
+= mount_options
76 '--name', 'client.{id}'.format(id=self
.client_id
),
77 # TODO ceph-fuse doesn't understand dash dash '--',
82 if self
.client_config
.get('valgrind') is not None:
83 run_cmd
= misc
.get_valgrind_args(
85 'client.{id}'.format(id=self
.client_id
),
87 self
.client_config
.get('valgrind'),
89 cwd
= None # misc.get_valgrind_args chdir for us
91 run_cmd
.extend(fuse_cmd
)
93 def list_connections():
94 self
.client_remote
.run(
95 args
=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
99 p
= self
.client_remote
.run(
100 args
=["ls", "/sys/fs/fuse/connections"],
105 if p
.exitstatus
!= 0:
108 ls_str
= p
.stdout
.getvalue().strip()
110 return [int(n
) for n
in ls_str
.split("\n")]
114 # Before starting ceph-fuse process, note the contents of
115 # /sys/fs/fuse/connections
116 pre_mount_conns
= list_connections()
117 log
.info("Pre-mount connections: {0}".format(pre_mount_conns
))
119 proc
= self
.client_remote
.run(
122 logger
=log
.getChild('ceph-fuse.{id}'.format(id=self
.client_id
)),
126 self
.fuse_daemon
= proc
128 # Wait for the connection reference to appear in /sys
129 mount_wait
= self
.client_config
.get('mount_wait', 0)
131 log
.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait
))
132 time
.sleep(mount_wait
)
133 timeout
= int(self
.client_config
.get('mount_timeout', 30))
136 post_mount_conns
= list_connections()
137 while len(post_mount_conns
) <= len(pre_mount_conns
):
138 if self
.fuse_daemon
.finished
:
139 # Did mount fail? Raise the CommandFailedError instead of
140 # hitting the "failed to populate /sys/" timeout
141 self
.fuse_daemon
.wait()
145 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
149 post_mount_conns
= list_connections()
151 log
.info("Post-mount connections: {0}".format(post_mount_conns
))
153 # Record our fuse connection number so that we can use it when
155 new_conns
= list(set(post_mount_conns
) - set(pre_mount_conns
))
156 if len(new_conns
) == 0:
157 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns
))
158 elif len(new_conns
) > 1:
159 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns
))
161 self
._fuse
_conn
= new_conns
[0]
163 self
.gather_mount_info()
165 def gather_mount_info(self
):
166 status
= self
.admin_socket(['status'])
167 self
.id = status
['id']
168 self
.client_pid
= status
['metadata']['pid']
170 self
.inst
= status
['inst_str']
171 self
.addr
= status
['addr_str']
173 sessions
= self
.fs
.rank_asok(['session', 'ls'])
175 if s
['id'] == self
.id:
176 self
.inst
= s
['inst']
177 self
.addr
= self
.inst
.split()[1]
178 if self
.inst
is None:
179 raise RuntimeError("cannot find client session")
181 def is_mounted(self
):
182 proc
= self
.client_remote
.run(
198 except CommandFailedError
:
199 if ("endpoint is not connected" in proc
.stderr
.getvalue()
200 or "Software caused connection abort" in proc
.stderr
.getvalue()):
201 # This happens is fuse is killed without unmount
202 log
.warn("Found stale moutn point at {0}".format(self
.mountpoint
))
205 # This happens if the mount directory doesn't exist
206 log
.info('mount point does not exist: %s', self
.mountpoint
)
209 fstype
= proc
.stdout
.getvalue().rstrip('\n')
210 if fstype
== 'fuseblk':
211 log
.info('ceph-fuse is mounted on %s', self
.mountpoint
)
214 log
.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
218 def wait_until_mounted(self
):
220 Check to make sure that fuse is mounted on mountpoint. If not,
221 sleep for 5 seconds and check again.
224 while not self
.is_mounted():
225 # Even if it's not mounted, it should at least
226 # be running: catch simple failures where it has terminated.
227 assert not self
.fuse_daemon
.poll()
231 # Now that we're mounted, set permissions so that the rest of the test will have
232 # unrestricted access to the filesystem mount.
235 self
.client_remote
.run(args
=['sudo', 'chmod', '1777', self
.mountpoint
], timeout
=(15*60), cwd
=self
.test_dir
, stderr
=stderr
)
236 except run
.CommandFailedError
:
237 stderr
= stderr
.getvalue()
238 if "Read-only file system".lower() in stderr
.lower():
243 def _mountpoint_exists(self
):
244 return self
.client_remote
.run(args
=["ls", "-d", self
.mountpoint
], check_status
=False, cwd
=self
.test_dir
, timeout
=(15*60)).exitstatus
== 0
247 if not self
.is_mounted():
251 log
.info('Running fusermount -u on {name}...'.format(name
=self
.client_remote
.name
))
252 self
.client_remote
.run(
262 except run
.CommandFailedError
:
263 log
.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name
=self
.client_remote
.name
))
265 self
.client_remote
.run(args
=[
267 run
.Raw('PATH=/usr/sbin:$PATH'),
274 # abort the fuse mount, killing all hung processes
276 self
.run_python(dedent("""
278 path = "/sys/fs/fuse/connections/{0}/abort"
279 if os.path.exists(path):
280 open(path, "w").write("1")
281 """).format(self
._fuse
_conn
))
282 self
._fuse
_conn
= None
286 # make sure its unmounted
287 self
.client_remote
.run(
298 except CommandFailedError
:
299 if self
.is_mounted():
302 assert not self
.is_mounted()
303 self
._fuse
_conn
= None
308 def umount_wait(self
, force
=False, require_clean
=False, timeout
=900):
310 :param force: Complete cleanly even if the MDS is offline
312 if not (self
.is_mounted() and self
.fuse_daemon
):
313 log
.debug('ceph-fuse client.{id} is not mounted at {remote} {mnt}'.format(id=self
.client_id
,
314 remote
=self
.client_remote
,
315 mnt
=self
.mountpoint
))
319 assert not require_clean
# mutually exclusive
321 # When we expect to be forcing, kill the ceph-fuse process directly.
322 # This should avoid hitting the more aggressive fallback killing
323 # in umount() which can affect other mounts too.
324 self
.fuse_daemon
.stdin
.close()
326 # However, we will still hit the aggressive wait if there is an ongoing
327 # mount -o remount (especially if the remount is stuck because MDSs
333 # Permit a timeout, so that we do not block forever
334 run
.wait([self
.fuse_daemon
], timeout
)
335 except MaxWhileTries
:
336 log
.error("process failed to terminate after unmount. This probably"
337 " indicates a bug within ceph-fuse.")
339 except CommandFailedError
:
347 Remove the mount point.
349 Prerequisite: the client is not mounted.
353 self
.client_remote
.run(
364 except CommandFailedError
:
365 if "No such file or directory" in stderr
.getvalue():
372 Terminate the client without removing the mount point.
374 log
.info('Killing ceph-fuse connection on {name}...'.format(name
=self
.client_remote
.name
))
375 self
.fuse_daemon
.stdin
.close()
377 self
.fuse_daemon
.wait()
378 except CommandFailedError
:
381 def kill_cleanup(self
):
383 Follow up ``kill`` to get to a clean unmounted state.
385 log
.info('Cleaning up killed ceph-fuse connection')
391 Whatever the state of the mount, get it gone.
393 super(FuseMount
, self
).teardown()
397 if self
.fuse_daemon
and not self
.fuse_daemon
.finished
:
398 self
.fuse_daemon
.stdin
.close()
400 self
.fuse_daemon
.wait()
401 except CommandFailedError
:
404 # Indiscriminate, unlike the touchier cleanup()
405 self
.client_remote
.run(
415 def _asok_path(self
):
416 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self
.client_id
)
422 def admin_socket(self
, args
):
429 def find_socket(client_name):
430 asok_path = "{asok_path}"
431 files = glob.glob(asok_path)
433 # Given a non-glob path, it better be there
434 if "*" not in asok_path:
435 assert(len(files) == 1)
439 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
440 if os.path.exists("/proc/{{0}}".format(pid)):
442 raise RuntimeError("Client socket {{0}} not found".format(client_name))
444 print(find_socket("{client_name}"))
446 asok_path
=self
._asok
_path
(),
447 client_name
="client.{0}".format(self
.client_id
))
449 # Find the admin socket
450 p
= self
.client_remote
.run(args
=[
451 'sudo', 'python3', '-c', pyscript
452 ], stdout
=StringIO(), timeout
=(15*60))
453 asok_path
= p
.stdout
.getvalue().strip()
454 log
.info("Found client admin socket at {0}".format(asok_path
))
456 # Query client ID from admin socket
457 p
= self
.client_remote
.run(
458 args
=['sudo', self
._prefix
+ 'ceph', '--admin-daemon', asok_path
] + args
,
459 stdout
=StringIO(), timeout
=(15*60))
460 return json
.loads(p
.stdout
.getvalue())
462 def get_global_id(self
):
464 Look up the CephFS client ID for this mount
466 return self
.admin_socket(['mds_sessions'])['id']
468 def get_global_inst(self
):
470 Look up the CephFS client instance for this mount
474 def get_global_addr(self
):
476 Look up the CephFS client addr for this mount
480 def get_client_pid(self
):
482 return pid of ceph-fuse process
484 status
= self
.admin_socket(['status'])
485 return status
['metadata']['pid']
487 def get_osd_epoch(self
):
489 Return 2-tuple of osd_epoch, osd_epoch_barrier
491 status
= self
.admin_socket(['status'])
492 return status
['osd_epoch'], status
['osd_epoch_barrier']
494 def get_dentry_count(self
):
496 Return 2-tuple of dentry_count, dentry_pinned_count
498 status
= self
.admin_socket(['status'])
499 return status
['dentry_count'], status
['dentry_pinned_count']
501 def set_cache_size(self
, size
):
502 return self
.admin_socket(['config', 'set', 'client_cache_size', str(size
)])