]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/fuse_mount.py
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / qa / tasks / cephfs / fuse_mount.py
CommitLineData
7c673cae
FG
1
2from StringIO import StringIO
3import json
4import time
5import logging
6from textwrap import dedent
7
8from teuthology import misc
9from teuthology.contextutil import MaxWhileTries
10from teuthology.orchestra import run
11from teuthology.orchestra.run import CommandFailedError
12from .mount import CephFSMount
13
14log = logging.getLogger(__name__)
15
16
17class FuseMount(CephFSMount):
18 def __init__(self, client_config, test_dir, client_id, client_remote):
19 super(FuseMount, self).__init__(test_dir, client_id, client_remote)
20
21 self.client_config = client_config if client_config else {}
22 self.fuse_daemon = None
23 self._fuse_conn = None
24
25 def mount(self, mount_path=None, mount_fs_name=None):
26 try:
27 return self._mount(mount_path, mount_fs_name)
28 except RuntimeError:
29 # Catch exceptions by the mount() logic (i.e. not remote command
30 # failures) and ensure the mount is not left half-up.
31 # Otherwise we might leave a zombie mount point that causes
32 # anyone traversing cephtest/ to get hung up on.
33 log.warn("Trying to clean up after failed mount")
34 self.umount_wait(force=True)
35 raise
36
37 def _mount(self, mount_path, mount_fs_name):
38 log.info("Client client.%s config is %s" % (self.client_id, self.client_config))
39
40 daemon_signal = 'kill'
41 if self.client_config.get('coverage') or self.client_config.get('valgrind') is not None:
42 daemon_signal = 'term'
43
44 log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
45 id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
46
47 self.client_remote.run(
48 args=[
49 'mkdir',
50 '--',
51 self.mountpoint,
52 ],
53 )
54
55 run_cmd = [
56 'sudo',
57 'adjust-ulimits',
58 'ceph-coverage',
59 '{tdir}/archive/coverage'.format(tdir=self.test_dir),
60 'daemon-helper',
61 daemon_signal,
62 ]
63
64 fuse_cmd = ['ceph-fuse', "-f"]
65
66 if mount_path is not None:
67 fuse_cmd += ["--client_mountpoint={0}".format(mount_path)]
68
69 if mount_fs_name is not None:
70 fuse_cmd += ["--client_mds_namespace={0}".format(mount_fs_name)]
71
72 fuse_cmd += [
73 '--name', 'client.{id}'.format(id=self.client_id),
74 # TODO ceph-fuse doesn't understand dash dash '--',
75 self.mountpoint,
76 ]
77
78 if self.client_config.get('valgrind') is not None:
79 run_cmd = misc.get_valgrind_args(
80 self.test_dir,
81 'client.{id}'.format(id=self.client_id),
82 run_cmd,
83 self.client_config.get('valgrind'),
84 )
85
86 run_cmd.extend(fuse_cmd)
87
88 def list_connections():
89 self.client_remote.run(
90 args=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
91 check_status=False
92 )
93 p = self.client_remote.run(
94 args=["ls", "/sys/fs/fuse/connections"],
95 stdout=StringIO(),
96 check_status=False
97 )
98 if p.exitstatus != 0:
99 return []
100
101 ls_str = p.stdout.getvalue().strip()
102 if ls_str:
103 return [int(n) for n in ls_str.split("\n")]
104 else:
105 return []
106
107 # Before starting ceph-fuse process, note the contents of
108 # /sys/fs/fuse/connections
109 pre_mount_conns = list_connections()
110 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
111
112 proc = self.client_remote.run(
113 args=run_cmd,
114 logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
115 stdin=run.PIPE,
116 wait=False,
117 )
118 self.fuse_daemon = proc
119
120 # Wait for the connection reference to appear in /sys
121 mount_wait = self.client_config.get('mount_wait', 0)
122 if mount_wait > 0:
123 log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
124 time.sleep(mount_wait)
125 timeout = int(self.client_config.get('mount_timeout', 30))
126 waited = 0
127
128 post_mount_conns = list_connections()
129 while len(post_mount_conns) <= len(pre_mount_conns):
130 if self.fuse_daemon.finished:
131 # Did mount fail? Raise the CommandFailedError instead of
132 # hitting the "failed to populate /sys/" timeout
133 self.fuse_daemon.wait()
134 time.sleep(1)
135 waited += 1
136 if waited > timeout:
137 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
138 waited
139 ))
140 else:
141 post_mount_conns = list_connections()
142
143 log.info("Post-mount connections: {0}".format(post_mount_conns))
144
145 # Record our fuse connection number so that we can use it when
146 # forcing an unmount
147 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
148 if len(new_conns) == 0:
149 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
150 elif len(new_conns) > 1:
151 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
152 else:
153 self._fuse_conn = new_conns[0]
154
155 def is_mounted(self):
156 proc = self.client_remote.run(
157 args=[
158 'stat',
159 '--file-system',
160 '--printf=%T\n',
161 '--',
162 self.mountpoint,
163 ],
164 stdout=StringIO(),
165 stderr=StringIO(),
166 wait=False
167 )
168 try:
169 proc.wait()
170 except CommandFailedError:
171 if ("endpoint is not connected" in proc.stderr.getvalue()
172 or "Software caused connection abort" in proc.stderr.getvalue()):
173 # This happens is fuse is killed without unmount
174 log.warn("Found stale moutn point at {0}".format(self.mountpoint))
175 return True
176 else:
177 # This happens if the mount directory doesn't exist
178 log.info('mount point does not exist: %s', self.mountpoint)
179 return False
180
181 fstype = proc.stdout.getvalue().rstrip('\n')
182 if fstype == 'fuseblk':
183 log.info('ceph-fuse is mounted on %s', self.mountpoint)
184 return True
185 else:
186 log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
187 fstype=fstype))
188 return False
189
190 def wait_until_mounted(self):
191 """
192 Check to make sure that fuse is mounted on mountpoint. If not,
193 sleep for 5 seconds and check again.
194 """
195
196 while not self.is_mounted():
197 # Even if it's not mounted, it should at least
198 # be running: catch simple failures where it has terminated.
199 assert not self.fuse_daemon.poll()
200
201 time.sleep(5)
202
203 # Now that we're mounted, set permissions so that the rest of the test will have
204 # unrestricted access to the filesystem mount.
205 self.client_remote.run(
206 args=['sudo', 'chmod', '1777', self.mountpoint])
207
208 def _mountpoint_exists(self):
209 return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False).exitstatus == 0
210
211 def umount(self):
212 try:
213 log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
214 self.client_remote.run(
215 args=[
216 'sudo',
217 'fusermount',
218 '-u',
219 self.mountpoint,
220 ],
221 )
222 except run.CommandFailedError:
223 log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
224
225 # abort the fuse mount, killing all hung processes
226 if self._fuse_conn:
227 self.run_python(dedent("""
228 import os
229 path = "/sys/fs/fuse/connections/{0}/abort"
230 if os.path.exists(path):
231 open(path, "w").write("1")
232 """).format(self._fuse_conn))
233 self._fuse_conn = None
234
235 stderr = StringIO()
236 try:
237 # make sure its unmounted
238 self.client_remote.run(
239 args=[
240 'sudo',
241 'umount',
242 '-l',
243 '-f',
244 self.mountpoint,
245 ],
246 stderr=stderr
247 )
248 except CommandFailedError:
249 if self.is_mounted():
250 raise
251
252 assert not self.is_mounted()
253 self._fuse_conn = None
254
255 def umount_wait(self, force=False, require_clean=False):
256 """
257 :param force: Complete cleanly even if the MDS is offline
258 """
259 if force:
260 assert not require_clean # mutually exclusive
261
262 # When we expect to be forcing, kill the ceph-fuse process directly.
263 # This should avoid hitting the more aggressive fallback killing
264 # in umount() which can affect other mounts too.
265 self.fuse_daemon.stdin.close()
266
267 # However, we will still hit the aggressive wait if there is an ongoing
268 # mount -o remount (especially if the remount is stuck because MDSs
269 # are unavailable)
270
271 self.umount()
272
273 try:
274 if self.fuse_daemon:
275 # Permit a timeout, so that we do not block forever
276 run.wait([self.fuse_daemon], 900)
277 except MaxWhileTries:
278 log.error("process failed to terminate after unmount. This probably"
279 "indicates a bug within ceph-fuse.")
280 raise
281 except CommandFailedError:
282 if require_clean:
283 raise
284
285 self.cleanup()
286
287 def cleanup(self):
288 """
289 Remove the mount point.
290
291 Prerequisite: the client is not mounted.
292 """
293 stderr = StringIO()
294 try:
295 self.client_remote.run(
296 args=[
297 'rmdir',
298 '--',
299 self.mountpoint,
300 ],
301 stderr=stderr
302 )
303 except CommandFailedError:
304 if "No such file or directory" in stderr.getvalue():
305 pass
306 else:
307 raise
308
309 def kill(self):
310 """
311 Terminate the client without removing the mount point.
312 """
313 self.fuse_daemon.stdin.close()
314 try:
315 self.fuse_daemon.wait()
316 except CommandFailedError:
317 pass
318
319 def kill_cleanup(self):
320 """
321 Follow up ``kill`` to get to a clean unmounted state.
322 """
323 self.umount()
324 self.cleanup()
325
326 def teardown(self):
327 """
328 Whatever the state of the mount, get it gone.
329 """
330 super(FuseMount, self).teardown()
331
332 self.umount()
333
334 if self.fuse_daemon and not self.fuse_daemon.finished:
335 self.fuse_daemon.stdin.close()
336 try:
337 self.fuse_daemon.wait()
338 except CommandFailedError:
339 pass
340
341 # Indiscriminate, unlike the touchier cleanup()
342 self.client_remote.run(
343 args=[
344 'rm',
345 '-rf',
346 self.mountpoint,
347 ],
348 )
349
350 def _asok_path(self):
351 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id)
352
353 @property
354 def _prefix(self):
355 return ""
356
357 def admin_socket(self, args):
358 pyscript = """
359import glob
360import re
361import os
362import subprocess
363
364def find_socket(client_name):
365 asok_path = "{asok_path}"
366 files = glob.glob(asok_path)
367
368 # Given a non-glob path, it better be there
369 if "*" not in asok_path:
370 assert(len(files) == 1)
371 return files[0]
372
373 for f in files:
374 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
375 if os.path.exists("/proc/{{0}}".format(pid)):
376 return f
377 raise RuntimeError("Client socket {{0}} not found".format(client_name))
378
379print find_socket("{client_name}")
380""".format(
381 asok_path=self._asok_path(),
382 client_name="client.{0}".format(self.client_id))
383
384 # Find the admin socket
385 p = self.client_remote.run(args=[
386 'python', '-c', pyscript
387 ], stdout=StringIO())
388 asok_path = p.stdout.getvalue().strip()
389 log.info("Found client admin socket at {0}".format(asok_path))
390
391 # Query client ID from admin socket
392 p = self.client_remote.run(
393 args=['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
394 stdout=StringIO())
395 return json.loads(p.stdout.getvalue())
396
397 def get_global_id(self):
398 """
399 Look up the CephFS client ID for this mount
400 """
401
402 return self.admin_socket(['mds_sessions'])['id']
403
404 def get_osd_epoch(self):
405 """
406 Return 2-tuple of osd_epoch, osd_epoch_barrier
407 """
408 status = self.admin_socket(['status'])
409 return status['osd_epoch'], status['osd_epoch_barrier']
410
411 def get_dentry_count(self):
412 """
413 Return 2-tuple of dentry_count, dentry_pinned_count
414 """
415 status = self.admin_socket(['status'])
416 return status['dentry_count'], status['dentry_pinned_count']
417
418 def set_cache_size(self, size):
419 return self.admin_socket(['config', 'set', 'client_cache_size', str(size)])