]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/fuse_mount.py
import 15.2.2 octopus source
[ceph.git] / ceph / qa / tasks / cephfs / fuse_mount.py
CommitLineData
7c673cae
FG
1from StringIO import StringIO
2import json
3import time
4import logging
5from textwrap import dedent
6
7from teuthology import misc
8from teuthology.contextutil import MaxWhileTries
9from teuthology.orchestra import run
10from teuthology.orchestra.run import CommandFailedError
11from .mount import CephFSMount
12
13log = logging.getLogger(__name__)
14
15
16class FuseMount(CephFSMount):
11fdf7f2
TL
17 def __init__(self, ctx, client_config, test_dir, client_id, client_remote):
18 super(FuseMount, self).__init__(ctx, test_dir, client_id, client_remote)
7c673cae
FG
19
20 self.client_config = client_config if client_config else {}
21 self.fuse_daemon = None
22 self._fuse_conn = None
11fdf7f2
TL
23 self.id = None
24 self.inst = None
25 self.addr = None
7c673cae 26
9f95a23c
TL
27 def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
28 if mountpoint is not None:
29 self.mountpoint = mountpoint
11fdf7f2
TL
30 self.setupfs(name=mount_fs_name)
31
7c673cae 32 try:
9f95a23c 33 return self._mount(mount_path, mount_fs_name, mount_options)
7c673cae
FG
34 except RuntimeError:
35 # Catch exceptions by the mount() logic (i.e. not remote command
36 # failures) and ensure the mount is not left half-up.
37 # Otherwise we might leave a zombie mount point that causes
38 # anyone traversing cephtest/ to get hung up on.
39 log.warn("Trying to clean up after failed mount")
40 self.umount_wait(force=True)
41 raise
42
9f95a23c 43 def _mount(self, mount_path, mount_fs_name, mount_options):
7c673cae
FG
44 log.info("Client client.%s config is %s" % (self.client_id, self.client_config))
45
46 daemon_signal = 'kill'
47 if self.client_config.get('coverage') or self.client_config.get('valgrind') is not None:
48 daemon_signal = 'term'
49
50 log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
51 id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
52
9f95a23c
TL
53 self.client_remote.run(args=['mkdir', '-p', self.mountpoint],
54 timeout=(15*60), cwd=self.test_dir)
7c673cae
FG
55
56 run_cmd = [
57 'sudo',
58 'adjust-ulimits',
59 'ceph-coverage',
60 '{tdir}/archive/coverage'.format(tdir=self.test_dir),
61 'daemon-helper',
62 daemon_signal,
63 ]
64
65 fuse_cmd = ['ceph-fuse', "-f"]
66
67 if mount_path is not None:
68 fuse_cmd += ["--client_mountpoint={0}".format(mount_path)]
69
70 if mount_fs_name is not None:
9f95a23c
TL
71 fuse_cmd += ["--client_fs={0}".format(mount_fs_name)]
72
73 fuse_cmd += mount_options
7c673cae
FG
74
75 fuse_cmd += [
76 '--name', 'client.{id}'.format(id=self.client_id),
77 # TODO ceph-fuse doesn't understand dash dash '--',
78 self.mountpoint,
79 ]
80
9f95a23c 81 cwd = self.test_dir
7c673cae
FG
82 if self.client_config.get('valgrind') is not None:
83 run_cmd = misc.get_valgrind_args(
84 self.test_dir,
85 'client.{id}'.format(id=self.client_id),
86 run_cmd,
87 self.client_config.get('valgrind'),
88 )
9f95a23c 89 cwd = None # misc.get_valgrind_args chdir for us
7c673cae
FG
90
91 run_cmd.extend(fuse_cmd)
92
93 def list_connections():
94 self.client_remote.run(
95 args=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
f64942e4
AA
96 check_status=False,
97 timeout=(15*60)
7c673cae
FG
98 )
99 p = self.client_remote.run(
100 args=["ls", "/sys/fs/fuse/connections"],
101 stdout=StringIO(),
f64942e4
AA
102 check_status=False,
103 timeout=(15*60)
7c673cae
FG
104 )
105 if p.exitstatus != 0:
106 return []
107
108 ls_str = p.stdout.getvalue().strip()
109 if ls_str:
110 return [int(n) for n in ls_str.split("\n")]
111 else:
112 return []
113
114 # Before starting ceph-fuse process, note the contents of
115 # /sys/fs/fuse/connections
116 pre_mount_conns = list_connections()
117 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
118
119 proc = self.client_remote.run(
120 args=run_cmd,
9f95a23c 121 cwd=cwd,
7c673cae
FG
122 logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
123 stdin=run.PIPE,
124 wait=False,
125 )
126 self.fuse_daemon = proc
127
128 # Wait for the connection reference to appear in /sys
129 mount_wait = self.client_config.get('mount_wait', 0)
130 if mount_wait > 0:
131 log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
132 time.sleep(mount_wait)
133 timeout = int(self.client_config.get('mount_timeout', 30))
134 waited = 0
135
136 post_mount_conns = list_connections()
137 while len(post_mount_conns) <= len(pre_mount_conns):
138 if self.fuse_daemon.finished:
139 # Did mount fail? Raise the CommandFailedError instead of
140 # hitting the "failed to populate /sys/" timeout
141 self.fuse_daemon.wait()
142 time.sleep(1)
143 waited += 1
144 if waited > timeout:
145 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
146 waited
147 ))
148 else:
149 post_mount_conns = list_connections()
150
151 log.info("Post-mount connections: {0}".format(post_mount_conns))
152
153 # Record our fuse connection number so that we can use it when
154 # forcing an unmount
155 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
156 if len(new_conns) == 0:
157 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
158 elif len(new_conns) > 1:
159 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
160 else:
161 self._fuse_conn = new_conns[0]
162
11fdf7f2
TL
163 self.gather_mount_info()
164
165 def gather_mount_info(self):
166 status = self.admin_socket(['status'])
167 self.id = status['id']
eafe8130 168 self.client_pid = status['metadata']['pid']
11fdf7f2
TL
169 try:
170 self.inst = status['inst_str']
171 self.addr = status['addr_str']
9f95a23c 172 except KeyError:
11fdf7f2
TL
173 sessions = self.fs.rank_asok(['session', 'ls'])
174 for s in sessions:
175 if s['id'] == self.id:
176 self.inst = s['inst']
177 self.addr = self.inst.split()[1]
178 if self.inst is None:
179 raise RuntimeError("cannot find client session")
180
7c673cae
FG
181 def is_mounted(self):
182 proc = self.client_remote.run(
183 args=[
184 'stat',
185 '--file-system',
186 '--printf=%T\n',
187 '--',
188 self.mountpoint,
189 ],
9f95a23c 190 cwd=self.test_dir,
7c673cae
FG
191 stdout=StringIO(),
192 stderr=StringIO(),
f64942e4
AA
193 wait=False,
194 timeout=(15*60)
7c673cae
FG
195 )
196 try:
197 proc.wait()
198 except CommandFailedError:
199 if ("endpoint is not connected" in proc.stderr.getvalue()
200 or "Software caused connection abort" in proc.stderr.getvalue()):
201 # This happens is fuse is killed without unmount
202 log.warn("Found stale moutn point at {0}".format(self.mountpoint))
203 return True
204 else:
205 # This happens if the mount directory doesn't exist
206 log.info('mount point does not exist: %s', self.mountpoint)
207 return False
208
209 fstype = proc.stdout.getvalue().rstrip('\n')
210 if fstype == 'fuseblk':
211 log.info('ceph-fuse is mounted on %s', self.mountpoint)
212 return True
213 else:
214 log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
215 fstype=fstype))
216 return False
217
218 def wait_until_mounted(self):
219 """
220 Check to make sure that fuse is mounted on mountpoint. If not,
221 sleep for 5 seconds and check again.
222 """
223
224 while not self.is_mounted():
225 # Even if it's not mounted, it should at least
226 # be running: catch simple failures where it has terminated.
227 assert not self.fuse_daemon.poll()
228
229 time.sleep(5)
230
231 # Now that we're mounted, set permissions so that the rest of the test will have
232 # unrestricted access to the filesystem mount.
f64942e4
AA
233 try:
234 stderr = StringIO()
9f95a23c 235 self.client_remote.run(args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(15*60), cwd=self.test_dir, stderr=stderr)
f64942e4
AA
236 except run.CommandFailedError:
237 stderr = stderr.getvalue()
238 if "Read-only file system".lower() in stderr.lower():
239 pass
240 else:
241 raise
7c673cae
FG
242
243 def _mountpoint_exists(self):
9f95a23c 244 return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False, cwd=self.test_dir, timeout=(15*60)).exitstatus == 0
7c673cae
FG
245
246 def umount(self):
1911f103
TL
247 if not self.is_mounted():
248 return
249
7c673cae
FG
250 try:
251 log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
252 self.client_remote.run(
253 args=[
254 'sudo',
255 'fusermount',
256 '-u',
257 self.mountpoint,
258 ],
9f95a23c 259 cwd=self.test_dir,
f64942e4 260 timeout=(30*60),
7c673cae
FG
261 )
262 except run.CommandFailedError:
263 log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
264
c07f9fc5
FG
265 self.client_remote.run(args=[
266 'sudo',
267 run.Raw('PATH=/usr/sbin:$PATH'),
268 'lsof',
269 run.Raw(';'),
270 'ps',
271 'auxf',
f64942e4 272 ], timeout=(60*15))
c07f9fc5 273
7c673cae
FG
274 # abort the fuse mount, killing all hung processes
275 if self._fuse_conn:
276 self.run_python(dedent("""
277 import os
278 path = "/sys/fs/fuse/connections/{0}/abort"
279 if os.path.exists(path):
280 open(path, "w").write("1")
281 """).format(self._fuse_conn))
282 self._fuse_conn = None
283
284 stderr = StringIO()
285 try:
286 # make sure its unmounted
287 self.client_remote.run(
288 args=[
289 'sudo',
290 'umount',
291 '-l',
292 '-f',
293 self.mountpoint,
294 ],
f64942e4
AA
295 stderr=stderr,
296 timeout=(60*15)
7c673cae
FG
297 )
298 except CommandFailedError:
299 if self.is_mounted():
300 raise
301
302 assert not self.is_mounted()
303 self._fuse_conn = None
11fdf7f2
TL
304 self.id = None
305 self.inst = None
306 self.addr = None
7c673cae 307
28e407b8 308 def umount_wait(self, force=False, require_clean=False, timeout=900):
7c673cae
FG
309 """
310 :param force: Complete cleanly even if the MDS is offline
311 """
9f95a23c
TL
312 if not (self.is_mounted() and self.fuse_daemon):
313 log.debug('ceph-fuse client.{id} is not mounted at {remote} {mnt}'.format(id=self.client_id,
314 remote=self.client_remote,
315 mnt=self.mountpoint))
316 return
317
7c673cae
FG
318 if force:
319 assert not require_clean # mutually exclusive
320
321 # When we expect to be forcing, kill the ceph-fuse process directly.
322 # This should avoid hitting the more aggressive fallback killing
323 # in umount() which can affect other mounts too.
324 self.fuse_daemon.stdin.close()
325
326 # However, we will still hit the aggressive wait if there is an ongoing
327 # mount -o remount (especially if the remount is stuck because MDSs
328 # are unavailable)
329
330 self.umount()
331
332 try:
9f95a23c
TL
333 # Permit a timeout, so that we do not block forever
334 run.wait([self.fuse_daemon], timeout)
7c673cae 335 except MaxWhileTries:
11fdf7f2
TL
336 log.error("process failed to terminate after unmount. This probably"
337 " indicates a bug within ceph-fuse.")
7c673cae
FG
338 raise
339 except CommandFailedError:
340 if require_clean:
341 raise
342
343 self.cleanup()
344
345 def cleanup(self):
346 """
347 Remove the mount point.
348
349 Prerequisite: the client is not mounted.
350 """
351 stderr = StringIO()
352 try:
353 self.client_remote.run(
354 args=[
355 'rmdir',
356 '--',
357 self.mountpoint,
358 ],
9f95a23c 359 cwd=self.test_dir,
f64942e4 360 stderr=stderr,
9f95a23c
TL
361 timeout=(60*5),
362 check_status=False,
7c673cae
FG
363 )
364 except CommandFailedError:
365 if "No such file or directory" in stderr.getvalue():
366 pass
367 else:
368 raise
369
370 def kill(self):
371 """
372 Terminate the client without removing the mount point.
373 """
11fdf7f2 374 log.info('Killing ceph-fuse connection on {name}...'.format(name=self.client_remote.name))
7c673cae
FG
375 self.fuse_daemon.stdin.close()
376 try:
377 self.fuse_daemon.wait()
378 except CommandFailedError:
379 pass
380
381 def kill_cleanup(self):
382 """
383 Follow up ``kill`` to get to a clean unmounted state.
384 """
11fdf7f2 385 log.info('Cleaning up killed ceph-fuse connection')
7c673cae
FG
386 self.umount()
387 self.cleanup()
388
389 def teardown(self):
390 """
391 Whatever the state of the mount, get it gone.
392 """
393 super(FuseMount, self).teardown()
394
395 self.umount()
396
397 if self.fuse_daemon and not self.fuse_daemon.finished:
398 self.fuse_daemon.stdin.close()
399 try:
400 self.fuse_daemon.wait()
401 except CommandFailedError:
402 pass
403
404 # Indiscriminate, unlike the touchier cleanup()
405 self.client_remote.run(
406 args=[
407 'rm',
408 '-rf',
409 self.mountpoint,
410 ],
9f95a23c 411 cwd=self.test_dir,
f64942e4 412 timeout=(60*5)
7c673cae
FG
413 )
414
415 def _asok_path(self):
416 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id)
417
418 @property
419 def _prefix(self):
420 return ""
421
422 def admin_socket(self, args):
423 pyscript = """
424import glob
425import re
426import os
427import subprocess
428
429def find_socket(client_name):
430 asok_path = "{asok_path}"
431 files = glob.glob(asok_path)
432
433 # Given a non-glob path, it better be there
434 if "*" not in asok_path:
435 assert(len(files) == 1)
436 return files[0]
437
438 for f in files:
439 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
440 if os.path.exists("/proc/{{0}}".format(pid)):
441 return f
442 raise RuntimeError("Client socket {{0}} not found".format(client_name))
443
9f95a23c 444print(find_socket("{client_name}"))
7c673cae
FG
445""".format(
446 asok_path=self._asok_path(),
447 client_name="client.{0}".format(self.client_id))
448
449 # Find the admin socket
450 p = self.client_remote.run(args=[
9f95a23c 451 'sudo', 'python3', '-c', pyscript
f64942e4 452 ], stdout=StringIO(), timeout=(15*60))
7c673cae
FG
453 asok_path = p.stdout.getvalue().strip()
454 log.info("Found client admin socket at {0}".format(asok_path))
455
456 # Query client ID from admin socket
457 p = self.client_remote.run(
458 args=['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
f64942e4 459 stdout=StringIO(), timeout=(15*60))
7c673cae
FG
460 return json.loads(p.stdout.getvalue())
461
462 def get_global_id(self):
463 """
464 Look up the CephFS client ID for this mount
465 """
7c673cae
FG
466 return self.admin_socket(['mds_sessions'])['id']
467
11fdf7f2
TL
468 def get_global_inst(self):
469 """
470 Look up the CephFS client instance for this mount
471 """
472 return self.inst
473
474 def get_global_addr(self):
475 """
476 Look up the CephFS client addr for this mount
477 """
478 return self.addr
479
28e407b8
AA
480 def get_client_pid(self):
481 """
482 return pid of ceph-fuse process
483 """
484 status = self.admin_socket(['status'])
485 return status['metadata']['pid']
486
7c673cae
FG
487 def get_osd_epoch(self):
488 """
489 Return 2-tuple of osd_epoch, osd_epoch_barrier
490 """
491 status = self.admin_socket(['status'])
492 return status['osd_epoch'], status['osd_epoch_barrier']
493
494 def get_dentry_count(self):
495 """
496 Return 2-tuple of dentry_count, dentry_pinned_count
497 """
498 status = self.admin_socket(['status'])
499 return status['dentry_count'], status['dentry_pinned_count']
500
501 def set_cache_size(self, size):
502 return self.admin_socket(['config', 'set', 'client_cache_size', str(size)])