]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | from StringIO import StringIO | |
3 | import json | |
4 | import time | |
5 | import logging | |
6 | from textwrap import dedent | |
7 | ||
8 | from teuthology import misc | |
9 | from teuthology.contextutil import MaxWhileTries | |
10 | from teuthology.orchestra import run | |
11 | from teuthology.orchestra.run import CommandFailedError | |
12 | from .mount import CephFSMount | |
11fdf7f2 | 13 | from tasks.cephfs.filesystem import Filesystem |
7c673cae FG |
14 | |
15 | log = logging.getLogger(__name__) | |
16 | ||
17 | ||
18 | class FuseMount(CephFSMount): | |
11fdf7f2 TL |
19 | def __init__(self, ctx, client_config, test_dir, client_id, client_remote): |
20 | super(FuseMount, self).__init__(ctx, test_dir, client_id, client_remote) | |
7c673cae FG |
21 | |
22 | self.client_config = client_config if client_config else {} | |
23 | self.fuse_daemon = None | |
24 | self._fuse_conn = None | |
11fdf7f2 TL |
25 | self.id = None |
26 | self.inst = None | |
27 | self.addr = None | |
7c673cae FG |
28 | |
29 | def mount(self, mount_path=None, mount_fs_name=None): | |
11fdf7f2 TL |
30 | self.setupfs(name=mount_fs_name) |
31 | ||
7c673cae FG |
32 | try: |
33 | return self._mount(mount_path, mount_fs_name) | |
34 | except RuntimeError: | |
35 | # Catch exceptions by the mount() logic (i.e. not remote command | |
36 | # failures) and ensure the mount is not left half-up. | |
37 | # Otherwise we might leave a zombie mount point that causes | |
38 | # anyone traversing cephtest/ to get hung up on. | |
39 | log.warn("Trying to clean up after failed mount") | |
40 | self.umount_wait(force=True) | |
41 | raise | |
42 | ||
43 | def _mount(self, mount_path, mount_fs_name): | |
44 | log.info("Client client.%s config is %s" % (self.client_id, self.client_config)) | |
45 | ||
46 | daemon_signal = 'kill' | |
47 | if self.client_config.get('coverage') or self.client_config.get('valgrind') is not None: | |
48 | daemon_signal = 'term' | |
49 | ||
50 | log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format( | |
51 | id=self.client_id, remote=self.client_remote, mnt=self.mountpoint)) | |
52 | ||
53 | self.client_remote.run( | |
54 | args=[ | |
55 | 'mkdir', | |
56 | '--', | |
57 | self.mountpoint, | |
58 | ], | |
f64942e4 | 59 | timeout=(15*60) |
7c673cae FG |
60 | ) |
61 | ||
62 | run_cmd = [ | |
63 | 'sudo', | |
64 | 'adjust-ulimits', | |
65 | 'ceph-coverage', | |
66 | '{tdir}/archive/coverage'.format(tdir=self.test_dir), | |
67 | 'daemon-helper', | |
68 | daemon_signal, | |
69 | ] | |
70 | ||
71 | fuse_cmd = ['ceph-fuse', "-f"] | |
72 | ||
73 | if mount_path is not None: | |
74 | fuse_cmd += ["--client_mountpoint={0}".format(mount_path)] | |
75 | ||
76 | if mount_fs_name is not None: | |
77 | fuse_cmd += ["--client_mds_namespace={0}".format(mount_fs_name)] | |
78 | ||
79 | fuse_cmd += [ | |
80 | '--name', 'client.{id}'.format(id=self.client_id), | |
81 | # TODO ceph-fuse doesn't understand dash dash '--', | |
82 | self.mountpoint, | |
83 | ] | |
84 | ||
85 | if self.client_config.get('valgrind') is not None: | |
86 | run_cmd = misc.get_valgrind_args( | |
87 | self.test_dir, | |
88 | 'client.{id}'.format(id=self.client_id), | |
89 | run_cmd, | |
90 | self.client_config.get('valgrind'), | |
91 | ) | |
92 | ||
93 | run_cmd.extend(fuse_cmd) | |
94 | ||
95 | def list_connections(): | |
96 | self.client_remote.run( | |
97 | args=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"], | |
f64942e4 AA |
98 | check_status=False, |
99 | timeout=(15*60) | |
7c673cae FG |
100 | ) |
101 | p = self.client_remote.run( | |
102 | args=["ls", "/sys/fs/fuse/connections"], | |
103 | stdout=StringIO(), | |
f64942e4 AA |
104 | check_status=False, |
105 | timeout=(15*60) | |
7c673cae FG |
106 | ) |
107 | if p.exitstatus != 0: | |
108 | return [] | |
109 | ||
110 | ls_str = p.stdout.getvalue().strip() | |
111 | if ls_str: | |
112 | return [int(n) for n in ls_str.split("\n")] | |
113 | else: | |
114 | return [] | |
115 | ||
116 | # Before starting ceph-fuse process, note the contents of | |
117 | # /sys/fs/fuse/connections | |
118 | pre_mount_conns = list_connections() | |
119 | log.info("Pre-mount connections: {0}".format(pre_mount_conns)) | |
120 | ||
121 | proc = self.client_remote.run( | |
122 | args=run_cmd, | |
123 | logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)), | |
124 | stdin=run.PIPE, | |
125 | wait=False, | |
126 | ) | |
127 | self.fuse_daemon = proc | |
128 | ||
129 | # Wait for the connection reference to appear in /sys | |
130 | mount_wait = self.client_config.get('mount_wait', 0) | |
131 | if mount_wait > 0: | |
132 | log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait)) | |
133 | time.sleep(mount_wait) | |
134 | timeout = int(self.client_config.get('mount_timeout', 30)) | |
135 | waited = 0 | |
136 | ||
137 | post_mount_conns = list_connections() | |
138 | while len(post_mount_conns) <= len(pre_mount_conns): | |
139 | if self.fuse_daemon.finished: | |
140 | # Did mount fail? Raise the CommandFailedError instead of | |
141 | # hitting the "failed to populate /sys/" timeout | |
142 | self.fuse_daemon.wait() | |
143 | time.sleep(1) | |
144 | waited += 1 | |
145 | if waited > timeout: | |
146 | raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format( | |
147 | waited | |
148 | )) | |
149 | else: | |
150 | post_mount_conns = list_connections() | |
151 | ||
152 | log.info("Post-mount connections: {0}".format(post_mount_conns)) | |
153 | ||
154 | # Record our fuse connection number so that we can use it when | |
155 | # forcing an unmount | |
156 | new_conns = list(set(post_mount_conns) - set(pre_mount_conns)) | |
157 | if len(new_conns) == 0: | |
158 | raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns)) | |
159 | elif len(new_conns) > 1: | |
160 | raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns)) | |
161 | else: | |
162 | self._fuse_conn = new_conns[0] | |
163 | ||
11fdf7f2 TL |
164 | self.gather_mount_info() |
165 | ||
166 | def gather_mount_info(self): | |
167 | status = self.admin_socket(['status']) | |
168 | self.id = status['id'] | |
eafe8130 | 169 | self.client_pid = status['metadata']['pid'] |
11fdf7f2 TL |
170 | try: |
171 | self.inst = status['inst_str'] | |
172 | self.addr = status['addr_str'] | |
173 | except KeyError as e: | |
174 | sessions = self.fs.rank_asok(['session', 'ls']) | |
175 | for s in sessions: | |
176 | if s['id'] == self.id: | |
177 | self.inst = s['inst'] | |
178 | self.addr = self.inst.split()[1] | |
179 | if self.inst is None: | |
180 | raise RuntimeError("cannot find client session") | |
181 | ||
7c673cae FG |
182 | def is_mounted(self): |
183 | proc = self.client_remote.run( | |
184 | args=[ | |
185 | 'stat', | |
186 | '--file-system', | |
187 | '--printf=%T\n', | |
188 | '--', | |
189 | self.mountpoint, | |
190 | ], | |
191 | stdout=StringIO(), | |
192 | stderr=StringIO(), | |
f64942e4 AA |
193 | wait=False, |
194 | timeout=(15*60) | |
7c673cae FG |
195 | ) |
196 | try: | |
197 | proc.wait() | |
198 | except CommandFailedError: | |
199 | if ("endpoint is not connected" in proc.stderr.getvalue() | |
200 | or "Software caused connection abort" in proc.stderr.getvalue()): | |
201 | # This happens is fuse is killed without unmount | |
202 | log.warn("Found stale moutn point at {0}".format(self.mountpoint)) | |
203 | return True | |
204 | else: | |
205 | # This happens if the mount directory doesn't exist | |
206 | log.info('mount point does not exist: %s', self.mountpoint) | |
207 | return False | |
208 | ||
209 | fstype = proc.stdout.getvalue().rstrip('\n') | |
210 | if fstype == 'fuseblk': | |
211 | log.info('ceph-fuse is mounted on %s', self.mountpoint) | |
212 | return True | |
213 | else: | |
214 | log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format( | |
215 | fstype=fstype)) | |
216 | return False | |
217 | ||
218 | def wait_until_mounted(self): | |
219 | """ | |
220 | Check to make sure that fuse is mounted on mountpoint. If not, | |
221 | sleep for 5 seconds and check again. | |
222 | """ | |
223 | ||
224 | while not self.is_mounted(): | |
225 | # Even if it's not mounted, it should at least | |
226 | # be running: catch simple failures where it has terminated. | |
227 | assert not self.fuse_daemon.poll() | |
228 | ||
229 | time.sleep(5) | |
230 | ||
231 | # Now that we're mounted, set permissions so that the rest of the test will have | |
232 | # unrestricted access to the filesystem mount. | |
f64942e4 AA |
233 | try: |
234 | stderr = StringIO() | |
235 | self.client_remote.run(args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(15*60), stderr=stderr) | |
236 | except run.CommandFailedError: | |
237 | stderr = stderr.getvalue() | |
238 | if "Read-only file system".lower() in stderr.lower(): | |
239 | pass | |
240 | else: | |
241 | raise | |
7c673cae FG |
242 | |
243 | def _mountpoint_exists(self): | |
f64942e4 | 244 | return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False, timeout=(15*60)).exitstatus == 0 |
7c673cae FG |
245 | |
246 | def umount(self): | |
247 | try: | |
248 | log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name)) | |
249 | self.client_remote.run( | |
250 | args=[ | |
251 | 'sudo', | |
252 | 'fusermount', | |
253 | '-u', | |
254 | self.mountpoint, | |
255 | ], | |
f64942e4 | 256 | timeout=(30*60), |
7c673cae FG |
257 | ) |
258 | except run.CommandFailedError: | |
259 | log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name)) | |
260 | ||
c07f9fc5 FG |
261 | self.client_remote.run(args=[ |
262 | 'sudo', | |
263 | run.Raw('PATH=/usr/sbin:$PATH'), | |
264 | 'lsof', | |
265 | run.Raw(';'), | |
266 | 'ps', | |
267 | 'auxf', | |
f64942e4 | 268 | ], timeout=(60*15)) |
c07f9fc5 | 269 | |
7c673cae FG |
270 | # abort the fuse mount, killing all hung processes |
271 | if self._fuse_conn: | |
272 | self.run_python(dedent(""" | |
273 | import os | |
274 | path = "/sys/fs/fuse/connections/{0}/abort" | |
275 | if os.path.exists(path): | |
276 | open(path, "w").write("1") | |
277 | """).format(self._fuse_conn)) | |
278 | self._fuse_conn = None | |
279 | ||
280 | stderr = StringIO() | |
281 | try: | |
282 | # make sure its unmounted | |
283 | self.client_remote.run( | |
284 | args=[ | |
285 | 'sudo', | |
286 | 'umount', | |
287 | '-l', | |
288 | '-f', | |
289 | self.mountpoint, | |
290 | ], | |
f64942e4 AA |
291 | stderr=stderr, |
292 | timeout=(60*15) | |
7c673cae FG |
293 | ) |
294 | except CommandFailedError: | |
295 | if self.is_mounted(): | |
296 | raise | |
297 | ||
298 | assert not self.is_mounted() | |
299 | self._fuse_conn = None | |
11fdf7f2 TL |
300 | self.id = None |
301 | self.inst = None | |
302 | self.addr = None | |
7c673cae | 303 | |
28e407b8 | 304 | def umount_wait(self, force=False, require_clean=False, timeout=900): |
7c673cae FG |
305 | """ |
306 | :param force: Complete cleanly even if the MDS is offline | |
307 | """ | |
308 | if force: | |
309 | assert not require_clean # mutually exclusive | |
310 | ||
311 | # When we expect to be forcing, kill the ceph-fuse process directly. | |
312 | # This should avoid hitting the more aggressive fallback killing | |
313 | # in umount() which can affect other mounts too. | |
314 | self.fuse_daemon.stdin.close() | |
315 | ||
316 | # However, we will still hit the aggressive wait if there is an ongoing | |
317 | # mount -o remount (especially if the remount is stuck because MDSs | |
318 | # are unavailable) | |
319 | ||
320 | self.umount() | |
321 | ||
322 | try: | |
323 | if self.fuse_daemon: | |
324 | # Permit a timeout, so that we do not block forever | |
28e407b8 | 325 | run.wait([self.fuse_daemon], timeout) |
7c673cae | 326 | except MaxWhileTries: |
11fdf7f2 TL |
327 | log.error("process failed to terminate after unmount. This probably" |
328 | " indicates a bug within ceph-fuse.") | |
7c673cae FG |
329 | raise |
330 | except CommandFailedError: | |
331 | if require_clean: | |
332 | raise | |
333 | ||
334 | self.cleanup() | |
335 | ||
336 | def cleanup(self): | |
337 | """ | |
338 | Remove the mount point. | |
339 | ||
340 | Prerequisite: the client is not mounted. | |
341 | """ | |
342 | stderr = StringIO() | |
343 | try: | |
344 | self.client_remote.run( | |
345 | args=[ | |
346 | 'rmdir', | |
347 | '--', | |
348 | self.mountpoint, | |
349 | ], | |
f64942e4 AA |
350 | stderr=stderr, |
351 | timeout=(60*5) | |
7c673cae FG |
352 | ) |
353 | except CommandFailedError: | |
354 | if "No such file or directory" in stderr.getvalue(): | |
355 | pass | |
356 | else: | |
357 | raise | |
358 | ||
359 | def kill(self): | |
360 | """ | |
361 | Terminate the client without removing the mount point. | |
362 | """ | |
11fdf7f2 | 363 | log.info('Killing ceph-fuse connection on {name}...'.format(name=self.client_remote.name)) |
7c673cae FG |
364 | self.fuse_daemon.stdin.close() |
365 | try: | |
366 | self.fuse_daemon.wait() | |
367 | except CommandFailedError: | |
368 | pass | |
369 | ||
370 | def kill_cleanup(self): | |
371 | """ | |
372 | Follow up ``kill`` to get to a clean unmounted state. | |
373 | """ | |
11fdf7f2 | 374 | log.info('Cleaning up killed ceph-fuse connection') |
7c673cae FG |
375 | self.umount() |
376 | self.cleanup() | |
377 | ||
378 | def teardown(self): | |
379 | """ | |
380 | Whatever the state of the mount, get it gone. | |
381 | """ | |
382 | super(FuseMount, self).teardown() | |
383 | ||
384 | self.umount() | |
385 | ||
386 | if self.fuse_daemon and not self.fuse_daemon.finished: | |
387 | self.fuse_daemon.stdin.close() | |
388 | try: | |
389 | self.fuse_daemon.wait() | |
390 | except CommandFailedError: | |
391 | pass | |
392 | ||
393 | # Indiscriminate, unlike the touchier cleanup() | |
394 | self.client_remote.run( | |
395 | args=[ | |
396 | 'rm', | |
397 | '-rf', | |
398 | self.mountpoint, | |
399 | ], | |
f64942e4 | 400 | timeout=(60*5) |
7c673cae FG |
401 | ) |
402 | ||
403 | def _asok_path(self): | |
404 | return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id) | |
405 | ||
406 | @property | |
407 | def _prefix(self): | |
408 | return "" | |
409 | ||
410 | def admin_socket(self, args): | |
411 | pyscript = """ | |
412 | import glob | |
413 | import re | |
414 | import os | |
415 | import subprocess | |
416 | ||
417 | def find_socket(client_name): | |
418 | asok_path = "{asok_path}" | |
419 | files = glob.glob(asok_path) | |
420 | ||
421 | # Given a non-glob path, it better be there | |
422 | if "*" not in asok_path: | |
423 | assert(len(files) == 1) | |
424 | return files[0] | |
425 | ||
426 | for f in files: | |
427 | pid = re.match(".*\.(\d+)\.asok$", f).group(1) | |
428 | if os.path.exists("/proc/{{0}}".format(pid)): | |
429 | return f | |
430 | raise RuntimeError("Client socket {{0}} not found".format(client_name)) | |
431 | ||
432 | print find_socket("{client_name}") | |
433 | """.format( | |
434 | asok_path=self._asok_path(), | |
435 | client_name="client.{0}".format(self.client_id)) | |
436 | ||
437 | # Find the admin socket | |
438 | p = self.client_remote.run(args=[ | |
f64942e4 AA |
439 | 'sudo', 'python2', '-c', pyscript |
440 | ], stdout=StringIO(), timeout=(15*60)) | |
7c673cae FG |
441 | asok_path = p.stdout.getvalue().strip() |
442 | log.info("Found client admin socket at {0}".format(asok_path)) | |
443 | ||
444 | # Query client ID from admin socket | |
445 | p = self.client_remote.run( | |
446 | args=['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args, | |
f64942e4 | 447 | stdout=StringIO(), timeout=(15*60)) |
7c673cae FG |
448 | return json.loads(p.stdout.getvalue()) |
449 | ||
450 | def get_global_id(self): | |
451 | """ | |
452 | Look up the CephFS client ID for this mount | |
453 | """ | |
7c673cae FG |
454 | return self.admin_socket(['mds_sessions'])['id'] |
455 | ||
11fdf7f2 TL |
456 | def get_global_inst(self): |
457 | """ | |
458 | Look up the CephFS client instance for this mount | |
459 | """ | |
460 | return self.inst | |
461 | ||
462 | def get_global_addr(self): | |
463 | """ | |
464 | Look up the CephFS client addr for this mount | |
465 | """ | |
466 | return self.addr | |
467 | ||
28e407b8 AA |
468 | def get_client_pid(self): |
469 | """ | |
470 | return pid of ceph-fuse process | |
471 | """ | |
472 | status = self.admin_socket(['status']) | |
473 | return status['metadata']['pid'] | |
474 | ||
7c673cae FG |
475 | def get_osd_epoch(self): |
476 | """ | |
477 | Return 2-tuple of osd_epoch, osd_epoch_barrier | |
478 | """ | |
479 | status = self.admin_socket(['status']) | |
480 | return status['osd_epoch'], status['osd_epoch_barrier'] | |
481 | ||
482 | def get_dentry_count(self): | |
483 | """ | |
484 | Return 2-tuple of dentry_count, dentry_pinned_count | |
485 | """ | |
486 | status = self.admin_socket(['status']) | |
487 | return status['dentry_count'], status['dentry_pinned_count'] | |
488 | ||
489 | def set_cache_size(self, size): | |
490 | return self.admin_socket(['config', 'set', 'client_cache_size', str(size)]) |