]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | from StringIO import StringIO | |
3 | import json | |
4 | import time | |
5 | import logging | |
6 | from textwrap import dedent | |
7 | ||
8 | from teuthology import misc | |
9 | from teuthology.contextutil import MaxWhileTries | |
10 | from teuthology.orchestra import run | |
11 | from teuthology.orchestra.run import CommandFailedError | |
12 | from .mount import CephFSMount | |
11fdf7f2 | 13 | from tasks.cephfs.filesystem import Filesystem |
7c673cae FG |
14 | |
15 | log = logging.getLogger(__name__) | |
16 | ||
17 | ||
18 | class FuseMount(CephFSMount): | |
11fdf7f2 TL |
19 | def __init__(self, ctx, client_config, test_dir, client_id, client_remote): |
20 | super(FuseMount, self).__init__(ctx, test_dir, client_id, client_remote) | |
7c673cae FG |
21 | |
22 | self.client_config = client_config if client_config else {} | |
23 | self.fuse_daemon = None | |
24 | self._fuse_conn = None | |
11fdf7f2 TL |
25 | self.id = None |
26 | self.inst = None | |
27 | self.addr = None | |
7c673cae FG |
28 | |
29 | def mount(self, mount_path=None, mount_fs_name=None): | |
11fdf7f2 TL |
30 | self.setupfs(name=mount_fs_name) |
31 | ||
7c673cae FG |
32 | try: |
33 | return self._mount(mount_path, mount_fs_name) | |
34 | except RuntimeError: | |
35 | # Catch exceptions by the mount() logic (i.e. not remote command | |
36 | # failures) and ensure the mount is not left half-up. | |
37 | # Otherwise we might leave a zombie mount point that causes | |
38 | # anyone traversing cephtest/ to get hung up on. | |
39 | log.warn("Trying to clean up after failed mount") | |
40 | self.umount_wait(force=True) | |
41 | raise | |
42 | ||
43 | def _mount(self, mount_path, mount_fs_name): | |
44 | log.info("Client client.%s config is %s" % (self.client_id, self.client_config)) | |
45 | ||
46 | daemon_signal = 'kill' | |
47 | if self.client_config.get('coverage') or self.client_config.get('valgrind') is not None: | |
48 | daemon_signal = 'term' | |
49 | ||
50 | log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format( | |
51 | id=self.client_id, remote=self.client_remote, mnt=self.mountpoint)) | |
52 | ||
53 | self.client_remote.run( | |
54 | args=[ | |
55 | 'mkdir', | |
56 | '--', | |
57 | self.mountpoint, | |
58 | ], | |
f64942e4 | 59 | timeout=(15*60) |
7c673cae FG |
60 | ) |
61 | ||
62 | run_cmd = [ | |
63 | 'sudo', | |
64 | 'adjust-ulimits', | |
65 | 'ceph-coverage', | |
66 | '{tdir}/archive/coverage'.format(tdir=self.test_dir), | |
67 | 'daemon-helper', | |
68 | daemon_signal, | |
69 | ] | |
70 | ||
71 | fuse_cmd = ['ceph-fuse', "-f"] | |
72 | ||
73 | if mount_path is not None: | |
74 | fuse_cmd += ["--client_mountpoint={0}".format(mount_path)] | |
75 | ||
76 | if mount_fs_name is not None: | |
77 | fuse_cmd += ["--client_mds_namespace={0}".format(mount_fs_name)] | |
78 | ||
79 | fuse_cmd += [ | |
80 | '--name', 'client.{id}'.format(id=self.client_id), | |
81 | # TODO ceph-fuse doesn't understand dash dash '--', | |
82 | self.mountpoint, | |
83 | ] | |
84 | ||
85 | if self.client_config.get('valgrind') is not None: | |
86 | run_cmd = misc.get_valgrind_args( | |
87 | self.test_dir, | |
88 | 'client.{id}'.format(id=self.client_id), | |
89 | run_cmd, | |
90 | self.client_config.get('valgrind'), | |
91 | ) | |
92 | ||
93 | run_cmd.extend(fuse_cmd) | |
94 | ||
95 | def list_connections(): | |
96 | self.client_remote.run( | |
97 | args=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"], | |
f64942e4 AA |
98 | check_status=False, |
99 | timeout=(15*60) | |
7c673cae FG |
100 | ) |
101 | p = self.client_remote.run( | |
102 | args=["ls", "/sys/fs/fuse/connections"], | |
103 | stdout=StringIO(), | |
f64942e4 AA |
104 | check_status=False, |
105 | timeout=(15*60) | |
7c673cae FG |
106 | ) |
107 | if p.exitstatus != 0: | |
108 | return [] | |
109 | ||
110 | ls_str = p.stdout.getvalue().strip() | |
111 | if ls_str: | |
112 | return [int(n) for n in ls_str.split("\n")] | |
113 | else: | |
114 | return [] | |
115 | ||
116 | # Before starting ceph-fuse process, note the contents of | |
117 | # /sys/fs/fuse/connections | |
118 | pre_mount_conns = list_connections() | |
119 | log.info("Pre-mount connections: {0}".format(pre_mount_conns)) | |
120 | ||
121 | proc = self.client_remote.run( | |
122 | args=run_cmd, | |
123 | logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)), | |
124 | stdin=run.PIPE, | |
125 | wait=False, | |
126 | ) | |
127 | self.fuse_daemon = proc | |
128 | ||
129 | # Wait for the connection reference to appear in /sys | |
130 | mount_wait = self.client_config.get('mount_wait', 0) | |
131 | if mount_wait > 0: | |
132 | log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait)) | |
133 | time.sleep(mount_wait) | |
134 | timeout = int(self.client_config.get('mount_timeout', 30)) | |
135 | waited = 0 | |
136 | ||
137 | post_mount_conns = list_connections() | |
138 | while len(post_mount_conns) <= len(pre_mount_conns): | |
139 | if self.fuse_daemon.finished: | |
140 | # Did mount fail? Raise the CommandFailedError instead of | |
141 | # hitting the "failed to populate /sys/" timeout | |
142 | self.fuse_daemon.wait() | |
143 | time.sleep(1) | |
144 | waited += 1 | |
145 | if waited > timeout: | |
146 | raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format( | |
147 | waited | |
148 | )) | |
149 | else: | |
150 | post_mount_conns = list_connections() | |
151 | ||
152 | log.info("Post-mount connections: {0}".format(post_mount_conns)) | |
153 | ||
154 | # Record our fuse connection number so that we can use it when | |
155 | # forcing an unmount | |
156 | new_conns = list(set(post_mount_conns) - set(pre_mount_conns)) | |
157 | if len(new_conns) == 0: | |
158 | raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns)) | |
159 | elif len(new_conns) > 1: | |
160 | raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns)) | |
161 | else: | |
162 | self._fuse_conn = new_conns[0] | |
163 | ||
11fdf7f2 TL |
164 | self.gather_mount_info() |
165 | ||
166 | def gather_mount_info(self): | |
167 | status = self.admin_socket(['status']) | |
168 | self.id = status['id'] | |
169 | try: | |
170 | self.inst = status['inst_str'] | |
171 | self.addr = status['addr_str'] | |
172 | except KeyError as e: | |
173 | sessions = self.fs.rank_asok(['session', 'ls']) | |
174 | for s in sessions: | |
175 | if s['id'] == self.id: | |
176 | self.inst = s['inst'] | |
177 | self.addr = self.inst.split()[1] | |
178 | if self.inst is None: | |
179 | raise RuntimeError("cannot find client session") | |
180 | ||
7c673cae FG |
181 | def is_mounted(self): |
182 | proc = self.client_remote.run( | |
183 | args=[ | |
184 | 'stat', | |
185 | '--file-system', | |
186 | '--printf=%T\n', | |
187 | '--', | |
188 | self.mountpoint, | |
189 | ], | |
190 | stdout=StringIO(), | |
191 | stderr=StringIO(), | |
f64942e4 AA |
192 | wait=False, |
193 | timeout=(15*60) | |
7c673cae FG |
194 | ) |
195 | try: | |
196 | proc.wait() | |
197 | except CommandFailedError: | |
198 | if ("endpoint is not connected" in proc.stderr.getvalue() | |
199 | or "Software caused connection abort" in proc.stderr.getvalue()): | |
200 | # This happens is fuse is killed without unmount | |
201 | log.warn("Found stale moutn point at {0}".format(self.mountpoint)) | |
202 | return True | |
203 | else: | |
204 | # This happens if the mount directory doesn't exist | |
205 | log.info('mount point does not exist: %s', self.mountpoint) | |
206 | return False | |
207 | ||
208 | fstype = proc.stdout.getvalue().rstrip('\n') | |
209 | if fstype == 'fuseblk': | |
210 | log.info('ceph-fuse is mounted on %s', self.mountpoint) | |
211 | return True | |
212 | else: | |
213 | log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format( | |
214 | fstype=fstype)) | |
215 | return False | |
216 | ||
217 | def wait_until_mounted(self): | |
218 | """ | |
219 | Check to make sure that fuse is mounted on mountpoint. If not, | |
220 | sleep for 5 seconds and check again. | |
221 | """ | |
222 | ||
223 | while not self.is_mounted(): | |
224 | # Even if it's not mounted, it should at least | |
225 | # be running: catch simple failures where it has terminated. | |
226 | assert not self.fuse_daemon.poll() | |
227 | ||
228 | time.sleep(5) | |
229 | ||
230 | # Now that we're mounted, set permissions so that the rest of the test will have | |
231 | # unrestricted access to the filesystem mount. | |
f64942e4 AA |
232 | try: |
233 | stderr = StringIO() | |
234 | self.client_remote.run(args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(15*60), stderr=stderr) | |
235 | except run.CommandFailedError: | |
236 | stderr = stderr.getvalue() | |
237 | if "Read-only file system".lower() in stderr.lower(): | |
238 | pass | |
239 | else: | |
240 | raise | |
7c673cae FG |
241 | |
242 | def _mountpoint_exists(self): | |
f64942e4 | 243 | return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False, timeout=(15*60)).exitstatus == 0 |
7c673cae FG |
244 | |
245 | def umount(self): | |
246 | try: | |
247 | log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name)) | |
248 | self.client_remote.run( | |
249 | args=[ | |
250 | 'sudo', | |
251 | 'fusermount', | |
252 | '-u', | |
253 | self.mountpoint, | |
254 | ], | |
f64942e4 | 255 | timeout=(30*60), |
7c673cae FG |
256 | ) |
257 | except run.CommandFailedError: | |
258 | log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name)) | |
259 | ||
c07f9fc5 FG |
260 | self.client_remote.run(args=[ |
261 | 'sudo', | |
262 | run.Raw('PATH=/usr/sbin:$PATH'), | |
263 | 'lsof', | |
264 | run.Raw(';'), | |
265 | 'ps', | |
266 | 'auxf', | |
f64942e4 | 267 | ], timeout=(60*15)) |
c07f9fc5 | 268 | |
7c673cae FG |
269 | # abort the fuse mount, killing all hung processes |
270 | if self._fuse_conn: | |
271 | self.run_python(dedent(""" | |
272 | import os | |
273 | path = "/sys/fs/fuse/connections/{0}/abort" | |
274 | if os.path.exists(path): | |
275 | open(path, "w").write("1") | |
276 | """).format(self._fuse_conn)) | |
277 | self._fuse_conn = None | |
278 | ||
279 | stderr = StringIO() | |
280 | try: | |
281 | # make sure its unmounted | |
282 | self.client_remote.run( | |
283 | args=[ | |
284 | 'sudo', | |
285 | 'umount', | |
286 | '-l', | |
287 | '-f', | |
288 | self.mountpoint, | |
289 | ], | |
f64942e4 AA |
290 | stderr=stderr, |
291 | timeout=(60*15) | |
7c673cae FG |
292 | ) |
293 | except CommandFailedError: | |
294 | if self.is_mounted(): | |
295 | raise | |
296 | ||
297 | assert not self.is_mounted() | |
298 | self._fuse_conn = None | |
11fdf7f2 TL |
299 | self.id = None |
300 | self.inst = None | |
301 | self.addr = None | |
7c673cae | 302 | |
28e407b8 | 303 | def umount_wait(self, force=False, require_clean=False, timeout=900): |
7c673cae FG |
304 | """ |
305 | :param force: Complete cleanly even if the MDS is offline | |
306 | """ | |
307 | if force: | |
308 | assert not require_clean # mutually exclusive | |
309 | ||
310 | # When we expect to be forcing, kill the ceph-fuse process directly. | |
311 | # This should avoid hitting the more aggressive fallback killing | |
312 | # in umount() which can affect other mounts too. | |
313 | self.fuse_daemon.stdin.close() | |
314 | ||
315 | # However, we will still hit the aggressive wait if there is an ongoing | |
316 | # mount -o remount (especially if the remount is stuck because MDSs | |
317 | # are unavailable) | |
318 | ||
319 | self.umount() | |
320 | ||
321 | try: | |
322 | if self.fuse_daemon: | |
323 | # Permit a timeout, so that we do not block forever | |
28e407b8 | 324 | run.wait([self.fuse_daemon], timeout) |
7c673cae | 325 | except MaxWhileTries: |
11fdf7f2 TL |
326 | log.error("process failed to terminate after unmount. This probably" |
327 | " indicates a bug within ceph-fuse.") | |
7c673cae FG |
328 | raise |
329 | except CommandFailedError: | |
330 | if require_clean: | |
331 | raise | |
332 | ||
333 | self.cleanup() | |
334 | ||
335 | def cleanup(self): | |
336 | """ | |
337 | Remove the mount point. | |
338 | ||
339 | Prerequisite: the client is not mounted. | |
340 | """ | |
341 | stderr = StringIO() | |
342 | try: | |
343 | self.client_remote.run( | |
344 | args=[ | |
345 | 'rmdir', | |
346 | '--', | |
347 | self.mountpoint, | |
348 | ], | |
f64942e4 AA |
349 | stderr=stderr, |
350 | timeout=(60*5) | |
7c673cae FG |
351 | ) |
352 | except CommandFailedError: | |
353 | if "No such file or directory" in stderr.getvalue(): | |
354 | pass | |
355 | else: | |
356 | raise | |
357 | ||
358 | def kill(self): | |
359 | """ | |
360 | Terminate the client without removing the mount point. | |
361 | """ | |
11fdf7f2 | 362 | log.info('Killing ceph-fuse connection on {name}...'.format(name=self.client_remote.name)) |
7c673cae FG |
363 | self.fuse_daemon.stdin.close() |
364 | try: | |
365 | self.fuse_daemon.wait() | |
366 | except CommandFailedError: | |
367 | pass | |
368 | ||
369 | def kill_cleanup(self): | |
370 | """ | |
371 | Follow up ``kill`` to get to a clean unmounted state. | |
372 | """ | |
11fdf7f2 | 373 | log.info('Cleaning up killed ceph-fuse connection') |
7c673cae FG |
374 | self.umount() |
375 | self.cleanup() | |
376 | ||
377 | def teardown(self): | |
378 | """ | |
379 | Whatever the state of the mount, get it gone. | |
380 | """ | |
381 | super(FuseMount, self).teardown() | |
382 | ||
383 | self.umount() | |
384 | ||
385 | if self.fuse_daemon and not self.fuse_daemon.finished: | |
386 | self.fuse_daemon.stdin.close() | |
387 | try: | |
388 | self.fuse_daemon.wait() | |
389 | except CommandFailedError: | |
390 | pass | |
391 | ||
392 | # Indiscriminate, unlike the touchier cleanup() | |
393 | self.client_remote.run( | |
394 | args=[ | |
395 | 'rm', | |
396 | '-rf', | |
397 | self.mountpoint, | |
398 | ], | |
f64942e4 | 399 | timeout=(60*5) |
7c673cae FG |
400 | ) |
401 | ||
402 | def _asok_path(self): | |
403 | return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id) | |
404 | ||
405 | @property | |
406 | def _prefix(self): | |
407 | return "" | |
408 | ||
409 | def admin_socket(self, args): | |
410 | pyscript = """ | |
411 | import glob | |
412 | import re | |
413 | import os | |
414 | import subprocess | |
415 | ||
416 | def find_socket(client_name): | |
417 | asok_path = "{asok_path}" | |
418 | files = glob.glob(asok_path) | |
419 | ||
420 | # Given a non-glob path, it better be there | |
421 | if "*" not in asok_path: | |
422 | assert(len(files) == 1) | |
423 | return files[0] | |
424 | ||
425 | for f in files: | |
426 | pid = re.match(".*\.(\d+)\.asok$", f).group(1) | |
427 | if os.path.exists("/proc/{{0}}".format(pid)): | |
428 | return f | |
429 | raise RuntimeError("Client socket {{0}} not found".format(client_name)) | |
430 | ||
431 | print find_socket("{client_name}") | |
432 | """.format( | |
433 | asok_path=self._asok_path(), | |
434 | client_name="client.{0}".format(self.client_id)) | |
435 | ||
436 | # Find the admin socket | |
437 | p = self.client_remote.run(args=[ | |
f64942e4 AA |
438 | 'sudo', 'python2', '-c', pyscript |
439 | ], stdout=StringIO(), timeout=(15*60)) | |
7c673cae FG |
440 | asok_path = p.stdout.getvalue().strip() |
441 | log.info("Found client admin socket at {0}".format(asok_path)) | |
442 | ||
443 | # Query client ID from admin socket | |
444 | p = self.client_remote.run( | |
445 | args=['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args, | |
f64942e4 | 446 | stdout=StringIO(), timeout=(15*60)) |
7c673cae FG |
447 | return json.loads(p.stdout.getvalue()) |
448 | ||
449 | def get_global_id(self): | |
450 | """ | |
451 | Look up the CephFS client ID for this mount | |
452 | """ | |
7c673cae FG |
453 | return self.admin_socket(['mds_sessions'])['id'] |
454 | ||
11fdf7f2 TL |
455 | def get_global_inst(self): |
456 | """ | |
457 | Look up the CephFS client instance for this mount | |
458 | """ | |
459 | return self.inst | |
460 | ||
461 | def get_global_addr(self): | |
462 | """ | |
463 | Look up the CephFS client addr for this mount | |
464 | """ | |
465 | return self.addr | |
466 | ||
28e407b8 AA |
467 | def get_client_pid(self): |
468 | """ | |
469 | return pid of ceph-fuse process | |
470 | """ | |
471 | status = self.admin_socket(['status']) | |
472 | return status['metadata']['pid'] | |
473 | ||
7c673cae FG |
474 | def get_osd_epoch(self): |
475 | """ | |
476 | Return 2-tuple of osd_epoch, osd_epoch_barrier | |
477 | """ | |
478 | status = self.admin_socket(['status']) | |
479 | return status['osd_epoch'], status['osd_epoch_barrier'] | |
480 | ||
481 | def get_dentry_count(self): | |
482 | """ | |
483 | Return 2-tuple of dentry_count, dentry_pinned_count | |
484 | """ | |
485 | status = self.admin_socket(['status']) | |
486 | return status['dentry_count'], status['dentry_pinned_count'] | |
487 | ||
488 | def set_cache_size(self, size): | |
489 | return self.admin_socket(['config', 'set', 'client_cache_size', str(size)]) |