"""
-Before running this testsuite, add path to cephfs-shell module to $PATH and
-export $PATH.
+NOTE: For running this tests locally (using vstart_runner.py), export the
+path to src/tools/cephfs/shell/cephfs-shell module to $PATH. Running
+"export PATH=$PATH:$(cd ../src/tools/cephfs/shell && pwd)" from the build dir
+will update the environment without hassles of typing the path correctly.
"""
-from io import BytesIO
+from io import StringIO
from os import path
import crypt
import logging
from tempfile import mkstemp as tempfile_mkstemp
import math
-from six import ensure_str
-from sys import version_info as sys_version_info
-from re import search as re_search
from time import sleep
from tasks.cephfs.cephfs_test_case import CephFSTestCase
-from teuthology.misc import sudo_write_file
-from teuthology.orchestra.run import CommandFailedError
+from teuthology.exceptions import CommandFailedError
+from textwrap import dedent
log = logging.getLogger(__name__)
+
def humansize(nbytes):
suffixes = ['B', 'K', 'M', 'G', 'T', 'P']
i = 0
- while nbytes >= 1024 and i < len(suffixes)-1:
+ while nbytes >= 1024 and i < len(suffixes) - 1:
nbytes /= 1024.
i += 1
nbytes = math.ceil(nbytes)
f = ('%d' % nbytes).rstrip('.')
return '%s%s' % (f, suffixes[i])
-def str_to_bool(val):
- val = val.strip()
- trueval = ['true', 'yes', 'y', '1']
- return True if val == 1 or val.lower() in trueval else False
+
+def ensure_str(s):
+ if isinstance(s, str):
+ return s
+ if isinstance(s, bytes):
+ return s.decode()
+ raise TypeError("not expecting type '%s'" % type(s))
+
class TestCephFSShell(CephFSTestCase):
CLIENTS_REQUIRED = 1
- def run_cephfs_shell_cmd(self, cmd, mount_x=None, opts=None, stdin=None, config_path=None):
+ def setUp(self):
+ super(TestCephFSShell, self).setUp()
+
+ conf_contents = "[cephfs-shell]\ncolors = False\ndebug = True\n"
+ confpath = self.mount_a.client_remote.sh('mktemp').strip()
+ self.mount_a.client_remote.write_file(confpath, conf_contents)
+ self.default_shell_conf_path = confpath
+
+ def run_cephfs_shell_cmd(self, cmd, mount_x=None, shell_conf_path=None,
+ opts=None, stdout=None, stderr=None, stdin=None,
+ check_status=True):
+ stdout = stdout or StringIO()
+ stderr = stderr or StringIO()
if mount_x is None:
mount_x = self.mount_a
- if config_path is None:
- config_path = self.mount_a.config_path
-
if isinstance(cmd, list):
cmd = " ".join(cmd)
+ if not shell_conf_path:
+ shell_conf_path = self.default_shell_conf_path
- args = ["cephfs-shell", "-c", config_path]
-
- if opts is not None:
- args.extend(opts)
-
+ args = ["cephfs-shell", "-c", shell_conf_path]
+ if opts:
+ args += opts
args.extend(("--", cmd))
log.info("Running command: {}".format(" ".join(args)))
- return mount_x.client_remote.run(args=args, stdout=BytesIO(),
- stderr=BytesIO(), stdin=stdin)
+ return mount_x.client_remote.run(args=args, stdout=stdout,
+ stderr=stderr, stdin=stdin,
+ check_status=check_status)
- def get_cephfs_shell_cmd_error(self, cmd, mount_x=None, opts=None,
- stdin=None):
- return ensure_str(self.run_cephfs_shell_cmd(cmd, mount_x, opts, stdin).stderr.\
- getvalue().strip())
+ def negtest_cephfs_shell_cmd(self, **kwargs):
+ """
+ This method verifies that cephfs shell command fails with expected
+ return value and/or error message.
+
+ kwargs is expected to hold the arguments same as
+ run_cephfs_shell_cmd() with the following exceptions -
+ * It should not contain check_status (since commands are expected
+ to fail, check_status is hardcoded to False).
+ * It is optional to set expected error message and return value to
+ dict members 'errmsg' and 'retval' respectively.
- def get_cephfs_shell_cmd_output(self, cmd, mount_x=None, opts=None,
- stdin=None, config_path=None):
- return ensure_str(self.run_cephfs_shell_cmd(cmd, mount_x, opts, stdin,
- config_path).\
- stdout.getvalue().strip())
+ This method servers as shorthand for codeblocks like -
+
+ try:
+ proc = self.run_cephfs_shell_cmd(args=['some', 'cmd'],
+ check_status=False,
+ stdout=stdout)
+ except CommandFailedError as e:
+ self.assertNotIn('some error message',
+ proc.stderr.getvalue.lower())
- def get_cephfs_shell_script_output(self, script, mount_x=None, stdin=None):
- return ensure_str(self.run_cephfs_shell_script(script, mount_x, stdin).stdout.\
- getvalue().strip())
- def run_cephfs_shell_script(self, script, mount_x=None, stdin=None):
+ try:
+ proc = self.run_cephfs_shell_cmd(args=['some', 'cmd'],
+ check_status=False,
+ stdout=stdout)
+ except CommandFailedError as e:
+ self.assertNotEqual(1, proc.returncode)
+ """
+ retval = kwargs.pop('retval', None)
+ errmsg = kwargs.pop('errmsg', None)
+ kwargs['check_status'] = False
+
+ proc = self.run_cephfs_shell_cmd(**kwargs)
+ if retval:
+ self.assertEqual(proc.returncode, retval)
+ else:
+ self.assertNotEqual(proc.returncode, 0)
+ if errmsg:
+ self.assertIn(errmsg, proc.stderr.getvalue().lower())
+
+ return proc
+
+ def get_cephfs_shell_cmd_output(self, cmd, mount_x=None,
+ shell_conf_path=None, opts=None,
+ stdout=None, stdin=None,
+ check_status=True):
+ return ensure_str(self.run_cephfs_shell_cmd(
+ cmd=cmd, mount_x=mount_x, shell_conf_path=shell_conf_path,
+ opts=opts, stdout=stdout, stdin=stdin,
+ check_status=check_status).stdout.getvalue().strip())
+
+ def get_cephfs_shell_cmd_error(self, cmd, mount_x=None,
+ shell_conf_path=None, opts=None,
+ stderr=None, stdin=None, check_status=True):
+ return ensure_str(self.run_cephfs_shell_cmd(
+ cmd=cmd, mount_x=mount_x, shell_conf_path=shell_conf_path,
+ opts=opts, stderr=stderr, stdin=stdin,
+ check_status=check_status).stderr.getvalue().strip())
+
+ def run_cephfs_shell_script(self, script, mount_x=None,
+ shell_conf_path=None, opts=None, stdout=None,
+ stderr=None, stdin=None, check_status=True):
+ stdout = stdout or StringIO()
+ stderr = stderr or StringIO()
if mount_x is None:
mount_x = self.mount_a
scriptfile.write(script)
# copy script to the machine running cephfs-shell.
mount_x.client_remote.put_file(scriptpath, scriptpath)
- mount_x.run_shell('chmod 755 ' + scriptpath)
+ mount_x.run_shell_payload(f"chmod 755 {scriptpath}")
- args = ["cephfs-shell", "-c", mount_x.config_path, '-b', scriptpath]
+ args = ["cephfs-shell", '-b', scriptpath]
+ if shell_conf_path:
+ args[1:1] = ["-c", shell_conf_path]
log.info('Running script \"' + scriptpath + '\"')
- return mount_x.client_remote.run(args=args, stdout=BytesIO(),
- stderr=BytesIO(), stdin=stdin)
+ return mount_x.client_remote.run(args=args, stdout=stdout,
+ stderr=stderr, stdin=stdin,
+ check_status=True)
+
+ def get_cephfs_shell_script_output(self, script, mount_x=None,
+ shell_conf_path=None, opts=None,
+ stdout=None, stdin=None,
+ check_status=True):
+ return ensure_str(self.run_cephfs_shell_script(
+ script=script, mount_x=mount_x, shell_conf_path=shell_conf_path,
+ opts=opts, stdout=stdout, stdin=stdin,
+ check_status=check_status).stdout.getvalue().strip())
+
+
+class TestGeneric(TestCephFSShell):
+
+ def test_mistyped_cmd(self):
+ with self.assertRaises(CommandFailedError) as cm:
+ self.run_cephfs_shell_cmd('lsx')
+ self.assertEqual(cm.exception.exitstatus, 127)
+
class TestMkdir(TestCephFSShell):
def test_mkdir(self):
o = self.mount_a.stat('d1')
log.info("mount_a output:\n{}".format(o))
- def test_mkdir_with_07000_octal_mode(self):
+ def test_mkdir_with_070000_octal_mode(self):
"""
- Test that mkdir fails with octal mode greater than 0777
+ Test that mkdir fails with octal mode greater than 07777
"""
- o = self.get_cephfs_shell_cmd_output("mkdir -m 07000 d2")
- log.info("cephfs-shell output:\n{}".format(o))
-
- # mkdir d2 should fail
+ self.negtest_cephfs_shell_cmd(cmd="mkdir -m 070000 d2")
try:
- o = self.mount_a.stat('d2')
- log.info("mount_a output:\n{}".format(o))
- except:
+ self.mount_a.stat('d2')
+ except CommandFailedError:
pass
def test_mkdir_with_negative_octal_mode(self):
"""
Test that mkdir fails with negative octal mode
"""
- o = self.get_cephfs_shell_cmd_output("mkdir -m -0755 d3")
- log.info("cephfs-shell output:\n{}".format(o))
-
- # mkdir d3 should fail
+ self.negtest_cephfs_shell_cmd(cmd="mkdir -m -0755 d3")
try:
- o = self.mount_a.stat('d3')
- log.info("mount_a output:\n{}".format(o))
- except:
+ self.mount_a.stat('d3')
+ except CommandFailedError:
pass
def test_mkdir_with_non_octal_mode(self):
# mkdir d4 should pass
o = self.mount_a.stat('d4')
- assert((o['st_mode'] & 0o700) == 0o700)
+ assert ((o['st_mode'] & 0o700) == 0o700)
def test_mkdir_with_bad_non_octal_mode(self):
"""
Test that mkdir failes with bad non-octal mode
"""
- o = self.get_cephfs_shell_cmd_output("mkdir -m ugx=0755 d5")
- log.info("cephfs-shell output:\n{}".format(o))
-
- # mkdir d5 should fail
+ self.negtest_cephfs_shell_cmd(cmd="mkdir -m ugx=0755 d5")
try:
- o = self.mount_a.stat('d5')
- log.info("mount_a output:\n{}".format(o))
- except:
+ self.mount_a.stat('d5')
+ except CommandFailedError:
pass
def test_mkdir_path_without_path_option(self):
"""
Test that mkdir fails without path option for creating path
"""
- o = self.get_cephfs_shell_cmd_output("mkdir d5/d6/d7")
- log.info("cephfs-shell output:\n{}".format(o))
-
- # mkdir d5/d6/d7 should fail
+ self.negtest_cephfs_shell_cmd(cmd="mkdir d5/d6/d7")
try:
- o = self.mount_a.stat('d5/d6/d7')
- log.info("mount_a output:\n{}".format(o))
- except:
+ self.mount_a.stat('d5/d6/d7')
+ except CommandFailedError:
pass
def test_mkdir_path_with_path_option(self):
o = self.mount_a.stat('d5/d6/d7')
log.info("mount_a output:\n{}".format(o))
+
class TestRmdir(TestCephFSShell):
dir_name = "test_dir"
try:
self.mount_a.stat(self.dir_name)
except CommandFailedError as e:
- if e.exitstatus == 2:
+ if e.exitstatus == 2:
return 0
raise
Test that rmdir deletes directory
"""
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
- self.run_cephfs_shell_cmd("rmdir "+ self.dir_name)
+ self.run_cephfs_shell_cmd("rmdir " + self.dir_name)
self.dir_does_not_exists()
def test_rmdir_non_existing_dir(self):
"""
Test that rmdir does not delete a non existing directory
"""
- rmdir_output = self.get_cephfs_shell_cmd_error("rmdir test_dir")
- log.info("rmdir error output:\n{}".format(rmdir_output))
+ self.negtest_cephfs_shell_cmd(cmd="rmdir test_dir")
self.dir_does_not_exists()
def test_rmdir_dir_with_file(self):
Test that rmdir does not delete directory containing file
"""
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
+
self.run_cephfs_shell_cmd("put - test_dir/dumpfile", stdin="Valid File")
- self.run_cephfs_shell_cmd("rmdir" + self.dir_name)
+ # see comment below
+ # with self.assertRaises(CommandFailedError) as cm:
+ with self.assertRaises(CommandFailedError):
+ self.run_cephfs_shell_cmd("rmdir " + self.dir_name)
+ # TODO: we need to check for exit code and error message as well.
+ # skipping it for not since error codes used by cephfs-shell are not
+ # standard and they may change soon.
+ # self.assertEqual(cm.exception.exitcode, 39)
self.mount_a.stat(self.dir_name)
def test_rmdir_existing_file(self):
Test that rmdir does not delete a file
"""
self.run_cephfs_shell_cmd("put - dumpfile", stdin="Valid File")
- self.run_cephfs_shell_cmd("rmdir dumpfile")
+ self.negtest_cephfs_shell_cmd(cmd="rmdir dumpfile")
self.mount_a.stat("dumpfile")
def test_rmdir_p(self):
"""
- Test that rmdir -p deletes all empty directories in the root directory passed
+ Test that rmdir -p deletes all empty directories in the root
+ directory passed
"""
self.run_cephfs_shell_cmd("mkdir -p test_dir/t1/t2/t3")
- self.run_cephfs_shell_cmd("rmdir -p "+ self.dir_name)
+ self.run_cephfs_shell_cmd("rmdir -p " + self.dir_name)
self.dir_does_not_exists()
def test_rmdir_p_valid_path(self):
"""
Test that rmdir -p does not delete an invalid directory
"""
- rmdir_output = self.get_cephfs_shell_cmd_error("rmdir -p test_dir")
- log.info("rmdir error output:\n{}".format(rmdir_output))
+ self.negtest_cephfs_shell_cmd(cmd="rmdir -p test_dir")
self.dir_does_not_exists()
def test_rmdir_p_dir_with_file(self):
Test that rmdir -p does not delete the directory containing a file
"""
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
- self.run_cephfs_shell_cmd("put - test_dir/dumpfile", stdin="Valid File")
+ self.run_cephfs_shell_cmd("put - test_dir/dumpfile",
+ stdin="Valid File")
self.run_cephfs_shell_cmd("rmdir -p " + self.dir_name)
self.mount_a.stat(self.dir_name)
-class TestGetAndPut(TestCephFSShell):
- # the 'put' command gets tested as well with the 'get' comamnd
- def test_put_and_get_without_target_directory(self):
- """
- Test that put fails without target path
- """
- # generate test data in a directory
- self.run_cephfs_shell_cmd("!mkdir p1")
- self.run_cephfs_shell_cmd('!dd if=/dev/urandom of=p1/dump1 bs=1M count=1')
- self.run_cephfs_shell_cmd('!dd if=/dev/urandom of=p1/dump2 bs=2M count=1')
- self.run_cephfs_shell_cmd('!dd if=/dev/urandom of=p1/dump3 bs=3M count=1')
- # copy the whole directory over to the cephfs
- o = self.get_cephfs_shell_cmd_output("put p1")
- log.info("cephfs-shell output:\n{}".format(o))
+class TestLn(TestCephFSShell):
+ dir1 = 'test_dir1'
+ dir2 = 'test_dir2'
+ dump_id = 11
+ s = 'somedata'
+ dump_file = 'dump11'
+
+ def test_soft_link_without_link_name(self):
+ self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
+ self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
+ data=self.s)
+ self.run_cephfs_shell_script(script=dedent(f'''
+ cd /{self.dir1}/{self.dir2}
+ ln -s ../{self.dump_file}'''))
+ o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
+ f'/{self.dump_file}')
+ self.assertEqual(self.s, o)
+
+ def test_soft_link_with_link_name(self):
+ self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
+ self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
+ data=self.s)
+ self.run_cephfs_shell_cmd(f'ln -s /{self.dir1}/{self.dump_file} '
+ f'/{self.dir1}/{self.dir2}/')
+ o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
+ f'/{self.dump_file}')
+ self.assertEqual(self.s, o)
+
+ def test_hard_link_without_link_name(self):
+ self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
+ self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
+ data=self.s)
+ self.run_cephfs_shell_script(script=dedent(f'''
+ cd /{self.dir1}/{self.dir2}
+ ln ../{self.dump_file}'''))
+ o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
+ f'/{self.dump_file}')
+ self.assertEqual(self.s, o)
+
+ def test_hard_link_with_link_name(self):
+ self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
+ self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
+ data=self.s)
+ self.run_cephfs_shell_cmd(f'ln /{self.dir1}/{self.dump_file} '
+ f'/{self.dir1}/{self.dir2}/')
+ o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
+ f'/{self.dump_file}')
+ self.assertEqual(self.s, o)
+
+ def test_hard_link_to_dir_not_allowed(self):
+ self.run_cephfs_shell_cmd(f'mkdir {self.dir1}')
+ self.run_cephfs_shell_cmd(f'mkdir {self.dir2}')
+ r = self.run_cephfs_shell_cmd(f'ln /{self.dir1} /{self.dir2}/',
+ check_status=False)
+ self.assertEqual(r.returncode, 3)
+
+ def test_target_exists_in_dir(self):
+ self.mount_a.write_file(path=f'{self.dump_file}', data=self.s)
+ r = self.run_cephfs_shell_cmd(f'ln {self.dump_file} {self.dump_file}',
+ check_status=False)
+ self.assertEqual(r.returncode, 1)
+
+ def test_incorrect_dir(self):
+ self.mount_a.write_file(path=f'{self.dump_file}', data=self.s)
+ r = self.run_cephfs_shell_cmd(f'ln {self.dump_file} /dir1/',
+ check_status=False)
+ self.assertEqual(r.returncode, 5)
- # put p1 should pass
- o = self.mount_a.stat('p1')
- log.info("mount_a output:\n{}".format(o))
- o = self.mount_a.stat('p1/dump1')
- log.info("mount_a output:\n{}".format(o))
- o = self.mount_a.stat('p1/dump2')
- log.info("mount_a output:\n{}".format(o))
- o = self.mount_a.stat('p1/dump3')
- log.info("mount_a output:\n{}".format(o))
-
- self.run_cephfs_shell_cmd('!rm -rf p1')
- o = self.get_cephfs_shell_cmd_output("get p1")
- o = self.get_cephfs_shell_cmd_output('!stat p1 || echo $?')
- log.info("cephfs-shell output:\n{}".format(o))
- self.validate_stat_output(o)
-
- o = self.get_cephfs_shell_cmd_output('!stat p1/dump1 || echo $?')
- log.info("cephfs-shell output:\n{}".format(o))
- self.validate_stat_output(o)
-
- o = self.get_cephfs_shell_cmd_output('!stat p1/dump2 || echo $?')
- log.info("cephfs-shell output:\n{}".format(o))
- self.validate_stat_output(o)
-
- o = self.get_cephfs_shell_cmd_output('!stat p1/dump3 || echo $?')
- log.info("cephfs-shell output:\n{}".format(o))
- self.validate_stat_output(o)
-
- def validate_stat_output(self, s):
- l = s.split('\n')
- log.info("lines:\n{}".format(l))
- rv = l[-1] # get last line; a failed stat will have "1" as the line
- log.info("rv:{}".format(rv))
- r = 0
- try:
- r = int(rv) # a non-numeric line will cause an exception
- except:
- pass
- assert(r == 0)
+class TestGetAndPut(TestCephFSShell):
def test_get_with_target_name(self):
"""
Test that get passes with target name
o = self.mount_a.stat('dump4')
log.info("mount_a output:\n{}".format(o))
- o = self.get_cephfs_shell_cmd_output("get dump4 .")
+ o = self.get_cephfs_shell_cmd_output("get dump4 ./dump4")
log.info("cephfs-shell output:\n{}".format(o))
- o = self.get_cephfs_shell_cmd_output("!cat dump4")
+ # NOTE: cwd=None because we want to run it at CWD, not at cephfs mntpt.
+ o = self.mount_a.run_shell('cat dump4', cwd=None).stdout.getvalue(). \
+ strip()
o_hash = crypt.crypt(o, '.A')
# s_hash must be equal to o_hash
log.info("s_hash:{}".format(s_hash))
log.info("o_hash:{}".format(o_hash))
- assert(s_hash == o_hash)
+ assert (s_hash == o_hash)
+
+ # cleanup
+ self.mount_a.run_shell("rm dump4", cwd=None, check_status=False)
def test_get_without_target_name(self):
"""
- Test that get passes with target name
+ Test that get should fail when there is no target name
"""
- s = 'D' * 1024
- o = self.get_cephfs_shell_cmd_output("put - dump5", stdin=s)
- log.info("cephfs-shell output:\n{}".format(o))
-
+ s = 'Somedata'
# put - dump5 should pass
- o = self.mount_a.stat('dump5')
- log.info("mount_a output:\n{}".format(o))
-
- # get dump5 should fail
- o = self.get_cephfs_shell_cmd_output("get dump5")
- o = self.get_cephfs_shell_cmd_output("!stat dump5 || echo $?")
- log.info("cephfs-shell output:\n{}".format(o))
- l = o.split('\n')
- try:
- ret = int(l[1])
- # verify that stat dump5 passes
- # if ret == 1, then that implies the stat failed
- # which implies that there was a problem with "get dump5"
- assert(ret != 1)
- except ValueError:
- # we have a valid stat output; so this is good
- # if the int() fails then that means there's a valid stat output
- pass
+ self.get_cephfs_shell_cmd_output("put - dump5", stdin=s)
+
+ self.mount_a.stat('dump5')
+
+ # get dump5 should fail as there is no local_path mentioned
+ with self.assertRaises(CommandFailedError):
+ self.get_cephfs_shell_cmd_output("get dump5")
+
+ # stat dump would return non-zero exit code as get dump failed
+ # cwd=None because we want to run it at CWD, not at cephfs mntpt.
+ r = self.mount_a.run_shell('stat dump5', cwd=None,
+ check_status=False).returncode
+ self.assertEqual(r, 1)
+
+ def test_get_doesnt_create_dir(self):
+ # if get cmd is creating subdirs on its own then dump7 will be
+ # stored as ./dump7/tmp/dump7 and not ./dump7, therefore
+ # if doing `cat ./dump7` returns non-zero exit code(i.e. 1) then
+ # it implies that no such file exists at that location
+ dir_abspath = path.join(self.mount_a.mountpoint, 'tmp')
+ self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
+ self.mount_a.client_remote.write_file(path.join(dir_abspath, 'dump7'),
+ 'somedata')
+ self.get_cephfs_shell_cmd_output("get /tmp/dump7 ./dump7")
+ # test that dump7 exists
+ self.mount_a.run_shell("cat ./dump7", cwd=None)
+
+ # cleanup
+ self.mount_a.run_shell(args='rm dump7', cwd=None, check_status=False)
def test_get_to_console(self):
"""
# s_hash must be equal to o_hash
log.info("s_hash:{}".format(s_hash))
log.info("o_hash:{}".format(o_hash))
- assert(s_hash == o_hash)
+ assert (s_hash == o_hash)
+
+
+ def test_put_without_target_name(self):
+ """
+ put - should fail as the cmd expects both arguments are mandatory.
+ """
+ with self.assertRaises(CommandFailedError):
+ self.get_cephfs_shell_cmd_output("put -")
+
+ def test_put_validate_local_path(self):
+ """
+ This test is intended to make sure local_path is validated before
+ trying to put the file from local fs to cephfs and the command
+ put ./dumpXYZ dump8 would fail as dumpXYX doesn't exist.
+ """
+ with self.assertRaises(CommandFailedError):
+ o = self.get_cephfs_shell_cmd_output("put ./dumpXYZ dump8")
+ log.info("cephfs-shell output:\n{}".format(o))
class TestSnapshots(TestCephFSShell):
def test_snap(self):
self.assertIn('st_mode', o)
# create the same snapshot again - must fail with an error message
- o = self.get_cephfs_shell_cmd_error("snap create snap1 /data_dir")
- log.info("cephfs-shell output:\n{}".format(o))
- self.assertIn("snapshot 'snap1' already exists", o)
+ self.negtest_cephfs_shell_cmd(cmd="snap create snap1 /data_dir",
+ errmsg="snapshot 'snap1' already exists")
o = self.mount_a.stat(sdn)
log.info("mount_a output:\n{}".format(o))
self.assertIn('st_mode', o)
self.assertEqual("", o)
try:
o = self.mount_a.stat(sdn)
- except:
+ except CommandFailedError:
# snap dir should not exist anymore
pass
log.info("mount_a output:\n{}".format(o))
self.assertNotIn('st_mode', o)
# delete the same snapshot again - must fail with an error message
- o = self.get_cephfs_shell_cmd_error("snap delete snap1 /data_dir")
- self.assertIn("'snap1': no such snapshot", o)
+ self.negtest_cephfs_shell_cmd(cmd="snap delete snap1 /data_dir",
+ errmsg="'snap1': no such snapshot")
try:
o = self.mount_a.stat(sdn)
- except:
+ except CommandFailedError:
pass
log.info("mount_a output:\n{}".format(o))
self.assertNotIn('st_mode', o)
+
class TestCD(TestCephFSShell):
CLIENTS_REQUIRED = 1
to root directory.
"""
path = 'dir1/dir2/dir3'
- self.mount_a.run_shell('mkdir -p ' + path)
+ self.mount_a.run_shell_payload(f"mkdir -p {path}")
expected_cwd = '/'
script = 'cd {}\ncd\ncwd\n'.format(path)
to the path passed in the argument.
"""
path = 'dir1/dir2/dir3'
- self.mount_a.run_shell('mkdir -p ' + path)
+ self.mount_a.run_shell_payload(f"mkdir -p {path}")
expected_cwd = '/dir1/dir2/dir3'
script = 'cd {}\ncwd\n'.format(path)
output = self.get_cephfs_shell_script_output(script)
self.assertEqual(output, expected_cwd)
+
class TestDU(TestCephFSShell):
CLIENTS_REQUIRED = 1
def test_du_works_for_regfiles(self):
regfilename = 'some_regfile'
regfile_abspath = path.join(self.mount_a.mountpoint, regfilename)
- sudo_write_file(self.mount_a.client_remote, regfile_abspath, 'somedata')
+ self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
size = humansize(self.mount_a.stat(regfile_abspath)['st_size'])
expected_output = r'{}{}{}'.format(size, " +", regfilename)
du_output = self.get_cephfs_shell_cmd_output('du ' + regfilename)
- if sys_version_info.major >= 3:
- self.assertRegex(du_output, expected_output)
- elif sys_version_info.major < 3:
- assert re_search(expected_output, du_output) != None, "\n" + \
- "expected_output -\n{}\ndu_output -\n{}\n".format(
- expected_output, du_output)
+ self.assertRegex(du_output, expected_output)
def test_du_works_for_non_empty_dirs(self):
dirname = 'some_directory'
dir_abspath = path.join(self.mount_a.mountpoint, dirname)
regfilename = 'some_regfile'
regfile_abspath = path.join(dir_abspath, regfilename)
- self.mount_a.run_shell('mkdir ' + dir_abspath)
- sudo_write_file(self.mount_a.client_remote, regfile_abspath, 'somedata')
+ self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
+ self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
- # XXX: we stat `regfile_abspath` here because ceph du reports a non-empty
+ # XXX: we stat `regfile_abspath` here because ceph du reports
+ # a non-empty
# directory's size as sum of sizes of all files under it.
size = humansize(self.mount_a.stat(regfile_abspath)['st_size'])
expected_output = r'{}{}{}'.format(size, " +", dirname)
sleep(10)
du_output = self.get_cephfs_shell_cmd_output('du ' + dirname)
- if sys_version_info.major >= 3:
- self.assertRegex(du_output, expected_output)
- elif sys_version_info.major < 3:
- assert re_search(expected_output, du_output) != None, "\n" + \
- "expected_output -\n{}\ndu_output -\n{}\n".format(
- expected_output, du_output)
+ self.assertRegex(du_output, expected_output)
def test_du_works_for_empty_dirs(self):
dirname = 'some_directory'
dir_abspath = path.join(self.mount_a.mountpoint, dirname)
- self.mount_a.run_shell('mkdir ' + dir_abspath)
+ self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
size = humansize(self.mount_a.stat(dir_abspath)['st_size'])
expected_output = r'{}{}{}'.format(size, " +", dirname)
du_output = self.get_cephfs_shell_cmd_output('du ' + dirname)
- if sys_version_info.major >= 3:
- self.assertRegex(du_output, expected_output)
- elif sys_version_info.major < 3:
- assert re_search(expected_output, du_output) != None, "\n" + \
- "expected_output -\n{}\ndu_output -\n{}\n".format(
- expected_output, du_output)
+ self.assertRegex(du_output, expected_output)
def test_du_works_for_hardlinks(self):
regfilename = 'some_regfile'
regfile_abspath = path.join(self.mount_a.mountpoint, regfilename)
- sudo_write_file(self.mount_a.client_remote, regfile_abspath,
- 'somedata')
+ self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
hlinkname = 'some_hardlink'
hlink_abspath = path.join(self.mount_a.mountpoint, hlinkname)
- self.mount_a.run_shell(['sudo', 'ln', regfile_abspath,
- hlink_abspath], omit_sudo=False)
+ self.mount_a.run_shell_payload(f"ln {regfile_abspath} {hlink_abspath}")
size = humansize(self.mount_a.stat(hlink_abspath)['st_size'])
expected_output = r'{}{}{}'.format(size, " +", hlinkname)
du_output = self.get_cephfs_shell_cmd_output('du ' + hlinkname)
- if sys_version_info.major >= 3:
- self.assertRegex(du_output, expected_output)
- elif sys_version_info.major < 3:
- assert re_search(expected_output, du_output) != None, "\n" + \
- "expected_output -\n{}\ndu_output -\n{}\n".format(
- expected_output, du_output)
+ self.assertRegex(du_output, expected_output)
def test_du_works_for_softlinks_to_files(self):
regfilename = 'some_regfile'
regfile_abspath = path.join(self.mount_a.mountpoint, regfilename)
- sudo_write_file(self.mount_a.client_remote, regfile_abspath, 'somedata')
+ self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
slinkname = 'some_softlink'
slink_abspath = path.join(self.mount_a.mountpoint, slinkname)
- self.mount_a.run_shell(['ln', '-s', regfile_abspath, slink_abspath])
+ self.mount_a.run_shell_payload(
+ f"ln -s {regfile_abspath} {slink_abspath}")
size = humansize(self.mount_a.lstat(slink_abspath)['st_size'])
- expected_output = r'{}{}{}'.format((size), " +", slinkname)
+ expected_output = r'{}{}{}'.format(size, " +", slinkname)
du_output = self.get_cephfs_shell_cmd_output('du ' + slinkname)
- if sys_version_info.major >= 3:
- self.assertRegex(du_output, expected_output)
- elif sys_version_info.major < 3:
- assert re_search(expected_output, du_output) != None, "\n" + \
- "expected_output -\n{}\ndu_output -\n{}\n".format(
- expected_output, du_output)
+ self.assertRegex(du_output, expected_output)
def test_du_works_for_softlinks_to_dirs(self):
dirname = 'some_directory'
dir_abspath = path.join(self.mount_a.mountpoint, dirname)
- self.mount_a.run_shell('mkdir ' + dir_abspath)
+ self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
slinkname = 'some_softlink'
slink_abspath = path.join(self.mount_a.mountpoint, slinkname)
- self.mount_a.run_shell(['ln', '-s', dir_abspath, slink_abspath])
+ self.mount_a.run_shell_payload(f"ln -s {dir_abspath} {slink_abspath}")
size = humansize(self.mount_a.lstat(slink_abspath)['st_size'])
expected_output = r'{}{}{}'.format(size, " +", slinkname)
du_output = self.get_cephfs_shell_cmd_output('du ' + slinkname)
- if sys_version_info.major >= 3:
- self.assertRegex(du_output, expected_output)
- elif sys_version_info.major < 3:
- assert re_search(expected_output, du_output) != None, "\n" + \
- "expected_output -\n{}\ndu_output -\n{}\n".format(
- expected_output, du_output)
+ self.assertRegex(du_output, expected_output)
# NOTE: tests using these are pretty slow since to this methods sleeps for
# 15 seconds
slink_abspath = path.join(self.mount_a.mountpoint, slinkname)
slink2_abspath = path.join(self.mount_a.mountpoint, slink2name)
- self.mount_a.run_shell('mkdir ' + dir_abspath)
- self.mount_a.run_shell('touch ' + regfile_abspath)
- self.mount_a.run_shell(['ln', regfile_abspath, hlink_abspath])
- self.mount_a.run_shell(['ln', '-s', regfile_abspath, slink_abspath])
- self.mount_a.run_shell(['ln', '-s', dir_abspath, slink2_abspath])
+ self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
+ self.mount_a.run_shell_payload(f"touch {regfile_abspath}")
+ self.mount_a.run_shell_payload(f"ln {regfile_abspath} {hlink_abspath}")
+ self.mount_a.run_shell_payload(
+ f"ln -s {regfile_abspath} {slink_abspath}")
+ self.mount_a.run_shell_payload(f"ln -s {dir_abspath} {slink2_abspath}")
dir2_name = 'dir2'
dir21_name = 'dir21'
dir2_abspath = path.join(self.mount_a.mountpoint, dir2_name)
dir21_abspath = path.join(dir2_abspath, dir21_name)
regfile121_abspath = path.join(dir21_abspath, regfile121_name)
- self.mount_a.run_shell('mkdir -p ' + dir21_abspath)
- self.mount_a.run_shell('touch ' + regfile121_abspath)
+ self.mount_a.run_shell_payload(f"mkdir -p {dir21_abspath}")
+ self.mount_a.run_shell_payload(f"touch {regfile121_abspath}")
- sudo_write_file(self.mount_a.client_remote, regfile_abspath,
- 'somedata')
- sudo_write_file(self.mount_a.client_remote, regfile121_abspath,
- 'somemoredata')
+ self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
+ self.mount_a.client_remote.write_file(regfile121_abspath,
+ 'somemoredata')
# TODO: is there a way to trigger/force update ceph.dir.rbytes?
# wait so that attr ceph.dir.rbytes gets a chance to be updated.
if f == '/':
expected_patterns.append(r'{}{}{}'.format(size, " +", '.' + f))
else:
- expected_patterns.append(r'{}{}{}'.format(size, " +",
+ expected_patterns.append(r'{}{}{}'.format(
+ size, " +",
path_prefix + path.relpath(f, self.mount_a.mountpoint)))
for f in [dir_abspath, regfile_abspath, regfile121_abspath,
hlink_abspath, slink_abspath, slink2_abspath]:
- size = humansize(self.mount_a.stat(f, follow_symlinks=
- False)['st_size'])
+ size = humansize(self.mount_a.stat(
+ f, follow_symlinks=False)['st_size'])
append_expected_output_pattern(f)
# get size for directories containig regfiles within
for f in [dir2_abspath, dir21_abspath]:
size = humansize(self.mount_a.stat(regfile121_abspath,
- follow_symlinks=False)['st_size'])
+ follow_symlinks=False)[
+ 'st_size'])
append_expected_output_pattern(f)
# get size for CephFS root
for p in [dir_abspath, regfile_abspath, dir2_abspath,
dir21_abspath, regfile121_abspath, hlink_abspath,
slink_abspath, slink2_abspath]:
- path_to_files.append(path.relpath(p, self.mount_a.mountpoint))
+ path_to_files.append(path.relpath(p, self.mount_a.mountpoint))
- return (expected_patterns, path_to_files)
+ return expected_patterns, path_to_files
else:
return expected_patterns
du_output = self.get_cephfs_shell_cmd_output('du -r')
for expected_output in expected_patterns_in_output:
- if sys_version_info.major >= 3:
- self.assertRegex(du_output, expected_output)
- elif sys_version_info.major < 3:
- assert re_search(expected_output, du_output) != None, "\n" + \
- "expected_output -\n{}\ndu_output -\n{}\n".format(
- expected_output, du_output)
+ self.assertRegex(du_output, expected_output)
def test_du_with_path_in_args(self):
- expected_patterns_in_output, path_to_files = self._setup_files(True,
- path_prefix='')
+ expected_patterns_in_output, path_to_files = self._setup_files(
+ True, path_prefix='')
args = ['du', '/']
for p in path_to_files:
du_output = self.get_cephfs_shell_cmd_output(args)
for expected_output in expected_patterns_in_output:
- if sys_version_info.major >= 3:
- self.assertRegex(du_output, expected_output)
- elif sys_version_info.major < 3:
- assert re_search(expected_output, du_output) != None, "\n" +\
- "expected_output -\n{}\ndu_output -\n{}\n".format(
- expected_output, du_output)
+ self.assertRegex(du_output, expected_output)
def test_du_with_no_args(self):
expected_patterns_in_output = self._setup_files()
# Since CWD is CephFS root and being non-recursive expect only
# CWD in DU report.
if expected_output.find('/') == len(expected_output) - 1:
- if sys_version_info.major >= 3:
- self.assertRegex(du_output, expected_output)
- elif sys_version_info.major < 3:
- assert re_search(expected_output, du_output) != None, "\n" + \
- "expected_output -\n{}\ndu_output -\n{}\n".format(
- expected_output, du_output)
+ self.assertRegex(du_output, expected_output)
class TestDF(TestCephFSShell):
def validate_df(self, filename):
- df_output = self.get_cephfs_shell_cmd_output('df '+filename)
+ df_output = self.get_cephfs_shell_cmd_output('df ' + filename)
log.info("cephfs-shell df output:\n{}".format(df_output))
shell_df = df_output.splitlines()[1].split()
log.info("cephfs available:{}\n".format(block_size - st_size))
self.assertTupleEqual((block_size, st_size, block_size - st_size),
- (int(shell_df[0]), int(shell_df[1]) , int(shell_df[2])))
+ (int(shell_df[0]), int(shell_df[1]),
+ int(shell_df[2])))
def test_df_with_no_args(self):
expected_output = ''
def test_df_for_valid_directory(self):
dir_name = 'dir1'
- mount_output = self.mount_a.run_shell('mkdir ' + dir_name)
+ mount_output = self.mount_a.run_shell_payload(f"mkdir {dir_name}")
log.info("cephfs-shell mount output:\n{}".format(mount_output))
self.validate_df(dir_name)
def test_df_for_invalid_directory(self):
dir_abspath = path.join(self.mount_a.mountpoint, 'non-existent-dir')
- proc = self.run_cephfs_shell_cmd('df ' + dir_abspath)
- assert proc.stderr.getvalue().find(b'error in stat') != -1
+ self.negtest_cephfs_shell_cmd(cmd='df ' + dir_abspath,
+ errmsg='error in stat')
def test_df_for_valid_file(self):
s = 'df test' * 14145016
dir_name = 'testdir'
def create_dir(self):
- mount_output = self.get_cephfs_shell_cmd_output('mkdir ' + self.dir_name)
+ mount_output = self.get_cephfs_shell_cmd_output(
+ 'mkdir ' + self.dir_name)
log.info("cephfs-shell mount output:\n{}".format(mount_output))
- def set_and_get_quota_vals(self, input_val):
- quota_output = self.run_cephfs_shell_cmd('quota set --max_bytes '
- + input_val[0] + ' --max_files '
- + input_val[1] + ' '
- + self.dir_name)
- log.info("cephfs-shell quota set output:\n{}".format(quota_output))
+ def set_and_get_quota_vals(self, input_val, check_status=True):
+ self.run_cephfs_shell_cmd(['quota', 'set', '--max_bytes',
+ input_val[0], '--max_files', input_val[1],
+ self.dir_name], check_status=check_status)
- quota_output = self.get_cephfs_shell_cmd_output('quota get '+ self.dir_name)
- log.info("cephfs-shell quota get output:\n{}".format(quota_output))
+ quota_output = self.get_cephfs_shell_cmd_output(
+ ['quota', 'get', self.dir_name],
+ check_status=check_status)
quota_output = quota_output.split()
return quota_output[1], quota_output[3]
def test_set(self):
self.create_dir()
set_values = ('6', '2')
- self.assertTupleEqual(self.set_and_get_quota_vals(set_values), set_values)
+ self.assertTupleEqual(self.set_and_get_quota_vals(set_values),
+ set_values)
def test_replace_values(self):
self.test_set()
set_values = ('20', '4')
- self.assertTupleEqual(self.set_and_get_quota_vals(set_values), set_values)
+ self.assertTupleEqual(self.set_and_get_quota_vals(set_values),
+ set_values)
def test_set_invalid_dir(self):
set_values = ('5', '5')
try:
- self.assertTupleEqual(self.set_and_get_quota_vals(set_values), set_values)
- raise Exception("Something went wrong!! Values set for non existing directory")
+ self.assertTupleEqual(self.set_and_get_quota_vals(
+ set_values, False), set_values)
+ raise Exception(
+ "Something went wrong!! Values set for non existing directory")
except IndexError:
- # Test should pass as values cannot be set for non existing directory
+ # Test should pass as values cannot be set for non
+ # existing directory
pass
def test_set_invalid_values(self):
self.create_dir()
set_values = ('-6', '-5')
try:
- self.assertTupleEqual(self.set_and_get_quota_vals(set_values), set_values)
+ self.assertTupleEqual(self.set_and_get_quota_vals(set_values,
+ False),
+ set_values)
raise Exception("Something went wrong!! Invalid values set")
except IndexError:
# Test should pass as invalid values cannot be set
def test_exceed_file_limit(self):
self.test_set()
dir_abspath = path.join(self.mount_a.mountpoint, self.dir_name)
- self.mount_a.run_shell('touch '+dir_abspath+'/file1')
+ self.mount_a.run_shell_payload(f"touch {dir_abspath}/file1")
file2 = path.join(dir_abspath, "file2")
try:
- self.mount_a.run_shell('touch '+file2)
- raise Exception("Something went wrong!! File creation should have failed")
+ self.mount_a.run_shell_payload(f"touch {file2}")
+ raise Exception(
+ "Something went wrong!! File creation should have failed")
except CommandFailedError:
# Test should pass as file quota set to 2
# Additional condition to confirm file creation failure
file_abspath = path.join(dir_abspath, filename)
try:
# Write should fail as bytes quota is set to 6
- sudo_write_file(self.mount_a.client_remote, file_abspath,
- 'Disk raise Exception')
+ self.mount_a.client_remote.write_file(file_abspath,
+ 'Disk raise Exception')
raise Exception("Write should have failed")
except CommandFailedError:
# Test should pass only when write command fails
# Testing with teuthology: No file is created.
return 0
elif path_exists and not path.getsize(file_abspath):
- # Testing on Fedora 30: When write fails, empty file gets created.
+ # Testing on Fedora 30: When write fails, empty
+ # file gets created.
return 0
else:
raise
def create_dir(self):
self.run_cephfs_shell_cmd('mkdir ' + self.dir_name)
- def set_get_list_xattr_vals(self, input_val):
- setxattr_output = self.get_cephfs_shell_cmd_output('setxattr '
- + self.dir_name
- + ' '
- + input_val[0]
- + ' ' + input_val[1])
+ def set_get_list_xattr_vals(self, input_val, negtest=False):
+ setxattr_output = self.get_cephfs_shell_cmd_output(
+ ['setxattr', self.dir_name, input_val[0], input_val[1]])
log.info("cephfs-shell setxattr output:\n{}".format(setxattr_output))
- getxattr_output = self.get_cephfs_shell_cmd_output('getxattr '
- + self.dir_name
- + ' ' + input_val[0])
+ getxattr_output = self.get_cephfs_shell_cmd_output(
+ ['getxattr', self.dir_name, input_val[0]])
log.info("cephfs-shell getxattr output:\n{}".format(getxattr_output))
- listxattr_output = self.get_cephfs_shell_cmd_output('listxattr '+ self.dir_name)
+ listxattr_output = self.get_cephfs_shell_cmd_output(
+ ['listxattr', self.dir_name])
log.info("cephfs-shell listxattr output:\n{}".format(listxattr_output))
return listxattr_output, getxattr_output
def test_set(self):
self.create_dir()
set_values = ('user.key', '2')
- self.assertTupleEqual(self.set_get_list_xattr_vals(set_values), set_values)
+ self.assertTupleEqual(self.set_get_list_xattr_vals(set_values),
+ set_values)
def test_reset(self):
self.test_set()
set_values = ('user.key', '4')
- self.assertTupleEqual(self.set_get_list_xattr_vals(set_values), set_values)
+ self.assertTupleEqual(self.set_get_list_xattr_vals(set_values),
+ set_values)
def test_non_existing_dir(self):
- set_values = ('user.key', '9')
- self.assertTupleEqual(self.set_get_list_xattr_vals(set_values), (u'', u''))
-
-# def test_ls(self):
-# """
-# Test that ls passes
-# """
-# o = self.get_cephfs_shell_cmd_output("ls")
-# log.info("cephfs-shell output:\n{}".format(o))
-#
-# o = self.mount_a.run_shell(['ls']).stdout.getvalue().strip().replace("\n", " ").split()
-# log.info("mount_a output:\n{}".format(o))
-#
-# # ls should not list hidden files without the -a switch
-# if '.' in o or '..' in o:
-# log.info('ls failed')
-# else:
-# log.info('ls succeeded')
-#
-# def test_ls_a(self):
-# """
-# Test that ls -a passes
-# """
-# o = self.get_cephfs_shell_cmd_output("ls -a")
-# log.info("cephfs-shell output:\n{}".format(o))
-#
-# o = self.mount_a.run_shell(['ls', '-a']).stdout.getvalue().strip().replace("\n", " ").split()
-# log.info("mount_a output:\n{}".format(o))
-#
-# if '.' in o and '..' in o:
-# log.info('ls -a succeeded')
-# else:
-# log.info('ls -a failed')
+ input_val = ('user.key', '9')
+ self.negtest_cephfs_shell_cmd(
+ cmd=['setxattr', self.dir_name, input_val[0],
+ input_val[1]])
+ self.negtest_cephfs_shell_cmd(
+ cmd=['getxattr', self.dir_name, input_val[0]])
+ self.negtest_cephfs_shell_cmd(cmd=['listxattr', self.dir_name])
+
+
+class TestLS(TestCephFSShell):
+ dir_name = 'test_dir'
+ hidden_dir_name = '.test_hidden_dir'
+
+ def test_ls(self):
+ """ Test that ls prints files in CWD. """
+ self.run_cephfs_shell_cmd(f'mkdir {self.dir_name}')
+
+ ls_output = self.get_cephfs_shell_cmd_output("ls")
+ log.info(f"output of ls command:\n{ls_output}")
+
+ self.assertIn(self.dir_name, ls_output)
+
+ def test_ls_a(self):
+ """ Test ls -a prints hidden files in CWD."""
+
+ self.run_cephfs_shell_cmd(f'mkdir {self.hidden_dir_name}')
+
+ ls_a_output = self.get_cephfs_shell_cmd_output(['ls', '-a'])
+ log.info(f"output of ls -a command:\n{ls_a_output}")
+
+ self.assertIn(self.hidden_dir_name, ls_a_output)
+
+ def test_ls_does_not_print_hidden_dir(self):
+ """ Test ls command does not print hidden directory """
+
+ self.run_cephfs_shell_cmd(f'mkdir {self.hidden_dir_name}')
+
+ ls_output = self.get_cephfs_shell_cmd_output("ls")
+ log.info(f"output of ls command:\n{ls_output}")
+
+ self.assertNotIn(self.hidden_dir_name, ls_output)
+
+ def test_ls_a_prints_non_hidden_dir(self):
+ """ Test ls -a command prints non hidden directory """
+
+ self.run_cephfs_shell_cmd(
+ f'mkdir {self.hidden_dir_name} {self.dir_name}')
+
+ ls_a_output = self.get_cephfs_shell_cmd_output(['ls', '-a'])
+ log.info(f"output of ls -a command:\n{ls_a_output}")
+
+ self.assertIn(self.dir_name, ls_a_output)
+
+ def test_ls_H_prints_human_readable_file_size(self):
+ """ Test "ls -lH" prints human readable file size."""
+
+ file_sizes = ['1', '1K', '1M', '1G']
+ file_names = ['dump1', 'dump2', 'dump3', 'dump4']
+
+ for (file_size, file_name) in zip(file_sizes, file_names):
+ temp_file = self.mount_a.client_remote.mktemp(file_name)
+ self.mount_a.run_shell_payload(
+ f"fallocate -l {file_size} {temp_file}")
+ self.mount_a.run_shell_payload(f'mv {temp_file} ./')
+
+ ls_H_output = self.get_cephfs_shell_cmd_output(['ls', '-lH'])
+
+ ls_H_file_size = set()
+ for line in ls_H_output.split('\n'):
+ ls_H_file_size.add(line.split()[1])
+
+ # test that file sizes are in human readable format
+ self.assertEqual({'1B', '1K', '1M', '1G'}, ls_H_file_size)
+
+ def test_ls_s_sort_by_size(self):
+ """ Test "ls -S" sorts file listing by file_size """
+ test_file1 = "test_file1.txt"
+ test_file2 = "test_file2.txt"
+ file1_content = 'A' * 102
+ file2_content = 'B' * 10
+
+ self.run_cephfs_shell_cmd(f"write {test_file1}", stdin=file1_content)
+ self.run_cephfs_shell_cmd(f"write {test_file2}", stdin=file2_content)
+
+ ls_s_output = self.get_cephfs_shell_cmd_output(['ls', '-lS'])
+
+ file_sizes = []
+ for line in ls_s_output.split('\n'):
+ file_sizes.append(line.split()[1])
+
+ # test that file size are in ascending order
+ self.assertEqual(file_sizes, sorted(file_sizes))
+
class TestMisc(TestCephFSShell):
def test_issue_cephfs_shell_cmd_at_invocation(self):
dirname = 'somedirectory'
self.run_cephfs_shell_cmd(['mkdir', dirname])
- output = self.mount_a.client_remote.sh([
- 'cephfs-shell', '-c', self.mount_a.config_path, 'ls'
- ]).strip()
+ output = self.mount_a.client_remote.sh(['cephfs-shell', 'ls']). \
+ strip()
- if sys_version_info.major >= 3:
- self.assertRegex(output, dirname)
- elif sys_version_info.major < 3:
- assert re_search(dirname, output) != None, "\n" + \
- "expected_output -\n{}\ndu_output -\n{}\n".format(
- dirname, output)
+ self.assertRegex(output, dirname)
def test_help(self):
"""
o = self.get_cephfs_shell_cmd_output("help all")
log.info("output:\n{}".format(o))
-class TestConfReading(TestCephFSShell):
- def test_reading_conf_opt(self):
- """
- Read conf without duplicate sections/options.
- """
- debugval = self.fs.mon_manager.raw_cluster_cmd('config', 'get',
- 'client','debug_shell')
- debugval = str_to_bool(debugval)
- self.fs.mon_manager.raw_cluster_cmd('config', 'set', 'client',
- 'debug_shell', str(not debugval))
- output = self.get_cephfs_shell_cmd_output('set debug')
- new_debug_val = \
- str_to_bool(output[output.find('debug: ') + len('debug: ') : ])
- assert not debugval == new_debug_val
- def test_reading_conf_after_setting_opt_twice(self):
+ def test_chmod(self):
+ """Test chmod is allowed above o0777 """
+
+ test_file1 = "test_file2.txt"
+ file1_content = 'A' * 102
+ self.run_cephfs_shell_cmd(f"write {test_file1}", stdin=file1_content)
+ self.run_cephfs_shell_cmd(f"chmod 01777 {test_file1}")
+
+class TestShellOpts(TestCephFSShell):
+ """
+ Contains tests for shell options from conf file and shell prompt.
+ """
+
+ def setUp(self):
+ super(type(self), self).setUp()
+
+ # output of following command -
+ # editor - was: 'vim'
+ # now: '?'
+ # editor: '?'
+ self.editor_val = self.get_cephfs_shell_cmd_output(
+ 'set editor ?, set editor').split('\n')[2]
+ self.editor_val = self.editor_val.split(':')[1]. \
+ replace("'", "", 2).strip()
+
+ def write_tempconf(self, confcontents):
+ self.tempconfpath = self.mount_a.client_remote.mktemp(
+ suffix='cephfs-shell.conf')
+ self.mount_a.client_remote.write_file(self.tempconfpath,
+ confcontents)
+
+ def test_reading_conf(self):
+ self.write_tempconf("[cephfs-shell]\neditor = ???")
+
+ # output of following command -
+ # CephFS:~/>>> set editor
+ # editor: 'vim'
+ final_editor_val = self.get_cephfs_shell_cmd_output(
+ cmd='set editor', shell_conf_path=self.tempconfpath)
+ final_editor_val = final_editor_val.split(': ')[1]
+ final_editor_val = final_editor_val.replace("'", "", 2)
+
+ self.assertNotEqual(self.editor_val, final_editor_val)
+
+ def test_reading_conf_with_dup_opt(self):
"""
Read conf without duplicate sections/options.
"""
- debugval = self.fs.mon_manager.raw_cluster_cmd('config', 'get',
- 'client','debug_shell')
- debugval = str_to_bool(debugval)
-
- self.fs.mon_manager.raw_cluster_cmd('config', 'set', 'client',
- 'debug_shell', str(not debugval))
- self.fs.mon_manager.raw_cluster_cmd('config', 'set', 'client',
- 'debug_shell', str(not debugval))
- output = self.get_cephfs_shell_cmd_output('set debug')
- new_debug_val = \
- str_to_bool(output[output.find('debug: ') + len('debug: ') : ])
- assert not debugval == new_debug_val
-
- def test_reading_conf_after_resetting_opt(self):
- debugval = self.fs.mon_manager.raw_cluster_cmd('config', 'get',
- 'client','debug_shell')
- debugval = str_to_bool(debugval)
-
- self.fs.mon_manager.raw_cluster_cmd('config', 'set', 'client',
- 'debug_shell', str(not debugval))
- self.fs.mon_manager.raw_cluster_cmd('config', 'rm', 'client',
- 'debug_shell')
- self.fs.mon_manager.raw_cluster_cmd('config', 'set', 'client',
- 'debug_shell', str(not debugval))
- output = self.get_cephfs_shell_cmd_output('set debug')
- new_debug_val = \
- str_to_bool(output[output.find('debug: ') + len('debug: ') : ])
- assert not debugval == new_debug_val
+ self.write_tempconf("[cephfs-shell]\neditor = ???\neditor = " +
+ self.editor_val)
+
+ # output of following command -
+ # CephFS:~/>>> set editor
+ # editor: 'vim'
+ final_editor_val = self.get_cephfs_shell_cmd_output(
+ cmd='set editor', shell_conf_path=self.tempconfpath)
+ final_editor_val = final_editor_val.split(': ')[1]
+ final_editor_val = final_editor_val.replace("'", "", 2)
+
+ self.assertEqual(self.editor_val, final_editor_val)
+
+ def test_setting_opt_after_reading_conf(self):
+ self.write_tempconf("[cephfs-shell]\neditor = ???")
+
+ # output of following command -
+ # editor - was: vim
+ # now: vim
+ # editor: vim
+ final_editor_val = self.get_cephfs_shell_cmd_output(
+ cmd='set editor %s, set editor' % self.editor_val,
+ shell_conf_path=self.tempconfpath)
+ final_editor_val = final_editor_val.split('\n')[2]
+ final_editor_val = final_editor_val.split(': ')[1]
+ final_editor_val = final_editor_val.replace("'", "", 2)
+
+ self.assertEqual(self.editor_val, final_editor_val)