]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/fuse_mount.py
update sources to 12.2.7
[ceph.git] / ceph / qa / tasks / cephfs / fuse_mount.py
1
2 from StringIO import StringIO
3 import json
4 import time
5 import logging
6 from textwrap import dedent
7
8 from teuthology import misc
9 from teuthology.contextutil import MaxWhileTries
10 from teuthology.orchestra import run
11 from teuthology.orchestra.run import CommandFailedError
12 from .mount import CephFSMount
13
14 log = logging.getLogger(__name__)
15
16
17 class FuseMount(CephFSMount):
18 def __init__(self, client_config, test_dir, client_id, client_remote):
19 super(FuseMount, self).__init__(test_dir, client_id, client_remote)
20
21 self.client_config = client_config if client_config else {}
22 self.fuse_daemon = None
23 self._fuse_conn = None
24
25 def mount(self, mount_path=None, mount_fs_name=None):
26 try:
27 return self._mount(mount_path, mount_fs_name)
28 except RuntimeError:
29 # Catch exceptions by the mount() logic (i.e. not remote command
30 # failures) and ensure the mount is not left half-up.
31 # Otherwise we might leave a zombie mount point that causes
32 # anyone traversing cephtest/ to get hung up on.
33 log.warn("Trying to clean up after failed mount")
34 self.umount_wait(force=True)
35 raise
36
37 def _mount(self, mount_path, mount_fs_name):
38 log.info("Client client.%s config is %s" % (self.client_id, self.client_config))
39
40 daemon_signal = 'kill'
41 if self.client_config.get('coverage') or self.client_config.get('valgrind') is not None:
42 daemon_signal = 'term'
43
44 log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
45 id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
46
47 self.client_remote.run(
48 args=[
49 'mkdir',
50 '--',
51 self.mountpoint,
52 ],
53 )
54
55 run_cmd = [
56 'sudo',
57 'adjust-ulimits',
58 'ceph-coverage',
59 '{tdir}/archive/coverage'.format(tdir=self.test_dir),
60 'daemon-helper',
61 daemon_signal,
62 ]
63
64 fuse_cmd = ['ceph-fuse', "-f"]
65
66 if mount_path is not None:
67 fuse_cmd += ["--client_mountpoint={0}".format(mount_path)]
68
69 if mount_fs_name is not None:
70 fuse_cmd += ["--client_mds_namespace={0}".format(mount_fs_name)]
71
72 fuse_cmd += [
73 '--name', 'client.{id}'.format(id=self.client_id),
74 # TODO ceph-fuse doesn't understand dash dash '--',
75 self.mountpoint,
76 ]
77
78 if self.client_config.get('valgrind') is not None:
79 run_cmd = misc.get_valgrind_args(
80 self.test_dir,
81 'client.{id}'.format(id=self.client_id),
82 run_cmd,
83 self.client_config.get('valgrind'),
84 )
85
86 run_cmd.extend(fuse_cmd)
87
88 def list_connections():
89 self.client_remote.run(
90 args=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
91 check_status=False
92 )
93 p = self.client_remote.run(
94 args=["ls", "/sys/fs/fuse/connections"],
95 stdout=StringIO(),
96 check_status=False
97 )
98 if p.exitstatus != 0:
99 return []
100
101 ls_str = p.stdout.getvalue().strip()
102 if ls_str:
103 return [int(n) for n in ls_str.split("\n")]
104 else:
105 return []
106
107 # Before starting ceph-fuse process, note the contents of
108 # /sys/fs/fuse/connections
109 pre_mount_conns = list_connections()
110 log.info("Pre-mount connections: {0}".format(pre_mount_conns))
111
112 proc = self.client_remote.run(
113 args=run_cmd,
114 logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
115 stdin=run.PIPE,
116 wait=False,
117 )
118 self.fuse_daemon = proc
119
120 # Wait for the connection reference to appear in /sys
121 mount_wait = self.client_config.get('mount_wait', 0)
122 if mount_wait > 0:
123 log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
124 time.sleep(mount_wait)
125 timeout = int(self.client_config.get('mount_timeout', 30))
126 waited = 0
127
128 post_mount_conns = list_connections()
129 while len(post_mount_conns) <= len(pre_mount_conns):
130 if self.fuse_daemon.finished:
131 # Did mount fail? Raise the CommandFailedError instead of
132 # hitting the "failed to populate /sys/" timeout
133 self.fuse_daemon.wait()
134 time.sleep(1)
135 waited += 1
136 if waited > timeout:
137 raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
138 waited
139 ))
140 else:
141 post_mount_conns = list_connections()
142
143 log.info("Post-mount connections: {0}".format(post_mount_conns))
144
145 # Record our fuse connection number so that we can use it when
146 # forcing an unmount
147 new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
148 if len(new_conns) == 0:
149 raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
150 elif len(new_conns) > 1:
151 raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
152 else:
153 self._fuse_conn = new_conns[0]
154
155 def is_mounted(self):
156 proc = self.client_remote.run(
157 args=[
158 'stat',
159 '--file-system',
160 '--printf=%T\n',
161 '--',
162 self.mountpoint,
163 ],
164 stdout=StringIO(),
165 stderr=StringIO(),
166 wait=False
167 )
168 try:
169 proc.wait()
170 except CommandFailedError:
171 if ("endpoint is not connected" in proc.stderr.getvalue()
172 or "Software caused connection abort" in proc.stderr.getvalue()):
173 # This happens is fuse is killed without unmount
174 log.warn("Found stale moutn point at {0}".format(self.mountpoint))
175 return True
176 else:
177 # This happens if the mount directory doesn't exist
178 log.info('mount point does not exist: %s', self.mountpoint)
179 return False
180
181 fstype = proc.stdout.getvalue().rstrip('\n')
182 if fstype == 'fuseblk':
183 log.info('ceph-fuse is mounted on %s', self.mountpoint)
184 return True
185 else:
186 log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
187 fstype=fstype))
188 return False
189
190 def wait_until_mounted(self):
191 """
192 Check to make sure that fuse is mounted on mountpoint. If not,
193 sleep for 5 seconds and check again.
194 """
195
196 while not self.is_mounted():
197 # Even if it's not mounted, it should at least
198 # be running: catch simple failures where it has terminated.
199 assert not self.fuse_daemon.poll()
200
201 time.sleep(5)
202
203 # Now that we're mounted, set permissions so that the rest of the test will have
204 # unrestricted access to the filesystem mount.
205 self.client_remote.run(
206 args=['sudo', 'chmod', '1777', self.mountpoint])
207
208 def _mountpoint_exists(self):
209 return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False).exitstatus == 0
210
211 def umount(self):
212 try:
213 log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
214 self.client_remote.run(
215 args=[
216 'sudo',
217 'fusermount',
218 '-u',
219 self.mountpoint,
220 ],
221 )
222 except run.CommandFailedError:
223 log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
224
225 self.client_remote.run(args=[
226 'sudo',
227 run.Raw('PATH=/usr/sbin:$PATH'),
228 'lsof',
229 run.Raw(';'),
230 'ps',
231 'auxf',
232 ])
233
234 # abort the fuse mount, killing all hung processes
235 if self._fuse_conn:
236 self.run_python(dedent("""
237 import os
238 path = "/sys/fs/fuse/connections/{0}/abort"
239 if os.path.exists(path):
240 open(path, "w").write("1")
241 """).format(self._fuse_conn))
242 self._fuse_conn = None
243
244 stderr = StringIO()
245 try:
246 # make sure its unmounted
247 self.client_remote.run(
248 args=[
249 'sudo',
250 'umount',
251 '-l',
252 '-f',
253 self.mountpoint,
254 ],
255 stderr=stderr
256 )
257 except CommandFailedError:
258 if self.is_mounted():
259 raise
260
261 assert not self.is_mounted()
262 self._fuse_conn = None
263
264 def umount_wait(self, force=False, require_clean=False, timeout=900):
265 """
266 :param force: Complete cleanly even if the MDS is offline
267 """
268 if force:
269 assert not require_clean # mutually exclusive
270
271 # When we expect to be forcing, kill the ceph-fuse process directly.
272 # This should avoid hitting the more aggressive fallback killing
273 # in umount() which can affect other mounts too.
274 self.fuse_daemon.stdin.close()
275
276 # However, we will still hit the aggressive wait if there is an ongoing
277 # mount -o remount (especially if the remount is stuck because MDSs
278 # are unavailable)
279
280 self.umount()
281
282 try:
283 if self.fuse_daemon:
284 # Permit a timeout, so that we do not block forever
285 run.wait([self.fuse_daemon], timeout)
286 except MaxWhileTries:
287 log.error("process failed to terminate after unmount. This probably"
288 "indicates a bug within ceph-fuse.")
289 raise
290 except CommandFailedError:
291 if require_clean:
292 raise
293
294 self.cleanup()
295
296 def cleanup(self):
297 """
298 Remove the mount point.
299
300 Prerequisite: the client is not mounted.
301 """
302 stderr = StringIO()
303 try:
304 self.client_remote.run(
305 args=[
306 'rmdir',
307 '--',
308 self.mountpoint,
309 ],
310 stderr=stderr
311 )
312 except CommandFailedError:
313 if "No such file or directory" in stderr.getvalue():
314 pass
315 else:
316 raise
317
318 def kill(self):
319 """
320 Terminate the client without removing the mount point.
321 """
322 self.fuse_daemon.stdin.close()
323 try:
324 self.fuse_daemon.wait()
325 except CommandFailedError:
326 pass
327
328 def kill_cleanup(self):
329 """
330 Follow up ``kill`` to get to a clean unmounted state.
331 """
332 self.umount()
333 self.cleanup()
334
335 def teardown(self):
336 """
337 Whatever the state of the mount, get it gone.
338 """
339 super(FuseMount, self).teardown()
340
341 self.umount()
342
343 if self.fuse_daemon and not self.fuse_daemon.finished:
344 self.fuse_daemon.stdin.close()
345 try:
346 self.fuse_daemon.wait()
347 except CommandFailedError:
348 pass
349
350 # Indiscriminate, unlike the touchier cleanup()
351 self.client_remote.run(
352 args=[
353 'rm',
354 '-rf',
355 self.mountpoint,
356 ],
357 )
358
359 def _asok_path(self):
360 return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id)
361
362 @property
363 def _prefix(self):
364 return ""
365
366 def admin_socket(self, args):
367 pyscript = """
368 import glob
369 import re
370 import os
371 import subprocess
372
373 def find_socket(client_name):
374 asok_path = "{asok_path}"
375 files = glob.glob(asok_path)
376
377 # Given a non-glob path, it better be there
378 if "*" not in asok_path:
379 assert(len(files) == 1)
380 return files[0]
381
382 for f in files:
383 pid = re.match(".*\.(\d+)\.asok$", f).group(1)
384 if os.path.exists("/proc/{{0}}".format(pid)):
385 return f
386 raise RuntimeError("Client socket {{0}} not found".format(client_name))
387
388 print find_socket("{client_name}")
389 """.format(
390 asok_path=self._asok_path(),
391 client_name="client.{0}".format(self.client_id))
392
393 # Find the admin socket
394 p = self.client_remote.run(args=[
395 'python', '-c', pyscript
396 ], stdout=StringIO())
397 asok_path = p.stdout.getvalue().strip()
398 log.info("Found client admin socket at {0}".format(asok_path))
399
400 # Query client ID from admin socket
401 p = self.client_remote.run(
402 args=['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
403 stdout=StringIO())
404 return json.loads(p.stdout.getvalue())
405
406 def get_global_id(self):
407 """
408 Look up the CephFS client ID for this mount
409 """
410 return self.admin_socket(['mds_sessions'])['id']
411
412 def get_client_pid(self):
413 """
414 return pid of ceph-fuse process
415 """
416 status = self.admin_socket(['status'])
417 return status['metadata']['pid']
418
419 def get_osd_epoch(self):
420 """
421 Return 2-tuple of osd_epoch, osd_epoch_barrier
422 """
423 status = self.admin_socket(['status'])
424 return status['osd_epoch'], status['osd_epoch_barrier']
425
426 def get_dentry_count(self):
427 """
428 Return 2-tuple of dentry_count, dentry_pinned_count
429 """
430 status = self.admin_socket(['status'])
431 return status['dentry_count'], status['dentry_pinned_count']
432
433 def set_cache_size(self, size):
434 return self.admin_socket(['config', 'set', 'client_cache_size', str(size)])