]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/mount.py
import ceph 16.2.6
[ceph.git] / ceph / qa / tasks / cephfs / mount.py
1 import hashlib
2 import json
3 import logging
4 import datetime
5 import os
6 import re
7 import time
8
9 from io import StringIO
10 from contextlib import contextmanager
11 from textwrap import dedent
12 from IPy import IP
13
14 from teuthology.contextutil import safe_while
15 from teuthology.misc import get_file, write_file
16 from teuthology.orchestra import run
17 from teuthology.orchestra.run import CommandFailedError, ConnectionLostError, Raw
18
19 from tasks.cephfs.filesystem import Filesystem
20
21 log = logging.getLogger(__name__)
22
23 class CephFSMount(object):
24 def __init__(self, ctx, test_dir, client_id, client_remote,
25 client_keyring_path=None, hostfs_mntpt=None,
26 cephfs_name=None, cephfs_mntpt=None, brxnet=None):
27 """
28 :param test_dir: Global teuthology test dir
29 :param client_id: Client ID, the 'foo' in client.foo
30 :param client_keyring_path: path to keyring for given client_id
31 :param client_remote: Remote instance for the host where client will
32 run
33 :param hostfs_mntpt: Path to directory on the FS on which Ceph FS will
34 be mounted
35 :param cephfs_name: Name of Ceph FS to be mounted
36 :param cephfs_mntpt: Path to directory inside Ceph FS that will be
37 mounted as root
38 """
39 self.mounted = False
40 self.ctx = ctx
41 self.test_dir = test_dir
42
43 self._verify_attrs(client_id=client_id,
44 client_keyring_path=client_keyring_path,
45 hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name,
46 cephfs_mntpt=cephfs_mntpt)
47
48 self.client_id = client_id
49 self.client_keyring_path = client_keyring_path
50 self.client_remote = client_remote
51 if hostfs_mntpt:
52 self.hostfs_mntpt = hostfs_mntpt
53 self.hostfs_mntpt_dirname = os.path.basename(self.hostfs_mntpt)
54 else:
55 self.hostfs_mntpt = os.path.join(self.test_dir, f'mnt.{self.client_id}')
56 self.cephfs_name = cephfs_name
57 self.cephfs_mntpt = cephfs_mntpt
58
59 self.fs = None
60
61 self._netns_name = None
62 self.nsid = -1
63 if brxnet is None:
64 self.ceph_brx_net = '192.168.0.0/16'
65 else:
66 self.ceph_brx_net = brxnet
67
68 self.test_files = ['a', 'b', 'c']
69
70 self.background_procs = []
71
72 # This will cleanup the stale netnses, which are from the
73 # last failed test cases.
74 @staticmethod
75 def cleanup_stale_netnses_and_bridge(remote):
76 p = remote.run(args=['ip', 'netns', 'list'],
77 stdout=StringIO(), timeout=(5*60))
78 p = p.stdout.getvalue().strip()
79
80 # Get the netns name list
81 netns_list = re.findall(r'ceph-ns-[^()\s][-.\w]+[^():\s]', p)
82
83 # Remove the stale netnses
84 for ns in netns_list:
85 ns_name = ns.split()[0]
86 args = ['sudo', 'ip', 'netns', 'delete', '{0}'.format(ns_name)]
87 try:
88 remote.run(args=args, timeout=(5*60), omit_sudo=False)
89 except Exception:
90 pass
91
92 # Remove the stale 'ceph-brx'
93 try:
94 args = ['sudo', 'ip', 'link', 'delete', 'ceph-brx']
95 remote.run(args=args, timeout=(5*60), omit_sudo=False)
96 except Exception:
97 pass
98
99 def _parse_netns_name(self):
100 self._netns_name = '-'.join(["ceph-ns",
101 re.sub(r'/+', "-", self.mountpoint)])
102
103 @property
104 def mountpoint(self):
105 if self.hostfs_mntpt == None:
106 self.hostfs_mntpt = os.path.join(self.test_dir,
107 self.hostfs_mntpt_dirname)
108 return self.hostfs_mntpt
109
110 @mountpoint.setter
111 def mountpoint(self, path):
112 if not isinstance(path, str):
113 raise RuntimeError('path should be of str type.')
114 self._mountpoint = self.hostfs_mntpt = path
115
116 @property
117 def netns_name(self):
118 if self._netns_name == None:
119 self._parse_netns_name()
120 return self._netns_name
121
122 @netns_name.setter
123 def netns_name(self, name):
124 self._netns_name = name
125
126 def assert_and_log_minimum_mount_details(self):
127 """
128 Make sure we have minimum details required for mounting. Ideally, this
129 method should be called at the beginning of the mount method.
130 """
131 if not self.client_id or not self.client_remote or \
132 not self.hostfs_mntpt:
133 errmsg = ('Mounting CephFS requires that at least following '
134 'details to be provided -\n'
135 '1. the client ID,\n2. the mountpoint and\n'
136 '3. the remote machine where CephFS will be mounted.\n')
137 raise RuntimeError(errmsg)
138
139 log.info('Mounting Ceph FS. Following are details of mount; remember '
140 '"None" represents Python type None -')
141 log.info(f'self.client_remote.hostname = {self.client_remote.hostname}')
142 log.info(f'self.client.name = client.{self.client_id}')
143 log.info(f'self.hostfs_mntpt = {self.hostfs_mntpt}')
144 log.info(f'self.cephfs_name = {self.cephfs_name}')
145 log.info(f'self.cephfs_mntpt = {self.cephfs_mntpt}')
146 log.info(f'self.client_keyring_path = {self.client_keyring_path}')
147 if self.client_keyring_path:
148 log.info('keyring content -\n' +
149 get_file(self.client_remote, self.client_keyring_path,
150 sudo=True).decode())
151
152 def is_mounted(self):
153 return self.mounted
154
155 def setupfs(self, name=None):
156 if name is None and self.fs is not None:
157 # Previous mount existed, reuse the old name
158 name = self.fs.name
159 self.fs = Filesystem(self.ctx, name=name)
160 log.info('Wait for MDS to reach steady state...')
161 self.fs.wait_for_daemons()
162 log.info('Ready to start {}...'.format(type(self).__name__))
163
164 def _setup_brx_and_nat(self):
165 # The ip for ceph-brx should be
166 ip = IP(self.ceph_brx_net)[-2]
167 mask = self.ceph_brx_net.split('/')[1]
168 brd = IP(self.ceph_brx_net).broadcast()
169
170 brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(),
171 stdout=StringIO(), timeout=(5*60))
172 brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue())
173 if brx:
174 # If the 'ceph-brx' already exists, then check whether
175 # the new net is conflicting with it
176 _ip, _mask = brx[0].split()[1].split('/', 1)
177 if _ip != "{}".format(ip) or _mask != mask:
178 raise RuntimeError("Conflict with existing ceph-brx {0}, new {1}/{2}".format(brx[0].split()[1], ip, mask))
179
180 # Setup the ceph-brx and always use the last valid IP
181 if not brx:
182 log.info("Setuping the 'ceph-brx' with {0}/{1}".format(ip, mask))
183
184 self.run_shell_payload(f"""
185 set -e
186 sudo ip link add name ceph-brx type bridge
187 sudo ip addr flush dev ceph-brx
188 sudo ip link set ceph-brx up
189 sudo ip addr add {ip}/{mask} brd {brd} dev ceph-brx
190 """, timeout=(5*60), omit_sudo=False, cwd='/')
191
192 args = "echo 1 | sudo tee /proc/sys/net/ipv4/ip_forward"
193 self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
194
195 # Setup the NAT
196 p = self.client_remote.run(args=['route'], stderr=StringIO(),
197 stdout=StringIO(), timeout=(5*60))
198 p = re.findall(r'default .*', p.stdout.getvalue())
199 if p == False:
200 raise RuntimeError("No default gw found")
201 gw = p[0].split()[7]
202
203 self.run_shell_payload(f"""
204 set -e
205 sudo iptables -A FORWARD -o {gw} -i ceph-brx -j ACCEPT
206 sudo iptables -A FORWARD -i {gw} -o ceph-brx -j ACCEPT
207 sudo iptables -t nat -A POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
208 """, timeout=(5*60), omit_sudo=False, cwd='/')
209
210 def _setup_netns(self):
211 p = self.client_remote.run(args=['ip', 'netns', 'list'],
212 stderr=StringIO(), stdout=StringIO(),
213 timeout=(5*60)).stdout.getvalue().strip()
214
215 # Get the netns name list
216 netns_list = re.findall(r'[^()\s][-.\w]+[^():\s]', p)
217
218 out = re.search(r"{0}".format(self.netns_name), p)
219 if out is None:
220 # Get an uniq nsid for the new netns
221 nsid = 0
222 p = self.client_remote.run(args=['ip', 'netns', 'list-id'],
223 stderr=StringIO(), stdout=StringIO(),
224 timeout=(5*60)).stdout.getvalue()
225 while True:
226 out = re.search(r"nsid {} ".format(nsid), p)
227 if out is None:
228 break
229
230 nsid += 1
231
232 # Add one new netns and set it id
233 self.run_shell_payload(f"""
234 set -e
235 sudo ip netns add {self.netns_name}
236 sudo ip netns set {self.netns_name} {nsid}
237 """, timeout=(5*60), omit_sudo=False, cwd='/')
238 self.nsid = nsid;
239 else:
240 # The netns already exists and maybe suspended by self.kill()
241 self.resume_netns();
242
243 nsid = int(re.search(r"{0} \(id: (\d+)\)".format(self.netns_name), p).group(1))
244 self.nsid = nsid;
245 return
246
247 # Get one ip address for netns
248 ips = IP(self.ceph_brx_net)
249 for ip in ips:
250 found = False
251 if ip == ips[0]:
252 continue
253 if ip == ips[-2]:
254 raise RuntimeError("we have ran out of the ip addresses")
255
256 for ns in netns_list:
257 ns_name = ns.split()[0]
258 args = ['sudo', 'ip', 'netns', 'exec', '{0}'.format(ns_name), 'ip', 'addr']
259 try:
260 p = self.client_remote.run(args=args, stderr=StringIO(),
261 stdout=StringIO(), timeout=(5*60),
262 omit_sudo=False)
263 q = re.search("{0}".format(ip), p.stdout.getvalue())
264 if q is not None:
265 found = True
266 break
267 except CommandFailedError:
268 if "No such file or directory" in p.stderr.getvalue():
269 pass
270 if "Invalid argument" in p.stderr.getvalue():
271 pass
272
273 if found == False:
274 break
275
276 mask = self.ceph_brx_net.split('/')[1]
277 brd = IP(self.ceph_brx_net).broadcast()
278
279 log.info("Setuping the netns '{0}' with {1}/{2}".format(self.netns_name, ip, mask))
280
281 # Setup the veth interfaces
282 brxip = IP(self.ceph_brx_net)[-2]
283 self.run_shell_payload(f"""
284 set -e
285 sudo ip link add veth0 netns {self.netns_name} type veth peer name brx.{nsid}
286 sudo ip netns exec {self.netns_name} ip addr add {ip}/{mask} brd {brd} dev veth0
287 sudo ip netns exec {self.netns_name} ip link set veth0 up
288 sudo ip netns exec {self.netns_name} ip link set lo up
289 sudo ip netns exec {self.netns_name} ip route add default via {brxip}
290 """, timeout=(5*60), omit_sudo=False, cwd='/')
291
292 # Bring up the brx interface and join it to 'ceph-brx'
293 self.run_shell_payload(f"""
294 set -e
295 sudo ip link set brx.{nsid} up
296 sudo ip link set dev brx.{nsid} master ceph-brx
297 """, timeout=(5*60), omit_sudo=False, cwd='/')
298
299 def _cleanup_netns(self):
300 if self.nsid == -1:
301 return
302 log.info("Removing the netns '{0}'".format(self.netns_name))
303
304 # Delete the netns and the peer veth interface
305 self.run_shell_payload(f"""
306 set -e
307 sudo ip link set brx.{self.nsid} down
308 sudo ip link delete dev brx.{self.nsid}
309 sudo ip netns delete {self.netns_name}
310 """, timeout=(5*60), omit_sudo=False, cwd='/')
311
312 self.nsid = -1
313
314 def _cleanup_brx_and_nat(self):
315 brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(),
316 stdout=StringIO(), timeout=(5*60))
317 brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue())
318 if not brx:
319 return
320
321 # If we are the last netns, will delete the ceph-brx
322 args = ['sudo', 'ip', 'link', 'show']
323 p = self.client_remote.run(args=args, stdout=StringIO(),
324 timeout=(5*60), omit_sudo=False)
325 _list = re.findall(r'brx\.', p.stdout.getvalue().strip())
326 if len(_list) != 0:
327 return
328
329 log.info("Removing the 'ceph-brx'")
330
331 self.run_shell_payload("""
332 set -e
333 sudo ip link set ceph-brx down
334 sudo ip link delete ceph-brx
335 """, timeout=(5*60), omit_sudo=False, cwd='/')
336
337 # Drop the iptables NAT rules
338 ip = IP(self.ceph_brx_net)[-2]
339 mask = self.ceph_brx_net.split('/')[1]
340
341 p = self.client_remote.run(args=['route'], stderr=StringIO(),
342 stdout=StringIO(), timeout=(5*60))
343 p = re.findall(r'default .*', p.stdout.getvalue())
344 if p == False:
345 raise RuntimeError("No default gw found")
346 gw = p[0].split()[7]
347 self.run_shell_payload(f"""
348 set -e
349 sudo iptables -D FORWARD -o {gw} -i ceph-brx -j ACCEPT
350 sudo iptables -D FORWARD -i {gw} -o ceph-brx -j ACCEPT
351 sudo iptables -t nat -D POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
352 """, timeout=(5*60), omit_sudo=False, cwd='/')
353
354 def setup_netns(self):
355 """
356 Setup the netns for the mountpoint.
357 """
358 log.info("Setting the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
359 self._setup_brx_and_nat()
360 self._setup_netns()
361
362 def cleanup_netns(self):
363 """
364 Cleanup the netns for the mountpoint.
365 """
366 # We will defer cleaning the netnses and bridge until the last
367 # mountpoint is unmounted, this will be a temporary work around
368 # for issue#46282.
369
370 # log.info("Cleaning the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
371 # self._cleanup_netns()
372 # self._cleanup_brx_and_nat()
373
374 def suspend_netns(self):
375 """
376 Suspend the netns veth interface.
377 """
378 if self.nsid == -1:
379 return
380
381 log.info("Suspending the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
382
383 args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'down']
384 self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
385
386 def resume_netns(self):
387 """
388 Resume the netns veth interface.
389 """
390 if self.nsid == -1:
391 return
392
393 log.info("Resuming the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
394
395 args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'up']
396 self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
397
398 def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
399 """
400 kwargs expects its members to be same as the arguments accepted by
401 self.update_attrs().
402 """
403 raise NotImplementedError()
404
405 def mount_wait(self, **kwargs):
406 """
407 Accepts arguments same as self.mount().
408 """
409 self.mount(**kwargs)
410 self.wait_until_mounted()
411
412 def umount(self):
413 raise NotImplementedError()
414
415 def umount_wait(self, force=False, require_clean=False, timeout=None):
416 """
417
418 :param force: Expect that the mount will not shutdown cleanly: kill
419 it hard.
420 :param require_clean: Wait for the Ceph client associated with the
421 mount (e.g. ceph-fuse) to terminate, and
422 raise if it doesn't do so cleanly.
423 :param timeout: amount of time to be waited for umount command to finish
424 :return:
425 """
426 raise NotImplementedError()
427
428 def _verify_attrs(self, **kwargs):
429 """
430 Verify that client_id, client_keyring_path, client_remote, hostfs_mntpt,
431 cephfs_name, cephfs_mntpt are either type str or None.
432 """
433 for k, v in kwargs.items():
434 if v is not None and not isinstance(v, str):
435 raise RuntimeError('value of attributes should be either str '
436 f'or None. {k} - {v}')
437
438 def update_attrs(self, client_id=None, client_keyring_path=None,
439 client_remote=None, hostfs_mntpt=None, cephfs_name=None,
440 cephfs_mntpt=None):
441 if not (client_id or client_keyring_path or client_remote or
442 cephfs_name or cephfs_mntpt or hostfs_mntpt):
443 return
444
445 self._verify_attrs(client_id=client_id,
446 client_keyring_path=client_keyring_path,
447 hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name,
448 cephfs_mntpt=cephfs_mntpt)
449
450 if client_id:
451 self.client_id = client_id
452 if client_keyring_path:
453 self.client_keyring_path = client_keyring_path
454 if client_remote:
455 self.client_remote = client_remote
456 if hostfs_mntpt:
457 self.hostfs_mntpt = hostfs_mntpt
458 if cephfs_name:
459 self.cephfs_name = cephfs_name
460 if cephfs_mntpt:
461 self.cephfs_mntpt = cephfs_mntpt
462
463 def remount(self, **kwargs):
464 """
465 Update mount object's attributes and attempt remount with these
466 new values for these attrbiutes.
467
468 1. Run umount_wait().
469 2. Run update_attrs().
470 3. Run mount().
471
472 Accepts arguments of self.mount() and self.update_attrs() with 2 exceptions -
473 1. Accepts wait too which can be True or False.
474 2. The default value of createfs is False.
475 """
476 self.umount_wait()
477 assert not self.mounted
478
479 mntopts = kwargs.pop('mntopts', [])
480 createfs = kwargs.pop('createfs', False)
481 check_status = kwargs.pop('check_status', True)
482 wait = kwargs.pop('wait', True)
483
484 self.update_attrs(**kwargs)
485
486 retval = self.mount(mntopts=mntopts, createfs=createfs,
487 check_status=check_status)
488 # avoid this scenario (again): mount command might've failed and
489 # check_status might have silenced the exception, yet we attempt to
490 # wait which might lead to an error.
491 if retval is None and wait:
492 self.wait_until_mounted()
493
494 return retval
495
496 def kill(self):
497 """
498 Suspend the netns veth interface to make the client disconnected
499 from the ceph cluster
500 """
501 log.info('Killing connection on {0}...'.format(self.client_remote.name))
502 self.suspend_netns()
503
504 def kill_cleanup(self):
505 """
506 Follow up ``kill`` to get to a clean unmounted state.
507 """
508 log.info('Cleaning up killed connection on {0}'.format(self.client_remote.name))
509 self.umount_wait(force=True)
510
511 def cleanup(self):
512 """
513 Remove the mount point.
514
515 Prerequisite: the client is not mounted.
516 """
517 log.info('Cleaning up mount {0}'.format(self.client_remote.name))
518 stderr = StringIO()
519 try:
520 self.client_remote.run(args=['rmdir', '--', self.mountpoint],
521 cwd=self.test_dir, stderr=stderr,
522 timeout=(60*5), check_status=False)
523 except CommandFailedError:
524 if "no such file or directory" not in stderr.getvalue().lower():
525 raise
526
527 self.cleanup_netns()
528
529 def wait_until_mounted(self):
530 raise NotImplementedError()
531
532 def get_keyring_path(self):
533 return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self.client_id)
534
535 def get_key_from_keyfile(self):
536 # XXX: don't call run_shell(), since CephFS might be unmounted.
537 keyring = self.client_remote.run(
538 args=['sudo', 'cat', self.client_keyring_path], stdout=StringIO(),
539 omit_sudo=False).stdout.getvalue()
540 for line in keyring.split('\n'):
541 if line.find('key') != -1:
542 return line[line.find('=') + 1 : ].strip()
543
544 @property
545 def config_path(self):
546 """
547 Path to ceph.conf: override this if you're not a normal systemwide ceph install
548 :return: stringv
549 """
550 return "/etc/ceph/ceph.conf"
551
552 @contextmanager
553 def mounted_wait(self):
554 """
555 A context manager, from an initially unmounted state, to mount
556 this, yield, and then unmount and clean up.
557 """
558 self.mount()
559 self.wait_until_mounted()
560 try:
561 yield
562 finally:
563 self.umount_wait()
564
565 def create_file(self, filename='testfile', dirname=None, user=None,
566 check_status=True):
567 assert(self.is_mounted())
568
569 if not os.path.isabs(filename):
570 if dirname:
571 if os.path.isabs(dirname):
572 path = os.path.join(dirname, filename)
573 else:
574 path = os.path.join(self.hostfs_mntpt, dirname, filename)
575 else:
576 path = os.path.join(self.hostfs_mntpt, filename)
577 else:
578 path = filename
579
580 if user:
581 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', 'touch ' + path]
582 else:
583 args = 'touch ' + path
584
585 return self.client_remote.run(args=args, check_status=check_status)
586
587 def create_files(self):
588 assert(self.is_mounted())
589
590 for suffix in self.test_files:
591 log.info("Creating file {0}".format(suffix))
592 self.client_remote.run(args=[
593 'touch', os.path.join(self.hostfs_mntpt, suffix)
594 ])
595
596 def test_create_file(self, filename='testfile', dirname=None, user=None,
597 check_status=True):
598 return self.create_file(filename=filename, dirname=dirname, user=user,
599 check_status=False)
600
601 def check_files(self):
602 assert(self.is_mounted())
603
604 for suffix in self.test_files:
605 log.info("Checking file {0}".format(suffix))
606 r = self.client_remote.run(args=[
607 'ls', os.path.join(self.hostfs_mntpt, suffix)
608 ], check_status=False)
609 if r.exitstatus != 0:
610 raise RuntimeError("Expected file {0} not found".format(suffix))
611
612 def write_file(self, path, data, perms=None):
613 """
614 Write the given data at the given path and set the given perms to the
615 file on the path.
616 """
617 if path.find(self.hostfs_mntpt) == -1:
618 path = os.path.join(self.hostfs_mntpt, path)
619
620 write_file(self.client_remote, path, data)
621
622 if perms:
623 self.run_shell(args=f'chmod {perms} {path}')
624
625 def read_file(self, path):
626 """
627 Return the data from the file on given path.
628 """
629 if path.find(self.hostfs_mntpt) == -1:
630 path = os.path.join(self.hostfs_mntpt, path)
631
632 return self.run_shell(args=['cat', path]).\
633 stdout.getvalue().strip()
634
635 def create_destroy(self):
636 assert(self.is_mounted())
637
638 filename = "{0} {1}".format(datetime.datetime.now(), self.client_id)
639 log.debug("Creating test file {0}".format(filename))
640 self.client_remote.run(args=[
641 'touch', os.path.join(self.hostfs_mntpt, filename)
642 ])
643 log.debug("Deleting test file {0}".format(filename))
644 self.client_remote.run(args=[
645 'rm', '-f', os.path.join(self.hostfs_mntpt, filename)
646 ])
647
648 def _run_python(self, pyscript, py_version='python3', sudo=False):
649 args = []
650 if sudo:
651 args.append('sudo')
652 args += ['adjust-ulimits', 'daemon-helper', 'kill', py_version, '-c', pyscript]
653 return self.client_remote.run(args=args, wait=False, stdin=run.PIPE, stdout=StringIO())
654
655 def run_python(self, pyscript, py_version='python3', sudo=False):
656 p = self._run_python(pyscript, py_version, sudo=sudo)
657 p.wait()
658 return p.stdout.getvalue().strip()
659
660 def run_shell(self, args, timeout=900, **kwargs):
661 args = args.split() if isinstance(args, str) else args
662 kwargs.pop('omit_sudo', False)
663 sudo = kwargs.pop('sudo', False)
664 cwd = kwargs.pop('cwd', self.mountpoint)
665 stdout = kwargs.pop('stdout', StringIO())
666 stderr = kwargs.pop('stderr', StringIO())
667
668 if sudo:
669 args.insert(0, 'sudo')
670
671 return self.client_remote.run(args=args, cwd=cwd, timeout=timeout, stdout=stdout, stderr=stderr, **kwargs)
672
673 def run_shell_payload(self, payload, **kwargs):
674 return self.run_shell(["bash", "-c", Raw(f"'{payload}'")], **kwargs)
675
676 def run_as_user(self, **kwargs):
677 """
678 Besides the arguments defined for run_shell() this method also
679 accepts argument 'user'.
680 """
681 args = kwargs.pop('args')
682 user = kwargs.pop('user')
683 if isinstance(args, str):
684 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', args]
685 elif isinstance(args, list):
686 cmdlist = args
687 cmd = ''
688 for i in cmdlist:
689 cmd = cmd + i + ' '
690 # get rid of extra space at the end.
691 cmd = cmd[:-1]
692
693 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', cmd]
694
695 kwargs['args'] = args
696 return self.run_shell(**kwargs)
697
698 def run_as_root(self, **kwargs):
699 """
700 Accepts same arguments as run_shell().
701 """
702 kwargs['user'] = 'root'
703 return self.run_as_user(**kwargs)
704
705 def _verify(self, proc, retval=None, errmsg=None):
706 if retval:
707 msg = ('expected return value: {}\nreceived return value: '
708 '{}\n'.format(retval, proc.returncode))
709 assert proc.returncode == retval, msg
710
711 if errmsg:
712 stderr = proc.stderr.getvalue().lower()
713 msg = ('didn\'t find given string in stderr -\nexpected string: '
714 '{}\nreceived error message: {}\nnote: received error '
715 'message is converted to lowercase'.format(errmsg, stderr))
716 assert errmsg in stderr, msg
717
718 def negtestcmd(self, args, retval=None, errmsg=None, stdin=None,
719 cwd=None, wait=True):
720 """
721 Conduct a negative test for the given command.
722
723 retval and errmsg are parameters to confirm the cause of command
724 failure.
725 """
726 proc = self.run_shell(args=args, wait=wait, stdin=stdin, cwd=cwd,
727 check_status=False)
728 self._verify(proc, retval, errmsg)
729 return proc
730
731 def negtestcmd_as_user(self, args, user, retval=None, errmsg=None,
732 stdin=None, cwd=None, wait=True):
733 proc = self.run_as_user(args=args, user=user, wait=wait, stdin=stdin,
734 cwd=cwd, check_status=False)
735 self._verify(proc, retval, errmsg)
736 return proc
737
738 def negtestcmd_as_root(self, args, retval=None, errmsg=None, stdin=None,
739 cwd=None, wait=True):
740 proc = self.run_as_root(args=args, wait=wait, stdin=stdin, cwd=cwd,
741 check_status=False)
742 self._verify(proc, retval, errmsg)
743 return proc
744
745 def open_no_data(self, basename):
746 """
747 A pure metadata operation
748 """
749 assert(self.is_mounted())
750
751 path = os.path.join(self.hostfs_mntpt, basename)
752
753 p = self._run_python(dedent(
754 """
755 f = open("{path}", 'w')
756 """.format(path=path)
757 ))
758 p.wait()
759
760 def open_background(self, basename="background_file", write=True):
761 """
762 Open a file for writing, then block such that the client
763 will hold a capability.
764
765 Don't return until the remote process has got as far as opening
766 the file, then return the RemoteProcess instance.
767 """
768 assert(self.is_mounted())
769
770 path = os.path.join(self.hostfs_mntpt, basename)
771
772 if write:
773 pyscript = dedent("""
774 import time
775
776 with open("{path}", 'w') as f:
777 f.write('content')
778 f.flush()
779 f.write('content2')
780 while True:
781 time.sleep(1)
782 """).format(path=path)
783 else:
784 pyscript = dedent("""
785 import time
786
787 with open("{path}", 'r') as f:
788 while True:
789 time.sleep(1)
790 """).format(path=path)
791
792 rproc = self._run_python(pyscript)
793 self.background_procs.append(rproc)
794
795 # This wait would not be sufficient if the file had already
796 # existed, but it's simple and in practice users of open_background
797 # are not using it on existing files.
798 self.wait_for_visible(basename)
799
800 return rproc
801
802 def wait_for_dir_empty(self, dirname, timeout=30):
803 dirpath = os.path.join(self.hostfs_mntpt, dirname)
804 with safe_while(sleep=5, tries=(timeout//5)) as proceed:
805 while proceed():
806 p = self.run_shell_payload(f"stat -c %h {dirpath}")
807 nr_links = int(p.stdout.getvalue().strip())
808 if nr_links == 2:
809 return
810
811 def wait_for_visible(self, basename="background_file", timeout=30):
812 i = 0
813 while i < timeout:
814 r = self.client_remote.run(args=[
815 'stat', os.path.join(self.hostfs_mntpt, basename)
816 ], check_status=False)
817 if r.exitstatus == 0:
818 log.debug("File {0} became visible from {1} after {2}s".format(
819 basename, self.client_id, i))
820 return
821 else:
822 time.sleep(1)
823 i += 1
824
825 raise RuntimeError("Timed out after {0}s waiting for {1} to become visible from {2}".format(
826 i, basename, self.client_id))
827
828 def lock_background(self, basename="background_file", do_flock=True):
829 """
830 Open and lock a files for writing, hold the lock in a background process
831 """
832 assert(self.is_mounted())
833
834 path = os.path.join(self.hostfs_mntpt, basename)
835
836 script_builder = """
837 import time
838 import fcntl
839 import struct"""
840 if do_flock:
841 script_builder += """
842 f1 = open("{path}-1", 'w')
843 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)"""
844 script_builder += """
845 f2 = open("{path}-2", 'w')
846 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
847 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
848 while True:
849 time.sleep(1)
850 """
851
852 pyscript = dedent(script_builder).format(path=path)
853
854 log.info("lock_background file {0}".format(basename))
855 rproc = self._run_python(pyscript)
856 self.background_procs.append(rproc)
857 return rproc
858
859 def lock_and_release(self, basename="background_file"):
860 assert(self.is_mounted())
861
862 path = os.path.join(self.hostfs_mntpt, basename)
863
864 script = """
865 import time
866 import fcntl
867 import struct
868 f1 = open("{path}-1", 'w')
869 fcntl.flock(f1, fcntl.LOCK_EX)
870 f2 = open("{path}-2", 'w')
871 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
872 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
873 """
874 pyscript = dedent(script).format(path=path)
875
876 log.info("lock_and_release file {0}".format(basename))
877 return self._run_python(pyscript)
878
879 def check_filelock(self, basename="background_file", do_flock=True):
880 assert(self.is_mounted())
881
882 path = os.path.join(self.hostfs_mntpt, basename)
883
884 script_builder = """
885 import fcntl
886 import errno
887 import struct"""
888 if do_flock:
889 script_builder += """
890 f1 = open("{path}-1", 'r')
891 try:
892 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)
893 except IOError as e:
894 if e.errno == errno.EAGAIN:
895 pass
896 else:
897 raise RuntimeError("flock on file {path}-1 not found")"""
898 script_builder += """
899 f2 = open("{path}-2", 'r')
900 try:
901 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
902 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
903 except IOError as e:
904 if e.errno == errno.EAGAIN:
905 pass
906 else:
907 raise RuntimeError("posix lock on file {path}-2 not found")
908 """
909 pyscript = dedent(script_builder).format(path=path)
910
911 log.info("check lock on file {0}".format(basename))
912 self.client_remote.run(args=[
913 'python3', '-c', pyscript
914 ])
915
916 def write_background(self, basename="background_file", loop=False):
917 """
918 Open a file for writing, complete as soon as you can
919 :param basename:
920 :return:
921 """
922 assert(self.is_mounted())
923
924 path = os.path.join(self.hostfs_mntpt, basename)
925
926 pyscript = dedent("""
927 import os
928 import time
929
930 fd = os.open("{path}", os.O_RDWR | os.O_CREAT, 0o644)
931 try:
932 while True:
933 os.write(fd, b'content')
934 time.sleep(1)
935 if not {loop}:
936 break
937 except IOError as e:
938 pass
939 os.close(fd)
940 """).format(path=path, loop=str(loop))
941
942 rproc = self._run_python(pyscript)
943 self.background_procs.append(rproc)
944 return rproc
945
946 def write_n_mb(self, filename, n_mb, seek=0, wait=True):
947 """
948 Write the requested number of megabytes to a file
949 """
950 assert(self.is_mounted())
951
952 return self.run_shell(["dd", "if=/dev/urandom", "of={0}".format(filename),
953 "bs=1M", "conv=fdatasync",
954 "count={0}".format(int(n_mb)),
955 "seek={0}".format(int(seek))
956 ], wait=wait)
957
958 def write_test_pattern(self, filename, size):
959 log.info("Writing {0} bytes to {1}".format(size, filename))
960 return self.run_python(dedent("""
961 import zlib
962 path = "{path}"
963 with open(path, 'w') as f:
964 for i in range(0, {size}):
965 val = zlib.crc32(str(i).encode('utf-8')) & 7
966 f.write(chr(val))
967 """.format(
968 path=os.path.join(self.hostfs_mntpt, filename),
969 size=size
970 )))
971
972 def validate_test_pattern(self, filename, size):
973 log.info("Validating {0} bytes from {1}".format(size, filename))
974 # Use sudo because cephfs-data-scan may recreate the file with owner==root
975 return self.run_python(dedent("""
976 import zlib
977 path = "{path}"
978 with open(path, 'r') as f:
979 bytes = f.read()
980 if len(bytes) != {size}:
981 raise RuntimeError("Bad length {{0}} vs. expected {{1}}".format(
982 len(bytes), {size}
983 ))
984 for i, b in enumerate(bytes):
985 val = zlib.crc32(str(i).encode('utf-8')) & 7
986 if b != chr(val):
987 raise RuntimeError("Bad data at offset {{0}}".format(i))
988 """.format(
989 path=os.path.join(self.hostfs_mntpt, filename),
990 size=size
991 )), sudo=True)
992
993 def open_n_background(self, fs_path, count):
994 """
995 Open N files for writing, hold them open in a background process
996
997 :param fs_path: Path relative to CephFS root, e.g. "foo/bar"
998 :return: a RemoteProcess
999 """
1000 assert(self.is_mounted())
1001
1002 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1003
1004 pyscript = dedent("""
1005 import sys
1006 import time
1007 import os
1008
1009 n = {count}
1010 abs_path = "{abs_path}"
1011
1012 if not os.path.exists(abs_path):
1013 os.makedirs(abs_path)
1014
1015 handles = []
1016 for i in range(0, n):
1017 fname = "file_"+str(i)
1018 path = os.path.join(abs_path, fname)
1019 handles.append(open(path, 'w'))
1020
1021 while True:
1022 time.sleep(1)
1023 """).format(abs_path=abs_path, count=count)
1024
1025 rproc = self._run_python(pyscript)
1026 self.background_procs.append(rproc)
1027 return rproc
1028
1029 def create_n_files(self, fs_path, count, sync=False):
1030 assert(self.is_mounted())
1031
1032 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1033
1034 pyscript = dedent("""
1035 import sys
1036 import time
1037 import os
1038
1039 n = {count}
1040 abs_path = "{abs_path}"
1041
1042 if not os.path.exists(os.path.dirname(abs_path)):
1043 os.makedirs(os.path.dirname(abs_path))
1044
1045 for i in range(0, n):
1046 fname = "{{0}}_{{1}}".format(abs_path, i)
1047 with open(fname, 'w') as f:
1048 f.write('content')
1049 if {sync}:
1050 f.flush()
1051 os.fsync(f.fileno())
1052 """).format(abs_path=abs_path, count=count, sync=str(sync))
1053
1054 self.run_python(pyscript)
1055
1056 def teardown(self):
1057 for p in self.background_procs:
1058 log.info("Terminating background process")
1059 self._kill_background(p)
1060
1061 self.background_procs = []
1062
1063 def _kill_background(self, p):
1064 if p.stdin:
1065 p.stdin.close()
1066 try:
1067 p.wait()
1068 except (CommandFailedError, ConnectionLostError):
1069 pass
1070
1071 def kill_background(self, p):
1072 """
1073 For a process that was returned by one of the _background member functions,
1074 kill it hard.
1075 """
1076 self._kill_background(p)
1077 self.background_procs.remove(p)
1078
1079 def send_signal(self, signal):
1080 signal = signal.lower()
1081 if signal.lower() not in ['sigstop', 'sigcont', 'sigterm', 'sigkill']:
1082 raise NotImplementedError
1083
1084 self.client_remote.run(args=['sudo', 'kill', '-{0}'.format(signal),
1085 self.client_pid], omit_sudo=False)
1086
1087 def get_global_id(self):
1088 raise NotImplementedError()
1089
1090 def get_global_inst(self):
1091 raise NotImplementedError()
1092
1093 def get_global_addr(self):
1094 raise NotImplementedError()
1095
1096 def get_osd_epoch(self):
1097 raise NotImplementedError()
1098
1099 def get_op_read_count(self):
1100 raise NotImplementedError()
1101
1102 def lstat(self, fs_path, follow_symlinks=False, wait=True):
1103 return self.stat(fs_path, follow_symlinks=False, wait=True)
1104
1105 def stat(self, fs_path, follow_symlinks=True, wait=True, **kwargs):
1106 """
1107 stat a file, and return the result as a dictionary like this:
1108 {
1109 "st_ctime": 1414161137.0,
1110 "st_mtime": 1414161137.0,
1111 "st_nlink": 33,
1112 "st_gid": 0,
1113 "st_dev": 16777218,
1114 "st_size": 1190,
1115 "st_ino": 2,
1116 "st_uid": 0,
1117 "st_mode": 16877,
1118 "st_atime": 1431520593.0
1119 }
1120
1121 Raises exception on absent file.
1122 """
1123 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1124 if follow_symlinks:
1125 stat_call = "os.stat('" + abs_path + "')"
1126 else:
1127 stat_call = "os.lstat('" + abs_path + "')"
1128
1129 pyscript = dedent("""
1130 import os
1131 import stat
1132 import json
1133 import sys
1134
1135 try:
1136 s = {stat_call}
1137 except OSError as e:
1138 sys.exit(e.errno)
1139
1140 attrs = ["st_mode", "st_ino", "st_dev", "st_nlink", "st_uid", "st_gid", "st_size", "st_atime", "st_mtime", "st_ctime"]
1141 print(json.dumps(
1142 dict([(a, getattr(s, a)) for a in attrs]),
1143 indent=2))
1144 """).format(stat_call=stat_call)
1145 proc = self._run_python(pyscript, **kwargs)
1146 if wait:
1147 proc.wait()
1148 return json.loads(proc.stdout.getvalue().strip())
1149 else:
1150 return proc
1151
1152 def touch(self, fs_path):
1153 """
1154 Create a dentry if it doesn't already exist. This python
1155 implementation exists because the usual command line tool doesn't
1156 pass through error codes like EIO.
1157
1158 :param fs_path:
1159 :return:
1160 """
1161 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1162 pyscript = dedent("""
1163 import sys
1164 import errno
1165
1166 try:
1167 f = open("{path}", "w")
1168 f.close()
1169 except IOError as e:
1170 sys.exit(errno.EIO)
1171 """).format(path=abs_path)
1172 proc = self._run_python(pyscript)
1173 proc.wait()
1174
1175 def path_to_ino(self, fs_path, follow_symlinks=True):
1176 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1177
1178 if follow_symlinks:
1179 pyscript = dedent("""
1180 import os
1181 import stat
1182
1183 print(os.stat("{path}").st_ino)
1184 """).format(path=abs_path)
1185 else:
1186 pyscript = dedent("""
1187 import os
1188 import stat
1189
1190 print(os.lstat("{path}").st_ino)
1191 """).format(path=abs_path)
1192
1193 proc = self._run_python(pyscript)
1194 proc.wait()
1195 return int(proc.stdout.getvalue().strip())
1196
1197 def path_to_nlink(self, fs_path):
1198 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1199
1200 pyscript = dedent("""
1201 import os
1202 import stat
1203
1204 print(os.stat("{path}").st_nlink)
1205 """).format(path=abs_path)
1206
1207 proc = self._run_python(pyscript)
1208 proc.wait()
1209 return int(proc.stdout.getvalue().strip())
1210
1211 def ls(self, path=None, **kwargs):
1212 """
1213 Wrap ls: return a list of strings
1214 """
1215 cmd = ["ls"]
1216 if path:
1217 cmd.append(path)
1218
1219 ls_text = self.run_shell(cmd, **kwargs).stdout.getvalue().strip()
1220
1221 if ls_text:
1222 return ls_text.split("\n")
1223 else:
1224 # Special case because otherwise split on empty string
1225 # gives you [''] instead of []
1226 return []
1227
1228 def setfattr(self, path, key, val, **kwargs):
1229 """
1230 Wrap setfattr.
1231
1232 :param path: relative to mount point
1233 :param key: xattr name
1234 :param val: xattr value
1235 :return: None
1236 """
1237 self.run_shell(["setfattr", "-n", key, "-v", val, path], **kwargs)
1238
1239 def getfattr(self, path, attr, **kwargs):
1240 """
1241 Wrap getfattr: return the values of a named xattr on one file, or
1242 None if the attribute is not found.
1243
1244 :return: a string
1245 """
1246 p = self.run_shell(["getfattr", "--only-values", "-n", attr, path], wait=False, **kwargs)
1247 try:
1248 p.wait()
1249 except CommandFailedError as e:
1250 if e.exitstatus == 1 and "No such attribute" in p.stderr.getvalue():
1251 return None
1252 else:
1253 raise
1254
1255 return str(p.stdout.getvalue())
1256
1257 def df(self):
1258 """
1259 Wrap df: return a dict of usage fields in bytes
1260 """
1261
1262 p = self.run_shell(["df", "-B1", "."])
1263 lines = p.stdout.getvalue().strip().split("\n")
1264 fs, total, used, avail = lines[1].split()[:4]
1265 log.warning(lines)
1266
1267 return {
1268 "total": int(total),
1269 "used": int(used),
1270 "available": int(avail)
1271 }
1272
1273 def dir_checksum(self, path=None, follow_symlinks=False):
1274 cmd = ["find"]
1275 if follow_symlinks:
1276 cmd.append("-L")
1277 if path:
1278 cmd.append(path)
1279 cmd.extend(["-type", "f", "-exec", "md5sum", "{}", "+"])
1280 checksum_text = self.run_shell(cmd).stdout.getvalue().strip()
1281 checksum_sorted = sorted(checksum_text.split('\n'), key=lambda v: v.split()[1])
1282 return hashlib.md5(('\n'.join(checksum_sorted)).encode('utf-8')).hexdigest()