]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/fuse_mount.py
import ceph 14.2.5
[ceph.git] / ceph / qa / tasks / cephfs / fuse_mount.py
CommitLineData
7c673cae
FG
1
2from StringIO import StringIO
3import json
4import time
5import logging
6from textwrap import dedent
7
8from teuthology import misc
9from teuthology.contextutil import MaxWhileTries
10from teuthology.orchestra import run
11from teuthology.orchestra.run import CommandFailedError
12from .mount import CephFSMount
11fdf7f2 13from tasks.cephfs.filesystem import Filesystem
7c673cae
FG
14
15log = logging.getLogger(__name__)
16
17
18class FuseMount(CephFSMount):
11fdf7f2
TL
19 def __init__(self, ctx, client_config, test_dir, client_id, client_remote):
20 super(FuseMount, self).__init__(ctx, test_dir, client_id, client_remote)
7c673cae
FG
21
22 self.client_config = client_config if client_config else {}
23 self.fuse_daemon = None
24 self._fuse_conn = None
11fdf7f2
TL
25 self.id = None
26 self.inst = None
27 self.addr = None
7c673cae
FG
28
29 def mount(self, mount_path=None, mount_fs_name=None):
11fdf7f2
TL
30 self.setupfs(name=mount_fs_name)
31
7c673cae
FG
32 try:
33 return self._mount(mount_path, mount_fs_name)
34 except RuntimeError:
35 # Catch exceptions by the mount() logic (i.e. not remote command
36 # failures) and ensure the mount is not left half-up.
37 # Otherwise we might leave a zombie mount point that causes
38 # anyone traversing cephtest/ to get hung up on.
39 log.warn("Trying to clean up after failed mount")
40 self.umount_wait(force=True)
41 raise
42
43 def _mount(self, mount_path, mount_fs_name):
44 log.info("Client client.%s config is %s" % (self.client_id, self.client_config))
45
46 daemon_signal = 'kill'
47 if self.client_config.get('coverage') or self.client_config.get('valgrind') is not None:
48 daemon_signal = 'term'
49
50 log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
51 id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
52
53 self.client_remote.run(
54 args=[
55 'mkdir',
56 '--',
57 self.mountpoint,
58 ],
f64942e4 59 timeout=(15*60)
7c673cae
FG
60 )
61
62 run_cmd = [
63 'sudo',
64 'adjust-ulimits',
65 'ceph-coverage',
66 '{tdir}/archive/coverage'.format(tdir=self.test_dir),
67 'daemon-helper',
68 daemon_signal,
69 ]
70
71 fuse_cmd = ['ceph-fuse', "-f"]
72
73 if mount_path is not None:
74 fuse_cmd += ["--client_mountpoint={0}".format(mount_path)]
75
76 if mount_fs_name is not None:
77 fuse_cmd += ["--client_mds_namespace={0}".format(mount_fs_name)]
78
79 fuse_cmd += [
80 '--name', 'client.{id}'.format(id=self.client_id),
81 # TODO ceph-fuse doesn't understand dash dash '--',
82 self.mountpoint,
83 ]
84
85 if self.client_config.get('valgrind') is not None:
86 run_cmd = misc.get_valgrind_args(
87 self.test_dir,
88 'client.{id}'.format(id=self.client_id),
89 run_cmd,
90 self.client_config.get('valgrind'),
91 )
92
93 run_cmd.extend(fuse_cmd)
94
95 def list_connections():
96 self.client_remote.run(
97 args=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
f64942e4
AA
98 check_status=False,
99 timeout=(15*60)
7c673cae
FG
100 )
101 p = self.client_remote.run(
102 args=["ls", "/sys/fs/fuse/connections"],
103 stdout=StringIO(),
f64942e4
AA
104 check_status=False,
105 timeout=(15*60)
7c673cae
FG
106 )
107 if p.exitstatus != 0:
108 return []
109
110 ls_str = p.stdout.getvalue().strip()
111 if ls_str:
112 return [int(n) for n in ls_str.split("\n")]
113 else:
114 return []
115
116 # Before starting ceph-fuse process, note the contents of
117 # /sys/fs/fuse/connections
118 pre_mount_conns = list_connections()
119 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
120
121 proc = self.client_remote.run(
122 args=run_cmd,
123 logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
124 stdin=run.PIPE,
125 wait=False,
126 )
127 self.fuse_daemon = proc
128
129 # Wait for the connection reference to appear in /sys
130 mount_wait = self.client_config.get('mount_wait', 0)
131 if mount_wait > 0:
132 log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
133 time.sleep(mount_wait)
134 timeout = int(self.client_config.get('mount_timeout', 30))
135 waited = 0
136
137 post_mount_conns = list_connections()
138 while len(post_mount_conns) <= len(pre_mount_conns):
139 if self.fuse_daemon.finished:
140 # Did mount fail? Raise the CommandFailedError instead of
141 # hitting the "failed to populate /sys/" timeout
142 self.fuse_daemon.wait()
143 time.sleep(1)
144 waited += 1
145 if waited > timeout:
146 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
147 waited
148 ))
149 else:
150 post_mount_conns = list_connections()
151
152 log.info("Post-mount connections: {0}".format(post_mount_conns))
153
154 # Record our fuse connection number so that we can use it when
155 # forcing an unmount
156 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
157 if len(new_conns) == 0:
158 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
159 elif len(new_conns) > 1:
160 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
161 else:
162 self._fuse_conn = new_conns[0]
163
11fdf7f2
TL
164 self.gather_mount_info()
165
166 def gather_mount_info(self):
167 status = self.admin_socket(['status'])
168 self.id = status['id']
eafe8130 169 self.client_pid = status['metadata']['pid']
11fdf7f2
TL
170 try:
171 self.inst = status['inst_str']
172 self.addr = status['addr_str']
173 except KeyError as e:
174 sessions = self.fs.rank_asok(['session', 'ls'])
175 for s in sessions:
176 if s['id'] == self.id:
177 self.inst = s['inst']
178 self.addr = self.inst.split()[1]
179 if self.inst is None:
180 raise RuntimeError("cannot find client session")
181
7c673cae
FG
182 def is_mounted(self):
183 proc = self.client_remote.run(
184 args=[
185 'stat',
186 '--file-system',
187 '--printf=%T\n',
188 '--',
189 self.mountpoint,
190 ],
191 stdout=StringIO(),
192 stderr=StringIO(),
f64942e4
AA
193 wait=False,
194 timeout=(15*60)
7c673cae
FG
195 )
196 try:
197 proc.wait()
198 except CommandFailedError:
199 if ("endpoint is not connected" in proc.stderr.getvalue()
200 or "Software caused connection abort" in proc.stderr.getvalue()):
201 # This happens is fuse is killed without unmount
202 log.warn("Found stale moutn point at {0}".format(self.mountpoint))
203 return True
204 else:
205 # This happens if the mount directory doesn't exist
206 log.info('mount point does not exist: %s', self.mountpoint)
207 return False
208
209 fstype = proc.stdout.getvalue().rstrip('\n')
210 if fstype == 'fuseblk':
211 log.info('ceph-fuse is mounted on %s', self.mountpoint)
212 return True
213 else:
214 log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
215 fstype=fstype))
216 return False
217
218 def wait_until_mounted(self):
219 """
220 Check to make sure that fuse is mounted on mountpoint. If not,
221 sleep for 5 seconds and check again.
222 """
223
224 while not self.is_mounted():
225 # Even if it's not mounted, it should at least
226 # be running: catch simple failures where it has terminated.
227 assert not self.fuse_daemon.poll()
228
229 time.sleep(5)
230
231 # Now that we're mounted, set permissions so that the rest of the test will have
232 # unrestricted access to the filesystem mount.
f64942e4
AA
233 try:
234 stderr = StringIO()
235 self.client_remote.run(args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(15*60), stderr=stderr)
236 except run.CommandFailedError:
237 stderr = stderr.getvalue()
238 if "Read-only file system".lower() in stderr.lower():
239 pass
240 else:
241 raise
7c673cae
FG
242
243 def _mountpoint_exists(self):
f64942e4 244 return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False, timeout=(15*60)).exitstatus == 0
7c673cae
FG
245
246 def umount(self):
247 try:
248 log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
249 self.client_remote.run(
250 args=[
251 'sudo',
252 'fusermount',
253 '-u',
254 self.mountpoint,
255 ],
f64942e4 256 timeout=(30*60),
7c673cae
FG
257 )
258 except run.CommandFailedError:
259 log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
260
c07f9fc5
FG
261 self.client_remote.run(args=[
262 'sudo',
263 run.Raw('PATH=/usr/sbin:$PATH'),
264 'lsof',
265 run.Raw(';'),
266 'ps',
267 'auxf',
f64942e4 268 ], timeout=(60*15))
c07f9fc5 269
7c673cae
FG
270 # abort the fuse mount, killing all hung processes
271 if self._fuse_conn:
272 self.run_python(dedent("""
273 import os
274 path = "/sys/fs/fuse/connections/{0}/abort"
275 if os.path.exists(path):
276 open(path, "w").write("1")
277 """).format(self._fuse_conn))
278 self._fuse_conn = None
279
280 stderr = StringIO()
281 try:
282 # make sure its unmounted
283 self.client_remote.run(
284 args=[
285 'sudo',
286 'umount',
287 '-l',
288 '-f',
289 self.mountpoint,
290 ],
f64942e4
AA
291 stderr=stderr,
292 timeout=(60*15)
7c673cae
FG
293 )
294 except CommandFailedError:
295 if self.is_mounted():
296 raise
297
298 assert not self.is_mounted()
299 self._fuse_conn = None
11fdf7f2
TL
300 self.id = None
301 self.inst = None
302 self.addr = None
7c673cae 303
28e407b8 304 def umount_wait(self, force=False, require_clean=False, timeout=900):
7c673cae
FG
305 """
306 :param force: Complete cleanly even if the MDS is offline
307 """
308 if force:
309 assert not require_clean # mutually exclusive
310
311 # When we expect to be forcing, kill the ceph-fuse process directly.
312 # This should avoid hitting the more aggressive fallback killing
313 # in umount() which can affect other mounts too.
314 self.fuse_daemon.stdin.close()
315
316 # However, we will still hit the aggressive wait if there is an ongoing
317 # mount -o remount (especially if the remount is stuck because MDSs
318 # are unavailable)
319
320 self.umount()
321
322 try:
323 if self.fuse_daemon:
324 # Permit a timeout, so that we do not block forever
28e407b8 325 run.wait([self.fuse_daemon], timeout)
7c673cae 326 except MaxWhileTries:
11fdf7f2
TL
327 log.error("process failed to terminate after unmount. This probably"
328 " indicates a bug within ceph-fuse.")
7c673cae
FG
329 raise
330 except CommandFailedError:
331 if require_clean:
332 raise
333
334 self.cleanup()
335
336 def cleanup(self):
337 """
338 Remove the mount point.
339
340 Prerequisite: the client is not mounted.
341 """
342 stderr = StringIO()
343 try:
344 self.client_remote.run(
345 args=[
346 'rmdir',
347 '--',
348 self.mountpoint,
349 ],
f64942e4
AA
350 stderr=stderr,
351 timeout=(60*5)
7c673cae
FG
352 )
353 except CommandFailedError:
354 if "No such file or directory" in stderr.getvalue():
355 pass
356 else:
357 raise
358
359 def kill(self):
360 """
361 Terminate the client without removing the mount point.
362 """
11fdf7f2 363 log.info('Killing ceph-fuse connection on {name}...'.format(name=self.client_remote.name))
7c673cae
FG
364 self.fuse_daemon.stdin.close()
365 try:
366 self.fuse_daemon.wait()
367 except CommandFailedError:
368 pass
369
370 def kill_cleanup(self):
371 """
372 Follow up ``kill`` to get to a clean unmounted state.
373 """
11fdf7f2 374 log.info('Cleaning up killed ceph-fuse connection')
7c673cae
FG
375 self.umount()
376 self.cleanup()
377
378 def teardown(self):
379 """
380 Whatever the state of the mount, get it gone.
381 """
382 super(FuseMount, self).teardown()
383
384 self.umount()
385
386 if self.fuse_daemon and not self.fuse_daemon.finished:
387 self.fuse_daemon.stdin.close()
388 try:
389 self.fuse_daemon.wait()
390 except CommandFailedError:
391 pass
392
393 # Indiscriminate, unlike the touchier cleanup()
394 self.client_remote.run(
395 args=[
396 'rm',
397 '-rf',
398 self.mountpoint,
399 ],
f64942e4 400 timeout=(60*5)
7c673cae
FG
401 )
402
403 def _asok_path(self):
404 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id)
405
406 @property
407 def _prefix(self):
408 return ""
409
410 def admin_socket(self, args):
411 pyscript = """
412import glob
413import re
414import os
415import subprocess
416
417def find_socket(client_name):
418 asok_path = "{asok_path}"
419 files = glob.glob(asok_path)
420
421 # Given a non-glob path, it better be there
422 if "*" not in asok_path:
423 assert(len(files) == 1)
424 return files[0]
425
426 for f in files:
427 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
428 if os.path.exists("/proc/{{0}}".format(pid)):
429 return f
430 raise RuntimeError("Client socket {{0}} not found".format(client_name))
431
432print find_socket("{client_name}")
433""".format(
434 asok_path=self._asok_path(),
435 client_name="client.{0}".format(self.client_id))
436
437 # Find the admin socket
438 p = self.client_remote.run(args=[
f64942e4
AA
439 'sudo', 'python2', '-c', pyscript
440 ], stdout=StringIO(), timeout=(15*60))
7c673cae
FG
441 asok_path = p.stdout.getvalue().strip()
442 log.info("Found client admin socket at {0}".format(asok_path))
443
444 # Query client ID from admin socket
445 p = self.client_remote.run(
446 args=['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
f64942e4 447 stdout=StringIO(), timeout=(15*60))
7c673cae
FG
448 return json.loads(p.stdout.getvalue())
449
450 def get_global_id(self):
451 """
452 Look up the CephFS client ID for this mount
453 """
7c673cae
FG
454 return self.admin_socket(['mds_sessions'])['id']
455
11fdf7f2
TL
456 def get_global_inst(self):
457 """
458 Look up the CephFS client instance for this mount
459 """
460 return self.inst
461
462 def get_global_addr(self):
463 """
464 Look up the CephFS client addr for this mount
465 """
466 return self.addr
467
28e407b8
AA
468 def get_client_pid(self):
469 """
470 return pid of ceph-fuse process
471 """
472 status = self.admin_socket(['status'])
473 return status['metadata']['pid']
474
7c673cae
FG
475 def get_osd_epoch(self):
476 """
477 Return 2-tuple of osd_epoch, osd_epoch_barrier
478 """
479 status = self.admin_socket(['status'])
480 return status['osd_epoch'], status['osd_epoch_barrier']
481
482 def get_dentry_count(self):
483 """
484 Return 2-tuple of dentry_count, dentry_pinned_count
485 """
486 status = self.admin_socket(['status'])
487 return status['dentry_count'], status['dentry_pinned_count']
488
489 def set_cache_size(self, size):
490 return self.admin_socket(['config', 'set', 'client_cache_size', str(size)])