]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/mount.py
9 from io
import StringIO
10 from contextlib
import contextmanager
11 from textwrap
import dedent
14 from teuthology
.contextutil
import safe_while
15 from teuthology
.misc
import get_file
, write_file
16 from teuthology
.orchestra
import run
17 from teuthology
.orchestra
.run
import Raw
18 from teuthology
.exceptions
import CommandFailedError
, ConnectionLostError
20 from tasks
.cephfs
.filesystem
import Filesystem
22 log
= logging
.getLogger(__name__
)
24 class CephFSMount(object):
25 def __init__(self
, ctx
, test_dir
, client_id
, client_remote
,
26 client_keyring_path
=None, hostfs_mntpt
=None,
27 cephfs_name
=None, cephfs_mntpt
=None, brxnet
=None):
29 :param test_dir: Global teuthology test dir
30 :param client_id: Client ID, the 'foo' in client.foo
31 :param client_keyring_path: path to keyring for given client_id
32 :param client_remote: Remote instance for the host where client will
34 :param hostfs_mntpt: Path to directory on the FS on which Ceph FS will
36 :param cephfs_name: Name of Ceph FS to be mounted
37 :param cephfs_mntpt: Path to directory inside Ceph FS that will be
42 self
.test_dir
= test_dir
44 self
._verify
_attrs
(client_id
=client_id
,
45 client_keyring_path
=client_keyring_path
,
46 hostfs_mntpt
=hostfs_mntpt
, cephfs_name
=cephfs_name
,
47 cephfs_mntpt
=cephfs_mntpt
)
49 self
.client_id
= client_id
50 self
.client_keyring_path
= client_keyring_path
51 self
.client_remote
= client_remote
53 self
.hostfs_mntpt
= hostfs_mntpt
54 self
.hostfs_mntpt_dirname
= os
.path
.basename(self
.hostfs_mntpt
)
56 self
.hostfs_mntpt
= os
.path
.join(self
.test_dir
, f
'mnt.{self.client_id}')
57 self
.cephfs_name
= cephfs_name
58 self
.cephfs_mntpt
= cephfs_mntpt
60 self
.cluster_name
= 'ceph' # TODO: use config['cluster']
64 self
._netns
_name
= None
67 self
.ceph_brx_net
= '192.168.0.0/16'
69 self
.ceph_brx_net
= brxnet
71 self
.test_files
= ['a', 'b', 'c']
73 self
.background_procs
= []
75 # This will cleanup the stale netnses, which are from the
76 # last failed test cases.
78 def cleanup_stale_netnses_and_bridge(remote
):
79 p
= remote
.run(args
=['ip', 'netns', 'list'],
80 stdout
=StringIO(), timeout
=(5*60))
81 p
= p
.stdout
.getvalue().strip()
83 # Get the netns name list
84 netns_list
= re
.findall(r
'ceph-ns-[^()\s][-.\w]+[^():\s]', p
)
86 # Remove the stale netnses
88 ns_name
= ns
.split()[0]
89 args
= ['sudo', 'ip', 'netns', 'delete', '{0}'.format(ns_name
)]
91 remote
.run(args
=args
, timeout
=(5*60), omit_sudo
=False)
95 # Remove the stale 'ceph-brx'
97 args
= ['sudo', 'ip', 'link', 'delete', 'ceph-brx']
98 remote
.run(args
=args
, timeout
=(5*60), omit_sudo
=False)
102 def _parse_netns_name(self
):
103 self
._netns
_name
= '-'.join(["ceph-ns",
104 re
.sub(r
'/+', "-", self
.mountpoint
)])
107 def mountpoint(self
):
108 if self
.hostfs_mntpt
== None:
109 self
.hostfs_mntpt
= os
.path
.join(self
.test_dir
,
110 self
.hostfs_mntpt_dirname
)
111 return self
.hostfs_mntpt
114 def mountpoint(self
, path
):
115 if not isinstance(path
, str):
116 raise RuntimeError('path should be of str type.')
117 self
._mountpoint
= self
.hostfs_mntpt
= path
120 def netns_name(self
):
121 if self
._netns
_name
== None:
122 self
._parse
_netns
_name
()
123 return self
._netns
_name
126 def netns_name(self
, name
):
127 self
._netns
_name
= name
129 def assert_that_ceph_fs_exists(self
):
130 output
= self
.ctx
.managers
[self
.cluster_name
].raw_cluster_cmd("fs", "ls")
132 assert self
.cephfs_name
in output
, \
133 'expected ceph fs is not present on the cluster'
134 log
.info(f
'Mounting Ceph FS {self.cephfs_name}; just confirmed its presence on cluster')
136 assert 'No filesystems enabled' not in output
, \
137 'ceph cluster has no ceph fs, not even the default ceph fs'
138 log
.info('Mounting default Ceph FS; just confirmed its presence on cluster')
140 def assert_and_log_minimum_mount_details(self
):
142 Make sure we have minimum details required for mounting. Ideally, this
143 method should be called at the beginning of the mount method.
145 if not self
.client_id
or not self
.client_remote
or \
146 not self
.hostfs_mntpt
:
147 errmsg
= ('Mounting CephFS requires that at least following '
148 'details to be provided -\n'
149 '1. the client ID,\n2. the mountpoint and\n'
150 '3. the remote machine where CephFS will be mounted.\n')
151 raise RuntimeError(errmsg
)
153 self
.assert_that_ceph_fs_exists()
155 log
.info('Mounting Ceph FS. Following are details of mount; remember '
156 '"None" represents Python type None -')
157 log
.info(f
'self.client_remote.hostname = {self.client_remote.hostname}')
158 log
.info(f
'self.client.name = client.{self.client_id}')
159 log
.info(f
'self.hostfs_mntpt = {self.hostfs_mntpt}')
160 log
.info(f
'self.cephfs_name = {self.cephfs_name}')
161 log
.info(f
'self.cephfs_mntpt = {self.cephfs_mntpt}')
162 log
.info(f
'self.client_keyring_path = {self.client_keyring_path}')
163 if self
.client_keyring_path
:
164 log
.info('keyring content -\n' +
165 get_file(self
.client_remote
, self
.client_keyring_path
,
168 def is_mounted(self
):
171 def setupfs(self
, name
=None):
172 if name
is None and self
.fs
is not None:
173 # Previous mount existed, reuse the old name
175 self
.fs
= Filesystem(self
.ctx
, name
=name
)
176 log
.info('Wait for MDS to reach steady state...')
177 self
.fs
.wait_for_daemons()
178 log
.info('Ready to start {}...'.format(type(self
).__name
__))
180 def _create_mntpt(self
):
181 self
.client_remote
.run(args
=f
'mkdir -p -v {self.hostfs_mntpt}',
183 # Use 0000 mode to prevent undesired modifications to the mountpoint on
184 # the local file system.
185 self
.client_remote
.run(args
=f
'chmod 0000 {self.hostfs_mntpt}',
189 def _nsenter_args(self
):
190 return ['nsenter', f
'--net=/var/run/netns/{self.netns_name}']
192 def _set_filemode_on_mntpt(self
):
195 self
.client_remote
.run(
196 args
=['sudo', 'chmod', '1777', self
.hostfs_mntpt
],
197 stderr
=stderr
, timeout
=(5*60))
198 except CommandFailedError
:
199 # the client does not have write permissions in the caps it holds
200 # for the Ceph FS that was just mounted.
201 if 'permission denied' in stderr
.getvalue().lower():
204 def _setup_brx_and_nat(self
):
205 # The ip for ceph-brx should be
206 ip
= IP(self
.ceph_brx_net
)[-2]
207 mask
= self
.ceph_brx_net
.split('/')[1]
208 brd
= IP(self
.ceph_brx_net
).broadcast()
210 brx
= self
.client_remote
.run(args
=['ip', 'addr'], stderr
=StringIO(),
211 stdout
=StringIO(), timeout
=(5*60))
212 brx
= re
.findall(r
'inet .* ceph-brx', brx
.stdout
.getvalue())
214 # If the 'ceph-brx' already exists, then check whether
215 # the new net is conflicting with it
216 _ip
, _mask
= brx
[0].split()[1].split('/', 1)
217 if _ip
!= "{}".format(ip
) or _mask
!= mask
:
218 raise RuntimeError("Conflict with existing ceph-brx {0}, new {1}/{2}".format(brx
[0].split()[1], ip
, mask
))
220 # Setup the ceph-brx and always use the last valid IP
222 log
.info("Setuping the 'ceph-brx' with {0}/{1}".format(ip
, mask
))
224 self
.run_shell_payload(f
"""
226 sudo ip link add name ceph-brx type bridge
227 sudo ip addr flush dev ceph-brx
228 sudo ip link set ceph-brx up
229 sudo ip addr add {ip}/{mask} brd {brd} dev ceph-brx
230 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
232 args
= "echo 1 | sudo tee /proc/sys/net/ipv4/ip_forward"
233 self
.client_remote
.run(args
=args
, timeout
=(5*60), omit_sudo
=False)
236 p
= self
.client_remote
.run(args
=['route'], stderr
=StringIO(),
237 stdout
=StringIO(), timeout
=(5*60))
238 p
= re
.findall(r
'default .*', p
.stdout
.getvalue())
240 raise RuntimeError("No default gw found")
243 self
.run_shell_payload(f
"""
245 sudo iptables -A FORWARD -o {gw} -i ceph-brx -j ACCEPT
246 sudo iptables -A FORWARD -i {gw} -o ceph-brx -j ACCEPT
247 sudo iptables -t nat -A POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
248 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
250 def _setup_netns(self
):
251 p
= self
.client_remote
.run(args
=['ip', 'netns', 'list'],
252 stderr
=StringIO(), stdout
=StringIO(),
253 timeout
=(5*60)).stdout
.getvalue().strip()
255 # Get the netns name list
256 netns_list
= re
.findall(r
'[^()\s][-.\w]+[^():\s]', p
)
258 out
= re
.search(r
"{0}".format(self
.netns_name
), p
)
260 # Get an uniq nsid for the new netns
262 p
= self
.client_remote
.run(args
=['ip', 'netns', 'list-id'],
263 stderr
=StringIO(), stdout
=StringIO(),
264 timeout
=(5*60)).stdout
.getvalue()
266 out
= re
.search(r
"nsid {} ".format(nsid
), p
)
272 # Add one new netns and set it id
273 self
.run_shell_payload(f
"""
275 sudo ip netns add {self.netns_name}
276 sudo ip netns set {self.netns_name} {nsid}
277 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
280 # The netns already exists and maybe suspended by self.kill()
283 nsid
= int(re
.search(r
"{0} \(id: (\d+)\)".format(self
.netns_name
), p
).group(1))
287 # Get one ip address for netns
288 ips
= IP(self
.ceph_brx_net
)
294 raise RuntimeError("we have ran out of the ip addresses")
296 for ns
in netns_list
:
297 ns_name
= ns
.split()[0]
298 args
= ['sudo', 'ip', 'netns', 'exec', '{0}'.format(ns_name
), 'ip', 'addr']
300 p
= self
.client_remote
.run(args
=args
, stderr
=StringIO(),
301 stdout
=StringIO(), timeout
=(5*60),
303 q
= re
.search("{0}".format(ip
), p
.stdout
.getvalue())
307 except CommandFailedError
:
308 if "No such file or directory" in p
.stderr
.getvalue():
310 if "Invalid argument" in p
.stderr
.getvalue():
316 mask
= self
.ceph_brx_net
.split('/')[1]
317 brd
= IP(self
.ceph_brx_net
).broadcast()
319 log
.info("Setuping the netns '{0}' with {1}/{2}".format(self
.netns_name
, ip
, mask
))
321 # Setup the veth interfaces
322 brxip
= IP(self
.ceph_brx_net
)[-2]
323 self
.run_shell_payload(f
"""
325 sudo ip link add veth0 netns {self.netns_name} type veth peer name brx.{nsid}
326 sudo ip netns exec {self.netns_name} ip addr add {ip}/{mask} brd {brd} dev veth0
327 sudo ip netns exec {self.netns_name} ip link set veth0 up
328 sudo ip netns exec {self.netns_name} ip link set lo up
329 sudo ip netns exec {self.netns_name} ip route add default via {brxip}
330 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
332 # Bring up the brx interface and join it to 'ceph-brx'
333 self
.run_shell_payload(f
"""
335 sudo ip link set brx.{nsid} up
336 sudo ip link set dev brx.{nsid} master ceph-brx
337 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
339 def _cleanup_netns(self
):
342 log
.info("Removing the netns '{0}'".format(self
.netns_name
))
344 # Delete the netns and the peer veth interface
345 self
.run_shell_payload(f
"""
347 sudo ip link set brx.{self.nsid} down
348 sudo ip link delete dev brx.{self.nsid}
349 sudo ip netns delete {self.netns_name}
350 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
354 def _cleanup_brx_and_nat(self
):
355 brx
= self
.client_remote
.run(args
=['ip', 'addr'], stderr
=StringIO(),
356 stdout
=StringIO(), timeout
=(5*60))
357 brx
= re
.findall(r
'inet .* ceph-brx', brx
.stdout
.getvalue())
361 # If we are the last netns, will delete the ceph-brx
362 args
= ['sudo', 'ip', 'link', 'show']
363 p
= self
.client_remote
.run(args
=args
, stdout
=StringIO(),
364 timeout
=(5*60), omit_sudo
=False)
365 _list
= re
.findall(r
'brx\.', p
.stdout
.getvalue().strip())
369 log
.info("Removing the 'ceph-brx'")
371 self
.run_shell_payload("""
373 sudo ip link set ceph-brx down
374 sudo ip link delete ceph-brx
375 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
377 # Drop the iptables NAT rules
378 ip
= IP(self
.ceph_brx_net
)[-2]
379 mask
= self
.ceph_brx_net
.split('/')[1]
381 p
= self
.client_remote
.run(args
=['route'], stderr
=StringIO(),
382 stdout
=StringIO(), timeout
=(5*60))
383 p
= re
.findall(r
'default .*', p
.stdout
.getvalue())
385 raise RuntimeError("No default gw found")
387 self
.run_shell_payload(f
"""
389 sudo iptables -D FORWARD -o {gw} -i ceph-brx -j ACCEPT
390 sudo iptables -D FORWARD -i {gw} -o ceph-brx -j ACCEPT
391 sudo iptables -t nat -D POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
392 """, timeout
=(5*60), omit_sudo
=False, cwd
='/')
394 def setup_netns(self
):
396 Setup the netns for the mountpoint.
398 log
.info("Setting the '{0}' netns for '{1}'".format(self
._netns
_name
, self
.mountpoint
))
399 self
._setup
_brx
_and
_nat
()
402 def cleanup_netns(self
):
404 Cleanup the netns for the mountpoint.
406 # We will defer cleaning the netnses and bridge until the last
407 # mountpoint is unmounted, this will be a temporary work around
410 # log.info("Cleaning the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
411 # self._cleanup_netns()
412 # self._cleanup_brx_and_nat()
414 def suspend_netns(self
):
416 Suspend the netns veth interface.
421 log
.info("Suspending the '{0}' netns for '{1}'".format(self
._netns
_name
, self
.mountpoint
))
423 args
= ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self
.nsid
), 'down']
424 self
.client_remote
.run(args
=args
, timeout
=(5*60), omit_sudo
=False)
426 def resume_netns(self
):
428 Resume the netns veth interface.
433 log
.info("Resuming the '{0}' netns for '{1}'".format(self
._netns
_name
, self
.mountpoint
))
435 args
= ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self
.nsid
), 'up']
436 self
.client_remote
.run(args
=args
, timeout
=(5*60), omit_sudo
=False)
438 def mount(self
, mntopts
=[], check_status
=True, **kwargs
):
440 kwargs expects its members to be same as the arguments accepted by
443 raise NotImplementedError()
445 def mount_wait(self
, **kwargs
):
447 Accepts arguments same as self.mount().
450 self
.wait_until_mounted()
453 raise NotImplementedError()
455 def umount_wait(self
, force
=False, require_clean
=False, timeout
=None):
458 :param force: Expect that the mount will not shutdown cleanly: kill
460 :param require_clean: Wait for the Ceph client associated with the
461 mount (e.g. ceph-fuse) to terminate, and
462 raise if it doesn't do so cleanly.
463 :param timeout: amount of time to be waited for umount command to finish
466 raise NotImplementedError()
468 def _verify_attrs(self
, **kwargs
):
470 Verify that client_id, client_keyring_path, client_remote, hostfs_mntpt,
471 cephfs_name, cephfs_mntpt are either type str or None.
473 for k
, v
in kwargs
.items():
474 if v
is not None and not isinstance(v
, str):
475 raise RuntimeError('value of attributes should be either str '
476 f
'or None. {k} - {v}')
478 def update_attrs(self
, client_id
=None, client_keyring_path
=None,
479 client_remote
=None, hostfs_mntpt
=None, cephfs_name
=None,
481 if not (client_id
or client_keyring_path
or client_remote
or
482 cephfs_name
or cephfs_mntpt
or hostfs_mntpt
):
485 self
._verify
_attrs
(client_id
=client_id
,
486 client_keyring_path
=client_keyring_path
,
487 hostfs_mntpt
=hostfs_mntpt
, cephfs_name
=cephfs_name
,
488 cephfs_mntpt
=cephfs_mntpt
)
491 self
.client_id
= client_id
492 if client_keyring_path
:
493 self
.client_keyring_path
= client_keyring_path
495 self
.client_remote
= client_remote
497 self
.hostfs_mntpt
= hostfs_mntpt
499 self
.cephfs_name
= cephfs_name
501 self
.cephfs_mntpt
= cephfs_mntpt
503 def remount(self
, **kwargs
):
505 Update mount object's attributes and attempt remount with these
506 new values for these attrbiutes.
508 1. Run umount_wait().
509 2. Run update_attrs().
512 Accepts arguments of self.mount() and self.update_attrs() with 1
513 exception: wait accepted too which can be True or False.
516 assert not self
.mounted
518 mntopts
= kwargs
.pop('mntopts', [])
519 check_status
= kwargs
.pop('check_status', True)
520 wait
= kwargs
.pop('wait', True)
522 self
.update_attrs(**kwargs
)
524 retval
= self
.mount(mntopts
=mntopts
, check_status
=check_status
)
525 # avoid this scenario (again): mount command might've failed and
526 # check_status might have silenced the exception, yet we attempt to
527 # wait which might lead to an error.
528 if retval
is None and wait
:
529 self
.wait_until_mounted()
535 Suspend the netns veth interface to make the client disconnected
536 from the ceph cluster
538 log
.info('Killing connection on {0}...'.format(self
.client_remote
.name
))
541 def kill_cleanup(self
):
543 Follow up ``kill`` to get to a clean unmounted state.
545 log
.info('Cleaning up killed connection on {0}'.format(self
.client_remote
.name
))
546 self
.umount_wait(force
=True)
550 Remove the mount point.
552 Prerequisite: the client is not mounted.
554 log
.info('Cleaning up mount {0}'.format(self
.client_remote
.name
))
557 self
.client_remote
.run(args
=['rmdir', '--', self
.mountpoint
],
558 cwd
=self
.test_dir
, stderr
=stderr
,
559 timeout
=(60*5), check_status
=False)
560 except CommandFailedError
:
561 if "no such file or directory" not in stderr
.getvalue().lower():
566 def wait_until_mounted(self
):
567 raise NotImplementedError()
569 def get_keyring_path(self
):
570 # N.B.: default keyring is /etc/ceph/ceph.keyring; see ceph.py and generate_caps
571 return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self
.client_id
)
573 def get_key_from_keyfile(self
):
574 # XXX: don't call run_shell(), since CephFS might be unmounted.
575 keyring
= self
.client_remote
.run(
576 args
=['sudo', 'cat', self
.client_keyring_path
], stdout
=StringIO(),
577 omit_sudo
=False).stdout
.getvalue()
578 for line
in keyring
.split('\n'):
579 if line
.find('key') != -1:
580 return line
[line
.find('=') + 1 : ].strip()
583 def config_path(self
):
585 Path to ceph.conf: override this if you're not a normal systemwide ceph install
588 return "/etc/ceph/ceph.conf"
591 def mounted_wait(self
):
593 A context manager, from an initially unmounted state, to mount
594 this, yield, and then unmount and clean up.
597 self
.wait_until_mounted()
603 def create_file(self
, filename
='testfile', dirname
=None, user
=None,
605 assert(self
.is_mounted())
607 if not os
.path
.isabs(filename
):
609 if os
.path
.isabs(dirname
):
610 path
= os
.path
.join(dirname
, filename
)
612 path
= os
.path
.join(self
.hostfs_mntpt
, dirname
, filename
)
614 path
= os
.path
.join(self
.hostfs_mntpt
, filename
)
619 args
= ['sudo', '-u', user
, '-s', '/bin/bash', '-c', 'touch ' + path
]
621 args
= 'touch ' + path
623 return self
.client_remote
.run(args
=args
, check_status
=check_status
)
625 def create_files(self
):
626 assert(self
.is_mounted())
628 for suffix
in self
.test_files
:
629 log
.info("Creating file {0}".format(suffix
))
630 self
.client_remote
.run(args
=[
631 'touch', os
.path
.join(self
.hostfs_mntpt
, suffix
)
634 def test_create_file(self
, filename
='testfile', dirname
=None, user
=None,
636 return self
.create_file(filename
=filename
, dirname
=dirname
, user
=user
,
639 def check_files(self
):
640 assert(self
.is_mounted())
642 for suffix
in self
.test_files
:
643 log
.info("Checking file {0}".format(suffix
))
644 r
= self
.client_remote
.run(args
=[
645 'ls', os
.path
.join(self
.hostfs_mntpt
, suffix
)
646 ], check_status
=False)
647 if r
.exitstatus
!= 0:
648 raise RuntimeError("Expected file {0} not found".format(suffix
))
650 def write_file(self
, path
, data
, perms
=None):
652 Write the given data at the given path and set the given perms to the
655 if path
.find(self
.hostfs_mntpt
) == -1:
656 path
= os
.path
.join(self
.hostfs_mntpt
, path
)
658 write_file(self
.client_remote
, path
, data
)
661 self
.run_shell(args
=f
'chmod {perms} {path}')
663 def read_file(self
, path
):
665 Return the data from the file on given path.
667 if path
.find(self
.hostfs_mntpt
) == -1:
668 path
= os
.path
.join(self
.hostfs_mntpt
, path
)
670 return self
.run_shell(args
=['cat', path
]).\
671 stdout
.getvalue().strip()
673 def create_destroy(self
):
674 assert(self
.is_mounted())
676 filename
= "{0} {1}".format(datetime
.datetime
.now(), self
.client_id
)
677 log
.debug("Creating test file {0}".format(filename
))
678 self
.client_remote
.run(args
=[
679 'touch', os
.path
.join(self
.hostfs_mntpt
, filename
)
681 log
.debug("Deleting test file {0}".format(filename
))
682 self
.client_remote
.run(args
=[
683 'rm', '-f', os
.path
.join(self
.hostfs_mntpt
, filename
)
686 def _run_python(self
, pyscript
, py_version
='python3', sudo
=False):
690 args
+= ['adjust-ulimits', 'daemon-helper', 'kill', py_version
, '-c', pyscript
]
691 return self
.client_remote
.run(args
=args
, wait
=False, stdin
=run
.PIPE
, stdout
=StringIO())
693 def run_python(self
, pyscript
, py_version
='python3', sudo
=False):
694 p
= self
._run
_python
(pyscript
, py_version
, sudo
=sudo
)
696 return p
.stdout
.getvalue().strip()
698 def run_shell(self
, args
, timeout
=900, **kwargs
):
699 args
= args
.split() if isinstance(args
, str) else args
700 omit_sudo
= kwargs
.pop('omit_sudo', False)
701 sudo
= kwargs
.pop('sudo', False)
702 cwd
= kwargs
.pop('cwd', self
.mountpoint
)
703 stdout
= kwargs
.pop('stdout', StringIO())
704 stderr
= kwargs
.pop('stderr', StringIO())
707 args
.insert(0, 'sudo')
709 return self
.client_remote
.run(args
=args
, cwd
=cwd
, timeout
=timeout
,
710 stdout
=stdout
, stderr
=stderr
,
711 omit_sudo
=omit_sudo
, **kwargs
)
713 def run_shell_payload(self
, payload
, **kwargs
):
714 return self
.run_shell(["bash", "-c", Raw(f
"'{payload}'")], **kwargs
)
716 def run_as_user(self
, **kwargs
):
718 Besides the arguments defined for run_shell() this method also
719 accepts argument 'user'.
721 args
= kwargs
.pop('args')
722 user
= kwargs
.pop('user')
723 if isinstance(args
, str):
724 args
= ['sudo', '-u', user
, '-s', '/bin/bash', '-c', args
]
725 elif isinstance(args
, list):
730 # get rid of extra space at the end.
733 args
= ['sudo', '-u', user
, '-s', '/bin/bash', '-c', cmd
]
735 kwargs
['args'] = args
736 return self
.run_shell(**kwargs
)
738 def run_as_root(self
, **kwargs
):
740 Accepts same arguments as run_shell().
742 kwargs
['user'] = 'root'
743 return self
.run_as_user(**kwargs
)
745 def _verify(self
, proc
, retval
=None, errmsg
=None):
747 msg
= ('expected return value: {}\nreceived return value: '
748 '{}\n'.format(retval
, proc
.returncode
))
749 assert proc
.returncode
== retval
, msg
752 stderr
= proc
.stderr
.getvalue().lower()
753 msg
= ('didn\'t find given string in stderr -\nexpected string: '
754 '{}\nreceived error message: {}\nnote: received error '
755 'message is converted to lowercase'.format(errmsg
, stderr
))
756 assert errmsg
in stderr
, msg
758 def negtestcmd(self
, args
, retval
=None, errmsg
=None, stdin
=None,
759 cwd
=None, wait
=True):
761 Conduct a negative test for the given command.
763 retval and errmsg are parameters to confirm the cause of command
766 proc
= self
.run_shell(args
=args
, wait
=wait
, stdin
=stdin
, cwd
=cwd
,
768 self
._verify
(proc
, retval
, errmsg
)
771 def negtestcmd_as_user(self
, args
, user
, retval
=None, errmsg
=None,
772 stdin
=None, cwd
=None, wait
=True):
773 proc
= self
.run_as_user(args
=args
, user
=user
, wait
=wait
, stdin
=stdin
,
774 cwd
=cwd
, check_status
=False)
775 self
._verify
(proc
, retval
, errmsg
)
778 def negtestcmd_as_root(self
, args
, retval
=None, errmsg
=None, stdin
=None,
779 cwd
=None, wait
=True):
780 proc
= self
.run_as_root(args
=args
, wait
=wait
, stdin
=stdin
, cwd
=cwd
,
782 self
._verify
(proc
, retval
, errmsg
)
785 def open_no_data(self
, basename
):
787 A pure metadata operation
789 assert(self
.is_mounted())
791 path
= os
.path
.join(self
.hostfs_mntpt
, basename
)
793 p
= self
._run
_python
(dedent(
795 f = open("{path}", 'w')
796 """.format(path
=path
)
800 def open_background(self
, basename
="background_file", write
=True):
802 Open a file for writing, then block such that the client
803 will hold a capability.
805 Don't return until the remote process has got as far as opening
806 the file, then return the RemoteProcess instance.
808 assert(self
.is_mounted())
810 path
= os
.path
.join(self
.hostfs_mntpt
, basename
)
813 pyscript
= dedent("""
816 with open("{path}", 'w') as f:
822 """).format(path
=path
)
824 pyscript
= dedent("""
827 with open("{path}", 'r') as f:
830 """).format(path
=path
)
832 rproc
= self
._run
_python
(pyscript
)
833 self
.background_procs
.append(rproc
)
835 # This wait would not be sufficient if the file had already
836 # existed, but it's simple and in practice users of open_background
837 # are not using it on existing files.
838 self
.wait_for_visible(basename
)
842 def open_dir_background(self
, basename
):
844 Create and hold a capability to a directory.
846 assert(self
.is_mounted())
848 path
= os
.path
.join(self
.hostfs_mntpt
, basename
)
850 pyscript
= dedent("""
855 fd = os.open("{path}", os.O_RDONLY)
858 """).format(path
=path
)
860 rproc
= self
._run
_python
(pyscript
)
861 self
.background_procs
.append(rproc
)
863 self
.wait_for_visible(basename
)
867 def wait_for_dir_empty(self
, dirname
, timeout
=30):
868 dirpath
= os
.path
.join(self
.hostfs_mntpt
, dirname
)
869 with
safe_while(sleep
=5, tries
=(timeout
//5)) as proceed
:
871 p
= self
.run_shell_payload(f
"stat -c %h {dirpath}")
872 nr_links
= int(p
.stdout
.getvalue().strip())
876 def wait_for_visible(self
, basename
="background_file", timeout
=30):
879 r
= self
.client_remote
.run(args
=[
880 'stat', os
.path
.join(self
.hostfs_mntpt
, basename
)
881 ], check_status
=False)
882 if r
.exitstatus
== 0:
883 log
.debug("File {0} became visible from {1} after {2}s".format(
884 basename
, self
.client_id
, i
))
890 raise RuntimeError("Timed out after {0}s waiting for {1} to become visible from {2}".format(
891 i
, basename
, self
.client_id
))
893 def lock_background(self
, basename
="background_file", do_flock
=True):
895 Open and lock a files for writing, hold the lock in a background process
897 assert(self
.is_mounted())
899 path
= os
.path
.join(self
.hostfs_mntpt
, basename
)
906 script_builder
+= """
907 f1 = open("{path}-1", 'w')
908 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)"""
909 script_builder
+= """
910 f2 = open("{path}-2", 'w')
911 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
912 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
917 pyscript
= dedent(script_builder
).format(path
=path
)
919 log
.info("lock_background file {0}".format(basename
))
920 rproc
= self
._run
_python
(pyscript
)
921 self
.background_procs
.append(rproc
)
924 def lock_and_release(self
, basename
="background_file"):
925 assert(self
.is_mounted())
927 path
= os
.path
.join(self
.hostfs_mntpt
, basename
)
933 f1 = open("{path}-1", 'w')
934 fcntl.flock(f1, fcntl.LOCK_EX)
935 f2 = open("{path}-2", 'w')
936 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
937 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
939 pyscript
= dedent(script
).format(path
=path
)
941 log
.info("lock_and_release file {0}".format(basename
))
942 return self
._run
_python
(pyscript
)
944 def check_filelock(self
, basename
="background_file", do_flock
=True):
945 assert(self
.is_mounted())
947 path
= os
.path
.join(self
.hostfs_mntpt
, basename
)
954 script_builder
+= """
955 f1 = open("{path}-1", 'r')
957 fcntl.flock(f1, fcntl.LOCK_EX | fcntl.LOCK_NB)
959 if e.errno == errno.EAGAIN:
962 raise RuntimeError("flock on file {path}-1 not found")"""
963 script_builder
+= """
964 f2 = open("{path}-2", 'r')
966 lockdata = struct.pack('hhllhh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
967 fcntl.fcntl(f2, fcntl.F_SETLK, lockdata)
969 if e.errno == errno.EAGAIN:
972 raise RuntimeError("posix lock on file {path}-2 not found")
974 pyscript
= dedent(script_builder
).format(path
=path
)
976 log
.info("check lock on file {0}".format(basename
))
977 self
.client_remote
.run(args
=[
978 'python3', '-c', pyscript
981 def write_background(self
, basename
="background_file", loop
=False):
983 Open a file for writing, complete as soon as you can
987 assert(self
.is_mounted())
989 path
= os
.path
.join(self
.hostfs_mntpt
, basename
)
991 pyscript
= dedent("""
995 fd = os.open("{path}", os.O_RDWR | os.O_CREAT, 0o644)
998 os.write(fd, b'content')
1002 except IOError as e:
1005 """).format(path
=path
, loop
=str(loop
))
1007 rproc
= self
._run
_python
(pyscript
)
1008 self
.background_procs
.append(rproc
)
1011 def write_n_mb(self
, filename
, n_mb
, seek
=0, wait
=True):
1013 Write the requested number of megabytes to a file
1015 assert(self
.is_mounted())
1017 return self
.run_shell(["dd", "if=/dev/urandom", "of={0}".format(filename
),
1018 "bs=1M", "conv=fdatasync",
1019 "count={0}".format(int(n_mb
)),
1020 "seek={0}".format(int(seek
))
1023 def write_test_pattern(self
, filename
, size
):
1024 log
.info("Writing {0} bytes to {1}".format(size
, filename
))
1025 return self
.run_python(dedent("""
1028 with open(path, 'w') as f:
1029 for i in range(0, {size}):
1030 val = zlib.crc32(str(i).encode('utf-8')) & 7
1033 path
=os
.path
.join(self
.hostfs_mntpt
, filename
),
1037 def validate_test_pattern(self
, filename
, size
):
1038 log
.info("Validating {0} bytes from {1}".format(size
, filename
))
1039 # Use sudo because cephfs-data-scan may recreate the file with owner==root
1040 return self
.run_python(dedent("""
1043 with open(path, 'r') as f:
1045 if len(bytes) != {size}:
1046 raise RuntimeError("Bad length {{0}} vs. expected {{1}}".format(
1049 for i, b in enumerate(bytes):
1050 val = zlib.crc32(str(i).encode('utf-8')) & 7
1052 raise RuntimeError("Bad data at offset {{0}}".format(i))
1054 path
=os
.path
.join(self
.hostfs_mntpt
, filename
),
1058 def open_n_background(self
, fs_path
, count
):
1060 Open N files for writing, hold them open in a background process
1062 :param fs_path: Path relative to CephFS root, e.g. "foo/bar"
1063 :return: a RemoteProcess
1065 assert(self
.is_mounted())
1067 abs_path
= os
.path
.join(self
.hostfs_mntpt
, fs_path
)
1069 pyscript
= dedent("""
1075 abs_path = "{abs_path}"
1077 if not os.path.exists(abs_path):
1078 os.makedirs(abs_path)
1081 for i in range(0, n):
1082 fname = "file_"+str(i)
1083 path = os.path.join(abs_path, fname)
1084 handles.append(open(path, 'w'))
1088 """).format(abs_path
=abs_path
, count
=count
)
1090 rproc
= self
._run
_python
(pyscript
)
1091 self
.background_procs
.append(rproc
)
1094 def create_n_files(self
, fs_path
, count
, sync
=False, dirsync
=False, unlink
=False, finaldirsync
=False):
1098 :param sync: sync the file after writing
1099 :param dirsync: sync the containing directory after closing the file
1100 :param unlink: unlink the file after closing
1101 :param finaldirsync: sync the containing directory after closing the last file
1104 assert(self
.is_mounted())
1106 abs_path
= os
.path
.join(self
.hostfs_mntpt
, fs_path
)
1108 pyscript
= dedent(f
"""
1114 dpath = os.path.dirname(path)
1115 fnameprefix = os.path.basename(path)
1116 os.makedirs(dpath, exist_ok=True)
1119 dirfd = os.open(dpath, os.O_DIRECTORY)
1122 fpath = os.path.join(dpath, f"{{fnameprefix}}_{{i}}")
1123 with open(fpath, 'w') as f:
1127 os.fsync(f.fileno())
1138 self
.run_python(pyscript
)
1141 for p
in self
.background_procs
:
1142 log
.info("Terminating background process")
1143 self
._kill
_background
(p
)
1145 self
.background_procs
= []
1147 def _kill_background(self
, p
):
1152 except (CommandFailedError
, ConnectionLostError
):
1155 def kill_background(self
, p
):
1157 For a process that was returned by one of the _background member functions,
1160 self
._kill
_background
(p
)
1161 self
.background_procs
.remove(p
)
1163 def send_signal(self
, signal
):
1164 signal
= signal
.lower()
1165 if signal
.lower() not in ['sigstop', 'sigcont', 'sigterm', 'sigkill']:
1166 raise NotImplementedError
1168 self
.client_remote
.run(args
=['sudo', 'kill', '-{0}'.format(signal
),
1169 self
.client_pid
], omit_sudo
=False)
1171 def get_global_id(self
):
1172 raise NotImplementedError()
1174 def get_global_inst(self
):
1175 raise NotImplementedError()
1177 def get_global_addr(self
):
1178 raise NotImplementedError()
1180 def get_osd_epoch(self
):
1181 raise NotImplementedError()
1183 def get_op_read_count(self
):
1184 raise NotImplementedError()
1186 def readlink(self
, fs_path
):
1187 abs_path
= os
.path
.join(self
.hostfs_mntpt
, fs_path
)
1189 pyscript
= dedent("""
1192 print(os.readlink("{path}"))
1193 """).format(path
=abs_path
)
1195 proc
= self
._run
_python
(pyscript
)
1197 return str(proc
.stdout
.getvalue().strip())
1200 def lstat(self
, fs_path
, follow_symlinks
=False, wait
=True):
1201 return self
.stat(fs_path
, follow_symlinks
=False, wait
=True)
1203 def stat(self
, fs_path
, follow_symlinks
=True, wait
=True, **kwargs
):
1205 stat a file, and return the result as a dictionary like this:
1207 "st_ctime": 1414161137.0,
1208 "st_mtime": 1414161137.0,
1216 "st_atime": 1431520593.0
1219 Raises exception on absent file.
1221 abs_path
= os
.path
.join(self
.hostfs_mntpt
, fs_path
)
1223 stat_call
= "os.stat('" + abs_path
+ "')"
1225 stat_call
= "os.lstat('" + abs_path
+ "')"
1227 pyscript
= dedent("""
1235 except OSError as e:
1238 attrs = ["st_mode", "st_ino", "st_dev", "st_nlink", "st_uid", "st_gid", "st_size", "st_atime", "st_mtime", "st_ctime"]
1240 dict([(a, getattr(s, a)) for a in attrs]),
1242 """).format(stat_call
=stat_call
)
1243 proc
= self
._run
_python
(pyscript
, **kwargs
)
1246 return json
.loads(proc
.stdout
.getvalue().strip())
1250 def touch(self
, fs_path
):
1252 Create a dentry if it doesn't already exist. This python
1253 implementation exists because the usual command line tool doesn't
1254 pass through error codes like EIO.
1259 abs_path
= os
.path
.join(self
.hostfs_mntpt
, fs_path
)
1260 pyscript
= dedent("""
1265 f = open("{path}", "w")
1267 except IOError as e:
1269 """).format(path
=abs_path
)
1270 proc
= self
._run
_python
(pyscript
)
1273 def path_to_ino(self
, fs_path
, follow_symlinks
=True):
1274 abs_path
= os
.path
.join(self
.hostfs_mntpt
, fs_path
)
1277 pyscript
= dedent("""
1281 print(os.stat("{path}").st_ino)
1282 """).format(path
=abs_path
)
1284 pyscript
= dedent("""
1288 print(os.lstat("{path}").st_ino)
1289 """).format(path
=abs_path
)
1291 proc
= self
._run
_python
(pyscript
)
1293 return int(proc
.stdout
.getvalue().strip())
1295 def path_to_nlink(self
, fs_path
):
1296 abs_path
= os
.path
.join(self
.hostfs_mntpt
, fs_path
)
1298 pyscript
= dedent("""
1302 print(os.stat("{path}").st_nlink)
1303 """).format(path
=abs_path
)
1305 proc
= self
._run
_python
(pyscript
)
1307 return int(proc
.stdout
.getvalue().strip())
1309 def ls(self
, path
=None, **kwargs
):
1311 Wrap ls: return a list of strings
1317 ls_text
= self
.run_shell(cmd
, **kwargs
).stdout
.getvalue().strip()
1320 return ls_text
.split("\n")
1322 # Special case because otherwise split on empty string
1323 # gives you [''] instead of []
1326 def setfattr(self
, path
, key
, val
, **kwargs
):
1330 :param path: relative to mount point
1331 :param key: xattr name
1332 :param val: xattr value
1335 self
.run_shell(["setfattr", "-n", key
, "-v", val
, path
], **kwargs
)
1337 def getfattr(self
, path
, attr
, **kwargs
):
1339 Wrap getfattr: return the values of a named xattr on one file, or
1340 None if the attribute is not found.
1344 p
= self
.run_shell(["getfattr", "--only-values", "-n", attr
, path
], wait
=False, **kwargs
)
1347 except CommandFailedError
as e
:
1348 if e
.exitstatus
== 1 and "No such attribute" in p
.stderr
.getvalue():
1353 return str(p
.stdout
.getvalue())
1357 Wrap df: return a dict of usage fields in bytes
1360 p
= self
.run_shell(["df", "-B1", "."])
1361 lines
= p
.stdout
.getvalue().strip().split("\n")
1362 fs
, total
, used
, avail
= lines
[1].split()[:4]
1366 "total": int(total
),
1368 "available": int(avail
)
1371 def dir_checksum(self
, path
=None, follow_symlinks
=False):
1377 cmd
.extend(["-type", "f", "-exec", "md5sum", "{}", "+"])
1378 checksum_text
= self
.run_shell(cmd
).stdout
.getvalue().strip()
1379 checksum_sorted
= sorted(checksum_text
.split('\n'), key
=lambda v
: v
.split()[1])
1380 return hashlib
.md5(('\n'.join(checksum_sorted
)).encode('utf-8')).hexdigest()