"""
-Before running this testsuite, add path to cephfs-shell module to $PATH and
-export $PATH.
+NOTE: For running this tests locally (using vstart_runner.py), export the
+path to src/tools/cephfs/shell/cephfs-shell module to $PATH. Running
+"export PATH=$PATH:$(cd ../src/tools/cephfs/shell && pwd)" from the build dir
+will update the environment without hassles of typing the path correctly.
"""
from io import StringIO
from os import path
from time import sleep
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from teuthology.exceptions import CommandFailedError
+from textwrap import dedent
log = logging.getLogger(__name__)
+
def humansize(nbytes):
suffixes = ['B', 'K', 'M', 'G', 'T', 'P']
i = 0
- while nbytes >= 1024 and i < len(suffixes)-1:
+ while nbytes >= 1024 and i < len(suffixes) - 1:
nbytes /= 1024.
i += 1
nbytes = math.ceil(nbytes)
f = ('%d' % nbytes).rstrip('.')
return '%s%s' % (f, suffixes[i])
+
def ensure_str(s):
if isinstance(s, str):
return s
if isinstance(s, bytes):
return s.decode()
raise TypeError("not expecting type '%s'" % type(s))
-
+
+
class TestCephFSShell(CephFSTestCase):
CLIENTS_REQUIRED = 1
def get_cephfs_shell_cmd_output(self, cmd, mount_x=None,
shell_conf_path=None, opts=None,
- stdout=None, stdin=None,check_status=True):
+ stdout=None, stdin=None,
+ check_status=True):
return ensure_str(self.run_cephfs_shell_cmd(
cmd=cmd, mount_x=mount_x, shell_conf_path=shell_conf_path,
opts=opts, stdout=stdout, stdin=stdin,
check_status=check_status).stdout.getvalue().strip())
+class TestGeneric(TestCephFSShell):
+
+ def test_mistyped_cmd(self):
+ with self.assertRaises(CommandFailedError) as cm:
+ self.run_cephfs_shell_cmd('lsx')
+ self.assertEqual(cm.exception.exitstatus, 127)
+
+
class TestMkdir(TestCephFSShell):
def test_mkdir(self):
"""
o = self.mount_a.stat('d1')
log.info("mount_a output:\n{}".format(o))
- def test_mkdir_with_07000_octal_mode(self):
+ def test_mkdir_with_070000_octal_mode(self):
"""
- Test that mkdir fails with octal mode greater than 0777
+ Test that mkdir fails with octal mode greater than 07777
"""
- self.negtest_cephfs_shell_cmd(cmd="mkdir -m 07000 d2")
+ self.negtest_cephfs_shell_cmd(cmd="mkdir -m 070000 d2")
try:
self.mount_a.stat('d2')
except CommandFailedError:
# mkdir d4 should pass
o = self.mount_a.stat('d4')
- assert((o['st_mode'] & 0o700) == 0o700)
+ assert ((o['st_mode'] & 0o700) == 0o700)
def test_mkdir_with_bad_non_octal_mode(self):
"""
o = self.mount_a.stat('d5/d6/d7')
log.info("mount_a output:\n{}".format(o))
+
class TestRmdir(TestCephFSShell):
dir_name = "test_dir"
try:
self.mount_a.stat(self.dir_name)
except CommandFailedError as e:
- if e.exitstatus == 2:
+ if e.exitstatus == 2:
return 0
raise
Test that rmdir deletes directory
"""
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
- self.run_cephfs_shell_cmd("rmdir "+ self.dir_name)
+ self.run_cephfs_shell_cmd("rmdir " + self.dir_name)
self.dir_does_not_exists()
def test_rmdir_non_existing_dir(self):
Test that rmdir does not delete directory containing file
"""
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
+
self.run_cephfs_shell_cmd("put - test_dir/dumpfile", stdin="Valid File")
- self.run_cephfs_shell_cmd("rmdir" + self.dir_name)
+ # see comment below
+ # with self.assertRaises(CommandFailedError) as cm:
+ with self.assertRaises(CommandFailedError):
+ self.run_cephfs_shell_cmd("rmdir " + self.dir_name)
+ # TODO: we need to check for exit code and error message as well.
+ # skipping it for not since error codes used by cephfs-shell are not
+ # standard and they may change soon.
+ # self.assertEqual(cm.exception.exitcode, 39)
self.mount_a.stat(self.dir_name)
def test_rmdir_existing_file(self):
def test_rmdir_p(self):
"""
- Test that rmdir -p deletes all empty directories in the root directory passed
+ Test that rmdir -p deletes all empty directories in the root
+ directory passed
"""
self.run_cephfs_shell_cmd("mkdir -p test_dir/t1/t2/t3")
- self.run_cephfs_shell_cmd("rmdir -p "+ self.dir_name)
+ self.run_cephfs_shell_cmd("rmdir -p " + self.dir_name)
self.dir_does_not_exists()
def test_rmdir_p_valid_path(self):
Test that rmdir -p does not delete the directory containing a file
"""
self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
- self.run_cephfs_shell_cmd("put - test_dir/dumpfile", stdin="Valid File")
+ self.run_cephfs_shell_cmd("put - test_dir/dumpfile",
+ stdin="Valid File")
self.run_cephfs_shell_cmd("rmdir -p " + self.dir_name)
self.mount_a.stat(self.dir_name)
+
+class TestLn(TestCephFSShell):
+ dir1 = 'test_dir1'
+ dir2 = 'test_dir2'
+ dump_id = 11
+ s = 'somedata'
+ dump_file = 'dump11'
+
+ def test_soft_link_without_link_name(self):
+ self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
+ self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
+ data=self.s)
+ self.run_cephfs_shell_script(script=dedent(f'''
+ cd /{self.dir1}/{self.dir2}
+ ln -s ../{self.dump_file}'''))
+ o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
+ f'/{self.dump_file}')
+ self.assertEqual(self.s, o)
+
+ def test_soft_link_with_link_name(self):
+ self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
+ self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
+ data=self.s)
+ self.run_cephfs_shell_cmd(f'ln -s /{self.dir1}/{self.dump_file} '
+ f'/{self.dir1}/{self.dir2}/')
+ o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
+ f'/{self.dump_file}')
+ self.assertEqual(self.s, o)
+
+ def test_hard_link_without_link_name(self):
+ self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
+ self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
+ data=self.s)
+ self.run_cephfs_shell_script(script=dedent(f'''
+ cd /{self.dir1}/{self.dir2}
+ ln ../{self.dump_file}'''))
+ o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
+ f'/{self.dump_file}')
+ self.assertEqual(self.s, o)
+
+ def test_hard_link_with_link_name(self):
+ self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
+ self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
+ data=self.s)
+ self.run_cephfs_shell_cmd(f'ln /{self.dir1}/{self.dump_file} '
+ f'/{self.dir1}/{self.dir2}/')
+ o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
+ f'/{self.dump_file}')
+ self.assertEqual(self.s, o)
+
+ def test_hard_link_to_dir_not_allowed(self):
+ self.run_cephfs_shell_cmd(f'mkdir {self.dir1}')
+ self.run_cephfs_shell_cmd(f'mkdir {self.dir2}')
+ r = self.run_cephfs_shell_cmd(f'ln /{self.dir1} /{self.dir2}/',
+ check_status=False)
+ self.assertEqual(r.returncode, 3)
+
+ def test_target_exists_in_dir(self):
+ self.mount_a.write_file(path=f'{self.dump_file}', data=self.s)
+ r = self.run_cephfs_shell_cmd(f'ln {self.dump_file} {self.dump_file}',
+ check_status=False)
+ self.assertEqual(r.returncode, 1)
+
+ def test_incorrect_dir(self):
+ self.mount_a.write_file(path=f'{self.dump_file}', data=self.s)
+ r = self.run_cephfs_shell_cmd(f'ln {self.dump_file} /dir1/',
+ check_status=False)
+ self.assertEqual(r.returncode, 5)
+
+
class TestGetAndPut(TestCephFSShell):
def test_get_with_target_name(self):
"""
o = self.get_cephfs_shell_cmd_output("get dump4 ./dump4")
log.info("cephfs-shell output:\n{}".format(o))
- o = self.get_cephfs_shell_cmd_output("!cat dump4")
+ # NOTE: cwd=None because we want to run it at CWD, not at cephfs mntpt.
+ o = self.mount_a.run_shell('cat dump4', cwd=None).stdout.getvalue(). \
+ strip()
o_hash = crypt.crypt(o, '.A')
# s_hash must be equal to o_hash
log.info("s_hash:{}".format(s_hash))
log.info("o_hash:{}".format(o_hash))
- assert(s_hash == o_hash)
+ assert (s_hash == o_hash)
+
+ # cleanup
+ self.mount_a.run_shell("rm dump4", cwd=None, check_status=False)
def test_get_without_target_name(self):
"""
# test that dump7 exists
self.mount_a.run_shell("cat ./dump7", cwd=None)
+ # cleanup
+ self.mount_a.run_shell(args='rm dump7', cwd=None, check_status=False)
+
def test_get_to_console(self):
"""
Test that get passes with target name
# s_hash must be equal to o_hash
log.info("s_hash:{}".format(s_hash))
log.info("o_hash:{}".format(o_hash))
- assert(s_hash == o_hash)
+ assert (s_hash == o_hash)
+
def test_put_without_target_name(self):
"""
log.info("mount_a output:\n{}".format(o))
self.assertNotIn('st_mode', o)
+
class TestCD(TestCephFSShell):
CLIENTS_REQUIRED = 1
output = self.get_cephfs_shell_script_output(script)
self.assertEqual(output, expected_cwd)
+
class TestDU(TestCephFSShell):
CLIENTS_REQUIRED = 1
self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
- # XXX: we stat `regfile_abspath` here because ceph du reports a non-empty
+ # XXX: we stat `regfile_abspath` here because ceph du reports
+ # a non-empty
# directory's size as sum of sizes of all files under it.
size = humansize(self.mount_a.stat(regfile_abspath)['st_size'])
expected_output = r'{}{}{}'.format(size, " +", dirname)
self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
slinkname = 'some_softlink'
slink_abspath = path.join(self.mount_a.mountpoint, slinkname)
- self.mount_a.run_shell_payload(f"ln -s {regfile_abspath} {slink_abspath}")
+ self.mount_a.run_shell_payload(
+ f"ln -s {regfile_abspath} {slink_abspath}")
size = humansize(self.mount_a.lstat(slink_abspath)['st_size'])
- expected_output = r'{}{}{}'.format((size), " +", slinkname)
+ expected_output = r'{}{}{}'.format(size, " +", slinkname)
du_output = self.get_cephfs_shell_cmd_output('du ' + slinkname)
self.assertRegex(du_output, expected_output)
self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
self.mount_a.run_shell_payload(f"touch {regfile_abspath}")
self.mount_a.run_shell_payload(f"ln {regfile_abspath} {hlink_abspath}")
- self.mount_a.run_shell_payload(f"ln -s {regfile_abspath} {slink_abspath}")
+ self.mount_a.run_shell_payload(
+ f"ln -s {regfile_abspath} {slink_abspath}")
self.mount_a.run_shell_payload(f"ln -s {dir_abspath} {slink2_abspath}")
dir2_name = 'dir2'
self.mount_a.run_shell_payload(f"touch {regfile121_abspath}")
self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
- self.mount_a.client_remote.write_file(regfile121_abspath, 'somemoredata')
+ self.mount_a.client_remote.write_file(regfile121_abspath,
+ 'somemoredata')
# TODO: is there a way to trigger/force update ceph.dir.rbytes?
# wait so that attr ceph.dir.rbytes gets a chance to be updated.
if f == '/':
expected_patterns.append(r'{}{}{}'.format(size, " +", '.' + f))
else:
- expected_patterns.append(r'{}{}{}'.format(size, " +",
+ expected_patterns.append(r'{}{}{}'.format(
+ size, " +",
path_prefix + path.relpath(f, self.mount_a.mountpoint)))
for f in [dir_abspath, regfile_abspath, regfile121_abspath,
hlink_abspath, slink_abspath, slink2_abspath]:
- size = humansize(self.mount_a.stat(f, follow_symlinks=
- False)['st_size'])
+ size = humansize(self.mount_a.stat(
+ f, follow_symlinks=False)['st_size'])
append_expected_output_pattern(f)
# get size for directories containig regfiles within
for f in [dir2_abspath, dir21_abspath]:
size = humansize(self.mount_a.stat(regfile121_abspath,
- follow_symlinks=False)['st_size'])
+ follow_symlinks=False)[
+ 'st_size'])
append_expected_output_pattern(f)
# get size for CephFS root
for p in [dir_abspath, regfile_abspath, dir2_abspath,
dir21_abspath, regfile121_abspath, hlink_abspath,
slink_abspath, slink2_abspath]:
- path_to_files.append(path.relpath(p, self.mount_a.mountpoint))
+ path_to_files.append(path.relpath(p, self.mount_a.mountpoint))
- return (expected_patterns, path_to_files)
+ return expected_patterns, path_to_files
else:
return expected_patterns
self.assertRegex(du_output, expected_output)
def test_du_with_path_in_args(self):
- expected_patterns_in_output, path_to_files = self._setup_files(True,
- path_prefix='')
+ expected_patterns_in_output, path_to_files = self._setup_files(
+ True, path_prefix='')
args = ['du', '/']
for p in path_to_files:
class TestDF(TestCephFSShell):
def validate_df(self, filename):
- df_output = self.get_cephfs_shell_cmd_output('df '+filename)
+ df_output = self.get_cephfs_shell_cmd_output('df ' + filename)
log.info("cephfs-shell df output:\n{}".format(df_output))
shell_df = df_output.splitlines()[1].split()
log.info("cephfs available:{}\n".format(block_size - st_size))
self.assertTupleEqual((block_size, st_size, block_size - st_size),
- (int(shell_df[0]), int(shell_df[1]) , int(shell_df[2])))
+ (int(shell_df[0]), int(shell_df[1]),
+ int(shell_df[2])))
def test_df_with_no_args(self):
expected_output = ''
dir_name = 'testdir'
def create_dir(self):
- mount_output = self.get_cephfs_shell_cmd_output('mkdir ' + self.dir_name)
+ mount_output = self.get_cephfs_shell_cmd_output(
+ 'mkdir ' + self.dir_name)
log.info("cephfs-shell mount output:\n{}".format(mount_output))
def set_and_get_quota_vals(self, input_val, check_status=True):
input_val[0], '--max_files', input_val[1],
self.dir_name], check_status=check_status)
- quota_output = self.get_cephfs_shell_cmd_output(['quota', 'get', self.dir_name],
- check_status=check_status)
+ quota_output = self.get_cephfs_shell_cmd_output(
+ ['quota', 'get', self.dir_name],
+ check_status=check_status)
quota_output = quota_output.split()
return quota_output[1], quota_output[3]
def test_set(self):
self.create_dir()
set_values = ('6', '2')
- self.assertTupleEqual(self.set_and_get_quota_vals(set_values), set_values)
+ self.assertTupleEqual(self.set_and_get_quota_vals(set_values),
+ set_values)
def test_replace_values(self):
self.test_set()
set_values = ('20', '4')
- self.assertTupleEqual(self.set_and_get_quota_vals(set_values), set_values)
+ self.assertTupleEqual(self.set_and_get_quota_vals(set_values),
+ set_values)
def test_set_invalid_dir(self):
set_values = ('5', '5')
try:
self.assertTupleEqual(self.set_and_get_quota_vals(
- set_values, False), set_values)
- raise Exception("Something went wrong!! Values set for non existing directory")
+ set_values, False), set_values)
+ raise Exception(
+ "Something went wrong!! Values set for non existing directory")
except IndexError:
- # Test should pass as values cannot be set for non existing directory
+ # Test should pass as values cannot be set for non
+ # existing directory
pass
def test_set_invalid_values(self):
set_values = ('-6', '-5')
try:
self.assertTupleEqual(self.set_and_get_quota_vals(set_values,
- False), set_values)
+ False),
+ set_values)
raise Exception("Something went wrong!! Invalid values set")
except IndexError:
# Test should pass as invalid values cannot be set
file2 = path.join(dir_abspath, "file2")
try:
self.mount_a.run_shell_payload(f"touch {file2}")
- raise Exception("Something went wrong!! File creation should have failed")
+ raise Exception(
+ "Something went wrong!! File creation should have failed")
except CommandFailedError:
# Test should pass as file quota set to 2
# Additional condition to confirm file creation failure
file_abspath = path.join(dir_abspath, filename)
try:
# Write should fail as bytes quota is set to 6
- self.mount_a.client_remote.write_file(file_abspath, 'Disk raise Exception')
+ self.mount_a.client_remote.write_file(file_abspath,
+ 'Disk raise Exception')
raise Exception("Write should have failed")
except CommandFailedError:
# Test should pass only when write command fails
# Testing with teuthology: No file is created.
return 0
elif path_exists and not path.getsize(file_abspath):
- # Testing on Fedora 30: When write fails, empty file gets created.
+ # Testing on Fedora 30: When write fails, empty
+ # file gets created.
return 0
else:
raise
def test_set(self):
self.create_dir()
set_values = ('user.key', '2')
- self.assertTupleEqual(self.set_get_list_xattr_vals(set_values), set_values)
+ self.assertTupleEqual(self.set_get_list_xattr_vals(set_values),
+ set_values)
def test_reset(self):
self.test_set()
set_values = ('user.key', '4')
- self.assertTupleEqual(self.set_get_list_xattr_vals(set_values), set_values)
+ self.assertTupleEqual(self.set_get_list_xattr_vals(set_values),
+ set_values)
def test_non_existing_dir(self):
input_val = ('user.key', '9')
- self.negtest_cephfs_shell_cmd(cmd=['setxattr', self.dir_name, input_val[0],
- input_val[1]])
- self.negtest_cephfs_shell_cmd(cmd=['getxattr', self.dir_name, input_val[0]])
+ self.negtest_cephfs_shell_cmd(
+ cmd=['setxattr', self.dir_name, input_val[0],
+ input_val[1]])
+ self.negtest_cephfs_shell_cmd(
+ cmd=['getxattr', self.dir_name, input_val[0]])
self.negtest_cephfs_shell_cmd(cmd=['listxattr', self.dir_name])
+
class TestLS(TestCephFSShell):
- dir_name = ('test_dir')
- hidden_dir_name = ('.test_hidden_dir')
+ dir_name = 'test_dir'
+ hidden_dir_name = '.test_hidden_dir'
def test_ls(self):
""" Test that ls prints files in CWD. """
def test_ls_a_prints_non_hidden_dir(self):
""" Test ls -a command prints non hidden directory """
- self.run_cephfs_shell_cmd(f'mkdir {self.hidden_dir_name} {self.dir_name}')
+ self.run_cephfs_shell_cmd(
+ f'mkdir {self.hidden_dir_name} {self.dir_name}')
ls_a_output = self.get_cephfs_shell_cmd_output(['ls', '-a'])
log.info(f"output of ls -a command:\n{ls_a_output}")
def test_ls_H_prints_human_readable_file_size(self):
""" Test "ls -lH" prints human readable file size."""
- file_sizes = ['1','1K', '1M', '1G']
+ file_sizes = ['1', '1K', '1M', '1G']
file_names = ['dump1', 'dump2', 'dump3', 'dump4']
-
for (file_size, file_name) in zip(file_sizes, file_names):
temp_file = self.mount_a.client_remote.mktemp(file_name)
- self.mount_a.run_shell_payload(f"fallocate -l {file_size} {temp_file}")
+ self.mount_a.run_shell_payload(
+ f"fallocate -l {file_size} {temp_file}")
self.mount_a.run_shell_payload(f'mv {temp_file} ./')
ls_H_output = self.get_cephfs_shell_cmd_output(['ls', '-lH'])
ls_H_file_size.add(line.split()[1])
# test that file sizes are in human readable format
- self.assertEqual({'1B','1K', '1M', '1G'}, ls_H_file_size)
+ self.assertEqual({'1B', '1K', '1M', '1G'}, ls_H_file_size)
def test_ls_s_sort_by_size(self):
""" Test "ls -S" sorts file listing by file_size """
for line in ls_s_output.split('\n'):
file_sizes.append(line.split()[1])
- #test that file size are in ascending order
+ # test that file size are in ascending order
self.assertEqual(file_sizes, sorted(file_sizes))
dirname = 'somedirectory'
self.run_cephfs_shell_cmd(['mkdir', dirname])
- output = self.mount_a.client_remote.sh(['cephfs-shell', 'ls']).\
+ output = self.mount_a.client_remote.sh(['cephfs-shell', 'ls']). \
strip()
self.assertRegex(output, dirname)
o = self.get_cephfs_shell_cmd_output("help all")
log.info("output:\n{}".format(o))
+
+ def test_chmod(self):
+ """Test chmod is allowed above o0777 """
+
+ test_file1 = "test_file2.txt"
+ file1_content = 'A' * 102
+ self.run_cephfs_shell_cmd(f"write {test_file1}", stdin=file1_content)
+ self.run_cephfs_shell_cmd(f"chmod 01777 {test_file1}")
+
class TestShellOpts(TestCephFSShell):
"""
Contains tests for shell options from conf file and shell prompt.
# editor: '?'
self.editor_val = self.get_cephfs_shell_cmd_output(
'set editor ?, set editor').split('\n')[2]
- self.editor_val = self.editor_val.split(':')[1].\
+ self.editor_val = self.editor_val.split(':')[1]. \
replace("'", "", 2).strip()
def write_tempconf(self, confcontents):
self.tempconfpath = self.mount_a.client_remote.mktemp(
suffix='cephfs-shell.conf')
self.mount_a.client_remote.write_file(self.tempconfpath,
- confcontents)
+ confcontents)
def test_reading_conf(self):
self.write_tempconf("[cephfs-shell]\neditor = ???")
# now: vim
# editor: vim
final_editor_val = self.get_cephfs_shell_cmd_output(
- cmd='set editor %s, set editor' % (self.editor_val),
+ cmd='set editor %s, set editor' % self.editor_val,
shell_conf_path=self.tempconfpath)
final_editor_val = final_editor_val.split('\n')[2]
final_editor_val = final_editor_val.split(': ')[1]