]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | from StringIO import StringIO | |
3 | import json | |
4 | import time | |
5 | import logging | |
6 | from textwrap import dedent | |
7 | ||
8 | from teuthology import misc | |
9 | from teuthology.contextutil import MaxWhileTries | |
10 | from teuthology.orchestra import run | |
11 | from teuthology.orchestra.run import CommandFailedError | |
12 | from .mount import CephFSMount | |
13 | ||
14 | log = logging.getLogger(__name__) | |
15 | ||
16 | ||
17 | class FuseMount(CephFSMount): | |
18 | def __init__(self, client_config, test_dir, client_id, client_remote): | |
19 | super(FuseMount, self).__init__(test_dir, client_id, client_remote) | |
20 | ||
21 | self.client_config = client_config if client_config else {} | |
22 | self.fuse_daemon = None | |
23 | self._fuse_conn = None | |
24 | ||
25 | def mount(self, mount_path=None, mount_fs_name=None): | |
26 | try: | |
27 | return self._mount(mount_path, mount_fs_name) | |
28 | except RuntimeError: | |
29 | # Catch exceptions by the mount() logic (i.e. not remote command | |
30 | # failures) and ensure the mount is not left half-up. | |
31 | # Otherwise we might leave a zombie mount point that causes | |
32 | # anyone traversing cephtest/ to get hung up on. | |
33 | log.warn("Trying to clean up after failed mount") | |
34 | self.umount_wait(force=True) | |
35 | raise | |
36 | ||
37 | def _mount(self, mount_path, mount_fs_name): | |
38 | log.info("Client client.%s config is %s" % (self.client_id, self.client_config)) | |
39 | ||
40 | daemon_signal = 'kill' | |
41 | if self.client_config.get('coverage') or self.client_config.get('valgrind') is not None: | |
42 | daemon_signal = 'term' | |
43 | ||
44 | log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format( | |
45 | id=self.client_id, remote=self.client_remote, mnt=self.mountpoint)) | |
46 | ||
47 | self.client_remote.run( | |
48 | args=[ | |
49 | 'mkdir', | |
50 | '--', | |
51 | self.mountpoint, | |
52 | ], | |
53 | ) | |
54 | ||
55 | run_cmd = [ | |
56 | 'sudo', | |
57 | 'adjust-ulimits', | |
58 | 'ceph-coverage', | |
59 | '{tdir}/archive/coverage'.format(tdir=self.test_dir), | |
60 | 'daemon-helper', | |
61 | daemon_signal, | |
62 | ] | |
63 | ||
64 | fuse_cmd = ['ceph-fuse', "-f"] | |
65 | ||
66 | if mount_path is not None: | |
67 | fuse_cmd += ["--client_mountpoint={0}".format(mount_path)] | |
68 | ||
69 | if mount_fs_name is not None: | |
70 | fuse_cmd += ["--client_mds_namespace={0}".format(mount_fs_name)] | |
71 | ||
72 | fuse_cmd += [ | |
73 | '--name', 'client.{id}'.format(id=self.client_id), | |
74 | # TODO ceph-fuse doesn't understand dash dash '--', | |
75 | self.mountpoint, | |
76 | ] | |
77 | ||
78 | if self.client_config.get('valgrind') is not None: | |
79 | run_cmd = misc.get_valgrind_args( | |
80 | self.test_dir, | |
81 | 'client.{id}'.format(id=self.client_id), | |
82 | run_cmd, | |
83 | self.client_config.get('valgrind'), | |
84 | ) | |
85 | ||
86 | run_cmd.extend(fuse_cmd) | |
87 | ||
88 | def list_connections(): | |
89 | self.client_remote.run( | |
90 | args=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"], | |
91 | check_status=False | |
92 | ) | |
93 | p = self.client_remote.run( | |
94 | args=["ls", "/sys/fs/fuse/connections"], | |
95 | stdout=StringIO(), | |
96 | check_status=False | |
97 | ) | |
98 | if p.exitstatus != 0: | |
99 | return [] | |
100 | ||
101 | ls_str = p.stdout.getvalue().strip() | |
102 | if ls_str: | |
103 | return [int(n) for n in ls_str.split("\n")] | |
104 | else: | |
105 | return [] | |
106 | ||
107 | # Before starting ceph-fuse process, note the contents of | |
108 | # /sys/fs/fuse/connections | |
109 | pre_mount_conns = list_connections() | |
110 | log.info("Pre-mount connections: {0}".format(pre_mount_conns)) | |
111 | ||
112 | proc = self.client_remote.run( | |
113 | args=run_cmd, | |
114 | logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)), | |
115 | stdin=run.PIPE, | |
116 | wait=False, | |
117 | ) | |
118 | self.fuse_daemon = proc | |
119 | ||
120 | # Wait for the connection reference to appear in /sys | |
121 | mount_wait = self.client_config.get('mount_wait', 0) | |
122 | if mount_wait > 0: | |
123 | log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait)) | |
124 | time.sleep(mount_wait) | |
125 | timeout = int(self.client_config.get('mount_timeout', 30)) | |
126 | waited = 0 | |
127 | ||
128 | post_mount_conns = list_connections() | |
129 | while len(post_mount_conns) <= len(pre_mount_conns): | |
130 | if self.fuse_daemon.finished: | |
131 | # Did mount fail? Raise the CommandFailedError instead of | |
132 | # hitting the "failed to populate /sys/" timeout | |
133 | self.fuse_daemon.wait() | |
134 | time.sleep(1) | |
135 | waited += 1 | |
136 | if waited > timeout: | |
137 | raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format( | |
138 | waited | |
139 | )) | |
140 | else: | |
141 | post_mount_conns = list_connections() | |
142 | ||
143 | log.info("Post-mount connections: {0}".format(post_mount_conns)) | |
144 | ||
145 | # Record our fuse connection number so that we can use it when | |
146 | # forcing an unmount | |
147 | new_conns = list(set(post_mount_conns) - set(pre_mount_conns)) | |
148 | if len(new_conns) == 0: | |
149 | raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns)) | |
150 | elif len(new_conns) > 1: | |
151 | raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns)) | |
152 | else: | |
153 | self._fuse_conn = new_conns[0] | |
154 | ||
155 | def is_mounted(self): | |
156 | proc = self.client_remote.run( | |
157 | args=[ | |
158 | 'stat', | |
159 | '--file-system', | |
160 | '--printf=%T\n', | |
161 | '--', | |
162 | self.mountpoint, | |
163 | ], | |
164 | stdout=StringIO(), | |
165 | stderr=StringIO(), | |
166 | wait=False | |
167 | ) | |
168 | try: | |
169 | proc.wait() | |
170 | except CommandFailedError: | |
171 | if ("endpoint is not connected" in proc.stderr.getvalue() | |
172 | or "Software caused connection abort" in proc.stderr.getvalue()): | |
173 | # This happens is fuse is killed without unmount | |
174 | log.warn("Found stale moutn point at {0}".format(self.mountpoint)) | |
175 | return True | |
176 | else: | |
177 | # This happens if the mount directory doesn't exist | |
178 | log.info('mount point does not exist: %s', self.mountpoint) | |
179 | return False | |
180 | ||
181 | fstype = proc.stdout.getvalue().rstrip('\n') | |
182 | if fstype == 'fuseblk': | |
183 | log.info('ceph-fuse is mounted on %s', self.mountpoint) | |
184 | return True | |
185 | else: | |
186 | log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format( | |
187 | fstype=fstype)) | |
188 | return False | |
189 | ||
190 | def wait_until_mounted(self): | |
191 | """ | |
192 | Check to make sure that fuse is mounted on mountpoint. If not, | |
193 | sleep for 5 seconds and check again. | |
194 | """ | |
195 | ||
196 | while not self.is_mounted(): | |
197 | # Even if it's not mounted, it should at least | |
198 | # be running: catch simple failures where it has terminated. | |
199 | assert not self.fuse_daemon.poll() | |
200 | ||
201 | time.sleep(5) | |
202 | ||
203 | # Now that we're mounted, set permissions so that the rest of the test will have | |
204 | # unrestricted access to the filesystem mount. | |
205 | self.client_remote.run( | |
206 | args=['sudo', 'chmod', '1777', self.mountpoint]) | |
207 | ||
208 | def _mountpoint_exists(self): | |
209 | return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False).exitstatus == 0 | |
210 | ||
211 | def umount(self): | |
212 | try: | |
213 | log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name)) | |
214 | self.client_remote.run( | |
215 | args=[ | |
216 | 'sudo', | |
217 | 'fusermount', | |
218 | '-u', | |
219 | self.mountpoint, | |
220 | ], | |
221 | ) | |
222 | except run.CommandFailedError: | |
223 | log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name)) | |
224 | ||
c07f9fc5 FG |
225 | self.client_remote.run(args=[ |
226 | 'sudo', | |
227 | run.Raw('PATH=/usr/sbin:$PATH'), | |
228 | 'lsof', | |
229 | run.Raw(';'), | |
230 | 'ps', | |
231 | 'auxf', | |
232 | ]) | |
233 | ||
7c673cae FG |
234 | # abort the fuse mount, killing all hung processes |
235 | if self._fuse_conn: | |
236 | self.run_python(dedent(""" | |
237 | import os | |
238 | path = "/sys/fs/fuse/connections/{0}/abort" | |
239 | if os.path.exists(path): | |
240 | open(path, "w").write("1") | |
241 | """).format(self._fuse_conn)) | |
242 | self._fuse_conn = None | |
243 | ||
244 | stderr = StringIO() | |
245 | try: | |
246 | # make sure its unmounted | |
247 | self.client_remote.run( | |
248 | args=[ | |
249 | 'sudo', | |
250 | 'umount', | |
251 | '-l', | |
252 | '-f', | |
253 | self.mountpoint, | |
254 | ], | |
255 | stderr=stderr | |
256 | ) | |
257 | except CommandFailedError: | |
258 | if self.is_mounted(): | |
259 | raise | |
260 | ||
261 | assert not self.is_mounted() | |
262 | self._fuse_conn = None | |
263 | ||
264 | def umount_wait(self, force=False, require_clean=False): | |
265 | """ | |
266 | :param force: Complete cleanly even if the MDS is offline | |
267 | """ | |
268 | if force: | |
269 | assert not require_clean # mutually exclusive | |
270 | ||
271 | # When we expect to be forcing, kill the ceph-fuse process directly. | |
272 | # This should avoid hitting the more aggressive fallback killing | |
273 | # in umount() which can affect other mounts too. | |
274 | self.fuse_daemon.stdin.close() | |
275 | ||
276 | # However, we will still hit the aggressive wait if there is an ongoing | |
277 | # mount -o remount (especially if the remount is stuck because MDSs | |
278 | # are unavailable) | |
279 | ||
280 | self.umount() | |
281 | ||
282 | try: | |
283 | if self.fuse_daemon: | |
284 | # Permit a timeout, so that we do not block forever | |
285 | run.wait([self.fuse_daemon], 900) | |
286 | except MaxWhileTries: | |
287 | log.error("process failed to terminate after unmount. This probably" | |
288 | "indicates a bug within ceph-fuse.") | |
289 | raise | |
290 | except CommandFailedError: | |
291 | if require_clean: | |
292 | raise | |
293 | ||
294 | self.cleanup() | |
295 | ||
296 | def cleanup(self): | |
297 | """ | |
298 | Remove the mount point. | |
299 | ||
300 | Prerequisite: the client is not mounted. | |
301 | """ | |
302 | stderr = StringIO() | |
303 | try: | |
304 | self.client_remote.run( | |
305 | args=[ | |
306 | 'rmdir', | |
307 | '--', | |
308 | self.mountpoint, | |
309 | ], | |
310 | stderr=stderr | |
311 | ) | |
312 | except CommandFailedError: | |
313 | if "No such file or directory" in stderr.getvalue(): | |
314 | pass | |
315 | else: | |
316 | raise | |
317 | ||
318 | def kill(self): | |
319 | """ | |
320 | Terminate the client without removing the mount point. | |
321 | """ | |
322 | self.fuse_daemon.stdin.close() | |
323 | try: | |
324 | self.fuse_daemon.wait() | |
325 | except CommandFailedError: | |
326 | pass | |
327 | ||
328 | def kill_cleanup(self): | |
329 | """ | |
330 | Follow up ``kill`` to get to a clean unmounted state. | |
331 | """ | |
332 | self.umount() | |
333 | self.cleanup() | |
334 | ||
335 | def teardown(self): | |
336 | """ | |
337 | Whatever the state of the mount, get it gone. | |
338 | """ | |
339 | super(FuseMount, self).teardown() | |
340 | ||
341 | self.umount() | |
342 | ||
343 | if self.fuse_daemon and not self.fuse_daemon.finished: | |
344 | self.fuse_daemon.stdin.close() | |
345 | try: | |
346 | self.fuse_daemon.wait() | |
347 | except CommandFailedError: | |
348 | pass | |
349 | ||
350 | # Indiscriminate, unlike the touchier cleanup() | |
351 | self.client_remote.run( | |
352 | args=[ | |
353 | 'rm', | |
354 | '-rf', | |
355 | self.mountpoint, | |
356 | ], | |
357 | ) | |
358 | ||
359 | def _asok_path(self): | |
360 | return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id) | |
361 | ||
362 | @property | |
363 | def _prefix(self): | |
364 | return "" | |
365 | ||
366 | def admin_socket(self, args): | |
367 | pyscript = """ | |
368 | import glob | |
369 | import re | |
370 | import os | |
371 | import subprocess | |
372 | ||
373 | def find_socket(client_name): | |
374 | asok_path = "{asok_path}" | |
375 | files = glob.glob(asok_path) | |
376 | ||
377 | # Given a non-glob path, it better be there | |
378 | if "*" not in asok_path: | |
379 | assert(len(files) == 1) | |
380 | return files[0] | |
381 | ||
382 | for f in files: | |
383 | pid = re.match(".*\.(\d+)\.asok$", f).group(1) | |
384 | if os.path.exists("/proc/{{0}}".format(pid)): | |
385 | return f | |
386 | raise RuntimeError("Client socket {{0}} not found".format(client_name)) | |
387 | ||
388 | print find_socket("{client_name}") | |
389 | """.format( | |
390 | asok_path=self._asok_path(), | |
391 | client_name="client.{0}".format(self.client_id)) | |
392 | ||
393 | # Find the admin socket | |
394 | p = self.client_remote.run(args=[ | |
395 | 'python', '-c', pyscript | |
396 | ], stdout=StringIO()) | |
397 | asok_path = p.stdout.getvalue().strip() | |
398 | log.info("Found client admin socket at {0}".format(asok_path)) | |
399 | ||
400 | # Query client ID from admin socket | |
401 | p = self.client_remote.run( | |
402 | args=['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args, | |
403 | stdout=StringIO()) | |
404 | return json.loads(p.stdout.getvalue()) | |
405 | ||
406 | def get_global_id(self): | |
407 | """ | |
408 | Look up the CephFS client ID for this mount | |
409 | """ | |
410 | ||
411 | return self.admin_socket(['mds_sessions'])['id'] | |
412 | ||
413 | def get_osd_epoch(self): | |
414 | """ | |
415 | Return 2-tuple of osd_epoch, osd_epoch_barrier | |
416 | """ | |
417 | status = self.admin_socket(['status']) | |
418 | return status['osd_epoch'], status['osd_epoch_barrier'] | |
419 | ||
420 | def get_dentry_count(self): | |
421 | """ | |
422 | Return 2-tuple of dentry_count, dentry_pinned_count | |
423 | """ | |
424 | status = self.admin_socket(['status']) | |
425 | return status['dentry_count'], status['dentry_pinned_count'] | |
426 | ||
427 | def set_cache_size(self, size): | |
428 | return self.admin_socket(['config', 'set', 'client_cache_size', str(size)]) |