]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/mount.py
import ceph quincy 17.2.4
[ceph.git] / ceph / qa / tasks / cephfs / mount.py
1 import hashlib
2 import json
3 import logging
4 import datetime
5 import os
6 import re
7 import time
8
9 from io import StringIO
10 from contextlib import contextmanager
11 from textwrap import dedent
12 from IPy import IP
13
14 from teuthology.contextutil import safe_while
15 from teuthology.misc import get_file, write_file
16 from teuthology.orchestra import run
17 from teuthology.orchestra.run import Raw
18 from teuthology.exceptions import CommandFailedError, ConnectionLostError
19
20 from tasks.cephfs.filesystem import Filesystem
21
22 log = logging.getLogger(__name__)
23
24 class CephFSMount(object):
25 def __init__(self, ctx, test_dir, client_id, client_remote,
26 client_keyring_path=None, hostfs_mntpt=None,
27 cephfs_name=None, cephfs_mntpt=None, brxnet=None):
28 """
29 :param test_dir: Global teuthology test dir
30 :param client_id: Client ID, the 'foo' in client.foo
31 :param client_keyring_path: path to keyring for given client_id
32 :param client_remote: Remote instance for the host where client will
33 run
34 :param hostfs_mntpt: Path to directory on the FS on which Ceph FS will
35 be mounted
36 :param cephfs_name: Name of Ceph FS to be mounted
37 :param cephfs_mntpt: Path to directory inside Ceph FS that will be
38 mounted as root
39 """
40 self.mounted = False
41 self.ctx = ctx
42 self.test_dir = test_dir
43
44 self._verify_attrs(client_id=client_id,
45 client_keyring_path=client_keyring_path,
46 hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name,
47 cephfs_mntpt=cephfs_mntpt)
48
49 self.client_id = client_id
50 self.client_keyring_path = client_keyring_path
51 self.client_remote = client_remote
52 if hostfs_mntpt:
53 self.hostfs_mntpt = hostfs_mntpt
54 self.hostfs_mntpt_dirname = os.path.basename(self.hostfs_mntpt)
55 else:
56 self.hostfs_mntpt = os.path.join(self.test_dir, f'mnt.{self.client_id}')
57 self.cephfs_name = cephfs_name
58 self.cephfs_mntpt = cephfs_mntpt
59
60 self.cluster_name = 'ceph' # TODO: use config['cluster']
61
62 self.fs = None
63
64 self._netns_name = None
65 self.nsid = -1
66 if brxnet is None:
67 self.ceph_brx_net = '192.168.0.0/16'
68 else:
69 self.ceph_brx_net = brxnet
70
71 self.test_files = ['a', 'b', 'c']
72
73 self.background_procs = []
74
75 # This will cleanup the stale netnses, which are from the
76 # last failed test cases.
77 @staticmethod
78 def cleanup_stale_netnses_and_bridge(remote):
79 p = remote.run(args=['ip', 'netns', 'list'],
80 stdout=StringIO(), timeout=(5*60))
81 p = p.stdout.getvalue().strip()
82
83 # Get the netns name list
84 netns_list = re.findall(r'ceph-ns-[^()\s][-.\w]+[^():\s]', p)
85
86 # Remove the stale netnses
87 for ns in netns_list:
88 ns_name = ns.split()[0]
89 args = ['sudo', 'ip', 'netns', 'delete', '{0}'.format(ns_name)]
90 try:
91 remote.run(args=args, timeout=(5*60), omit_sudo=False)
92 except Exception:
93 pass
94
95 # Remove the stale 'ceph-brx'
96 try:
97 args = ['sudo', 'ip', 'link', 'delete', 'ceph-brx']
98 remote.run(args=args, timeout=(5*60), omit_sudo=False)
99 except Exception:
100 pass
101
102 def _parse_netns_name(self):
103 self._netns_name = '-'.join(["ceph-ns",
104 re.sub(r'/+', "-", self.mountpoint)])
105
106 @property
107 def mountpoint(self):
108 if self.hostfs_mntpt == None:
109 self.hostfs_mntpt = os.path.join(self.test_dir,
110 self.hostfs_mntpt_dirname)
111 return self.hostfs_mntpt
112
113 @mountpoint.setter
114 def mountpoint(self, path):
115 if not isinstance(path, str):
116 raise RuntimeError('path should be of str type.')
117 self._mountpoint = self.hostfs_mntpt = path
118
119 @property
120 def netns_name(self):
121 if self._netns_name == None:
122 self._parse_netns_name()
123 return self._netns_name
124
125 @netns_name.setter
126 def netns_name(self, name):
127 self._netns_name = name
128
129 def assert_that_ceph_fs_exists(self):
130 output = self.ctx.managers[self.cluster_name].raw_cluster_cmd("fs", "ls")
131 if self.cephfs_name:
132 assert self.cephfs_name in output, \
133 'expected ceph fs is not present on the cluster'
134 log.info(f'Mounting Ceph FS {self.cephfs_name}; just confirmed its presence on cluster')
135 else:
136 assert 'No filesystems enabled' not in output, \
137 'ceph cluster has no ceph fs, not even the default ceph fs'
138 log.info('Mounting default Ceph FS; just confirmed its presence on cluster')
139
140 def assert_and_log_minimum_mount_details(self):
141 """
142 Make sure we have minimum details required for mounting. Ideally, this
143 method should be called at the beginning of the mount method.
144 """
145 if not self.client_id or not self.client_remote or \
146 not self.hostfs_mntpt:
147 errmsg = ('Mounting CephFS requires that at least following '
148 'details to be provided -\n'
149 '1. the client ID,\n2. the mountpoint and\n'
150 '3. the remote machine where CephFS will be mounted.\n')
151 raise RuntimeError(errmsg)
152
153 self.assert_that_ceph_fs_exists()
154
155 log.info('Mounting Ceph FS. Following are details of mount; remember '
156 '"None" represents Python type None -')
157 log.info(f'self.client_remote.hostname = {self.client_remote.hostname}')
158 log.info(f'self.client.name = client.{self.client_id}')
159 log.info(f'self.hostfs_mntpt = {self.hostfs_mntpt}')
160 log.info(f'self.cephfs_name = {self.cephfs_name}')
161 log.info(f'self.cephfs_mntpt = {self.cephfs_mntpt}')
162 log.info(f'self.client_keyring_path = {self.client_keyring_path}')
163 if self.client_keyring_path:
164 log.info('keyring content -\n' +
165 get_file(self.client_remote, self.client_keyring_path,
166 sudo=True).decode())
167
168 def is_mounted(self):
169 return self.mounted
170
171 def setupfs(self, name=None):
172 if name is None and self.fs is not None:
173 # Previous mount existed, reuse the old name
174 name = self.fs.name
175 self.fs = Filesystem(self.ctx, name=name)
176 log.info('Wait for MDS to reach steady state...')
177 self.fs.wait_for_daemons()
178 log.info('Ready to start {}...'.format(type(self).__name__))
179
180 def _create_mntpt(self):
181 self.client_remote.run(args=f'mkdir -p -v {self.hostfs_mntpt}',
182 timeout=60)
183 # Use 0000 mode to prevent undesired modifications to the mountpoint on
184 # the local file system.
185 self.client_remote.run(args=f'chmod 0000 {self.hostfs_mntpt}',
186 timeout=60)
187
188 @property
189 def _nsenter_args(self):
190 return ['nsenter', f'--net=/var/run/netns/{self.netns_name}']
191
192 def _set_filemode_on_mntpt(self):
193 stderr = StringIO()
194 try:
195 self.client_remote.run(
196 args=['sudo', 'chmod', '1777', self.hostfs_mntpt],
197 stderr=stderr, timeout=(5*60))
198 except CommandFailedError:
199 # the client does not have write permissions in the caps it holds
200 # for the Ceph FS that was just mounted.
201 if 'permission denied' in stderr.getvalue().lower():
202 pass
203
204 def _setup_brx_and_nat(self):
205 # The ip for ceph-brx should be
206 ip = IP(self.ceph_brx_net)[-2]
207 mask = self.ceph_brx_net.split('/')[1]
208 brd = IP(self.ceph_brx_net).broadcast()
209
210 brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(),
211 stdout=StringIO(), timeout=(5*60))
212 brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue())
213 if brx:
214 # If the 'ceph-brx' already exists, then check whether
215 # the new net is conflicting with it
216 _ip, _mask = brx[0].split()[1].split('/', 1)
217 if _ip != "{}".format(ip) or _mask != mask:
218 raise RuntimeError("Conflict with existing ceph-brx {0}, new {1}/{2}".format(brx[0].split()[1], ip, mask))
219
220 # Setup the ceph-brx and always use the last valid IP
221 if not brx:
222 log.info("Setuping the 'ceph-brx' with {0}/{1}".format(ip, mask))
223
224 self.run_shell_payload(f"""
225 set -e
226 sudo ip link add name ceph-brx type bridge
227 sudo ip addr flush dev ceph-brx
228 sudo ip link set ceph-brx up
229 sudo ip addr add {ip}/{mask} brd {brd} dev ceph-brx
230 """, timeout=(5*60), omit_sudo=False, cwd='/')
231
232 args = "echo 1 | sudo tee /proc/sys/net/ipv4/ip_forward"
233 self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
234
235 # Setup the NAT
236 p = self.client_remote.run(args=['route'], stderr=StringIO(),
237 stdout=StringIO(), timeout=(5*60))
238 p = re.findall(r'default .*', p.stdout.getvalue())
239 if p == False:
240 raise RuntimeError("No default gw found")
241 gw = p[0].split()[7]
242
243 self.run_shell_payload(f"""
244 set -e
245 sudo iptables -A FORWARD -o {gw} -i ceph-brx -j ACCEPT
246 sudo iptables -A FORWARD -i {gw} -o ceph-brx -j ACCEPT
247 sudo iptables -t nat -A POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
248 """, timeout=(5*60), omit_sudo=False, cwd='/')
249
250 def _setup_netns(self):
251 p = self.client_remote.run(args=['ip', 'netns', 'list'],
252 stderr=StringIO(), stdout=StringIO(),
253 timeout=(5*60)).stdout.getvalue().strip()
254
255 # Get the netns name list
256 netns_list = re.findall(r'[^()\s][-.\w]+[^():\s]', p)
257
258 out = re.search(r"{0}".format(self.netns_name), p)
259 if out is None:
260 # Get an uniq nsid for the new netns
261 nsid = 0
262 p = self.client_remote.run(args=['ip', 'netns', 'list-id'],
263 stderr=StringIO(), stdout=StringIO(),
264 timeout=(5*60)).stdout.getvalue()
265 while True:
266 out = re.search(r"nsid {} ".format(nsid), p)
267 if out is None:
268 break
269
270 nsid += 1
271
272 # Add one new netns and set it id
273 self.run_shell_payload(f"""
274 set -e
275 sudo ip netns add {self.netns_name}
276 sudo ip netns set {self.netns_name} {nsid}
277 """, timeout=(5*60), omit_sudo=False, cwd='/')
278 self.nsid = nsid;
279 else:
280 # The netns already exists and maybe suspended by self.kill()
281 self.resume_netns();
282
283 nsid = int(re.search(r"{0} \(id: (\d+)\)".format(self.netns_name), p).group(1))
284 self.nsid = nsid;
285 return
286
287 # Get one ip address for netns
288 ips = IP(self.ceph_brx_net)
289 for ip in ips:
290 found = False
291 if ip == ips[0]:
292 continue
293 if ip == ips[-2]:
294 raise RuntimeError("we have ran out of the ip addresses")
295
296 for ns in netns_list:
297 ns_name = ns.split()[0]
298 args = ['sudo', 'ip', 'netns', 'exec', '{0}'.format(ns_name), 'ip', 'addr']
299 try:
300 p = self.client_remote.run(args=args, stderr=StringIO(),
301 stdout=StringIO(), timeout=(5*60),
302 omit_sudo=False)
303 q = re.search("{0}".format(ip), p.stdout.getvalue())
304 if q is not None:
305 found = True
306 break
307 except CommandFailedError:
308 if "No such file or directory" in p.stderr.getvalue():
309 pass
310 if "Invalid argument" in p.stderr.getvalue():
311 pass
312
313 if found == False:
314 break
315
316 mask = self.ceph_brx_net.split('/')[1]
317 brd = IP(self.ceph_brx_net).broadcast()
318
319 log.info("Setuping the netns '{0}' with {1}/{2}".format(self.netns_name, ip, mask))
320
321 # Setup the veth interfaces
322 brxip = IP(self.ceph_brx_net)[-2]
323 self.run_shell_payload(f"""
324 set -e
325 sudo ip link add veth0 netns {self.netns_name} type veth peer name brx.{nsid}
326 sudo ip netns exec {self.netns_name} ip addr add {ip}/{mask} brd {brd} dev veth0
327 sudo ip netns exec {self.netns_name} ip link set veth0 up
328 sudo ip netns exec {self.netns_name} ip link set lo up
329 sudo ip netns exec {self.netns_name} ip route add default via {brxip}
330 """, timeout=(5*60), omit_sudo=False, cwd='/')
331
332 # Bring up the brx interface and join it to 'ceph-brx'
333 self.run_shell_payload(f"""
334 set -e
335 sudo ip link set brx.{nsid} up
336 sudo ip link set dev brx.{nsid} master ceph-brx
337 """, timeout=(5*60), omit_sudo=False, cwd='/')
338
339 def _cleanup_netns(self):
340 if self.nsid == -1:
341 return
342 log.info("Removing the netns '{0}'".format(self.netns_name))
343
344 # Delete the netns and the peer veth interface
345 self.run_shell_payload(f"""
346 set -e
347 sudo ip link set brx.{self.nsid} down
348 sudo ip link delete dev brx.{self.nsid}
349 sudo ip netns delete {self.netns_name}
350 """, timeout=(5*60), omit_sudo=False, cwd='/')
351
352 self.nsid = -1
353
354 def _cleanup_brx_and_nat(self):
355 brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(),
356 stdout=StringIO(), timeout=(5*60))
357 brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue())
358 if not brx:
359 return
360
361 # If we are the last netns, will delete the ceph-brx
362 args = ['sudo', 'ip', 'link', 'show']
363 p = self.client_remote.run(args=args, stdout=StringIO(),
364 timeout=(5*60), omit_sudo=False)
365 _list = re.findall(r'brx\.', p.stdout.getvalue().strip())
366 if len(_list) != 0:
367 return
368
369 log.info("Removing the 'ceph-brx'")
370
371 self.run_shell_payload("""
372 set -e
373 sudo ip link set ceph-brx down
374 sudo ip link delete ceph-brx
375 """, timeout=(5*60), omit_sudo=False, cwd='/')
376
377 # Drop the iptables NAT rules
378 ip = IP(self.ceph_brx_net)[-2]
379 mask = self.ceph_brx_net.split('/')[1]
380
381 p = self.client_remote.run(args=['route'], stderr=StringIO(),
382 stdout=StringIO(), timeout=(5*60))
383 p = re.findall(r'default .*', p.stdout.getvalue())
384 if p == False:
385 raise RuntimeError("No default gw found")
386 gw = p[0].split()[7]
387 self.run_shell_payload(f"""
388 set -e
389 sudo iptables -D FORWARD -o {gw} -i ceph-brx -j ACCEPT
390 sudo iptables -D FORWARD -i {gw} -o ceph-brx -j ACCEPT
391 sudo iptables -t nat -D POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
392 """, timeout=(5*60), omit_sudo=False, cwd='/')
393
394 def setup_netns(self):
395 """
396 Setup the netns for the mountpoint.
397 """
398 log.info("Setting the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
399 self._setup_brx_and_nat()
400 self._setup_netns()
401
402 def cleanup_netns(self):
403 """
404 Cleanup the netns for the mountpoint.
405 """
406 # We will defer cleaning the netnses and bridge until the last
407 # mountpoint is unmounted, this will be a temporary work around
408 # for issue#46282.
409
410 # log.info("Cleaning the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
411 # self._cleanup_netns()
412 # self._cleanup_brx_and_nat()
413
414 def suspend_netns(self):
415 """
416 Suspend the netns veth interface.
417 """
418 if self.nsid == -1:
419 return
420
421 log.info("Suspending the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
422
423 args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'down']
424 self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
425
426 def resume_netns(self):
427 """
428 Resume the netns veth interface.
429 """
430 if self.nsid == -1:
431 return
432
433 log.info("Resuming the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
434
435 args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'up']
436 self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
437
438 def mount(self, mntopts=[], check_status=True, **kwargs):
439 """
440 kwargs expects its members to be same as the arguments accepted by
441 self.update_attrs().
442 """
443 raise NotImplementedError()
444
445 def mount_wait(self, **kwargs):
446 """
447 Accepts arguments same as self.mount().
448 """
449 self.mount(**kwargs)
450 self.wait_until_mounted()
451
452 def umount(self):
453 raise NotImplementedError()
454
455 def umount_wait(self, force=False, require_clean=False, timeout=None):
456 """
457
458 :param force: Expect that the mount will not shutdown cleanly: kill
459 it hard.
460 :param require_clean: Wait for the Ceph client associated with the
461 mount (e.g. ceph-fuse) to terminate, and
462 raise if it doesn't do so cleanly.
463 :param timeout: amount of time to be waited for umount command to finish
464 :return:
465 """
466 raise NotImplementedError()
467
468 def _verify_attrs(self, **kwargs):
469 """
470 Verify that client_id, client_keyring_path, client_remote, hostfs_mntpt,
471 cephfs_name, cephfs_mntpt are either type str or None.
472 """
473 for k, v in kwargs.items():
474 if v is not None and not isinstance(v, str):
475 raise RuntimeError('value of attributes should be either str '
476 f'or None. {k} - {v}')
477
478 def update_attrs(self, client_id=None, client_keyring_path=None,
479 client_remote=None, hostfs_mntpt=None, cephfs_name=None,
480 cephfs_mntpt=None):
481 if not (client_id or client_keyring_path or client_remote or
482 cephfs_name or cephfs_mntpt or hostfs_mntpt):
483 return
484
485 self._verify_attrs(client_id=client_id,
486 client_keyring_path=client_keyring_path,
487 hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name,
488 cephfs_mntpt=cephfs_mntpt)
489
490 if client_id:
491 self.client_id = client_id
492 if client_keyring_path:
493 self.client_keyring_path = client_keyring_path
494 if client_remote:
495 self.client_remote = client_remote
496 if hostfs_mntpt:
497 self.hostfs_mntpt = hostfs_mntpt
498 if cephfs_name:
499 self.cephfs_name = cephfs_name
500 if cephfs_mntpt:
501 self.cephfs_mntpt = cephfs_mntpt
502
503 def remount(self, **kwargs):
504 """
505 Update mount object's attributes and attempt remount with these
506 new values for these attrbiutes.
507
508 1. Run umount_wait().
509 2. Run update_attrs().
510 3. Run mount().
511
512 Accepts arguments of self.mount() and self.update_attrs() with 1
513 exception: wait accepted too which can be True or False.
514 """
515 self.umount_wait()
516 assert not self.mounted
517
518 mntopts = kwargs.pop('mntopts', [])
519 check_status = kwargs.pop('check_status', True)
520 wait = kwargs.pop('wait', True)
521
522 self.update_attrs(**kwargs)
523
524 retval = self.mount(mntopts=mntopts, check_status=check_status)
525 # avoid this scenario (again): mount command might've failed and
526 # check_status might have silenced the exception, yet we attempt to
527 # wait which might lead to an error.
528 if retval is None and wait:
529 self.wait_until_mounted()
530
531 return retval
532
533 def kill(self):
534 """
535 Suspend the netns veth interface to make the client disconnected
536 from the ceph cluster
537 """
538 log.info('Killing connection on {0}...'.format(self.client_remote.name))
539 self.suspend_netns()
540
541 def kill_cleanup(self):
542 """
543 Follow up ``kill`` to get to a clean unmounted state.
544 """
545 log.info('Cleaning up killed connection on {0}'.format(self.client_remote.name))
546 self.umount_wait(force=True)
547
548 def cleanup(self):
549 """
550 Remove the mount point.
551
552 Prerequisite: the client is not mounted.
553 """
554 log.info('Cleaning up mount {0}'.format(self.client_remote.name))
555 stderr = StringIO()
556 try:
557 self.client_remote.run(args=['rmdir', '--', self.mountpoint],
558 cwd=self.test_dir, stderr=stderr,
559 timeout=(60*5), check_status=False)
560 except CommandFailedError:
561 if "no such file or directory" not in stderr.getvalue().lower():
562 raise
563
564 self.cleanup_netns()
565
566 def wait_until_mounted(self):
567 raise NotImplementedError()
568
569 def get_keyring_path(self):
570 # N.B.: default keyring is /etc/ceph/ceph.keyring; see ceph.py and generate_caps
571 return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self.client_id)
572
573 def get_key_from_keyfile(self):
574 # XXX: don't call run_shell(), since CephFS might be unmounted.
575 keyring = self.client_remote.run(
576 args=['sudo', 'cat', self.client_keyring_path], stdout=StringIO(),
577 omit_sudo=False).stdout.getvalue()
578 for line in keyring.split('\n'):
579 if line.find('key') != -1:
580 return line[line.find('=') + 1 : ].strip()
581
582 @property
583 def config_path(self):
584 """
585 Path to ceph.conf: override this if you're not a normal systemwide ceph install
586 :return: stringv
587 """
588 return "/etc/ceph/ceph.conf"
589
590 @contextmanager
591 def mounted_wait(self):
592 """
593 A context manager, from an initially unmounted state, to mount
594 this, yield, and then unmount and clean up.
595 """
596 self.mount()
597 self.wait_until_mounted()
598 try:
599 yield
600 finally:
601 self.umount_wait()
602
603 def create_file(self, filename='testfile', dirname=None, user=None,
604 check_status=True):
605 assert(self.is_mounted())
606
607 if not os.path.isabs(filename):
608 if dirname:
609 if os.path.isabs(dirname):
610 path = os.path.join(dirname, filename)
611 else:
612 path = os.path.join(self.hostfs_mntpt, dirname, filename)
613 else:
614 path = os.path.join(self.hostfs_mntpt, filename)
615 else:
616 path = filename
617
618 if user:
619 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', 'touch ' + path]
620 else:
621 args = 'touch ' + path
622
623 return self.client_remote.run(args=args, check_status=check_status)
624
625 def create_files(self):
626 assert(self.is_mounted())
627
628 for suffix in self.test_files:
629 log.info("Creating file {0}".format(suffix))
630 self.client_remote.run(args=[
631 'touch', os.path.join(self.hostfs_mntpt, suffix)
632 ])
633
634 def test_create_file(self, filename='testfile', dirname=None, user=None,
635 check_status=True):
636 return self.create_file(filename=filename, dirname=dirname, user=user,
637 check_status=False)
638
639 def check_files(self):
640 assert(self.is_mounted())
641
642 for suffix in self.test_files:
643 log.info("Checking file {0}".format(suffix))
644 r = self.client_remote.run(args=[
645 'ls', os.path.join(self.hostfs_mntpt, suffix)
646 ], check_status=False)
647 if r.exitstatus != 0:
648 raise RuntimeError("Expected file {0} not found".format(suffix))
649
650 def write_file(self, path, data, perms=None):
651 """
652 Write the given data at the given path and set the given perms to the
653 file on the path.
654 """
655 if path.find(self.hostfs_mntpt) == -1:
656 path = os.path.join(self.hostfs_mntpt, path)
657
658 write_file(self.client_remote, path, data)
659
660 if perms:
661 self.run_shell(args=f'chmod {perms} {path}')
662
663 def read_file(self, path):
664 """
665 Return the data from the file on given path.
666 """
667 if path.find(self.hostfs_mntpt) == -1:
668 path = os.path.join(self.hostfs_mntpt, path)
669
670 return self.run_shell(args=['cat', path]).\
671 stdout.getvalue().strip()
672
673 def create_destroy(self):
674 assert(self.is_mounted())
675
676 filename = "{0} {1}".format(datetime.datetime.now(), self.client_id)
677 log.debug("Creating test file {0}".format(filename))
678 self.client_remote.run(args=[
679 'touch', os.path.join(self.hostfs_mntpt, filename)
680 ])
681 log.debug("Deleting test file {0}".format(filename))
682 self.client_remote.run(args=[
683 'rm', '-f', os.path.join(self.hostfs_mntpt, filename)
684 ])
685
686 def _run_python(self, pyscript, py_version='python3', sudo=False):
687 args = []
688 if sudo:
689 args.append('sudo')
690 args += ['adjust-ulimits', 'daemon-helper', 'kill', py_version, '-c', pyscript]
691 return self.client_remote.run(args=args, wait=False, stdin=run.PIPE, stdout=StringIO())
692
693 def run_python(self, pyscript, py_version='python3', sudo=False):
694 p = self._run_python(pyscript, py_version, sudo=sudo)
695 p.wait()
696 return p.stdout.getvalue().strip()
697
698 def run_shell(self, args, timeout=900, **kwargs):
699 args = args.split() if isinstance(args, str) else args
700 omit_sudo = kwargs.pop('omit_sudo', False)
701 sudo = kwargs.pop('sudo', False)
702 cwd = kwargs.pop('cwd', self.mountpoint)
703 stdout = kwargs.pop('stdout', StringIO())
704 stderr = kwargs.pop('stderr', StringIO())
705
706 if sudo:
707 args.insert(0, 'sudo')
708
709 return self.client_remote.run(args=args, cwd=cwd, timeout=timeout,
710 stdout=stdout, stderr=stderr,
711 omit_sudo=omit_sudo, **kwargs)
712
713 def run_shell_payload(self, payload, **kwargs):
714 return self.run_shell(["bash", "-c", Raw(f"'{payload}'")], **kwargs)
715
716 def run_as_user(self, **kwargs):
717 """
718 Besides the arguments defined for run_shell() this method also
719 accepts argument 'user'.
720 """
721 args = kwargs.pop('args')
722 user = kwargs.pop('user')
723 if isinstance(args, str):
724 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', args]
725 elif isinstance(args, list):
726 cmdlist = args
727 cmd = ''
728 for i in cmdlist:
729 cmd = cmd + i + ' '
730 # get rid of extra space at the end.
731 cmd = cmd[:-1]
732
733 args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', cmd]
734
735 kwargs['args'] = args
736 return self.run_shell(**kwargs)
737
738 def run_as_root(self, **kwargs):
739 """
740 Accepts same arguments as run_shell().
741 """
742 kwargs['user'] = 'root'
743 return self.run_as_user(**kwargs)
744
745 def _verify(self, proc, retval=None, errmsg=None):
746 if retval:
747 msg = ('expected return value: {}\nreceived return value: '
748 '{}\n'.format(retval, proc.returncode))
749 assert proc.returncode == retval, msg
750
751 if errmsg:
752 stderr = proc.stderr.getvalue().lower()
753 msg = ('didn\'t find given string in stderr -\nexpected string: '
754 '{}\nreceived error message: {}\nnote: received error '
755 'message is converted to lowercase'.format(errmsg, stderr))
756 assert errmsg in stderr, msg
757
758 def negtestcmd(self, args, retval=None, errmsg=None, stdin=None,
759 cwd=None, wait=True):
760 """
761 Conduct a negative test for the given command.
762
763 retval and errmsg are parameters to confirm the cause of command
764 failure.
765 """
766 proc = self.run_shell(args=args, wait=wait, stdin=stdin, cwd=cwd,
767 check_status=False)
768 self._verify(proc, retval, errmsg)
769 return proc
770
771 def negtestcmd_as_user(self, args, user, retval=None, errmsg=None,
772 stdin=None, cwd=None, wait=True):
773 proc = self.run_as_user(args=args, user=user, wait=wait, stdin=stdin,
774 cwd=cwd, check_status=False)
775 self._verify(proc, retval, errmsg)
776 return proc
777
778 def negtestcmd_as_root(self, args, retval=None, errmsg=None, stdin=None,
779 cwd=None, wait=True):
780 proc = self.run_as_root(args=args, wait=wait, stdin=stdin, cwd=cwd,
781 check_status=False)
782 self._verify(proc, retval, errmsg)
783 return proc
784
785 def open_no_data(self, basename):
786 """
787 A pure metadata operation
788 """
789 assert(self.is_mounted())
790
791 path = os.path.join(self.hostfs_mntpt, basename)
792
793 p = self._run_python(dedent(
794 """
795 f = open("{path}", 'w')
796 """.format(path=path)
797 ))
798 p.wait()
799
800 def open_background(self, basename="background_file", write=True):
801 """
802 Open a file for writing, then block such that the client
803 will hold a capability.
804
805 Don't return until the remote process has got as far as opening
806 the file, then return the RemoteProcess instance.
807 """
808 assert(self.is_mounted())
809
810 path = os.path.join(self.hostfs_mntpt, basename)
811
812 if write:
813 pyscript = dedent("""
814 import time
815
816 with open("{path}", 'w') as f:
817 f.write('content')
818 f.flush()
819 f.write('content2')
820 while True:
821 time.sleep(1)
822 """).format(path=path)
823 else:
824 pyscript = dedent("""
825 import time
826
827 with open("{path}", 'r') as f:
828 while True:
829 time.sleep(1)
830 """).format(path=path)
831
832 rproc = self._run_python(pyscript)
833 self.background_procs.append(rproc)
834
835 # This wait would not be sufficient if the file had already
836 # existed, but it's simple and in practice users of open_background
837 # are not using it on existing files.
838 self.wait_for_visible(basename)
839
840 return rproc
841
842 def open_dir_background(self, basename):
843 """
844 Create and hold a capability to a directory.
845 """
846 assert(self.is_mounted())
847
848 path = os.path.join(self.hostfs_mntpt, basename)
849
850 pyscript = dedent("""
851 import time
852 import os
853
854 os.mkdir("{path}")
855 fd = os.open("{path}", os.O_RDONLY)
856 while True:
857 time.sleep(1)
858 """).format(path=path)
859
860 rproc = self._run_python(pyscript)
861 self.background_procs.append(rproc)
862
863 self.wait_for_visible(basename)
864
865 return rproc
866
867 def wait_for_dir_empty(self, dirname, timeout=30):
868 dirpath = os.path.join(self.hostfs_mntpt, dirname)
869 with safe_while(sleep=5, tries=(timeout//5)) as proceed:
870 while proceed():
871 p = self.run_shell_payload(f"stat -c %h {dirpath}")
872 nr_links = int(p.stdout.getvalue().strip())
873 if nr_links == 2:
874 return
875
876 def wait_for_visible(self, basename="background_file", timeout=30):
877 i = 0
878 while i < timeout:
879 r = self.client_remote.run(args=[
880 'stat', os.path.join(self.hostfs_mntpt, basename)
881 ], check_status=False)
882 if r.exitstatus == 0:
883 log.debug("File {0} became visible from {1} after {2}s".format(
884 basename, self.client_id, i))
885 return
886 else:
887 time.sleep(1)
888 i += 1
889
890 raise RuntimeError("Timed out after {0}s waiting for {1} to become visible from {2}".format(
891 i, basename, self.client_id))
892
893 def lock_background(self, basename="background_file", do_flock=True):
894 """
895 Open and lock a files for writing, hold the lock in a background process
896 """
897 assert(self.is_mounted())
898
899 path = os.path.join(self.hostfs_mntpt, basename)
900
901 script_builder = """
902 import time
903 import fcntl
904 import struct"""
905 if do_flock:
906 script_builder += """
907 f1 = open("{path}-1", 'w')
908 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)"""
909 script_builder += """
910 f2 = open("{path}-2", 'w')
911 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
912 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
913 while True:
914 time.sleep(1)
915 """
916
917 pyscript = dedent(script_builder).format(path=path)
918
919 log.info("lock_background file {0}".format(basename))
920 rproc = self._run_python(pyscript)
921 self.background_procs.append(rproc)
922 return rproc
923
924 def lock_and_release(self, basename="background_file"):
925 assert(self.is_mounted())
926
927 path = os.path.join(self.hostfs_mntpt, basename)
928
929 script = """
930 import time
931 import fcntl
932 import struct
933 f1 = open("{path}-1", 'w')
934 fcntl.flock(f1, fcntl.LOCK_EX)
935 f2 = open("{path}-2", 'w')
936 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
937 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
938 """
939 pyscript = dedent(script).format(path=path)
940
941 log.info("lock_and_release file {0}".format(basename))
942 return self._run_python(pyscript)
943
944 def check_filelock(self, basename="background_file", do_flock=True):
945 assert(self.is_mounted())
946
947 path = os.path.join(self.hostfs_mntpt, basename)
948
949 script_builder = """
950 import fcntl
951 import errno
952 import struct"""
953 if do_flock:
954 script_builder += """
955 f1 = open("{path}-1", 'r')
956 try:
957 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)
958 except IOError as e:
959 if e.errno == errno.EAGAIN:
960 pass
961 else:
962 raise RuntimeError("flock on file {path}-1 not found")"""
963 script_builder += """
964 f2 = open("{path}-2", 'r')
965 try:
966 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
967 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
968 except IOError as e:
969 if e.errno == errno.EAGAIN:
970 pass
971 else:
972 raise RuntimeError("posix lock on file {path}-2 not found")
973 """
974 pyscript = dedent(script_builder).format(path=path)
975
976 log.info("check lock on file {0}".format(basename))
977 self.client_remote.run(args=[
978 'python3', '-c', pyscript
979 ])
980
981 def write_background(self, basename="background_file", loop=False):
982 """
983 Open a file for writing, complete as soon as you can
984 :param basename:
985 :return:
986 """
987 assert(self.is_mounted())
988
989 path = os.path.join(self.hostfs_mntpt, basename)
990
991 pyscript = dedent("""
992 import os
993 import time
994
995 fd = os.open("{path}", os.O_RDWR | os.O_CREAT, 0o644)
996 try:
997 while True:
998 os.write(fd, b'content')
999 time.sleep(1)
1000 if not {loop}:
1001 break
1002 except IOError as e:
1003 pass
1004 os.close(fd)
1005 """).format(path=path, loop=str(loop))
1006
1007 rproc = self._run_python(pyscript)
1008 self.background_procs.append(rproc)
1009 return rproc
1010
1011 def write_n_mb(self, filename, n_mb, seek=0, wait=True):
1012 """
1013 Write the requested number of megabytes to a file
1014 """
1015 assert(self.is_mounted())
1016
1017 return self.run_shell(["dd", "if=/dev/urandom", "of={0}".format(filename),
1018 "bs=1M", "conv=fdatasync",
1019 "count={0}".format(int(n_mb)),
1020 "seek={0}".format(int(seek))
1021 ], wait=wait)
1022
1023 def write_test_pattern(self, filename, size):
1024 log.info("Writing {0} bytes to {1}".format(size, filename))
1025 return self.run_python(dedent("""
1026 import zlib
1027 path = "{path}"
1028 with open(path, 'w') as f:
1029 for i in range(0, {size}):
1030 val = zlib.crc32(str(i).encode('utf-8')) & 7
1031 f.write(chr(val))
1032 """.format(
1033 path=os.path.join(self.hostfs_mntpt, filename),
1034 size=size
1035 )))
1036
1037 def validate_test_pattern(self, filename, size):
1038 log.info("Validating {0} bytes from {1}".format(size, filename))
1039 # Use sudo because cephfs-data-scan may recreate the file with owner==root
1040 return self.run_python(dedent("""
1041 import zlib
1042 path = "{path}"
1043 with open(path, 'r') as f:
1044 bytes = f.read()
1045 if len(bytes) != {size}:
1046 raise RuntimeError("Bad length {{0}} vs. expected {{1}}".format(
1047 len(bytes), {size}
1048 ))
1049 for i, b in enumerate(bytes):
1050 val = zlib.crc32(str(i).encode('utf-8')) & 7
1051 if b != chr(val):
1052 raise RuntimeError("Bad data at offset {{0}}".format(i))
1053 """.format(
1054 path=os.path.join(self.hostfs_mntpt, filename),
1055 size=size
1056 )), sudo=True)
1057
1058 def open_n_background(self, fs_path, count):
1059 """
1060 Open N files for writing, hold them open in a background process
1061
1062 :param fs_path: Path relative to CephFS root, e.g. "foo/bar"
1063 :return: a RemoteProcess
1064 """
1065 assert(self.is_mounted())
1066
1067 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1068
1069 pyscript = dedent("""
1070 import sys
1071 import time
1072 import os
1073
1074 n = {count}
1075 abs_path = "{abs_path}"
1076
1077 if not os.path.exists(abs_path):
1078 os.makedirs(abs_path)
1079
1080 handles = []
1081 for i in range(0, n):
1082 fname = "file_"+str(i)
1083 path = os.path.join(abs_path, fname)
1084 handles.append(open(path, 'w'))
1085
1086 while True:
1087 time.sleep(1)
1088 """).format(abs_path=abs_path, count=count)
1089
1090 rproc = self._run_python(pyscript)
1091 self.background_procs.append(rproc)
1092 return rproc
1093
1094 def create_n_files(self, fs_path, count, sync=False, dirsync=False, unlink=False, finaldirsync=False):
1095 """
1096 Create n files.
1097
1098 :param sync: sync the file after writing
1099 :param dirsync: sync the containing directory after closing the file
1100 :param unlink: unlink the file after closing
1101 :param finaldirsync: sync the containing directory after closing the last file
1102 """
1103
1104 assert(self.is_mounted())
1105
1106 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1107
1108 pyscript = dedent(f"""
1109 import os
1110
1111 n = {count}
1112 path = "{abs_path}"
1113
1114 dpath = os.path.dirname(path)
1115 fnameprefix = os.path.basename(path)
1116 os.makedirs(dpath, exist_ok=True)
1117
1118 try:
1119 dirfd = os.open(dpath, os.O_DIRECTORY)
1120
1121 for i in range(n):
1122 fpath = os.path.join(dpath, f"{{fnameprefix}}_{{i}}")
1123 with open(fpath, 'w') as f:
1124 f.write(f"{{i}}")
1125 if {sync}:
1126 f.flush()
1127 os.fsync(f.fileno())
1128 if {unlink}:
1129 os.unlink(fpath)
1130 if {dirsync}:
1131 os.fsync(dirfd)
1132 if {finaldirsync}:
1133 os.fsync(dirfd)
1134 finally:
1135 os.close(dirfd)
1136 """)
1137
1138 self.run_python(pyscript)
1139
1140 def teardown(self):
1141 for p in self.background_procs:
1142 log.info("Terminating background process")
1143 self._kill_background(p)
1144
1145 self.background_procs = []
1146
1147 def _kill_background(self, p):
1148 if p.stdin:
1149 p.stdin.close()
1150 try:
1151 p.wait()
1152 except (CommandFailedError, ConnectionLostError):
1153 pass
1154
1155 def kill_background(self, p):
1156 """
1157 For a process that was returned by one of the _background member functions,
1158 kill it hard.
1159 """
1160 self._kill_background(p)
1161 self.background_procs.remove(p)
1162
1163 def send_signal(self, signal):
1164 signal = signal.lower()
1165 if signal.lower() not in ['sigstop', 'sigcont', 'sigterm', 'sigkill']:
1166 raise NotImplementedError
1167
1168 self.client_remote.run(args=['sudo', 'kill', '-{0}'.format(signal),
1169 self.client_pid], omit_sudo=False)
1170
1171 def get_global_id(self):
1172 raise NotImplementedError()
1173
1174 def get_global_inst(self):
1175 raise NotImplementedError()
1176
1177 def get_global_addr(self):
1178 raise NotImplementedError()
1179
1180 def get_osd_epoch(self):
1181 raise NotImplementedError()
1182
1183 def get_op_read_count(self):
1184 raise NotImplementedError()
1185
1186 def readlink(self, fs_path):
1187 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1188
1189 pyscript = dedent("""
1190 import os
1191
1192 print(os.readlink("{path}"))
1193 """).format(path=abs_path)
1194
1195 proc = self._run_python(pyscript)
1196 proc.wait()
1197 return str(proc.stdout.getvalue().strip())
1198
1199
1200 def lstat(self, fs_path, follow_symlinks=False, wait=True):
1201 return self.stat(fs_path, follow_symlinks=False, wait=True)
1202
1203 def stat(self, fs_path, follow_symlinks=True, wait=True, **kwargs):
1204 """
1205 stat a file, and return the result as a dictionary like this:
1206 {
1207 "st_ctime": 1414161137.0,
1208 "st_mtime": 1414161137.0,
1209 "st_nlink": 33,
1210 "st_gid": 0,
1211 "st_dev": 16777218,
1212 "st_size": 1190,
1213 "st_ino": 2,
1214 "st_uid": 0,
1215 "st_mode": 16877,
1216 "st_atime": 1431520593.0
1217 }
1218
1219 Raises exception on absent file.
1220 """
1221 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1222 if follow_symlinks:
1223 stat_call = "os.stat('" + abs_path + "')"
1224 else:
1225 stat_call = "os.lstat('" + abs_path + "')"
1226
1227 pyscript = dedent("""
1228 import os
1229 import stat
1230 import json
1231 import sys
1232
1233 try:
1234 s = {stat_call}
1235 except OSError as e:
1236 sys.exit(e.errno)
1237
1238 attrs = ["st_mode", "st_ino", "st_dev", "st_nlink", "st_uid", "st_gid", "st_size", "st_atime", "st_mtime", "st_ctime"]
1239 print(json.dumps(
1240 dict([(a, getattr(s, a)) for a in attrs]),
1241 indent=2))
1242 """).format(stat_call=stat_call)
1243 proc = self._run_python(pyscript, **kwargs)
1244 if wait:
1245 proc.wait()
1246 return json.loads(proc.stdout.getvalue().strip())
1247 else:
1248 return proc
1249
1250 def touch(self, fs_path):
1251 """
1252 Create a dentry if it doesn't already exist. This python
1253 implementation exists because the usual command line tool doesn't
1254 pass through error codes like EIO.
1255
1256 :param fs_path:
1257 :return:
1258 """
1259 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1260 pyscript = dedent("""
1261 import sys
1262 import errno
1263
1264 try:
1265 f = open("{path}", "w")
1266 f.close()
1267 except IOError as e:
1268 sys.exit(errno.EIO)
1269 """).format(path=abs_path)
1270 proc = self._run_python(pyscript)
1271 proc.wait()
1272
1273 def path_to_ino(self, fs_path, follow_symlinks=True):
1274 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1275
1276 if follow_symlinks:
1277 pyscript = dedent("""
1278 import os
1279 import stat
1280
1281 print(os.stat("{path}").st_ino)
1282 """).format(path=abs_path)
1283 else:
1284 pyscript = dedent("""
1285 import os
1286 import stat
1287
1288 print(os.lstat("{path}").st_ino)
1289 """).format(path=abs_path)
1290
1291 proc = self._run_python(pyscript)
1292 proc.wait()
1293 return int(proc.stdout.getvalue().strip())
1294
1295 def path_to_nlink(self, fs_path):
1296 abs_path = os.path.join(self.hostfs_mntpt, fs_path)
1297
1298 pyscript = dedent("""
1299 import os
1300 import stat
1301
1302 print(os.stat("{path}").st_nlink)
1303 """).format(path=abs_path)
1304
1305 proc = self._run_python(pyscript)
1306 proc.wait()
1307 return int(proc.stdout.getvalue().strip())
1308
1309 def ls(self, path=None, **kwargs):
1310 """
1311 Wrap ls: return a list of strings
1312 """
1313 cmd = ["ls"]
1314 if path:
1315 cmd.append(path)
1316
1317 ls_text = self.run_shell(cmd, **kwargs).stdout.getvalue().strip()
1318
1319 if ls_text:
1320 return ls_text.split("\n")
1321 else:
1322 # Special case because otherwise split on empty string
1323 # gives you [''] instead of []
1324 return []
1325
1326 def setfattr(self, path, key, val, **kwargs):
1327 """
1328 Wrap setfattr.
1329
1330 :param path: relative to mount point
1331 :param key: xattr name
1332 :param val: xattr value
1333 :return: None
1334 """
1335 self.run_shell(["setfattr", "-n", key, "-v", val, path], **kwargs)
1336
1337 def getfattr(self, path, attr, **kwargs):
1338 """
1339 Wrap getfattr: return the values of a named xattr on one file, or
1340 None if the attribute is not found.
1341
1342 :return: a string
1343 """
1344 p = self.run_shell(["getfattr", "--only-values", "-n", attr, path], wait=False, **kwargs)
1345 try:
1346 p.wait()
1347 except CommandFailedError as e:
1348 if e.exitstatus == 1 and "No such attribute" in p.stderr.getvalue():
1349 return None
1350 else:
1351 raise
1352
1353 return str(p.stdout.getvalue())
1354
1355 def df(self):
1356 """
1357 Wrap df: return a dict of usage fields in bytes
1358 """
1359
1360 p = self.run_shell(["df", "-B1", "."])
1361 lines = p.stdout.getvalue().strip().split("\n")
1362 fs, total, used, avail = lines[1].split()[:4]
1363 log.warning(lines)
1364
1365 return {
1366 "total": int(total),
1367 "used": int(used),
1368 "available": int(avail)
1369 }
1370
1371 def dir_checksum(self, path=None, follow_symlinks=False):
1372 cmd = ["find"]
1373 if follow_symlinks:
1374 cmd.append("-L")
1375 if path:
1376 cmd.append(path)
1377 cmd.extend(["-type", "f", "-exec", "md5sum", "{}", "+"])
1378 checksum_text = self.run_shell(cmd).stdout.getvalue().strip()
1379 checksum_sorted = sorted(checksum_text.split('\n'), key=lambda v: v.split()[1])
1380 return hashlib.md5(('\n'.join(checksum_sorted)).encode('utf-8')).hexdigest()