]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/fuse_mount.py
import 15.2.4
[ceph.git] / ceph / qa / tasks / cephfs / fuse_mount.py
1 from io import BytesIO
2 from io import StringIO
3 import json
4 import time
5 import logging
6
7 import six
8
9 from textwrap import dedent
10
11 from teuthology import misc
12 from teuthology.contextutil import MaxWhileTries
13 from teuthology.orchestra import run
14 from teuthology.orchestra.run import CommandFailedError
15 from tasks.cephfs.mount import CephFSMount
16
17 log = logging.getLogger(__name__)
18
19
20 class FuseMount(CephFSMount):
21 def __init__(self, ctx, client_config, test_dir, client_id, client_remote):
22 super(FuseMount, self).__init__(ctx, test_dir, client_id, client_remote)
23
24 self.client_config = client_config if client_config else {}
25 self.fuse_daemon = None
26 self._fuse_conn = None
27 self.id = None
28 self.inst = None
29 self.addr = None
30
31 def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
32 if mountpoint is not None:
33 self.mountpoint = mountpoint
34 self.setupfs(name=mount_fs_name)
35
36 try:
37 return self._mount(mount_path, mount_fs_name, mount_options)
38 except RuntimeError:
39 # Catch exceptions by the mount() logic (i.e. not remote command
40 # failures) and ensure the mount is not left half-up.
41 # Otherwise we might leave a zombie mount point that causes
42 # anyone traversing cephtest/ to get hung up on.
43 log.warning("Trying to clean up after failed mount")
44 self.umount_wait(force=True)
45 raise
46
47 def _mount(self, mount_path, mount_fs_name, mount_options):
48 log.info("Client client.%s config is %s" % (self.client_id, self.client_config))
49
50 daemon_signal = 'kill'
51 if self.client_config.get('coverage') or self.client_config.get('valgrind') is not None:
52 daemon_signal = 'term'
53
54 log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
55 id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
56
57 self.client_remote.run(args=['mkdir', '-p', self.mountpoint],
58 timeout=(15*60), cwd=self.test_dir)
59
60 run_cmd = [
61 'sudo',
62 'adjust-ulimits',
63 'ceph-coverage',
64 '{tdir}/archive/coverage'.format(tdir=self.test_dir),
65 'daemon-helper',
66 daemon_signal,
67 ]
68
69 fuse_cmd = ['ceph-fuse', "-f"]
70
71 if mount_path is not None:
72 fuse_cmd += ["--client_mountpoint={0}".format(mount_path)]
73
74 if mount_fs_name is not None:
75 fuse_cmd += ["--client_fs={0}".format(mount_fs_name)]
76
77 fuse_cmd += mount_options
78
79 fuse_cmd += [
80 '--name', 'client.{id}'.format(id=self.client_id),
81 # TODO ceph-fuse doesn't understand dash dash '--',
82 self.mountpoint,
83 ]
84
85 cwd = self.test_dir
86 if self.client_config.get('valgrind') is not None:
87 run_cmd = misc.get_valgrind_args(
88 self.test_dir,
89 'client.{id}'.format(id=self.client_id),
90 run_cmd,
91 self.client_config.get('valgrind'),
92 )
93 cwd = None # misc.get_valgrind_args chdir for us
94
95 run_cmd.extend(fuse_cmd)
96
97 def list_connections():
98 from teuthology.misc import get_system_type
99
100 conn_dir = "/sys/fs/fuse/connections"
101
102 self.client_remote.run(args=['sudo', 'modprobe', 'fuse'],
103 check_status=False)
104 self.client_remote.run(
105 args=["sudo", "mount", "-t", "fusectl", conn_dir, conn_dir],
106 check_status=False, timeout=(30))
107
108 try:
109 ls_str = self.client_remote.sh("ls " + conn_dir,
110 stdout=StringIO(),
111 timeout=(15*60)).strip()
112 except CommandFailedError:
113 return []
114
115 if ls_str:
116 return [int(n) for n in ls_str.split("\n")]
117 else:
118 return []
119
120 # Before starting ceph-fuse process, note the contents of
121 # /sys/fs/fuse/connections
122 pre_mount_conns = list_connections()
123 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
124
125 proc = self.client_remote.run(
126 args=run_cmd,
127 cwd=cwd,
128 logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
129 stdin=run.PIPE,
130 wait=False,
131 )
132 self.fuse_daemon = proc
133
134 # Wait for the connection reference to appear in /sys
135 mount_wait = self.client_config.get('mount_wait', 0)
136 if mount_wait > 0:
137 log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
138 time.sleep(mount_wait)
139 timeout = int(self.client_config.get('mount_timeout', 30))
140 waited = 0
141
142 post_mount_conns = list_connections()
143 while len(post_mount_conns) <= len(pre_mount_conns):
144 if self.fuse_daemon.finished:
145 # Did mount fail? Raise the CommandFailedError instead of
146 # hitting the "failed to populate /sys/" timeout
147 self.fuse_daemon.wait()
148 time.sleep(1)
149 waited += 1
150 if waited > timeout:
151 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
152 waited
153 ))
154 else:
155 post_mount_conns = list_connections()
156
157 log.info("Post-mount connections: {0}".format(post_mount_conns))
158
159 # Record our fuse connection number so that we can use it when
160 # forcing an unmount
161 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
162 if len(new_conns) == 0:
163 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
164 elif len(new_conns) > 1:
165 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
166 else:
167 self._fuse_conn = new_conns[0]
168
169 self.gather_mount_info()
170
171 def gather_mount_info(self):
172 status = self.admin_socket(['status'])
173 self.id = status['id']
174 self.client_pid = status['metadata']['pid']
175 try:
176 self.inst = status['inst_str']
177 self.addr = status['addr_str']
178 except KeyError:
179 sessions = self.fs.rank_asok(['session', 'ls'])
180 for s in sessions:
181 if s['id'] == self.id:
182 self.inst = s['inst']
183 self.addr = self.inst.split()[1]
184 if self.inst is None:
185 raise RuntimeError("cannot find client session")
186
187 def is_mounted(self):
188 proc = self.client_remote.run(
189 args=[
190 'stat',
191 '--file-system',
192 '--printf=%T\n',
193 '--',
194 self.mountpoint,
195 ],
196 cwd=self.test_dir,
197 stdout=BytesIO(),
198 stderr=BytesIO(),
199 wait=False,
200 timeout=(15*60)
201 )
202 try:
203 proc.wait()
204 except CommandFailedError:
205 error = six.ensure_str(proc.stderr.getvalue())
206 if ("endpoint is not connected" in error
207 or "Software caused connection abort" in error):
208 # This happens is fuse is killed without unmount
209 log.warning("Found stale moutn point at {0}".format(self.mountpoint))
210 return True
211 else:
212 # This happens if the mount directory doesn't exist
213 log.info('mount point does not exist: %s', self.mountpoint)
214 return False
215
216 fstype = six.ensure_str(proc.stdout.getvalue()).rstrip('\n')
217 if fstype == 'fuseblk':
218 log.info('ceph-fuse is mounted on %s', self.mountpoint)
219 return True
220 else:
221 log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
222 fstype=fstype))
223 return False
224
225 def wait_until_mounted(self):
226 """
227 Check to make sure that fuse is mounted on mountpoint. If not,
228 sleep for 5 seconds and check again.
229 """
230
231 while not self.is_mounted():
232 # Even if it's not mounted, it should at least
233 # be running: catch simple failures where it has terminated.
234 assert not self.fuse_daemon.poll()
235
236 time.sleep(5)
237
238 # Now that we're mounted, set permissions so that the rest of the test will have
239 # unrestricted access to the filesystem mount.
240 try:
241 stderr = BytesIO()
242 self.client_remote.run(args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(15*60), cwd=self.test_dir, stderr=stderr)
243 except run.CommandFailedError:
244 stderr = stderr.getvalue()
245 if b"Read-only file system".lower() in stderr.lower():
246 pass
247 else:
248 raise
249
250 def _mountpoint_exists(self):
251 return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False, cwd=self.test_dir, timeout=(15*60)).exitstatus == 0
252
253 def umount(self):
254 if not self.is_mounted():
255 return
256
257 try:
258 log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
259 self.client_remote.run(
260 args=[
261 'sudo',
262 'fusermount',
263 '-u',
264 self.mountpoint,
265 ],
266 cwd=self.test_dir,
267 timeout=(30*60),
268 )
269 except run.CommandFailedError:
270 log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
271
272 self.client_remote.run(args=[
273 'sudo',
274 run.Raw('PATH=/usr/sbin:$PATH'),
275 'lsof',
276 run.Raw(';'),
277 'ps',
278 'auxf',
279 ], timeout=(60*15))
280
281 # abort the fuse mount, killing all hung processes
282 if self._fuse_conn:
283 self.run_python(dedent("""
284 import os
285 path = "/sys/fs/fuse/connections/{0}/abort"
286 if os.path.exists(path):
287 open(path, "w").write("1")
288 """).format(self._fuse_conn))
289 self._fuse_conn = None
290
291 stderr = BytesIO()
292 try:
293 # make sure its unmounted
294 self.client_remote.run(
295 args=[
296 'sudo',
297 'umount',
298 '-l',
299 '-f',
300 self.mountpoint,
301 ],
302 stderr=stderr,
303 timeout=(60*15)
304 )
305 except CommandFailedError:
306 if self.is_mounted():
307 raise
308
309 assert not self.is_mounted()
310 self._fuse_conn = None
311 self.id = None
312 self.inst = None
313 self.addr = None
314
315 def umount_wait(self, force=False, require_clean=False, timeout=900):
316 """
317 :param force: Complete cleanly even if the MDS is offline
318 """
319 if not (self.is_mounted() and self.fuse_daemon):
320 log.debug('ceph-fuse client.{id} is not mounted at {remote} {mnt}'.format(id=self.client_id,
321 remote=self.client_remote,
322 mnt=self.mountpoint))
323 return
324
325 if force:
326 assert not require_clean # mutually exclusive
327
328 # When we expect to be forcing, kill the ceph-fuse process directly.
329 # This should avoid hitting the more aggressive fallback killing
330 # in umount() which can affect other mounts too.
331 self.fuse_daemon.stdin.close()
332
333 # However, we will still hit the aggressive wait if there is an ongoing
334 # mount -o remount (especially if the remount is stuck because MDSs
335 # are unavailable)
336
337 self.umount()
338
339 try:
340 # Permit a timeout, so that we do not block forever
341 run.wait([self.fuse_daemon], timeout)
342 except MaxWhileTries:
343 log.error("process failed to terminate after unmount. This probably"
344 " indicates a bug within ceph-fuse.")
345 raise
346 except CommandFailedError:
347 if require_clean:
348 raise
349
350 self.cleanup()
351
352 def cleanup(self):
353 """
354 Remove the mount point.
355
356 Prerequisite: the client is not mounted.
357 """
358 stderr = BytesIO()
359 try:
360 self.client_remote.run(
361 args=[
362 'rmdir',
363 '--',
364 self.mountpoint,
365 ],
366 cwd=self.test_dir,
367 stderr=stderr,
368 timeout=(60*5),
369 check_status=False,
370 )
371 except CommandFailedError:
372 if b"No such file or directory" in stderr.getvalue():
373 pass
374 else:
375 raise
376
377 def kill(self):
378 """
379 Terminate the client without removing the mount point.
380 """
381 log.info('Killing ceph-fuse connection on {name}...'.format(name=self.client_remote.name))
382 self.fuse_daemon.stdin.close()
383 try:
384 self.fuse_daemon.wait()
385 except CommandFailedError:
386 pass
387
388 def kill_cleanup(self):
389 """
390 Follow up ``kill`` to get to a clean unmounted state.
391 """
392 log.info('Cleaning up killed ceph-fuse connection')
393 self.umount()
394 self.cleanup()
395
396 def teardown(self):
397 """
398 Whatever the state of the mount, get it gone.
399 """
400 super(FuseMount, self).teardown()
401
402 self.umount()
403
404 if self.fuse_daemon and not self.fuse_daemon.finished:
405 self.fuse_daemon.stdin.close()
406 try:
407 self.fuse_daemon.wait()
408 except CommandFailedError:
409 pass
410
411 # Indiscriminate, unlike the touchier cleanup()
412 self.client_remote.run(
413 args=[
414 'rm',
415 '-rf',
416 self.mountpoint,
417 ],
418 cwd=self.test_dir,
419 timeout=(60*5)
420 )
421
422 def _asok_path(self):
423 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id)
424
425 @property
426 def _prefix(self):
427 return ""
428
429 def admin_socket(self, args):
430 pyscript = """
431 import glob
432 import re
433 import os
434 import subprocess
435
436 def find_socket(client_name):
437 asok_path = "{asok_path}"
438 files = glob.glob(asok_path)
439
440 # Given a non-glob path, it better be there
441 if "*" not in asok_path:
442 assert(len(files) == 1)
443 return files[0]
444
445 for f in files:
446 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
447 if os.path.exists("/proc/{{0}}".format(pid)):
448 return f
449 raise RuntimeError("Client socket {{0}} not found".format(client_name))
450
451 print(find_socket("{client_name}"))
452 """.format(
453 asok_path=self._asok_path(),
454 client_name="client.{0}".format(self.client_id))
455
456 # Find the admin socket
457 asok_path = self.client_remote.sh(
458 ['sudo', 'python3', '-c', pyscript],
459 stdout=StringIO(),
460 timeout=(15*60)).strip()
461 log.info("Found client admin socket at {0}".format(asok_path))
462
463 # Query client ID from admin socket
464 json_data = self.client_remote.sh(
465 ['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
466 stdout=StringIO(),
467 timeout=(15*60))
468 return json.loads(json_data)
469
470 def get_global_id(self):
471 """
472 Look up the CephFS client ID for this mount
473 """
474 return self.admin_socket(['mds_sessions'])['id']
475
476 def get_global_inst(self):
477 """
478 Look up the CephFS client instance for this mount
479 """
480 return self.inst
481
482 def get_global_addr(self):
483 """
484 Look up the CephFS client addr for this mount
485 """
486 return self.addr
487
488 def get_client_pid(self):
489 """
490 return pid of ceph-fuse process
491 """
492 status = self.admin_socket(['status'])
493 return status['metadata']['pid']
494
495 def get_osd_epoch(self):
496 """
497 Return 2-tuple of osd_epoch, osd_epoch_barrier
498 """
499 status = self.admin_socket(['status'])
500 return status['osd_epoch'], status['osd_epoch_barrier']
501
502 def get_dentry_count(self):
503 """
504 Return 2-tuple of dentry_count, dentry_pinned_count
505 """
506 status = self.admin_socket(['status'])
507 return status['dentry_count'], status['dentry_pinned_count']
508
509 def set_cache_size(self, size):
510 return self.admin_socket(['config', 'set', 'client_cache_size', str(size)])