]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | import json |
2 | import time | |
3 | import logging | |
e306af50 | 4 | |
f67539c2 | 5 | from io import StringIO |
7c673cae FG |
6 | from textwrap import dedent |
7 | ||
7c673cae | 8 | from teuthology.contextutil import MaxWhileTries |
f67539c2 | 9 | from teuthology.contextutil import safe_while |
7c673cae FG |
10 | from teuthology.orchestra import run |
11 | from teuthology.orchestra.run import CommandFailedError | |
f67539c2 | 12 | from tasks.ceph_manager import get_valgrind_args |
e306af50 | 13 | from tasks.cephfs.mount import CephFSMount |
7c673cae FG |
14 | |
15 | log = logging.getLogger(__name__) | |
16 | ||
f67539c2 | 17 | # Refer mount.py for docstrings. |
7c673cae | 18 | class FuseMount(CephFSMount): |
f67539c2 TL |
19 | def __init__(self, ctx, client_config, test_dir, client_id, |
20 | client_remote, client_keyring_path=None, cephfs_name=None, | |
21 | cephfs_mntpt=None, hostfs_mntpt=None, brxnet=None): | |
22 | super(FuseMount, self).__init__(ctx=ctx, test_dir=test_dir, | |
23 | client_id=client_id, client_remote=client_remote, | |
24 | client_keyring_path=client_keyring_path, hostfs_mntpt=hostfs_mntpt, | |
25 | cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet) | |
7c673cae FG |
26 | |
27 | self.client_config = client_config if client_config else {} | |
28 | self.fuse_daemon = None | |
29 | self._fuse_conn = None | |
11fdf7f2 TL |
30 | self.id = None |
31 | self.inst = None | |
32 | self.addr = None | |
7c673cae | 33 | |
f67539c2 TL |
34 | def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs): |
35 | self.update_attrs(**kwargs) | |
36 | self.assert_and_log_minimum_mount_details() | |
37 | ||
38 | self.setup_netns() | |
39 | ||
40 | if createfs: | |
41 | # TODO: don't call setupfs() from within mount(), since it's | |
42 | # absurd. The proper order should be: create FS first and then | |
43 | # call mount(). | |
44 | self.setupfs(name=self.cephfs_name) | |
11fdf7f2 | 45 | |
7c673cae | 46 | try: |
f67539c2 | 47 | return self._mount(mntopts, check_status) |
7c673cae FG |
48 | except RuntimeError: |
49 | # Catch exceptions by the mount() logic (i.e. not remote command | |
50 | # failures) and ensure the mount is not left half-up. | |
51 | # Otherwise we might leave a zombie mount point that causes | |
52 | # anyone traversing cephtest/ to get hung up on. | |
e306af50 | 53 | log.warning("Trying to clean up after failed mount") |
7c673cae FG |
54 | self.umount_wait(force=True) |
55 | raise | |
56 | ||
f67539c2 TL |
57 | def _mount(self, mntopts, check_status): |
58 | log.info("Client client.%s config is %s" % (self.client_id, | |
59 | self.client_config)) | |
7c673cae FG |
60 | |
61 | daemon_signal = 'kill' | |
f67539c2 TL |
62 | if self.client_config.get('coverage') or \ |
63 | self.client_config.get('valgrind') is not None: | |
7c673cae FG |
64 | daemon_signal = 'term' |
65 | ||
f91f0fd5 TL |
66 | # Use 0000 mode to prevent undesired modifications to the mountpoint on |
67 | # the local file system. | |
f67539c2 TL |
68 | script = f'mkdir -m 0000 -p -v {self.hostfs_mntpt}'.split() |
69 | stderr = StringIO() | |
70 | try: | |
71 | self.client_remote.run(args=script, timeout=(15*60), | |
72 | stderr=StringIO()) | |
73 | except CommandFailedError: | |
74 | if 'file exists' not in stderr.getvalue().lower(): | |
75 | raise | |
7c673cae FG |
76 | |
77 | run_cmd = [ | |
78 | 'sudo', | |
79 | 'adjust-ulimits', | |
80 | 'ceph-coverage', | |
81 | '{tdir}/archive/coverage'.format(tdir=self.test_dir), | |
82 | 'daemon-helper', | |
83 | daemon_signal, | |
84 | ] | |
85 | ||
f67539c2 TL |
86 | fuse_cmd = [ |
87 | 'ceph-fuse', "-f", | |
88 | "--admin-socket", "/var/run/ceph/$cluster-$name.$pid.asok", | |
7c673cae | 89 | ] |
f67539c2 TL |
90 | if self.client_id is not None: |
91 | fuse_cmd += ['--id', self.client_id] | |
92 | if self.client_keyring_path and self.client_id is not None: | |
93 | fuse_cmd += ['-k', self.client_keyring_path] | |
94 | if self.cephfs_mntpt is not None: | |
95 | fuse_cmd += ["--client_mountpoint=" + self.cephfs_mntpt] | |
96 | if self.cephfs_name is not None: | |
97 | fuse_cmd += ["--client_fs=" + self.cephfs_name] | |
98 | if mntopts: | |
99 | fuse_cmd += mntopts | |
100 | fuse_cmd.append(self.hostfs_mntpt) | |
7c673cae FG |
101 | |
102 | if self.client_config.get('valgrind') is not None: | |
f67539c2 | 103 | run_cmd = get_valgrind_args( |
7c673cae FG |
104 | self.test_dir, |
105 | 'client.{id}'.format(id=self.client_id), | |
106 | run_cmd, | |
107 | self.client_config.get('valgrind'), | |
f67539c2 | 108 | cd=False |
7c673cae | 109 | ) |
f67539c2 TL |
110 | |
111 | netns_prefix = ['sudo', 'nsenter', | |
112 | '--net=/var/run/netns/{0}'.format(self.netns_name)] | |
113 | run_cmd = netns_prefix + run_cmd | |
7c673cae FG |
114 | |
115 | run_cmd.extend(fuse_cmd) | |
116 | ||
117 | def list_connections(): | |
e306af50 TL |
118 | conn_dir = "/sys/fs/fuse/connections" |
119 | ||
120 | self.client_remote.run(args=['sudo', 'modprobe', 'fuse'], | |
121 | check_status=False) | |
7c673cae | 122 | self.client_remote.run( |
e306af50 TL |
123 | args=["sudo", "mount", "-t", "fusectl", conn_dir, conn_dir], |
124 | check_status=False, timeout=(30)) | |
125 | ||
126 | try: | |
127 | ls_str = self.client_remote.sh("ls " + conn_dir, | |
128 | stdout=StringIO(), | |
129 | timeout=(15*60)).strip() | |
130 | except CommandFailedError: | |
7c673cae FG |
131 | return [] |
132 | ||
7c673cae FG |
133 | if ls_str: |
134 | return [int(n) for n in ls_str.split("\n")] | |
135 | else: | |
136 | return [] | |
137 | ||
138 | # Before starting ceph-fuse process, note the contents of | |
139 | # /sys/fs/fuse/connections | |
140 | pre_mount_conns = list_connections() | |
141 | log.info("Pre-mount connections: {0}".format(pre_mount_conns)) | |
142 | ||
f67539c2 TL |
143 | mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO() |
144 | self.fuse_daemon = self.client_remote.run( | |
7c673cae FG |
145 | args=run_cmd, |
146 | logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)), | |
147 | stdin=run.PIPE, | |
f67539c2 TL |
148 | stdout=mountcmd_stdout, |
149 | stderr=mountcmd_stderr, | |
150 | wait=False | |
7c673cae | 151 | ) |
7c673cae FG |
152 | |
153 | # Wait for the connection reference to appear in /sys | |
154 | mount_wait = self.client_config.get('mount_wait', 0) | |
155 | if mount_wait > 0: | |
156 | log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait)) | |
157 | time.sleep(mount_wait) | |
158 | timeout = int(self.client_config.get('mount_timeout', 30)) | |
159 | waited = 0 | |
160 | ||
161 | post_mount_conns = list_connections() | |
162 | while len(post_mount_conns) <= len(pre_mount_conns): | |
163 | if self.fuse_daemon.finished: | |
164 | # Did mount fail? Raise the CommandFailedError instead of | |
165 | # hitting the "failed to populate /sys/" timeout | |
f67539c2 TL |
166 | try: |
167 | self.fuse_daemon.wait() | |
168 | except CommandFailedError as e: | |
169 | log.info('mount command failed.') | |
170 | if check_status: | |
171 | raise | |
172 | else: | |
173 | return (e, mountcmd_stdout.getvalue(), | |
174 | mountcmd_stderr.getvalue()) | |
7c673cae FG |
175 | time.sleep(1) |
176 | waited += 1 | |
177 | if waited > timeout: | |
f67539c2 TL |
178 | raise RuntimeError( |
179 | "Fuse mount failed to populate/sys/ after {} " | |
180 | "seconds".format(waited)) | |
7c673cae FG |
181 | else: |
182 | post_mount_conns = list_connections() | |
183 | ||
184 | log.info("Post-mount connections: {0}".format(post_mount_conns)) | |
185 | ||
186 | # Record our fuse connection number so that we can use it when | |
187 | # forcing an unmount | |
188 | new_conns = list(set(post_mount_conns) - set(pre_mount_conns)) | |
189 | if len(new_conns) == 0: | |
190 | raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns)) | |
191 | elif len(new_conns) > 1: | |
192 | raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns)) | |
193 | else: | |
194 | self._fuse_conn = new_conns[0] | |
195 | ||
11fdf7f2 TL |
196 | self.gather_mount_info() |
197 | ||
f67539c2 TL |
198 | self.mounted = True |
199 | ||
11fdf7f2 TL |
200 | def gather_mount_info(self): |
201 | status = self.admin_socket(['status']) | |
202 | self.id = status['id'] | |
eafe8130 | 203 | self.client_pid = status['metadata']['pid'] |
11fdf7f2 TL |
204 | try: |
205 | self.inst = status['inst_str'] | |
206 | self.addr = status['addr_str'] | |
9f95a23c | 207 | except KeyError: |
11fdf7f2 TL |
208 | sessions = self.fs.rank_asok(['session', 'ls']) |
209 | for s in sessions: | |
210 | if s['id'] == self.id: | |
211 | self.inst = s['inst'] | |
212 | self.addr = self.inst.split()[1] | |
213 | if self.inst is None: | |
214 | raise RuntimeError("cannot find client session") | |
215 | ||
f67539c2 | 216 | def check_mounted_state(self): |
7c673cae FG |
217 | proc = self.client_remote.run( |
218 | args=[ | |
219 | 'stat', | |
220 | '--file-system', | |
221 | '--printf=%T\n', | |
222 | '--', | |
f67539c2 | 223 | self.hostfs_mntpt, |
7c673cae | 224 | ], |
f67539c2 TL |
225 | stdout=StringIO(), |
226 | stderr=StringIO(), | |
f64942e4 AA |
227 | wait=False, |
228 | timeout=(15*60) | |
7c673cae FG |
229 | ) |
230 | try: | |
231 | proc.wait() | |
232 | except CommandFailedError: | |
f67539c2 TL |
233 | error = proc.stderr.getvalue() |
234 | if ("endpoint is not connected" in error | |
235 | or "Software caused connection abort" in error): | |
236 | # This happens is fuse is killed without unmount | |
237 | log.warning("Found stale mount point at {0}".format(self.hostfs_mntpt)) | |
238 | return True | |
239 | else: | |
240 | # This happens if the mount directory doesn't exist | |
241 | log.info('mount point does not exist: %s', self.hostfs_mntpt) | |
242 | return False | |
7c673cae | 243 | |
f67539c2 | 244 | fstype = proc.stdout.getvalue().rstrip('\n') |
7c673cae | 245 | if fstype == 'fuseblk': |
f67539c2 | 246 | log.info('ceph-fuse is mounted on %s', self.hostfs_mntpt) |
7c673cae FG |
247 | return True |
248 | else: | |
249 | log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format( | |
250 | fstype=fstype)) | |
251 | return False | |
252 | ||
253 | def wait_until_mounted(self): | |
254 | """ | |
255 | Check to make sure that fuse is mounted on mountpoint. If not, | |
256 | sleep for 5 seconds and check again. | |
257 | """ | |
258 | ||
f67539c2 | 259 | while not self.check_mounted_state(): |
7c673cae FG |
260 | # Even if it's not mounted, it should at least |
261 | # be running: catch simple failures where it has terminated. | |
262 | assert not self.fuse_daemon.poll() | |
263 | ||
264 | time.sleep(5) | |
265 | ||
f67539c2 TL |
266 | self.mounted = True |
267 | ||
268 | # Now that we're mounted, set permissions so that the rest of the test | |
269 | # will have unrestricted access to the filesystem mount. | |
270 | for retry in range(10): | |
271 | try: | |
272 | stderr = StringIO() | |
273 | self.client_remote.run(args=['sudo', 'chmod', '1777', | |
274 | self.hostfs_mntpt], | |
275 | timeout=(15*60), | |
276 | stderr=stderr, omit_sudo=False) | |
277 | break | |
278 | except run.CommandFailedError: | |
279 | stderr = stderr.getvalue().lower() | |
280 | if "read-only file system" in stderr: | |
281 | break | |
282 | elif "permission denied" in stderr: | |
283 | time.sleep(5) | |
284 | else: | |
285 | raise | |
7c673cae FG |
286 | |
287 | def _mountpoint_exists(self): | |
f67539c2 | 288 | return self.client_remote.run(args=["ls", "-d", self.hostfs_mntpt], check_status=False, timeout=(15*60)).exitstatus == 0 |
7c673cae | 289 | |
f67539c2 TL |
290 | def umount(self, cleanup=True): |
291 | """ | |
292 | umount() must not run cleanup() when it's called by umount_wait() | |
293 | since "run.wait([self.fuse_daemon], timeout)" would hang otherwise. | |
294 | """ | |
1911f103 | 295 | if not self.is_mounted(): |
f67539c2 TL |
296 | if cleanup: |
297 | self.cleanup() | |
1911f103 TL |
298 | return |
299 | ||
7c673cae FG |
300 | try: |
301 | log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name)) | |
f67539c2 TL |
302 | stderr = StringIO() |
303 | self.client_remote.run(args=['sudo', 'fusermount', '-u', | |
304 | self.hostfs_mntpt], | |
305 | stderr=stderr, | |
306 | timeout=(30*60), omit_sudo=False) | |
7c673cae | 307 | except run.CommandFailedError: |
f67539c2 TL |
308 | if "mountpoint not found" in stderr.getvalue(): |
309 | # This happens if the mount directory doesn't exist | |
310 | log.info('mount point does not exist: %s', self.mountpoint) | |
311 | elif "not mounted" in stderr.getvalue(): | |
312 | # This happens if the mount directory already unmouted | |
313 | log.info('mount point not mounted: %s', self.mountpoint) | |
314 | else: | |
315 | log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name)) | |
7c673cae | 316 | |
f67539c2 TL |
317 | self.client_remote.run( |
318 | args=['sudo', run.Raw('PATH=/usr/sbin:$PATH'), 'lsof', | |
319 | run.Raw(';'), 'ps', 'auxf'], | |
320 | timeout=(60*15), omit_sudo=False) | |
321 | ||
322 | # abort the fuse mount, killing all hung processes | |
323 | if self._fuse_conn: | |
324 | self.run_python(dedent(""" | |
325 | import os | |
326 | path = "/sys/fs/fuse/connections/{0}/abort" | |
327 | if os.path.exists(path): | |
328 | open(path, "w").write("1") | |
329 | """).format(self._fuse_conn)) | |
330 | self._fuse_conn = None | |
331 | ||
332 | stderr = StringIO() | |
333 | # make sure its unmounted | |
334 | try: | |
335 | self.client_remote.run(args=['sudo', 'umount', '-l', '-f', | |
336 | self.hostfs_mntpt], | |
337 | stderr=stderr, timeout=(60*15), omit_sudo=False) | |
338 | except CommandFailedError: | |
339 | if self.is_mounted(): | |
340 | raise | |
341 | ||
342 | self.mounted = False | |
7c673cae | 343 | self._fuse_conn = None |
11fdf7f2 TL |
344 | self.id = None |
345 | self.inst = None | |
346 | self.addr = None | |
f67539c2 TL |
347 | if cleanup: |
348 | self.cleanup() | |
7c673cae | 349 | |
28e407b8 | 350 | def umount_wait(self, force=False, require_clean=False, timeout=900): |
7c673cae FG |
351 | """ |
352 | :param force: Complete cleanly even if the MDS is offline | |
353 | """ | |
9f95a23c | 354 | if not (self.is_mounted() and self.fuse_daemon): |
f67539c2 TL |
355 | log.debug('ceph-fuse client.{id} is not mounted at {remote} ' |
356 | '{mnt}'.format(id=self.client_id, | |
357 | remote=self.client_remote, | |
358 | mnt=self.hostfs_mntpt)) | |
359 | self.cleanup() | |
9f95a23c TL |
360 | return |
361 | ||
7c673cae FG |
362 | if force: |
363 | assert not require_clean # mutually exclusive | |
364 | ||
365 | # When we expect to be forcing, kill the ceph-fuse process directly. | |
366 | # This should avoid hitting the more aggressive fallback killing | |
367 | # in umount() which can affect other mounts too. | |
368 | self.fuse_daemon.stdin.close() | |
369 | ||
370 | # However, we will still hit the aggressive wait if there is an ongoing | |
371 | # mount -o remount (especially if the remount is stuck because MDSs | |
372 | # are unavailable) | |
373 | ||
f67539c2 TL |
374 | # cleanup is set to to fail since clieanup must happen after umount is |
375 | # complete; otherwise following call to run.wait hangs. | |
376 | self.umount(cleanup=False) | |
7c673cae FG |
377 | |
378 | try: | |
9f95a23c TL |
379 | # Permit a timeout, so that we do not block forever |
380 | run.wait([self.fuse_daemon], timeout) | |
f67539c2 | 381 | |
7c673cae | 382 | except MaxWhileTries: |
11fdf7f2 TL |
383 | log.error("process failed to terminate after unmount. This probably" |
384 | " indicates a bug within ceph-fuse.") | |
7c673cae FG |
385 | raise |
386 | except CommandFailedError: | |
387 | if require_clean: | |
388 | raise | |
389 | ||
f67539c2 | 390 | self.mounted = False |
7c673cae FG |
391 | self.cleanup() |
392 | ||
393 | def teardown(self): | |
394 | """ | |
395 | Whatever the state of the mount, get it gone. | |
396 | """ | |
397 | super(FuseMount, self).teardown() | |
398 | ||
399 | self.umount() | |
400 | ||
401 | if self.fuse_daemon and not self.fuse_daemon.finished: | |
402 | self.fuse_daemon.stdin.close() | |
403 | try: | |
404 | self.fuse_daemon.wait() | |
405 | except CommandFailedError: | |
406 | pass | |
407 | ||
f67539c2 | 408 | self.mounted = False |
7c673cae FG |
409 | |
410 | def _asok_path(self): | |
411 | return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id) | |
412 | ||
413 | @property | |
414 | def _prefix(self): | |
415 | return "" | |
416 | ||
f67539c2 | 417 | def find_admin_socket(self): |
7c673cae FG |
418 | pyscript = """ |
419 | import glob | |
420 | import re | |
421 | import os | |
422 | import subprocess | |
423 | ||
f67539c2 | 424 | def _find_admin_socket(client_name): |
7c673cae FG |
425 | asok_path = "{asok_path}" |
426 | files = glob.glob(asok_path) | |
f67539c2 | 427 | mountpoint = "{mountpoint}" |
7c673cae FG |
428 | |
429 | # Given a non-glob path, it better be there | |
430 | if "*" not in asok_path: | |
431 | assert(len(files) == 1) | |
432 | return files[0] | |
433 | ||
434 | for f in files: | |
435 | pid = re.match(".*\.(\d+)\.asok$", f).group(1) | |
436 | if os.path.exists("/proc/{{0}}".format(pid)): | |
f67539c2 TL |
437 | with open("/proc/{{0}}/cmdline".format(pid), 'r') as proc_f: |
438 | contents = proc_f.read() | |
439 | if mountpoint in contents: | |
440 | return f | |
7c673cae FG |
441 | raise RuntimeError("Client socket {{0}} not found".format(client_name)) |
442 | ||
f67539c2 | 443 | print(_find_admin_socket("{client_name}")) |
7c673cae FG |
444 | """.format( |
445 | asok_path=self._asok_path(), | |
f67539c2 TL |
446 | client_name="client.{0}".format(self.client_id), |
447 | mountpoint=self.mountpoint) | |
7c673cae | 448 | |
f67539c2 | 449 | asok_path = self.run_python(pyscript) |
7c673cae | 450 | log.info("Found client admin socket at {0}".format(asok_path)) |
f67539c2 | 451 | return asok_path |
7c673cae | 452 | |
f67539c2 TL |
453 | def admin_socket(self, args): |
454 | asok_path = self.find_admin_socket() | |
455 | ||
456 | # Query client ID from admin socket, wait 2 seconds | |
457 | # and retry 10 times if it is not ready | |
458 | with safe_while(sleep=2, tries=10) as proceed: | |
459 | while proceed(): | |
460 | try: | |
461 | p = self.client_remote.run(args= | |
462 | ['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args, | |
463 | stdout=StringIO(), stderr=StringIO(), wait=False, | |
464 | timeout=(15*60)) | |
465 | p.wait() | |
466 | break | |
467 | except CommandFailedError: | |
468 | if "connection refused" in p.stderr.getvalue().lower(): | |
469 | pass | |
470 | ||
471 | return json.loads(p.stdout.getvalue().strip()) | |
7c673cae FG |
472 | |
473 | def get_global_id(self): | |
474 | """ | |
475 | Look up the CephFS client ID for this mount | |
476 | """ | |
7c673cae FG |
477 | return self.admin_socket(['mds_sessions'])['id'] |
478 | ||
11fdf7f2 TL |
479 | def get_global_inst(self): |
480 | """ | |
481 | Look up the CephFS client instance for this mount | |
482 | """ | |
483 | return self.inst | |
484 | ||
485 | def get_global_addr(self): | |
486 | """ | |
487 | Look up the CephFS client addr for this mount | |
488 | """ | |
489 | return self.addr | |
490 | ||
28e407b8 AA |
491 | def get_client_pid(self): |
492 | """ | |
493 | return pid of ceph-fuse process | |
494 | """ | |
495 | status = self.admin_socket(['status']) | |
496 | return status['metadata']['pid'] | |
497 | ||
7c673cae FG |
498 | def get_osd_epoch(self): |
499 | """ | |
500 | Return 2-tuple of osd_epoch, osd_epoch_barrier | |
501 | """ | |
502 | status = self.admin_socket(['status']) | |
503 | return status['osd_epoch'], status['osd_epoch_barrier'] | |
504 | ||
505 | def get_dentry_count(self): | |
506 | """ | |
507 | Return 2-tuple of dentry_count, dentry_pinned_count | |
508 | """ | |
509 | status = self.admin_socket(['status']) | |
510 | return status['dentry_count'], status['dentry_pinned_count'] | |
511 | ||
512 | def set_cache_size(self, size): | |
513 | return self.admin_socket(['config', 'set', 'client_cache_size', str(size)]) | |
f67539c2 TL |
514 | |
515 | def get_op_read_count(self): | |
516 | return self.admin_socket(['perf', 'dump', 'objecter'])['objecter']['osdop_read'] |