]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/mount.py
9 from io
import StringIO
10 from contextlib
import contextmanager
11 from textwrap
import dedent
14 from teuthology
.contextutil
import safe_while
15 from teuthology
.misc
import get_file
, write_file
16 from teuthology
.orchestra
import run
17 from teuthology
.orchestra
.run
import CommandFailedError
, ConnectionLostError
, Raw
19 from tasks
.cephfs
.filesystem
import Filesystem
21 log
= logging
.getLogger(__name__
)
23 class CephFSMount(object):
24 def __init__(self
, ctx
, test_dir
, client_id
, client_remote
,
25 client_keyring_path
=None, hostfs_mntpt
=None,
26 cephfs_name
=None, cephfs_mntpt
=None, brxnet
=None):
28 :param test_dir: Global teuthology test dir
29 :param client_id: Client ID, the 'foo' in client.foo
30 :param client_keyring_path: path to keyring for given client_id
31 :param client_remote: Remote instance for the host where client will
33 :param hostfs_mntpt: Path to directory on the FS on which Ceph FS will
35 :param cephfs_name: Name of Ceph FS to be mounted
36 :param cephfs_mntpt: Path to directory inside Ceph FS that will be
41 self
.test_dir
= test_dir
43 self
._verify
_attrs
(client_id
=client_id
,
44 client_keyring_path
=client_keyring_path
,
45 hostfs_mntpt
=hostfs_mntpt
, cephfs_name
=cephfs_name
,
46 cephfs_mntpt
=cephfs_mntpt
)
48 self
.client_id
= client_id
49 self
.client_keyring_path
= client_keyring_path
50 self
.client_remote
= client_remote
52 self
.hostfs_mntpt
= hostfs_mntpt
53 self
.hostfs_mntpt_dirname
= os
.path
.basename(self
.hostfs_mntpt
)
55 self
.hostfs_mntpt
= os
.path
.join(self
.test_dir
, f
'mnt.{self.client_id}')
56 self
.cephfs_name
= cephfs_name
57 self
.cephfs_mntpt
= cephfs_mntpt
61 self
._netns
_name
= None
64 self
.ceph_brx_net
= '192.168.0.0/16'
66 self
.ceph_brx_net
= brxnet
68 self
.test_files
= ['a', 'b', 'c']
70 self
.background_procs
= []
72 # This will cleanup the stale netnses, which are from the
73 # last failed test cases.
75 def cleanup_stale_netnses_and_bridge(remote
):
76 p
= remote
.run(args
=['ip', 'netns', 'list'],
77 stdout
=StringIO(), timeout
=(5*60))
78 p
= p
.stdout
.getvalue().strip()
80 # Get the netns name list
81 netns_list
= re
.findall(r
'ceph-ns-[^()\s][-.\w]+[^():\s]', p
)
83 # Remove the stale netnses
85 ns_name
= ns
.split()[0]
86 args
= ['sudo', 'ip', 'netns', 'delete', '{0}'.format(ns_name
)]
88 remote
.run(args
=args
, timeout
=(5*60), omit_sudo
=False)
92 # Remove the stale 'ceph-brx'
94 args
= ['sudo', 'ip', 'link', 'delete', 'ceph-brx']
95 remote
.run(args
=args
, timeout
=(5*60), omit_sudo
=False)
99 def _parse_netns_name(self
):
100 self
._netns
_name
= '-'.join(["ceph-ns",
101 re
.sub(r
'/+', "-", self
.mountpoint
)])
104 def mountpoint(self
):
105 if self
.hostfs_mntpt
== None:
106 self
.hostfs_mntpt
= os
.path
.join(self
.test_dir
,
107 self
.hostfs_mntpt_dirname
)
108 return self
.hostfs_mntpt
111 def mountpoint(self
, path
):
112 if not isinstance(path
, str):
113 raise RuntimeError('path should be of str type.')
114 self
._mountpoint
= self
.hostfs_mntpt
= path
117 def netns_name(self
):
118 if self
._netns
_name
== None:
119 self
._parse
_netns
_name
()
120 return self
._netns
_name
123 def netns_name(self
, name
):
124 self
._netns
_name
= name
126 def assert_and_log_minimum_mount_details(self
):
128 Make sure we have minimum details required for mounting. Ideally, this
129 method should be called at the beginning of the mount method.
131 if not self
.client_id
or not self
.client_remote
or \
132 not self
.hostfs_mntpt
:
133 errmsg
= ('Mounting CephFS requires that at least following '
134 'details to be provided -\n'
135 '1. the client ID,\n2. the mountpoint and\n'
136 '3. the remote machine where CephFS will be mounted.\n')
137 raise RuntimeError(errmsg
)
139 log
.info('Mounting Ceph FS. Following are details of mount; remember '
140 '"None" represents Python type None -')
141 log
.info(f
'self.client_remote.hostname = {self.client_remote.hostname}')
142 log
.info(f
'self.client.name = client.{self.client_id}')
143 log
.info(f
'self.hostfs_mntpt = {self.hostfs_mntpt}')
144 log
.info(f
'self.cephfs_name = {self.cephfs_name}')
145 log
.info(f
'self.cephfs_mntpt = {self.cephfs_mntpt}')
146 log
.info(f
'self.client_keyring_path = {self.client_keyring_path}')
147 if self
.client_keyring_path
:
148 log
.info('keyring content -\n' +
149 get_file(self
.client_remote
, self
.client_keyring_path
,
152 def is_mounted(self
):
155 def setupfs(self
, name
=None):
156 if name
is None and self
.fs
is not None:
157 # Previous mount existed, reuse the old name
159 self
.fs
= Filesystem(self
.ctx
, name
=name
)
160 log
.info('Wait for MDS to reach steady state...')
161 self
.fs
.wait_for_daemons()
162 log
.info('Ready to start {}...'.format(type(self
).__name
__))
164 def _setup_brx_and_nat(self
):
165 # The ip for ceph-brx should be
166 ip
= IP(self
.ceph_brx_net
)[-2]
167 mask
= self
.ceph_brx_net
.split('/')[1]
168 brd
= IP(self
.ceph_brx_net
).broadcast()
170 brx
= self
.client_remote
.run(args
=['ip', 'addr'], stderr
=StringIO(),
171 stdout
=StringIO(), timeout
=(5*60))
172 brx
= re
.findall(r
'inet .* ceph-brx', brx
.stdout
.getvalue())
174 # If the 'ceph-brx' already exists, then check whether
175 # the new net is conflicting with it
176 _ip
, _mask
= brx
[0].split()[1].split('/', 1)
177 if _ip
!= "{}".format(ip
) or _mask
!= mask
:
178 raise RuntimeError("Conflict with existing ceph-brx {0}, new {1}/{2}".format(brx
[0].split()[1], ip
, mask
))
180 # Setup the ceph-brx and always use the last valid IP
182 log
.info("Setuping the 'ceph-brx' with {0}/{1}".format(ip
, mask
))
184 self
.run_shell_payload(f
"""
186 sudo ip link add name ceph-brx type bridge
187 sudo ip addr flush dev ceph-brx
188 sudo ip link set ceph-brx up
189 sudo ip addr add {ip}/{mask} brd {brd} dev ceph-brx
190 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
192 args
= "echo 1 | sudo tee /proc/sys/net/ipv4/ip_forward"
193 self
.client_remote
.run(args
=args
, timeout
=(5*60), omit_sudo
=False)
196 p
= self
.client_remote
.run(args
=['route'], stderr
=StringIO(),
197 stdout
=StringIO(), timeout
=(5*60))
198 p
= re
.findall(r
'default .*', p
.stdout
.getvalue())
200 raise RuntimeError("No default gw found")
203 self
.run_shell_payload(f
"""
205 sudo iptables -A FORWARD -o {gw} -i ceph-brx -j ACCEPT
206 sudo iptables -A FORWARD -i {gw} -o ceph-brx -j ACCEPT
207 sudo iptables -t nat -A POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
208 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
210 def _setup_netns(self
):
211 p
= self
.client_remote
.run(args
=['ip', 'netns', 'list'],
212 stderr
=StringIO(), stdout
=StringIO(),
213 timeout
=(5*60)).stdout
.getvalue().strip()
215 # Get the netns name list
216 netns_list
= re
.findall(r
'[^()\s][-.\w]+[^():\s]', p
)
218 out
= re
.search(r
"{0}".format(self
.netns_name
), p
)
220 # Get an uniq nsid for the new netns
222 p
= self
.client_remote
.run(args
=['ip', 'netns', 'list-id'],
223 stderr
=StringIO(), stdout
=StringIO(),
224 timeout
=(5*60)).stdout
.getvalue()
226 out
= re
.search(r
"nsid {} ".format(nsid
), p
)
232 # Add one new netns and set it id
233 self
.run_shell_payload(f
"""
235 sudo ip netns add {self.netns_name}
236 sudo ip netns set {self.netns_name} {nsid}
237 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
240 # The netns already exists and maybe suspended by self.kill()
243 nsid
= int(re
.search(r
"{0} \(id: (\d+)\)".format(self
.netns_name
), p
).group(1))
247 # Get one ip address for netns
248 ips
= IP(self
.ceph_brx_net
)
254 raise RuntimeError("we have ran out of the ip addresses")
256 for ns
in netns_list
:
257 ns_name
= ns
.split()[0]
258 args
= ['sudo', 'ip', 'netns', 'exec', '{0}'.format(ns_name
), 'ip', 'addr']
260 p
= self
.client_remote
.run(args
=args
, stderr
=StringIO(),
261 stdout
=StringIO(), timeout
=(5*60),
263 q
= re
.search("{0}".format(ip
), p
.stdout
.getvalue())
267 except CommandFailedError
:
268 if "No such file or directory" in p
.stderr
.getvalue():
270 if "Invalid argument" in p
.stderr
.getvalue():
276 mask
= self
.ceph_brx_net
.split('/')[1]
277 brd
= IP(self
.ceph_brx_net
).broadcast()
279 log
.info("Setuping the netns '{0}' with {1}/{2}".format(self
.netns_name
, ip
, mask
))
281 # Setup the veth interfaces
282 brxip
= IP(self
.ceph_brx_net
)[-2]
283 self
.run_shell_payload(f
"""
285 sudo ip link add veth0 netns {self.netns_name} type veth peer name brx.{nsid}
286 sudo ip netns exec {self.netns_name} ip addr add {ip}/{mask} brd {brd} dev veth0
287 sudo ip netns exec {self.netns_name} ip link set veth0 up
288 sudo ip netns exec {self.netns_name} ip link set lo up
289 sudo ip netns exec {self.netns_name} ip route add default via {brxip}
290 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
292 # Bring up the brx interface and join it to 'ceph-brx'
293 self
.run_shell_payload(f
"""
295 sudo ip link set brx.{nsid} up
296 sudo ip link set dev brx.{nsid} master ceph-brx
297 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
299 def _cleanup_netns(self
):
302 log
.info("Removing the netns '{0}'".format(self
.netns_name
))
304 # Delete the netns and the peer veth interface
305 self
.run_shell_payload(f
"""
307 sudo ip link set brx.{self.nsid} down
308 sudo ip link delete dev brx.{self.nsid}
309 sudo ip netns delete {self.netns_name}
310 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
314 def _cleanup_brx_and_nat(self
):
315 brx
= self
.client_remote
.run(args
=['ip', 'addr'], stderr
=StringIO(),
316 stdout
=StringIO(), timeout
=(5*60))
317 brx
= re
.findall(r
'inet .* ceph-brx', brx
.stdout
.getvalue())
321 # If we are the last netns, will delete the ceph-brx
322 args
= ['sudo', 'ip', 'link', 'show']
323 p
= self
.client_remote
.run(args
=args
, stdout
=StringIO(),
324 timeout
=(5*60), omit_sudo
=False)
325 _list
= re
.findall(r
'brx\.', p
.stdout
.getvalue().strip())
329 log
.info("Removing the 'ceph-brx'")
331 self
.run_shell_payload("""
333 sudo ip link set ceph-brx down
334 sudo ip link delete ceph-brx
335 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
337 # Drop the iptables NAT rules
338 ip
= IP(self
.ceph_brx_net
)[-2]
339 mask
= self
.ceph_brx_net
.split('/')[1]
341 p
= self
.client_remote
.run(args
=['route'], stderr
=StringIO(),
342 stdout
=StringIO(), timeout
=(5*60))
343 p
= re
.findall(r
'default .*', p
.stdout
.getvalue())
345 raise RuntimeError("No default gw found")
347 self
.run_shell_payload(f
"""
349 sudo iptables -D FORWARD -o {gw} -i ceph-brx -j ACCEPT
350 sudo iptables -D FORWARD -i {gw} -o ceph-brx -j ACCEPT
351 sudo iptables -t nat -D POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
352 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
354 def setup_netns(self
):
356 Setup the netns for the mountpoint.
358 log
.info("Setting the '{0}' netns for '{1}'".format(self
._netns
_name
, self
.mountpoint
))
359 self
._setup
_brx
_and
_nat
()
362 def cleanup_netns(self
):
364 Cleanup the netns for the mountpoint.
366 # We will defer cleaning the netnses and bridge until the last
367 # mountpoint is unmounted, this will be a temporary work around
370 # log.info("Cleaning the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
371 # self._cleanup_netns()
372 # self._cleanup_brx_and_nat()
374 def suspend_netns(self
):
376 Suspend the netns veth interface.
381 log
.info("Suspending the '{0}' netns for '{1}'".format(self
._netns
_name
, self
.mountpoint
))
383 args
= ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self
.nsid
), 'down']
384 self
.client_remote
.run(args
=args
, timeout
=(5*60), omit_sudo
=False)
386 def resume_netns(self
):
388 Resume the netns veth interface.
393 log
.info("Resuming the '{0}' netns for '{1}'".format(self
._netns
_name
, self
.mountpoint
))
395 args
= ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self
.nsid
), 'up']
396 self
.client_remote
.run(args
=args
, timeout
=(5*60), omit_sudo
=False)
398 def mount(self
, mntopts
=[], createfs
=True, check_status
=True, **kwargs
):
400 kwargs expects its members to be same as the arguments accepted by
403 raise NotImplementedError()
405 def mount_wait(self
, **kwargs
):
407 Accepts arguments same as self.mount().
410 self
.wait_until_mounted()
413 raise NotImplementedError()
415 def umount_wait(self
, force
=False, require_clean
=False, timeout
=None):
418 :param force: Expect that the mount will not shutdown cleanly: kill
420 :param require_clean: Wait for the Ceph client associated with the
421 mount (e.g. ceph-fuse) to terminate, and
422 raise if it doesn't do so cleanly.
423 :param timeout: amount of time to be waited for umount command to finish
426 raise NotImplementedError()
428 def _verify_attrs(self
, **kwargs
):
430 Verify that client_id, client_keyring_path, client_remote, hostfs_mntpt,
431 cephfs_name, cephfs_mntpt are either type str or None.
433 for k
, v
in kwargs
.items():
434 if v
is not None and not isinstance(v
, str):
435 raise RuntimeError('value of attributes should be either str '
436 f
'or None. {k} - {v}')
438 def update_attrs(self
, client_id
=None, client_keyring_path
=None,
439 client_remote
=None, hostfs_mntpt
=None, cephfs_name
=None,
441 if not (client_id
or client_keyring_path
or client_remote
or
442 cephfs_name
or cephfs_mntpt
or hostfs_mntpt
):
445 self
._verify
_attrs
(client_id
=client_id
,
446 client_keyring_path
=client_keyring_path
,
447 hostfs_mntpt
=hostfs_mntpt
, cephfs_name
=cephfs_name
,
448 cephfs_mntpt
=cephfs_mntpt
)
451 self
.client_id
= client_id
452 if client_keyring_path
:
453 self
.client_keyring_path
= client_keyring_path
455 self
.client_remote
= client_remote
457 self
.hostfs_mntpt
= hostfs_mntpt
459 self
.cephfs_name
= cephfs_name
461 self
.cephfs_mntpt
= cephfs_mntpt
463 def remount(self
, **kwargs
):
465 Update mount object's attributes and attempt remount with these
466 new values for these attrbiutes.
468 1. Run umount_wait().
469 2. Run update_attrs().
472 Accepts arguments of self.mount() and self.update_attrs() with 2 exceptions -
473 1. Accepts wait too which can be True or False.
474 2. The default value of createfs is False.
477 assert not self
.mounted
479 mntopts
= kwargs
.pop('mntopts', [])
480 createfs
= kwargs
.pop('createfs', False)
481 check_status
= kwargs
.pop('check_status', True)
482 wait
= kwargs
.pop('wait', True)
484 self
.update_attrs(**kwargs
)
486 retval
= self
.mount(mntopts
=mntopts
, createfs
=createfs
,
487 check_status
=check_status
)
488 # avoid this scenario (again): mount command might've failed and
489 # check_status might have silenced the exception, yet we attempt to
490 # wait which might lead to an error.
491 if retval
is None and wait
:
492 self
.wait_until_mounted()
498 Suspend the netns veth interface to make the client disconnected
499 from the ceph cluster
501 log
.info('Killing connection on {0}...'.format(self
.client_remote
.name
))
504 def kill_cleanup(self
):
506 Follow up ``kill`` to get to a clean unmounted state.
508 log
.info('Cleaning up killed connection on {0}'.format(self
.client_remote
.name
))
509 self
.umount_wait(force
=True)
513 Remove the mount point.
515 Prerequisite: the client is not mounted.
517 log
.info('Cleaning up mount {0}'.format(self
.client_remote
.name
))
520 self
.client_remote
.run(args
=['rmdir', '--', self
.mountpoint
],
521 cwd
=self
.test_dir
, stderr
=stderr
,
522 timeout
=(60*5), check_status
=False)
523 except CommandFailedError
:
524 if "no such file or directory" not in stderr
.getvalue().lower():
529 def wait_until_mounted(self
):
530 raise NotImplementedError()
532 def get_keyring_path(self
):
533 return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self
.client_id
)
535 def get_key_from_keyfile(self
):
536 # XXX: don't call run_shell(), since CephFS might be unmounted.
537 keyring
= self
.client_remote
.run(
538 args
=['sudo', 'cat', self
.client_keyring_path
], stdout
=StringIO(),
539 omit_sudo
=False).stdout
.getvalue()
540 for line
in keyring
.split('\n'):
541 if line
.find('key') != -1:
542 return line
[line
.find('=') + 1 : ].strip()
545 def config_path(self
):
547 Path to ceph.conf: override this if you're not a normal systemwide ceph install
550 return "/etc/ceph/ceph.conf"
553 def mounted_wait(self
):
555 A context manager, from an initially unmounted state, to mount
556 this, yield, and then unmount and clean up.
559 self
.wait_until_mounted()
565 def create_file(self
, filename
='testfile', dirname
=None, user
=None,
567 assert(self
.is_mounted())
569 if not os
.path
.isabs(filename
):
571 if os
.path
.isabs(dirname
):
572 path
= os
.path
.join(dirname
, filename
)
574 path
= os
.path
.join(self
.hostfs_mntpt
, dirname
, filename
)
576 path
= os
.path
.join(self
.hostfs_mntpt
, filename
)
581 args
= ['sudo', '-u', user
, '-s', '/bin/bash', '-c', 'touch ' + path
]
583 args
= 'touch ' + path
585 return self
.client_remote
.run(args
=args
, check_status
=check_status
)
587 def create_files(self
):
588 assert(self
.is_mounted())
590 for suffix
in self
.test_files
:
591 log
.info("Creating file {0}".format(suffix
))
592 self
.client_remote
.run(args
=[
593 'touch', os
.path
.join(self
.hostfs_mntpt
, suffix
)
596 def test_create_file(self
, filename
='testfile', dirname
=None, user
=None,
598 return self
.create_file(filename
=filename
, dirname
=dirname
, user
=user
,
601 def check_files(self
):
602 assert(self
.is_mounted())
604 for suffix
in self
.test_files
:
605 log
.info("Checking file {0}".format(suffix
))
606 r
= self
.client_remote
.run(args
=[
607 'ls', os
.path
.join(self
.hostfs_mntpt
, suffix
)
608 ], check_status
=False)
609 if r
.exitstatus
!= 0:
610 raise RuntimeError("Expected file {0} not found".format(suffix
))
612 def write_file(self
, path
, data
, perms
=None):
614 Write the given data at the given path and set the given perms to the
617 if path
.find(self
.hostfs_mntpt
) == -1:
618 path
= os
.path
.join(self
.hostfs_mntpt
, path
)
620 write_file(self
.client_remote
, path
, data
)
623 self
.run_shell(args
=f
'chmod {perms} {path}')
625 def read_file(self
, path
):
627 Return the data from the file on given path.
629 if path
.find(self
.hostfs_mntpt
) == -1:
630 path
= os
.path
.join(self
.hostfs_mntpt
, path
)
632 return self
.run_shell(args
=['cat', path
]).\
633 stdout
.getvalue().strip()
635 def create_destroy(self
):
636 assert(self
.is_mounted())
638 filename
= "{0} {1}".format(datetime
.datetime
.now(), self
.client_id
)
639 log
.debug("Creating test file {0}".format(filename
))
640 self
.client_remote
.run(args
=[
641 'touch', os
.path
.join(self
.hostfs_mntpt
, filename
)
643 log
.debug("Deleting test file {0}".format(filename
))
644 self
.client_remote
.run(args
=[
645 'rm', '-f', os
.path
.join(self
.hostfs_mntpt
, filename
)
648 def _run_python(self
, pyscript
, py_version
='python3', sudo
=False):
652 args
+= ['adjust-ulimits', 'daemon-helper', 'kill', py_version
, '-c', pyscript
]
653 return self
.client_remote
.run(args
=args
, wait
=False, stdin
=run
.PIPE
, stdout
=StringIO())
655 def run_python(self
, pyscript
, py_version
='python3', sudo
=False):
656 p
= self
._run
_python
(pyscript
, py_version
, sudo
=sudo
)
658 return p
.stdout
.getvalue().strip()
660 def run_shell(self
, args
, timeout
=900, **kwargs
):
661 args
= args
.split() if isinstance(args
, str) else args
662 kwargs
.pop('omit_sudo', False)
663 sudo
= kwargs
.pop('sudo', False)
664 cwd
= kwargs
.pop('cwd', self
.mountpoint
)
665 stdout
= kwargs
.pop('stdout', StringIO())
666 stderr
= kwargs
.pop('stderr', StringIO())
669 args
.insert(0, 'sudo')
671 return self
.client_remote
.run(args
=args
, cwd
=cwd
, timeout
=timeout
, stdout
=stdout
, stderr
=stderr
, **kwargs
)
673 def run_shell_payload(self
, payload
, **kwargs
):
674 return self
.run_shell(["bash", "-c", Raw(f
"'{payload}'")], **kwargs
)
676 def run_as_user(self
, **kwargs
):
678 Besides the arguments defined for run_shell() this method also
679 accepts argument 'user'.
681 args
= kwargs
.pop('args')
682 user
= kwargs
.pop('user')
683 if isinstance(args
, str):
684 args
= ['sudo', '-u', user
, '-s', '/bin/bash', '-c', args
]
685 elif isinstance(args
, list):
690 # get rid of extra space at the end.
693 args
= ['sudo', '-u', user
, '-s', '/bin/bash', '-c', cmd
]
695 kwargs
['args'] = args
696 return self
.run_shell(**kwargs
)
698 def run_as_root(self
, **kwargs
):
700 Accepts same arguments as run_shell().
702 kwargs
['user'] = 'root'
703 return self
.run_as_user(**kwargs
)
705 def _verify(self
, proc
, retval
=None, errmsg
=None):
707 msg
= ('expected return value: {}\nreceived return value: '
708 '{}\n'.format(retval
, proc
.returncode
))
709 assert proc
.returncode
== retval
, msg
712 stderr
= proc
.stderr
.getvalue().lower()
713 msg
= ('didn\'t find given string in stderr -\nexpected string: '
714 '{}\nreceived error message: {}\nnote: received error '
715 'message is converted to lowercase'.format(errmsg
, stderr
))
716 assert errmsg
in stderr
, msg
718 def negtestcmd(self
, args
, retval
=None, errmsg
=None, stdin
=None,
719 cwd
=None, wait
=True):
721 Conduct a negative test for the given command.
723 retval and errmsg are parameters to confirm the cause of command
726 proc
= self
.run_shell(args
=args
, wait
=wait
, stdin
=stdin
, cwd
=cwd
,
728 self
._verify
(proc
, retval
, errmsg
)
731 def negtestcmd_as_user(self
, args
, user
, retval
=None, errmsg
=None,
732 stdin
=None, cwd
=None, wait
=True):
733 proc
= self
.run_as_user(args
=args
, user
=user
, wait
=wait
, stdin
=stdin
,
734 cwd
=cwd
, check_status
=False)
735 self
._verify
(proc
, retval
, errmsg
)
738 def negtestcmd_as_root(self
, args
, retval
=None, errmsg
=None, stdin
=None,
739 cwd
=None, wait
=True):
740 proc
= self
.run_as_root(args
=args
, wait
=wait
, stdin
=stdin
, cwd
=cwd
,
742 self
._verify
(proc
, retval
, errmsg
)
745 def open_no_data(self
, basename
):
747 A pure metadata operation
749 assert(self
.is_mounted())
751 path
= os
.path
.join(self
.hostfs_mntpt
, basename
)
753 p
= self
._run
_python
(dedent(
755 f = open("{path}", 'w')
756 """.format(path
=path
)
760 def open_background(self
, basename
="background_file", write
=True):
762 Open a file for writing, then block such that the client
763 will hold a capability.
765 Don't return until the remote process has got as far as opening
766 the file, then return the RemoteProcess instance.
768 assert(self
.is_mounted())
770 path
= os
.path
.join(self
.hostfs_mntpt
, basename
)
773 pyscript
= dedent("""
776 with open("{path}", 'w') as f:
782 """).format(path
=path
)
784 pyscript
= dedent("""
787 with open("{path}", 'r') as f:
790 """).format(path
=path
)
792 rproc
= self
._run
_python
(pyscript
)
793 self
.background_procs
.append(rproc
)
795 # This wait would not be sufficient if the file had already
796 # existed, but it's simple and in practice users of open_background
797 # are not using it on existing files.
798 self
.wait_for_visible(basename
)
802 def wait_for_dir_empty(self
, dirname
, timeout
=30):
803 dirpath
= os
.path
.join(self
.hostfs_mntpt
, dirname
)
804 with
safe_while(sleep
=5, tries
=(timeout
//5)) as proceed
:
806 p
= self
.run_shell_payload(f
"stat -c %h {dirpath}")
807 nr_links
= int(p
.stdout
.getvalue().strip())
811 def wait_for_visible(self
, basename
="background_file", timeout
=30):
814 r
= self
.client_remote
.run(args
=[
815 'stat', os
.path
.join(self
.hostfs_mntpt
, basename
)
816 ], check_status
=False)
817 if r
.exitstatus
== 0:
818 log
.debug("File {0} became visible from {1} after {2}s".format(
819 basename
, self
.client_id
, i
))
825 raise RuntimeError("Timed out after {0}s waiting for {1} to become visible from {2}".format(
826 i
, basename
, self
.client_id
))
828 def lock_background(self
, basename
="background_file", do_flock
=True):
830 Open and lock a files for writing, hold the lock in a background process
832 assert(self
.is_mounted())
834 path
= os
.path
.join(self
.hostfs_mntpt
, basename
)
841 script_builder
+= """
842 f1 = open("{path}-1", 'w')
843 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)"""
844 script_builder
+= """
845 f2 = open("{path}-2", 'w')
846 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
847 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
852 pyscript
= dedent(script_builder
).format(path
=path
)
854 log
.info("lock_background file {0}".format(basename
))
855 rproc
= self
._run
_python
(pyscript
)
856 self
.background_procs
.append(rproc
)
859 def lock_and_release(self
, basename
="background_file"):
860 assert(self
.is_mounted())
862 path
= os
.path
.join(self
.hostfs_mntpt
, basename
)
868 f1 = open("{path}-1", 'w')
869 fcntl.flock(f1, fcntl.LOCK_EX)
870 f2 = open("{path}-2", 'w')
871 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
872 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
874 pyscript
= dedent(script
).format(path
=path
)
876 log
.info("lock_and_release file {0}".format(basename
))
877 return self
._run
_python
(pyscript
)
879 def check_filelock(self
, basename
="background_file", do_flock
=True):
880 assert(self
.is_mounted())
882 path
= os
.path
.join(self
.hostfs_mntpt
, basename
)
889 script_builder
+= """
890 f1 = open("{path}-1", 'r')
892 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)
894 if e.errno == errno.EAGAIN:
897 raise RuntimeError("flock on file {path}-1 not found")"""
898 script_builder
+= """
899 f2 = open("{path}-2", 'r')
901 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
902 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
904 if e.errno == errno.EAGAIN:
907 raise RuntimeError("posix lock on file {path}-2 not found")
909 pyscript
= dedent(script_builder
).format(path
=path
)
911 log
.info("check lock on file {0}".format(basename
))
912 self
.client_remote
.run(args
=[
913 'python3', '-c', pyscript
916 def write_background(self
, basename
="background_file", loop
=False):
918 Open a file for writing, complete as soon as you can
922 assert(self
.is_mounted())
924 path
= os
.path
.join(self
.hostfs_mntpt
, basename
)
926 pyscript
= dedent("""
930 fd = os.open("{path}", os.O_RDWR | os.O_CREAT, 0o644)
933 os.write(fd, b'content')
940 """).format(path
=path
, loop
=str(loop
))
942 rproc
= self
._run
_python
(pyscript
)
943 self
.background_procs
.append(rproc
)
946 def write_n_mb(self
, filename
, n_mb
, seek
=0, wait
=True):
948 Write the requested number of megabytes to a file
950 assert(self
.is_mounted())
952 return self
.run_shell(["dd", "if=/dev/urandom", "of={0}".format(filename
),
953 "bs=1M", "conv=fdatasync",
954 "count={0}".format(int(n_mb
)),
955 "seek={0}".format(int(seek
))
958 def write_test_pattern(self
, filename
, size
):
959 log
.info("Writing {0} bytes to {1}".format(size
, filename
))
960 return self
.run_python(dedent("""
963 with open(path, 'w') as f:
964 for i in range(0, {size}):
965 val = zlib.crc32(str(i).encode('utf-8')) & 7
968 path
=os
.path
.join(self
.hostfs_mntpt
, filename
),
972 def validate_test_pattern(self
, filename
, size
):
973 log
.info("Validating {0} bytes from {1}".format(size
, filename
))
974 # Use sudo because cephfs-data-scan may recreate the file with owner==root
975 return self
.run_python(dedent("""
978 with open(path, 'r') as f:
980 if len(bytes) != {size}:
981 raise RuntimeError("Bad length {{0}} vs. expected {{1}}".format(
984 for i, b in enumerate(bytes):
985 val = zlib.crc32(str(i).encode('utf-8')) & 7
987 raise RuntimeError("Bad data at offset {{0}}".format(i))
989 path
=os
.path
.join(self
.hostfs_mntpt
, filename
),
993 def open_n_background(self
, fs_path
, count
):
995 Open N files for writing, hold them open in a background process
997 :param fs_path: Path relative to CephFS root, e.g. "foo/bar"
998 :return: a RemoteProcess
1000 assert(self
.is_mounted())
1002 abs_path
= os
.path
.join(self
.hostfs_mntpt
, fs_path
)
1004 pyscript
= dedent("""
1010 abs_path = "{abs_path}"
1012 if not os.path.exists(abs_path):
1013 os.makedirs(abs_path)
1016 for i in range(0, n):
1017 fname = "file_"+str(i)
1018 path = os.path.join(abs_path, fname)
1019 handles.append(open(path, 'w'))
1023 """).format(abs_path
=abs_path
, count
=count
)
1025 rproc
= self
._run
_python
(pyscript
)
1026 self
.background_procs
.append(rproc
)
1029 def create_n_files(self
, fs_path
, count
, sync
=False):
1030 assert(self
.is_mounted())
1032 abs_path
= os
.path
.join(self
.hostfs_mntpt
, fs_path
)
1034 pyscript
= dedent("""
1040 abs_path = "{abs_path}"
1042 if not os.path.exists(os.path.dirname(abs_path)):
1043 os.makedirs(os.path.dirname(abs_path))
1045 for i in range(0, n):
1046 fname = "{{0}}_{{1}}".format(abs_path, i)
1047 with open(fname, 'w') as f:
1051 os.fsync(f.fileno())
1052 """).format(abs_path
=abs_path
, count
=count
, sync
=str(sync
))
1054 self
.run_python(pyscript
)
1057 for p
in self
.background_procs
:
1058 log
.info("Terminating background process")
1059 self
._kill
_background
(p
)
1061 self
.background_procs
= []
1063 def _kill_background(self
, p
):
1068 except (CommandFailedError
, ConnectionLostError
):
1071 def kill_background(self
, p
):
1073 For a process that was returned by one of the _background member functions,
1076 self
._kill
_background
(p
)
1077 self
.background_procs
.remove(p
)
1079 def send_signal(self
, signal
):
1080 signal
= signal
.lower()
1081 if signal
.lower() not in ['sigstop', 'sigcont', 'sigterm', 'sigkill']:
1082 raise NotImplementedError
1084 self
.client_remote
.run(args
=['sudo', 'kill', '-{0}'.format(signal
),
1085 self
.client_pid
], omit_sudo
=False)
1087 def get_global_id(self
):
1088 raise NotImplementedError()
1090 def get_global_inst(self
):
1091 raise NotImplementedError()
1093 def get_global_addr(self
):
1094 raise NotImplementedError()
1096 def get_osd_epoch(self
):
1097 raise NotImplementedError()
1099 def get_op_read_count(self
):
1100 raise NotImplementedError()
1102 def lstat(self
, fs_path
, follow_symlinks
=False, wait
=True):
1103 return self
.stat(fs_path
, follow_symlinks
=False, wait
=True)
1105 def stat(self
, fs_path
, follow_symlinks
=True, wait
=True, **kwargs
):
1107 stat a file, and return the result as a dictionary like this:
1109 "st_ctime": 1414161137.0,
1110 "st_mtime": 1414161137.0,
1118 "st_atime": 1431520593.0
1121 Raises exception on absent file.
1123 abs_path
= os
.path
.join(self
.hostfs_mntpt
, fs_path
)
1125 stat_call
= "os.stat('" + abs_path
+ "')"
1127 stat_call
= "os.lstat('" + abs_path
+ "')"
1129 pyscript
= dedent("""
1137 except OSError as e:
1140 attrs = ["st_mode", "st_ino", "st_dev", "st_nlink", "st_uid", "st_gid", "st_size", "st_atime", "st_mtime", "st_ctime"]
1142 dict([(a, getattr(s, a)) for a in attrs]),
1144 """).format(stat_call
=stat_call
)
1145 proc
= self
._run
_python
(pyscript
, **kwargs
)
1148 return json
.loads(proc
.stdout
.getvalue().strip())
1152 def touch(self
, fs_path
):
1154 Create a dentry if it doesn't already exist. This python
1155 implementation exists because the usual command line tool doesn't
1156 pass through error codes like EIO.
1161 abs_path
= os
.path
.join(self
.hostfs_mntpt
, fs_path
)
1162 pyscript
= dedent("""
1167 f = open("{path}", "w")
1169 except IOError as e:
1171 """).format(path
=abs_path
)
1172 proc
= self
._run
_python
(pyscript
)
1175 def path_to_ino(self
, fs_path
, follow_symlinks
=True):
1176 abs_path
= os
.path
.join(self
.hostfs_mntpt
, fs_path
)
1179 pyscript
= dedent("""
1183 print(os.stat("{path}").st_ino)
1184 """).format(path
=abs_path
)
1186 pyscript
= dedent("""
1190 print(os.lstat("{path}").st_ino)
1191 """).format(path
=abs_path
)
1193 proc
= self
._run
_python
(pyscript
)
1195 return int(proc
.stdout
.getvalue().strip())
1197 def path_to_nlink(self
, fs_path
):
1198 abs_path
= os
.path
.join(self
.hostfs_mntpt
, fs_path
)
1200 pyscript
= dedent("""
1204 print(os.stat("{path}").st_nlink)
1205 """).format(path
=abs_path
)
1207 proc
= self
._run
_python
(pyscript
)
1209 return int(proc
.stdout
.getvalue().strip())
1211 def ls(self
, path
=None, **kwargs
):
1213 Wrap ls: return a list of strings
1219 ls_text
= self
.run_shell(cmd
, **kwargs
).stdout
.getvalue().strip()
1222 return ls_text
.split("\n")
1224 # Special case because otherwise split on empty string
1225 # gives you [''] instead of []
1228 def setfattr(self
, path
, key
, val
, **kwargs
):
1232 :param path: relative to mount point
1233 :param key: xattr name
1234 :param val: xattr value
1237 self
.run_shell(["setfattr", "-n", key
, "-v", val
, path
], **kwargs
)
1239 def getfattr(self
, path
, attr
, **kwargs
):
1241 Wrap getfattr: return the values of a named xattr on one file, or
1242 None if the attribute is not found.
1246 p
= self
.run_shell(["getfattr", "--only-values", "-n", attr
, path
], wait
=False, **kwargs
)
1249 except CommandFailedError
as e
:
1250 if e
.exitstatus
== 1 and "No such attribute" in p
.stderr
.getvalue():
1255 return str(p
.stdout
.getvalue())
1259 Wrap df: return a dict of usage fields in bytes
1262 p
= self
.run_shell(["df", "-B1", "."])
1263 lines
= p
.stdout
.getvalue().strip().split("\n")
1264 fs
, total
, used
, avail
= lines
[1].split()[:4]
1268 "total": int(total
),
1270 "available": int(avail
)
1273 def dir_checksum(self
, path
=None, follow_symlinks
=False):
1279 cmd
.extend(["-type", "f", "-exec", "md5sum", "{}", "+"])
1280 checksum_text
= self
.run_shell(cmd
).stdout
.getvalue().strip()
1281 checksum_sorted
= sorted(checksum_text
.split('\n'), key
=lambda v
: v
.split()[1])
1282 return hashlib
.md5(('\n'.join(checksum_sorted
)).encode('utf-8')).hexdigest()