]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | from StringIO import StringIO |
2 | import json | |
3 | import time | |
4 | import logging | |
5 | from textwrap import dedent | |
6 | ||
7 | from teuthology import misc | |
8 | from teuthology.contextutil import MaxWhileTries | |
9 | from teuthology.orchestra import run | |
10 | from teuthology.orchestra.run import CommandFailedError | |
11 | from .mount import CephFSMount | |
12 | ||
13 | log = logging.getLogger(__name__) | |
14 | ||
15 | ||
16 | class FuseMount(CephFSMount): | |
11fdf7f2 TL |
17 | def __init__(self, ctx, client_config, test_dir, client_id, client_remote): |
18 | super(FuseMount, self).__init__(ctx, test_dir, client_id, client_remote) | |
7c673cae FG |
19 | |
20 | self.client_config = client_config if client_config else {} | |
21 | self.fuse_daemon = None | |
22 | self._fuse_conn = None | |
11fdf7f2 TL |
23 | self.id = None |
24 | self.inst = None | |
25 | self.addr = None | |
7c673cae | 26 | |
9f95a23c TL |
27 | def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]): |
28 | if mountpoint is not None: | |
29 | self.mountpoint = mountpoint | |
11fdf7f2 TL |
30 | self.setupfs(name=mount_fs_name) |
31 | ||
7c673cae | 32 | try: |
9f95a23c | 33 | return self._mount(mount_path, mount_fs_name, mount_options) |
7c673cae FG |
34 | except RuntimeError: |
35 | # Catch exceptions by the mount() logic (i.e. not remote command | |
36 | # failures) and ensure the mount is not left half-up. | |
37 | # Otherwise we might leave a zombie mount point that causes | |
38 | # anyone traversing cephtest/ to get hung up on. | |
39 | log.warn("Trying to clean up after failed mount") | |
40 | self.umount_wait(force=True) | |
41 | raise | |
42 | ||
9f95a23c | 43 | def _mount(self, mount_path, mount_fs_name, mount_options): |
7c673cae FG |
44 | log.info("Client client.%s config is %s" % (self.client_id, self.client_config)) |
45 | ||
46 | daemon_signal = 'kill' | |
47 | if self.client_config.get('coverage') or self.client_config.get('valgrind') is not None: | |
48 | daemon_signal = 'term' | |
49 | ||
50 | log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format( | |
51 | id=self.client_id, remote=self.client_remote, mnt=self.mountpoint)) | |
52 | ||
9f95a23c TL |
53 | self.client_remote.run(args=['mkdir', '-p', self.mountpoint], |
54 | timeout=(15*60), cwd=self.test_dir) | |
7c673cae FG |
55 | |
56 | run_cmd = [ | |
57 | 'sudo', | |
58 | 'adjust-ulimits', | |
59 | 'ceph-coverage', | |
60 | '{tdir}/archive/coverage'.format(tdir=self.test_dir), | |
61 | 'daemon-helper', | |
62 | daemon_signal, | |
63 | ] | |
64 | ||
65 | fuse_cmd = ['ceph-fuse', "-f"] | |
66 | ||
67 | if mount_path is not None: | |
68 | fuse_cmd += ["--client_mountpoint={0}".format(mount_path)] | |
69 | ||
70 | if mount_fs_name is not None: | |
9f95a23c TL |
71 | fuse_cmd += ["--client_fs={0}".format(mount_fs_name)] |
72 | ||
73 | fuse_cmd += mount_options | |
7c673cae FG |
74 | |
75 | fuse_cmd += [ | |
76 | '--name', 'client.{id}'.format(id=self.client_id), | |
77 | # TODO ceph-fuse doesn't understand dash dash '--', | |
78 | self.mountpoint, | |
79 | ] | |
80 | ||
9f95a23c | 81 | cwd = self.test_dir |
7c673cae FG |
82 | if self.client_config.get('valgrind') is not None: |
83 | run_cmd = misc.get_valgrind_args( | |
84 | self.test_dir, | |
85 | 'client.{id}'.format(id=self.client_id), | |
86 | run_cmd, | |
87 | self.client_config.get('valgrind'), | |
88 | ) | |
9f95a23c | 89 | cwd = None # misc.get_valgrind_args chdir for us |
7c673cae FG |
90 | |
91 | run_cmd.extend(fuse_cmd) | |
92 | ||
93 | def list_connections(): | |
94 | self.client_remote.run( | |
95 | args=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"], | |
f64942e4 AA |
96 | check_status=False, |
97 | timeout=(15*60) | |
7c673cae FG |
98 | ) |
99 | p = self.client_remote.run( | |
100 | args=["ls", "/sys/fs/fuse/connections"], | |
101 | stdout=StringIO(), | |
f64942e4 AA |
102 | check_status=False, |
103 | timeout=(15*60) | |
7c673cae FG |
104 | ) |
105 | if p.exitstatus != 0: | |
106 | return [] | |
107 | ||
108 | ls_str = p.stdout.getvalue().strip() | |
109 | if ls_str: | |
110 | return [int(n) for n in ls_str.split("\n")] | |
111 | else: | |
112 | return [] | |
113 | ||
114 | # Before starting ceph-fuse process, note the contents of | |
115 | # /sys/fs/fuse/connections | |
116 | pre_mount_conns = list_connections() | |
117 | log.info("Pre-mount connections: {0}".format(pre_mount_conns)) | |
118 | ||
119 | proc = self.client_remote.run( | |
120 | args=run_cmd, | |
9f95a23c | 121 | cwd=cwd, |
7c673cae FG |
122 | logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)), |
123 | stdin=run.PIPE, | |
124 | wait=False, | |
125 | ) | |
126 | self.fuse_daemon = proc | |
127 | ||
128 | # Wait for the connection reference to appear in /sys | |
129 | mount_wait = self.client_config.get('mount_wait', 0) | |
130 | if mount_wait > 0: | |
131 | log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait)) | |
132 | time.sleep(mount_wait) | |
133 | timeout = int(self.client_config.get('mount_timeout', 30)) | |
134 | waited = 0 | |
135 | ||
136 | post_mount_conns = list_connections() | |
137 | while len(post_mount_conns) <= len(pre_mount_conns): | |
138 | if self.fuse_daemon.finished: | |
139 | # Did mount fail? Raise the CommandFailedError instead of | |
140 | # hitting the "failed to populate /sys/" timeout | |
141 | self.fuse_daemon.wait() | |
142 | time.sleep(1) | |
143 | waited += 1 | |
144 | if waited > timeout: | |
145 | raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format( | |
146 | waited | |
147 | )) | |
148 | else: | |
149 | post_mount_conns = list_connections() | |
150 | ||
151 | log.info("Post-mount connections: {0}".format(post_mount_conns)) | |
152 | ||
153 | # Record our fuse connection number so that we can use it when | |
154 | # forcing an unmount | |
155 | new_conns = list(set(post_mount_conns) - set(pre_mount_conns)) | |
156 | if len(new_conns) == 0: | |
157 | raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns)) | |
158 | elif len(new_conns) > 1: | |
159 | raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns)) | |
160 | else: | |
161 | self._fuse_conn = new_conns[0] | |
162 | ||
11fdf7f2 TL |
163 | self.gather_mount_info() |
164 | ||
165 | def gather_mount_info(self): | |
166 | status = self.admin_socket(['status']) | |
167 | self.id = status['id'] | |
eafe8130 | 168 | self.client_pid = status['metadata']['pid'] |
11fdf7f2 TL |
169 | try: |
170 | self.inst = status['inst_str'] | |
171 | self.addr = status['addr_str'] | |
9f95a23c | 172 | except KeyError: |
11fdf7f2 TL |
173 | sessions = self.fs.rank_asok(['session', 'ls']) |
174 | for s in sessions: | |
175 | if s['id'] == self.id: | |
176 | self.inst = s['inst'] | |
177 | self.addr = self.inst.split()[1] | |
178 | if self.inst is None: | |
179 | raise RuntimeError("cannot find client session") | |
180 | ||
7c673cae FG |
181 | def is_mounted(self): |
182 | proc = self.client_remote.run( | |
183 | args=[ | |
184 | 'stat', | |
185 | '--file-system', | |
186 | '--printf=%T\n', | |
187 | '--', | |
188 | self.mountpoint, | |
189 | ], | |
9f95a23c | 190 | cwd=self.test_dir, |
7c673cae FG |
191 | stdout=StringIO(), |
192 | stderr=StringIO(), | |
f64942e4 AA |
193 | wait=False, |
194 | timeout=(15*60) | |
7c673cae FG |
195 | ) |
196 | try: | |
197 | proc.wait() | |
198 | except CommandFailedError: | |
199 | if ("endpoint is not connected" in proc.stderr.getvalue() | |
200 | or "Software caused connection abort" in proc.stderr.getvalue()): | |
201 | # This happens is fuse is killed without unmount | |
202 | log.warn("Found stale moutn point at {0}".format(self.mountpoint)) | |
203 | return True | |
204 | else: | |
205 | # This happens if the mount directory doesn't exist | |
206 | log.info('mount point does not exist: %s', self.mountpoint) | |
207 | return False | |
208 | ||
209 | fstype = proc.stdout.getvalue().rstrip('\n') | |
210 | if fstype == 'fuseblk': | |
211 | log.info('ceph-fuse is mounted on %s', self.mountpoint) | |
212 | return True | |
213 | else: | |
214 | log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format( | |
215 | fstype=fstype)) | |
216 | return False | |
217 | ||
218 | def wait_until_mounted(self): | |
219 | """ | |
220 | Check to make sure that fuse is mounted on mountpoint. If not, | |
221 | sleep for 5 seconds and check again. | |
222 | """ | |
223 | ||
224 | while not self.is_mounted(): | |
225 | # Even if it's not mounted, it should at least | |
226 | # be running: catch simple failures where it has terminated. | |
227 | assert not self.fuse_daemon.poll() | |
228 | ||
229 | time.sleep(5) | |
230 | ||
231 | # Now that we're mounted, set permissions so that the rest of the test will have | |
232 | # unrestricted access to the filesystem mount. | |
f64942e4 AA |
233 | try: |
234 | stderr = StringIO() | |
9f95a23c | 235 | self.client_remote.run(args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(15*60), cwd=self.test_dir, stderr=stderr) |
f64942e4 AA |
236 | except run.CommandFailedError: |
237 | stderr = stderr.getvalue() | |
238 | if "Read-only file system".lower() in stderr.lower(): | |
239 | pass | |
240 | else: | |
241 | raise | |
7c673cae FG |
242 | |
243 | def _mountpoint_exists(self): | |
9f95a23c | 244 | return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False, cwd=self.test_dir, timeout=(15*60)).exitstatus == 0 |
7c673cae FG |
245 | |
246 | def umount(self): | |
1911f103 TL |
247 | if not self.is_mounted(): |
248 | return | |
249 | ||
7c673cae FG |
250 | try: |
251 | log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name)) | |
252 | self.client_remote.run( | |
253 | args=[ | |
254 | 'sudo', | |
255 | 'fusermount', | |
256 | '-u', | |
257 | self.mountpoint, | |
258 | ], | |
9f95a23c | 259 | cwd=self.test_dir, |
f64942e4 | 260 | timeout=(30*60), |
7c673cae FG |
261 | ) |
262 | except run.CommandFailedError: | |
263 | log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name)) | |
264 | ||
c07f9fc5 FG |
265 | self.client_remote.run(args=[ |
266 | 'sudo', | |
267 | run.Raw('PATH=/usr/sbin:$PATH'), | |
268 | 'lsof', | |
269 | run.Raw(';'), | |
270 | 'ps', | |
271 | 'auxf', | |
f64942e4 | 272 | ], timeout=(60*15)) |
c07f9fc5 | 273 | |
7c673cae FG |
274 | # abort the fuse mount, killing all hung processes |
275 | if self._fuse_conn: | |
276 | self.run_python(dedent(""" | |
277 | import os | |
278 | path = "/sys/fs/fuse/connections/{0}/abort" | |
279 | if os.path.exists(path): | |
280 | open(path, "w").write("1") | |
281 | """).format(self._fuse_conn)) | |
282 | self._fuse_conn = None | |
283 | ||
284 | stderr = StringIO() | |
285 | try: | |
286 | # make sure its unmounted | |
287 | self.client_remote.run( | |
288 | args=[ | |
289 | 'sudo', | |
290 | 'umount', | |
291 | '-l', | |
292 | '-f', | |
293 | self.mountpoint, | |
294 | ], | |
f64942e4 AA |
295 | stderr=stderr, |
296 | timeout=(60*15) | |
7c673cae FG |
297 | ) |
298 | except CommandFailedError: | |
299 | if self.is_mounted(): | |
300 | raise | |
301 | ||
302 | assert not self.is_mounted() | |
303 | self._fuse_conn = None | |
11fdf7f2 TL |
304 | self.id = None |
305 | self.inst = None | |
306 | self.addr = None | |
7c673cae | 307 | |
28e407b8 | 308 | def umount_wait(self, force=False, require_clean=False, timeout=900): |
7c673cae FG |
309 | """ |
310 | :param force: Complete cleanly even if the MDS is offline | |
311 | """ | |
9f95a23c TL |
312 | if not (self.is_mounted() and self.fuse_daemon): |
313 | log.debug('ceph-fuse client.{id} is not mounted at {remote} {mnt}'.format(id=self.client_id, | |
314 | remote=self.client_remote, | |
315 | mnt=self.mountpoint)) | |
316 | return | |
317 | ||
7c673cae FG |
318 | if force: |
319 | assert not require_clean # mutually exclusive | |
320 | ||
321 | # When we expect to be forcing, kill the ceph-fuse process directly. | |
322 | # This should avoid hitting the more aggressive fallback killing | |
323 | # in umount() which can affect other mounts too. | |
324 | self.fuse_daemon.stdin.close() | |
325 | ||
326 | # However, we will still hit the aggressive wait if there is an ongoing | |
327 | # mount -o remount (especially if the remount is stuck because MDSs | |
328 | # are unavailable) | |
329 | ||
330 | self.umount() | |
331 | ||
332 | try: | |
9f95a23c TL |
333 | # Permit a timeout, so that we do not block forever |
334 | run.wait([self.fuse_daemon], timeout) | |
7c673cae | 335 | except MaxWhileTries: |
11fdf7f2 TL |
336 | log.error("process failed to terminate after unmount. This probably" |
337 | " indicates a bug within ceph-fuse.") | |
7c673cae FG |
338 | raise |
339 | except CommandFailedError: | |
340 | if require_clean: | |
341 | raise | |
342 | ||
343 | self.cleanup() | |
344 | ||
345 | def cleanup(self): | |
346 | """ | |
347 | Remove the mount point. | |
348 | ||
349 | Prerequisite: the client is not mounted. | |
350 | """ | |
351 | stderr = StringIO() | |
352 | try: | |
353 | self.client_remote.run( | |
354 | args=[ | |
355 | 'rmdir', | |
356 | '--', | |
357 | self.mountpoint, | |
358 | ], | |
9f95a23c | 359 | cwd=self.test_dir, |
f64942e4 | 360 | stderr=stderr, |
9f95a23c TL |
361 | timeout=(60*5), |
362 | check_status=False, | |
7c673cae FG |
363 | ) |
364 | except CommandFailedError: | |
365 | if "No such file or directory" in stderr.getvalue(): | |
366 | pass | |
367 | else: | |
368 | raise | |
369 | ||
370 | def kill(self): | |
371 | """ | |
372 | Terminate the client without removing the mount point. | |
373 | """ | |
11fdf7f2 | 374 | log.info('Killing ceph-fuse connection on {name}...'.format(name=self.client_remote.name)) |
7c673cae FG |
375 | self.fuse_daemon.stdin.close() |
376 | try: | |
377 | self.fuse_daemon.wait() | |
378 | except CommandFailedError: | |
379 | pass | |
380 | ||
381 | def kill_cleanup(self): | |
382 | """ | |
383 | Follow up ``kill`` to get to a clean unmounted state. | |
384 | """ | |
11fdf7f2 | 385 | log.info('Cleaning up killed ceph-fuse connection') |
7c673cae FG |
386 | self.umount() |
387 | self.cleanup() | |
388 | ||
389 | def teardown(self): | |
390 | """ | |
391 | Whatever the state of the mount, get it gone. | |
392 | """ | |
393 | super(FuseMount, self).teardown() | |
394 | ||
395 | self.umount() | |
396 | ||
397 | if self.fuse_daemon and not self.fuse_daemon.finished: | |
398 | self.fuse_daemon.stdin.close() | |
399 | try: | |
400 | self.fuse_daemon.wait() | |
401 | except CommandFailedError: | |
402 | pass | |
403 | ||
404 | # Indiscriminate, unlike the touchier cleanup() | |
405 | self.client_remote.run( | |
406 | args=[ | |
407 | 'rm', | |
408 | '-rf', | |
409 | self.mountpoint, | |
410 | ], | |
9f95a23c | 411 | cwd=self.test_dir, |
f64942e4 | 412 | timeout=(60*5) |
7c673cae FG |
413 | ) |
414 | ||
415 | def _asok_path(self): | |
416 | return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id) | |
417 | ||
418 | @property | |
419 | def _prefix(self): | |
420 | return "" | |
421 | ||
422 | def admin_socket(self, args): | |
423 | pyscript = """ | |
424 | import glob | |
425 | import re | |
426 | import os | |
427 | import subprocess | |
428 | ||
429 | def find_socket(client_name): | |
430 | asok_path = "{asok_path}" | |
431 | files = glob.glob(asok_path) | |
432 | ||
433 | # Given a non-glob path, it better be there | |
434 | if "*" not in asok_path: | |
435 | assert(len(files) == 1) | |
436 | return files[0] | |
437 | ||
438 | for f in files: | |
439 | pid = re.match(".*\.(\d+)\.asok$", f).group(1) | |
440 | if os.path.exists("/proc/{{0}}".format(pid)): | |
441 | return f | |
442 | raise RuntimeError("Client socket {{0}} not found".format(client_name)) | |
443 | ||
9f95a23c | 444 | print(find_socket("{client_name}")) |
7c673cae FG |
445 | """.format( |
446 | asok_path=self._asok_path(), | |
447 | client_name="client.{0}".format(self.client_id)) | |
448 | ||
449 | # Find the admin socket | |
450 | p = self.client_remote.run(args=[ | |
9f95a23c | 451 | 'sudo', 'python3', '-c', pyscript |
f64942e4 | 452 | ], stdout=StringIO(), timeout=(15*60)) |
7c673cae FG |
453 | asok_path = p.stdout.getvalue().strip() |
454 | log.info("Found client admin socket at {0}".format(asok_path)) | |
455 | ||
456 | # Query client ID from admin socket | |
457 | p = self.client_remote.run( | |
458 | args=['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args, | |
f64942e4 | 459 | stdout=StringIO(), timeout=(15*60)) |
7c673cae FG |
460 | return json.loads(p.stdout.getvalue()) |
461 | ||
462 | def get_global_id(self): | |
463 | """ | |
464 | Look up the CephFS client ID for this mount | |
465 | """ | |
7c673cae FG |
466 | return self.admin_socket(['mds_sessions'])['id'] |
467 | ||
11fdf7f2 TL |
468 | def get_global_inst(self): |
469 | """ | |
470 | Look up the CephFS client instance for this mount | |
471 | """ | |
472 | return self.inst | |
473 | ||
474 | def get_global_addr(self): | |
475 | """ | |
476 | Look up the CephFS client addr for this mount | |
477 | """ | |
478 | return self.addr | |
479 | ||
28e407b8 AA |
480 | def get_client_pid(self): |
481 | """ | |
482 | return pid of ceph-fuse process | |
483 | """ | |
484 | status = self.admin_socket(['status']) | |
485 | return status['metadata']['pid'] | |
486 | ||
7c673cae FG |
487 | def get_osd_epoch(self): |
488 | """ | |
489 | Return 2-tuple of osd_epoch, osd_epoch_barrier | |
490 | """ | |
491 | status = self.admin_socket(['status']) | |
492 | return status['osd_epoch'], status['osd_epoch_barrier'] | |
493 | ||
494 | def get_dentry_count(self): | |
495 | """ | |
496 | Return 2-tuple of dentry_count, dentry_pinned_count | |
497 | """ | |
498 | status = self.admin_socket(['status']) | |
499 | return status['dentry_count'], status['dentry_pinned_count'] | |
500 | ||
501 | def set_cache_size(self, size): | |
502 | return self.admin_socket(['config', 'set', 'client_cache_size', str(size)]) |