]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/fuse_mount.py
update source to 12.2.11
[ceph.git] / ceph / qa / tasks / cephfs / fuse_mount.py
1
2 from StringIO import StringIO
3 import json
4 import time
5 import logging
6 from textwrap import dedent
7
8 from teuthology import misc
9 from teuthology.contextutil import MaxWhileTries
10 from teuthology.orchestra import run
11 from teuthology.orchestra.run import CommandFailedError
12 from .mount import CephFSMount
13
14 log = logging.getLogger(__name__)
15
16
17 class FuseMount(CephFSMount):
18 def __init__(self, client_config, test_dir, client_id, client_remote):
19 super(FuseMount, self).__init__(test_dir, client_id, client_remote)
20
21 self.client_config = client_config if client_config else {}
22 self.fuse_daemon = None
23 self._fuse_conn = None
24
25 def mount(self, mount_path=None, mount_fs_name=None):
26 try:
27 return self._mount(mount_path, mount_fs_name)
28 except RuntimeError:
29 # Catch exceptions by the mount() logic (i.e. not remote command
30 # failures) and ensure the mount is not left half-up.
31 # Otherwise we might leave a zombie mount point that causes
32 # anyone traversing cephtest/ to get hung up on.
33 log.warn("Trying to clean up after failed mount")
34 self.umount_wait(force=True)
35 raise
36
37 def _mount(self, mount_path, mount_fs_name):
38 log.info("Client client.%s config is %s" % (self.client_id, self.client_config))
39
40 daemon_signal = 'kill'
41 if self.client_config.get('coverage') or self.client_config.get('valgrind') is not None:
42 daemon_signal = 'term'
43
44 log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
45 id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
46
47 self.client_remote.run(
48 args=[
49 'mkdir',
50 '--',
51 self.mountpoint,
52 ],
53 timeout=(15*60)
54 )
55
56 run_cmd = [
57 'sudo',
58 'adjust-ulimits',
59 'ceph-coverage',
60 '{tdir}/archive/coverage'.format(tdir=self.test_dir),
61 'daemon-helper',
62 daemon_signal,
63 ]
64
65 fuse_cmd = ['ceph-fuse', "-f"]
66
67 if mount_path is not None:
68 fuse_cmd += ["--client_mountpoint={0}".format(mount_path)]
69
70 if mount_fs_name is not None:
71 fuse_cmd += ["--client_mds_namespace={0}".format(mount_fs_name)]
72
73 fuse_cmd += [
74 '--name', 'client.{id}'.format(id=self.client_id),
75 # TODO ceph-fuse doesn't understand dash dash '--',
76 self.mountpoint,
77 ]
78
79 if self.client_config.get('valgrind') is not None:
80 run_cmd = misc.get_valgrind_args(
81 self.test_dir,
82 'client.{id}'.format(id=self.client_id),
83 run_cmd,
84 self.client_config.get('valgrind'),
85 )
86
87 run_cmd.extend(fuse_cmd)
88
89 def list_connections():
90 self.client_remote.run(
91 args=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
92 check_status=False,
93 timeout=(15*60)
94 )
95 p = self.client_remote.run(
96 args=["ls", "/sys/fs/fuse/connections"],
97 stdout=StringIO(),
98 check_status=False,
99 timeout=(15*60)
100 )
101 if p.exitstatus != 0:
102 return []
103
104 ls_str = p.stdout.getvalue().strip()
105 if ls_str:
106 return [int(n) for n in ls_str.split("\n")]
107 else:
108 return []
109
110 # Before starting ceph-fuse process, note the contents of
111 # /sys/fs/fuse/connections
112 pre_mount_conns = list_connections()
113 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
114
115 proc = self.client_remote.run(
116 args=run_cmd,
117 logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
118 stdin=run.PIPE,
119 wait=False,
120 )
121 self.fuse_daemon = proc
122
123 # Wait for the connection reference to appear in /sys
124 mount_wait = self.client_config.get('mount_wait', 0)
125 if mount_wait > 0:
126 log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
127 time.sleep(mount_wait)
128 timeout = int(self.client_config.get('mount_timeout', 30))
129 waited = 0
130
131 post_mount_conns = list_connections()
132 while len(post_mount_conns) <= len(pre_mount_conns):
133 if self.fuse_daemon.finished:
134 # Did mount fail? Raise the CommandFailedError instead of
135 # hitting the "failed to populate /sys/" timeout
136 self.fuse_daemon.wait()
137 time.sleep(1)
138 waited += 1
139 if waited > timeout:
140 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
141 waited
142 ))
143 else:
144 post_mount_conns = list_connections()
145
146 log.info("Post-mount connections: {0}".format(post_mount_conns))
147
148 # Record our fuse connection number so that we can use it when
149 # forcing an unmount
150 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
151 if len(new_conns) == 0:
152 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
153 elif len(new_conns) > 1:
154 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
155 else:
156 self._fuse_conn = new_conns[0]
157
158 def is_mounted(self):
159 proc = self.client_remote.run(
160 args=[
161 'stat',
162 '--file-system',
163 '--printf=%T\n',
164 '--',
165 self.mountpoint,
166 ],
167 stdout=StringIO(),
168 stderr=StringIO(),
169 wait=False,
170 timeout=(15*60)
171 )
172 try:
173 proc.wait()
174 except CommandFailedError:
175 if ("endpoint is not connected" in proc.stderr.getvalue()
176 or "Software caused connection abort" in proc.stderr.getvalue()):
177 # This happens is fuse is killed without unmount
178 log.warn("Found stale moutn point at {0}".format(self.mountpoint))
179 return True
180 else:
181 # This happens if the mount directory doesn't exist
182 log.info('mount point does not exist: %s', self.mountpoint)
183 return False
184
185 fstype = proc.stdout.getvalue().rstrip('\n')
186 if fstype == 'fuseblk':
187 log.info('ceph-fuse is mounted on %s', self.mountpoint)
188 return True
189 else:
190 log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
191 fstype=fstype))
192 return False
193
194 def wait_until_mounted(self):
195 """
196 Check to make sure that fuse is mounted on mountpoint. If not,
197 sleep for 5 seconds and check again.
198 """
199
200 while not self.is_mounted():
201 # Even if it's not mounted, it should at least
202 # be running: catch simple failures where it has terminated.
203 assert not self.fuse_daemon.poll()
204
205 time.sleep(5)
206
207 # Now that we're mounted, set permissions so that the rest of the test will have
208 # unrestricted access to the filesystem mount.
209 try:
210 stderr = StringIO()
211 self.client_remote.run(args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(15*60), stderr=stderr)
212 except run.CommandFailedError:
213 stderr = stderr.getvalue()
214 if "Read-only file system".lower() in stderr.lower():
215 pass
216 else:
217 raise
218
219 def _mountpoint_exists(self):
220 return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False, timeout=(15*60)).exitstatus == 0
221
222 def umount(self):
223 try:
224 log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
225 self.client_remote.run(
226 args=[
227 'sudo',
228 'fusermount',
229 '-u',
230 self.mountpoint,
231 ],
232 timeout=(30*60),
233 )
234 except run.CommandFailedError:
235 log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
236
237 self.client_remote.run(args=[
238 'sudo',
239 run.Raw('PATH=/usr/sbin:$PATH'),
240 'lsof',
241 run.Raw(';'),
242 'ps',
243 'auxf',
244 ], timeout=(60*15))
245
246 # abort the fuse mount, killing all hung processes
247 if self._fuse_conn:
248 self.run_python(dedent("""
249 import os
250 path = "/sys/fs/fuse/connections/{0}/abort"
251 if os.path.exists(path):
252 open(path, "w").write("1")
253 """).format(self._fuse_conn))
254 self._fuse_conn = None
255
256 stderr = StringIO()
257 try:
258 # make sure its unmounted
259 self.client_remote.run(
260 args=[
261 'sudo',
262 'umount',
263 '-l',
264 '-f',
265 self.mountpoint,
266 ],
267 stderr=stderr,
268 timeout=(60*15)
269 )
270 except CommandFailedError:
271 if self.is_mounted():
272 raise
273
274 assert not self.is_mounted()
275 self._fuse_conn = None
276
277 def umount_wait(self, force=False, require_clean=False, timeout=900):
278 """
279 :param force: Complete cleanly even if the MDS is offline
280 """
281 if force:
282 assert not require_clean # mutually exclusive
283
284 # When we expect to be forcing, kill the ceph-fuse process directly.
285 # This should avoid hitting the more aggressive fallback killing
286 # in umount() which can affect other mounts too.
287 self.fuse_daemon.stdin.close()
288
289 # However, we will still hit the aggressive wait if there is an ongoing
290 # mount -o remount (especially if the remount is stuck because MDSs
291 # are unavailable)
292
293 self.umount()
294
295 try:
296 if self.fuse_daemon:
297 # Permit a timeout, so that we do not block forever
298 run.wait([self.fuse_daemon], timeout)
299 except MaxWhileTries:
300 log.error("process failed to terminate after unmount. This probably"
301 "indicates a bug within ceph-fuse.")
302 raise
303 except CommandFailedError:
304 if require_clean:
305 raise
306
307 self.cleanup()
308
309 def cleanup(self):
310 """
311 Remove the mount point.
312
313 Prerequisite: the client is not mounted.
314 """
315 stderr = StringIO()
316 try:
317 self.client_remote.run(
318 args=[
319 'rmdir',
320 '--',
321 self.mountpoint,
322 ],
323 stderr=stderr,
324 timeout=(60*5)
325 )
326 except CommandFailedError:
327 if "No such file or directory" in stderr.getvalue():
328 pass
329 else:
330 raise
331
332 def kill(self):
333 """
334 Terminate the client without removing the mount point.
335 """
336 self.fuse_daemon.stdin.close()
337 try:
338 self.fuse_daemon.wait()
339 except CommandFailedError:
340 pass
341
342 def kill_cleanup(self):
343 """
344 Follow up ``kill`` to get to a clean unmounted state.
345 """
346 self.umount()
347 self.cleanup()
348
349 def teardown(self):
350 """
351 Whatever the state of the mount, get it gone.
352 """
353 super(FuseMount, self).teardown()
354
355 self.umount()
356
357 if self.fuse_daemon and not self.fuse_daemon.finished:
358 self.fuse_daemon.stdin.close()
359 try:
360 self.fuse_daemon.wait()
361 except CommandFailedError:
362 pass
363
364 # Indiscriminate, unlike the touchier cleanup()
365 self.client_remote.run(
366 args=[
367 'rm',
368 '-rf',
369 self.mountpoint,
370 ],
371 timeout=(60*5)
372 )
373
374 def _asok_path(self):
375 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id)
376
377 @property
378 def _prefix(self):
379 return ""
380
381 def admin_socket(self, args):
382 pyscript = """
383 import glob
384 import re
385 import os
386 import subprocess
387
388 def find_socket(client_name):
389 asok_path = "{asok_path}"
390 files = glob.glob(asok_path)
391
392 # Given a non-glob path, it better be there
393 if "*" not in asok_path:
394 assert(len(files) == 1)
395 return files[0]
396
397 for f in files:
398 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
399 if os.path.exists("/proc/{{0}}".format(pid)):
400 return f
401 raise RuntimeError("Client socket {{0}} not found".format(client_name))
402
403 print find_socket("{client_name}")
404 """.format(
405 asok_path=self._asok_path(),
406 client_name="client.{0}".format(self.client_id))
407
408 # Find the admin socket
409 p = self.client_remote.run(args=[
410 'sudo', 'python2', '-c', pyscript
411 ], stdout=StringIO(), timeout=(15*60))
412 asok_path = p.stdout.getvalue().strip()
413 log.info("Found client admin socket at {0}".format(asok_path))
414
415 # Query client ID from admin socket
416 p = self.client_remote.run(
417 args=['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
418 stdout=StringIO(), timeout=(15*60))
419 return json.loads(p.stdout.getvalue())
420
421 def get_global_id(self):
422 """
423 Look up the CephFS client ID for this mount
424 """
425 return self.admin_socket(['mds_sessions'])['id']
426
427 def get_client_pid(self):
428 """
429 return pid of ceph-fuse process
430 """
431 status = self.admin_socket(['status'])
432 return status['metadata']['pid']
433
434 def get_osd_epoch(self):
435 """
436 Return 2-tuple of osd_epoch, osd_epoch_barrier
437 """
438 status = self.admin_socket(['status'])
439 return status['osd_epoch'], status['osd_epoch_barrier']
440
441 def get_dentry_count(self):
442 """
443 Return 2-tuple of dentry_count, dentry_pinned_count
444 """
445 status = self.admin_socket(['status'])
446 return status['dentry_count'], status['dentry_pinned_count']
447
448 def set_cache_size(self, size):
449 return self.admin_socket(['config', 'set', 'client_cache_size', str(size)])