]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/cephfs/test_cephfs_shell.py
update ceph source to reef 18.1.2
[ceph.git] / ceph / qa / tasks / cephfs / test_cephfs_shell.py
index 31f16b44a257057b4e9db8e01f2c80a4fc617074..9f743476270b8de066c90f50edd2edaeb8899266 100644 (file)
-import os
+"""
+NOTE: For running this tests locally (using vstart_runner.py), export the
+path to src/tools/cephfs/shell/cephfs-shell module to $PATH. Running
+"export PATH=$PATH:$(cd ../src/tools/cephfs/shell && pwd)" from the build dir
+will update the environment without hassles of typing the path correctly.
+"""
+from io import StringIO
+from os import path
 import crypt
 import logging
-from StringIO import StringIO
+from tempfile import mkstemp as tempfile_mkstemp
+import math
+from time import sleep
 from tasks.cephfs.cephfs_test_case import CephFSTestCase
-from tasks.cephfs.fuse_mount import FuseMount
 from teuthology.exceptions import CommandFailedError
+from textwrap import dedent
 
 log = logging.getLogger(__name__)
 
 
+def humansize(nbytes):
+    suffixes = ['B', 'K', 'M', 'G', 'T', 'P']
+    i = 0
+    while nbytes >= 1024 and i < len(suffixes) - 1:
+        nbytes /= 1024.
+        i += 1
+    nbytes = math.ceil(nbytes)
+    f = ('%d' % nbytes).rstrip('.')
+    return '%s%s' % (f, suffixes[i])
+
+
+def ensure_str(s):
+    if isinstance(s, str):
+        return s
+    if isinstance(s, bytes):
+        return s.decode()
+    raise TypeError("not expecting type '%s'" % type(s))
+
+
 class TestCephFSShell(CephFSTestCase):
     CLIENTS_REQUIRED = 1
 
-    def _cephfs_shell(self, cmd, opts=None, stdin=None):
-        args = ["cephfs-shell", "-c", self.mount_a.config_path]
-        if opts is not None:
-            args.extend(opts)
+    def setUp(self):
+        super(TestCephFSShell, self).setUp()
+
+        conf_contents = "[cephfs-shell]\ncolors = False\ndebug = True\n"
+        confpath = self.mount_a.client_remote.sh('mktemp').strip()
+        self.mount_a.client_remote.write_file(confpath, conf_contents)
+        self.default_shell_conf_path = confpath
+
+    def run_cephfs_shell_cmd(self, cmd, mount_x=None, shell_conf_path=None,
+                             opts=None, stdout=None, stderr=None, stdin=None,
+                             check_status=True):
+        stdout = stdout or StringIO()
+        stderr = stderr or StringIO()
+        if mount_x is None:
+            mount_x = self.mount_a
+        if isinstance(cmd, list):
+            cmd = " ".join(cmd)
+        if not shell_conf_path:
+            shell_conf_path = self.default_shell_conf_path
+
+        args = ["cephfs-shell", "-c", shell_conf_path]
+        if opts:
+            args += opts
         args.extend(("--", cmd))
+
         log.info("Running command: {}".format(" ".join(args)))
-        status = self.mount_a.client_remote.run(args=args,
-                                                stdout=StringIO(),
-                                                stdin=stdin)
-        return status.stdout.getvalue().strip()
+        return mount_x.client_remote.run(args=args, stdout=stdout,
+                                         stderr=stderr, stdin=stdin,
+                                         check_status=check_status)
 
-    def test_help(self):
+    def negtest_cephfs_shell_cmd(self, **kwargs):
         """
-        Test that help outputs commands.
+        This method verifies that cephfs shell command fails with expected
+        return value and/or error message.
+
+        kwargs is expected to hold the arguments same as
+        run_cephfs_shell_cmd() with the following exceptions -
+            * It should not contain check_status (since commands are expected
+              to fail, check_status is hardcoded to False).
+            * It is optional to set expected error message and return value to
+              dict members 'errmsg' and 'retval' respectively.
+
+        This method servers as shorthand for codeblocks like -
+
+        try:
+            proc = self.run_cephfs_shell_cmd(args=['some', 'cmd'],
+                                             check_status=False,
+                                             stdout=stdout)
+        except CommandFailedError as e:
+            self.assertNotIn('some error message',
+                              proc.stderr.getvalue.lower())
+
+
+        try:
+            proc = self.run_cephfs_shell_cmd(args=['some', 'cmd'],
+                                             check_status=False,
+                                             stdout=stdout)
+        except CommandFailedError as e:
+            self.assertNotEqual(1, proc.returncode)
         """
+        retval = kwargs.pop('retval', None)
+        errmsg = kwargs.pop('errmsg', None)
+        kwargs['check_status'] = False
 
-        o = self._cephfs_shell("help")
+        proc = self.run_cephfs_shell_cmd(**kwargs)
+        if retval:
+            self.assertEqual(proc.returncode, retval)
+        else:
+            self.assertNotEqual(proc.returncode, 0)
+        if errmsg:
+            self.assertIn(errmsg, proc.stderr.getvalue().lower())
+
+        return proc
+
+    def get_cephfs_shell_cmd_output(self, cmd, mount_x=None,
+                                    shell_conf_path=None, opts=None,
+                                    stdout=None, stdin=None,
+                                    check_status=True):
+        return ensure_str(self.run_cephfs_shell_cmd(
+            cmd=cmd, mount_x=mount_x, shell_conf_path=shell_conf_path,
+            opts=opts, stdout=stdout, stdin=stdin,
+            check_status=check_status).stdout.getvalue().strip())
+
+    def get_cephfs_shell_cmd_error(self, cmd, mount_x=None,
+                                   shell_conf_path=None, opts=None,
+                                   stderr=None, stdin=None, check_status=True):
+        return ensure_str(self.run_cephfs_shell_cmd(
+            cmd=cmd, mount_x=mount_x, shell_conf_path=shell_conf_path,
+            opts=opts, stderr=stderr, stdin=stdin,
+            check_status=check_status).stderr.getvalue().strip())
+
+    def run_cephfs_shell_script(self, script, mount_x=None,
+                                shell_conf_path=None, opts=None, stdout=None,
+                                stderr=None, stdin=None, check_status=True):
+        stdout = stdout or StringIO()
+        stderr = stderr or StringIO()
+        if mount_x is None:
+            mount_x = self.mount_a
+
+        scriptpath = tempfile_mkstemp(prefix='test-cephfs', text=True)[1]
+        with open(scriptpath, 'w') as scriptfile:
+            scriptfile.write(script)
+        # copy script to the machine running cephfs-shell.
+        mount_x.client_remote.put_file(scriptpath, scriptpath)
+        mount_x.run_shell_payload(f"chmod 755 {scriptpath}")
+
+        args = ["cephfs-shell", '-b', scriptpath]
+        if shell_conf_path:
+            args[1:1] = ["-c", shell_conf_path]
+        log.info('Running script \"' + scriptpath + '\"')
+        return mount_x.client_remote.run(args=args, stdout=stdout,
+                                         stderr=stderr, stdin=stdin,
+                                         check_status=True)
+
+    def get_cephfs_shell_script_output(self, script, mount_x=None,
+                                       shell_conf_path=None, opts=None,
+                                       stdout=None, stdin=None,
+                                       check_status=True):
+        return ensure_str(self.run_cephfs_shell_script(
+            script=script, mount_x=mount_x, shell_conf_path=shell_conf_path,
+            opts=opts, stdout=stdout, stdin=stdin,
+            check_status=check_status).stdout.getvalue().strip())
 
-        log.info("output:\n{}".format(o))
 
+class TestGeneric(TestCephFSShell):
+
+    def test_mistyped_cmd(self):
+        with self.assertRaises(CommandFailedError) as cm:
+            self.run_cephfs_shell_cmd('lsx')
+        self.assertEqual(cm.exception.exitstatus, 127)
+
+
+class TestMkdir(TestCephFSShell):
     def test_mkdir(self):
         """
         Test that mkdir creates directory
         """
-        o = self._cephfs_shell("mkdir d1")
+        o = self.get_cephfs_shell_cmd_output("mkdir d1")
         log.info("cephfs-shell output:\n{}".format(o))
 
         o = self.mount_a.stat('d1')
         log.info("mount_a output:\n{}".format(o))
 
-    def test_mkdir_with_07000_octal_mode(self):
+    def test_mkdir_with_070000_octal_mode(self):
         """
-        Test that mkdir fails with octal mode greater than 0777
+        Test that mkdir fails with octal mode greater than 07777
         """
-        o = self._cephfs_shell("mkdir -m 07000 d2")
-        log.info("cephfs-shell output:\n{}".format(o))
-
-        # mkdir d2 should fail
+        self.negtest_cephfs_shell_cmd(cmd="mkdir -m 070000 d2")
         try:
-            o = self.mount_a.stat('d2')
-            log.info("mount_a output:\n{}".format(o))
-        except:
+            self.mount_a.stat('d2')
+        except CommandFailedError:
             pass
 
     def test_mkdir_with_negative_octal_mode(self):
         """
         Test that mkdir fails with negative octal mode
         """
-        o = self._cephfs_shell("mkdir -m -0755 d3")
-        log.info("cephfs-shell output:\n{}".format(o))
-
-        # mkdir d3 should fail
+        self.negtest_cephfs_shell_cmd(cmd="mkdir -m -0755 d3")
         try:
-            o = self.mount_a.stat('d3')
-            log.info("mount_a output:\n{}".format(o))
-        except:
+            self.mount_a.stat('d3')
+        except CommandFailedError:
             pass
 
     def test_mkdir_with_non_octal_mode(self):
         """
         Test that mkdir passes with non-octal mode
         """
-        o = self._cephfs_shell("mkdir -m u=rwx d4")
+        o = self.get_cephfs_shell_cmd_output("mkdir -m u=rwx d4")
         log.info("cephfs-shell output:\n{}".format(o))
 
         # mkdir d4 should pass
         o = self.mount_a.stat('d4')
-        assert((o['st_mode'] & 0o700) == 0o700)
+        assert ((o['st_mode'] & 0o700) == 0o700)
 
     def test_mkdir_with_bad_non_octal_mode(self):
         """
         Test that mkdir failes with bad non-octal mode
         """
-        o = self._cephfs_shell("mkdir -m ugx=0755 d5")
-        log.info("cephfs-shell output:\n{}".format(o))
-
-        # mkdir d5 should fail
+        self.negtest_cephfs_shell_cmd(cmd="mkdir -m ugx=0755 d5")
         try:
-            o = self.mount_a.stat('d5')
-            log.info("mount_a output:\n{}".format(o))
-        except:
+            self.mount_a.stat('d5')
+        except CommandFailedError:
             pass
 
     def test_mkdir_path_without_path_option(self):
         """
         Test that mkdir fails without path option for creating path
         """
-        o = self._cephfs_shell("mkdir d5/d6/d7")
-        log.info("cephfs-shell output:\n{}".format(o))
-
-        # mkdir d5/d6/d7 should fail
+        self.negtest_cephfs_shell_cmd(cmd="mkdir d5/d6/d7")
         try:
-            o = self.mount_a.stat('d5/d6/d7')
-            log.info("mount_a output:\n{}".format(o))
-        except:
+            self.mount_a.stat('d5/d6/d7')
+        except CommandFailedError:
             pass
 
     def test_mkdir_path_with_path_option(self):
         """
         Test that mkdir passes with path option for creating path
         """
-        o = self._cephfs_shell("mkdir -p d5/d6/d7")
+        o = self.get_cephfs_shell_cmd_output("mkdir -p d5/d6/d7")
         log.info("cephfs-shell output:\n{}".format(o))
 
         # mkdir d5/d6/d7 should pass
         o = self.mount_a.stat('d5/d6/d7')
         log.info("mount_a output:\n{}".format(o))
 
-    def validate_stat_output(self, s):
-        l = s.split('\n')
-        log.info("lines:\n{}".format(l))
-        rv = l[-1] # get last line; a failed stat will have "1" as the line
-        log.info("rv:{}".format(rv))
-        r = 0
+
+class TestRmdir(TestCephFSShell):
+    dir_name = "test_dir"
+
+    def dir_does_not_exists(self):
+        """
+        Tests that directory does not exists
+        """
         try:
-            r = int(rv) # a non-numeric line will cause an exception
-        except:
-            pass
-        assert(r == 0)
+            self.mount_a.stat(self.dir_name)
+        except CommandFailedError as e:
+            if e.exitstatus == 2:
+                return 0
+            raise
 
-    def test_put_and_get_without_target_directory(self):
+    def test_rmdir(self):
         """
-        Test that put fails without target path
+        Test that rmdir deletes directory
         """
-        # generate test data in a directory
-        self._cephfs_shell("!mkdir p1")
-        self._cephfs_shell('!dd if=/dev/urandom of=p1/dump1 bs=1M count=1')
-        self._cephfs_shell('!dd if=/dev/urandom of=p1/dump2 bs=2M count=1')
-        self._cephfs_shell('!dd if=/dev/urandom of=p1/dump3 bs=3M count=1')
+        self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
+        self.run_cephfs_shell_cmd("rmdir " + self.dir_name)
+        self.dir_does_not_exists()
 
-        # copy the whole directory over to the cephfs
-        o = self._cephfs_shell("put p1")
-        log.info("cephfs-shell output:\n{}".format(o))
+    def test_rmdir_non_existing_dir(self):
+        """
+        Test that rmdir does not delete a non existing directory
+        """
+        self.negtest_cephfs_shell_cmd(cmd="rmdir test_dir")
+        self.dir_does_not_exists()
 
-        # put p1 should pass
-        o = self.mount_a.stat('p1')
-        log.info("mount_a output:\n{}".format(o))
-        o = self.mount_a.stat('p1/dump1')
-        log.info("mount_a output:\n{}".format(o))
-        o = self.mount_a.stat('p1/dump2')
-        log.info("mount_a output:\n{}".format(o))
-        o = self.mount_a.stat('p1/dump3')
-        log.info("mount_a output:\n{}".format(o))
+    def test_rmdir_dir_with_file(self):
+        """
+        Test that rmdir does not delete directory containing file
+        """
+        self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
 
-        self._cephfs_shell('!rm -rf p1')
-        o = self._cephfs_shell("get p1")
-        o = self._cephfs_shell('!stat p1 || echo $?')
-        log.info("cephfs-shell output:\n{}".format(o))
-        self.validate_stat_output(o)
+        self.run_cephfs_shell_cmd("put - test_dir/dumpfile", stdin="Valid File")
+        # see comment below
+        # with self.assertRaises(CommandFailedError) as cm:
+        with self.assertRaises(CommandFailedError):
+            self.run_cephfs_shell_cmd("rmdir " + self.dir_name)
+        # TODO: we need to check for exit code and error message as well.
+        # skipping it for not since error codes used by cephfs-shell are not
+        # standard and they may change soon.
+        # self.assertEqual(cm.exception.exitcode, 39)
+        self.mount_a.stat(self.dir_name)
 
-        o = self._cephfs_shell('!stat p1/dump1 || echo $?')
-        log.info("cephfs-shell output:\n{}".format(o))
-        self.validate_stat_output(o)
+    def test_rmdir_existing_file(self):
+        """
+        Test that rmdir does not delete a file
+        """
+        self.run_cephfs_shell_cmd("put - dumpfile", stdin="Valid File")
+        self.negtest_cephfs_shell_cmd(cmd="rmdir dumpfile")
+        self.mount_a.stat("dumpfile")
 
-        o = self._cephfs_shell('!stat p1/dump2 || echo $?')
-        log.info("cephfs-shell output:\n{}".format(o))
-        self.validate_stat_output(o)
+    def test_rmdir_p(self):
+        """
+        Test that rmdir -p deletes all empty directories in the root
+        directory passed
+        """
+        self.run_cephfs_shell_cmd("mkdir -p test_dir/t1/t2/t3")
+        self.run_cephfs_shell_cmd("rmdir -p " + self.dir_name)
+        self.dir_does_not_exists()
 
-        o = self._cephfs_shell('!stat p1/dump3 || echo $?')
-        log.info("cephfs-shell output:\n{}".format(o))
-        self.validate_stat_output(o)
+    def test_rmdir_p_valid_path(self):
+        """
+        Test that rmdir -p deletes all empty directories in the path passed
+        """
+        self.run_cephfs_shell_cmd("mkdir -p test_dir/t1/t2/t3")
+        self.run_cephfs_shell_cmd("rmdir -p test_dir/t1/t2/t3")
+        self.dir_does_not_exists()
+
+    def test_rmdir_p_non_existing_dir(self):
+        """
+        Test that rmdir -p does not delete an invalid directory
+        """
+        self.negtest_cephfs_shell_cmd(cmd="rmdir -p test_dir")
+        self.dir_does_not_exists()
+
+    def test_rmdir_p_dir_with_file(self):
+        """
+        Test that rmdir -p does not delete the directory containing a file
+        """
+        self.run_cephfs_shell_cmd("mkdir " + self.dir_name)
+        self.run_cephfs_shell_cmd("put - test_dir/dumpfile",
+                                  stdin="Valid File")
+        self.run_cephfs_shell_cmd("rmdir -p " + self.dir_name)
+        self.mount_a.stat(self.dir_name)
 
-    # the 'put' command gets tested as well with the 'get' comamnd
+
+class TestLn(TestCephFSShell):
+    dir1 = 'test_dir1'
+    dir2 = 'test_dir2'
+    dump_id = 11
+    s = 'somedata'
+    dump_file = 'dump11'
+
+    def test_soft_link_without_link_name(self):
+        self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
+        self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
+                                data=self.s)
+        self.run_cephfs_shell_script(script=dedent(f'''
+                cd /{self.dir1}/{self.dir2}
+                ln -s ../{self.dump_file}'''))
+        o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
+                                             f'/{self.dump_file}')
+        self.assertEqual(self.s, o)
+
+    def test_soft_link_with_link_name(self):
+        self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
+        self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
+                                data=self.s)
+        self.run_cephfs_shell_cmd(f'ln -s /{self.dir1}/{self.dump_file} '
+                                  f'/{self.dir1}/{self.dir2}/')
+        o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
+                                             f'/{self.dump_file}')
+        self.assertEqual(self.s, o)
+
+    def test_hard_link_without_link_name(self):
+        self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
+        self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
+                                data=self.s)
+        self.run_cephfs_shell_script(script=dedent(f'''
+                cd /{self.dir1}/{self.dir2}
+                ln ../{self.dump_file}'''))
+        o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
+                                             f'/{self.dump_file}')
+        self.assertEqual(self.s, o)
+
+    def test_hard_link_with_link_name(self):
+        self.run_cephfs_shell_cmd(f'mkdir -p {self.dir1}/{self.dir2}')
+        self.mount_a.write_file(path=f'{self.dir1}/{self.dump_file}',
+                                data=self.s)
+        self.run_cephfs_shell_cmd(f'ln /{self.dir1}/{self.dump_file} '
+                                  f'/{self.dir1}/{self.dir2}/')
+        o = self.get_cephfs_shell_cmd_output(f'cat /{self.dir1}/{self.dir2}'
+                                             f'/{self.dump_file}')
+        self.assertEqual(self.s, o)
+
+    def test_hard_link_to_dir_not_allowed(self):
+        self.run_cephfs_shell_cmd(f'mkdir {self.dir1}')
+        self.run_cephfs_shell_cmd(f'mkdir {self.dir2}')
+        r = self.run_cephfs_shell_cmd(f'ln /{self.dir1} /{self.dir2}/',
+                                      check_status=False)
+        self.assertEqual(r.returncode, 3)
+
+    def test_target_exists_in_dir(self):
+        self.mount_a.write_file(path=f'{self.dump_file}', data=self.s)
+        r = self.run_cephfs_shell_cmd(f'ln {self.dump_file} {self.dump_file}',
+                                      check_status=False)
+        self.assertEqual(r.returncode, 1)
+
+    def test_incorrect_dir(self):
+        self.mount_a.write_file(path=f'{self.dump_file}', data=self.s)
+        r = self.run_cephfs_shell_cmd(f'ln {self.dump_file} /dir1/',
+                                      check_status=False)
+        self.assertEqual(r.returncode, 5)
+
+
+class TestGetAndPut(TestCephFSShell):
     def test_get_with_target_name(self):
         """
         Test that get passes with target name
         """
         s = 'C' * 1024
         s_hash = crypt.crypt(s, '.A')
-        o = self._cephfs_shell("put - dump4", stdin=s)
+        o = self.get_cephfs_shell_cmd_output("put - dump4", stdin=s)
         log.info("cephfs-shell output:\n{}".format(o))
 
         # put - dump4 should pass
         o = self.mount_a.stat('dump4')
         log.info("mount_a output:\n{}".format(o))
 
-        o = self._cephfs_shell("get dump4 .")
+        o = self.get_cephfs_shell_cmd_output("get dump4 ./dump4")
         log.info("cephfs-shell output:\n{}".format(o))
 
-        o = self._cephfs_shell("!cat dump4")
+        # NOTE: cwd=None because we want to run it at CWD, not at cephfs mntpt.
+        o = self.mount_a.run_shell('cat dump4', cwd=None).stdout.getvalue(). \
+            strip()
         o_hash = crypt.crypt(o, '.A')
 
         # s_hash must be equal to o_hash
         log.info("s_hash:{}".format(s_hash))
         log.info("o_hash:{}".format(o_hash))
-        assert(s_hash == o_hash)
+        assert (s_hash == o_hash)
+
+        # cleanup
+        self.mount_a.run_shell("rm dump4", cwd=None, check_status=False)
 
     def test_get_without_target_name(self):
         """
-        Test that get passes with target name
+        Test that get should fail when there is no target name
         """
-        s = 'D' * 1024
-        o = self._cephfs_shell("put - dump5", stdin=s)
-        log.info("cephfs-shell output:\n{}".format(o))
-
+        s = 'Somedata'
         # put - dump5 should pass
-        o = self.mount_a.stat('dump5')
-        log.info("mount_a output:\n{}".format(o))
+        self.get_cephfs_shell_cmd_output("put - dump5", stdin=s)
 
-        # get dump5 should fail
-        o = self._cephfs_shell("get dump5")
-        o = self._cephfs_shell("!stat dump5 || echo $?")
-        log.info("cephfs-shell output:\n{}".format(o))
-        l = o.split('\n')
-        try:
-            ret = int(l[1])
-            # verify that stat dump5 passes
-            # if ret == 1, then that implies the stat failed
-            # which implies that there was a problem with "get dump5"
-            assert(ret != 1)
-        except ValueError:
-            # we have a valid stat output; so this is good
-            # if the int() fails then that means there's a valid stat output
-            pass
+        self.mount_a.stat('dump5')
+
+        # get dump5 should fail as there is no local_path mentioned
+        with self.assertRaises(CommandFailedError):
+            self.get_cephfs_shell_cmd_output("get dump5")
+
+        # stat dump would return non-zero exit code as get dump failed
+        # cwd=None because we want to run it at CWD, not at cephfs mntpt.
+        r = self.mount_a.run_shell('stat dump5', cwd=None,
+                                   check_status=False).returncode
+        self.assertEqual(r, 1)
+
+    def test_get_doesnt_create_dir(self):
+        # if get cmd is creating subdirs on its own then dump7 will be
+        # stored as ./dump7/tmp/dump7 and not ./dump7, therefore
+        # if doing `cat ./dump7` returns non-zero exit code(i.e. 1) then
+        # it implies that no such file exists at that location
+        dir_abspath = path.join(self.mount_a.mountpoint, 'tmp')
+        self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
+        self.mount_a.client_remote.write_file(path.join(dir_abspath, 'dump7'),
+                                              'somedata')
+        self.get_cephfs_shell_cmd_output("get /tmp/dump7 ./dump7")
+        # test that dump7 exists
+        self.mount_a.run_shell("cat ./dump7", cwd=None)
+
+        # cleanup
+        self.mount_a.run_shell(args='rm dump7', cwd=None, check_status=False)
 
     def test_get_to_console(self):
         """
@@ -233,7 +476,7 @@ class TestCephFSShell(CephFSTestCase):
         """
         s = 'E' * 1024
         s_hash = crypt.crypt(s, '.A')
-        o = self._cephfs_shell("put - dump6", stdin=s)
+        o = self.get_cephfs_shell_cmd_output("put - dump6", stdin=s)
         log.info("cephfs-shell output:\n{}".format(o))
 
         # put - dump6 should pass
@@ -241,42 +484,684 @@ class TestCephFSShell(CephFSTestCase):
         log.info("mount_a output:\n{}".format(o))
 
         # get dump6 - should pass
-        o = self._cephfs_shell("get dump6 -")
+        o = self.get_cephfs_shell_cmd_output("get dump6 -")
         o_hash = crypt.crypt(o, '.A')
         log.info("cephfs-shell output:\n{}".format(o))
 
         # s_hash must be equal to o_hash
         log.info("s_hash:{}".format(s_hash))
         log.info("o_hash:{}".format(o_hash))
-        assert(s_hash == o_hash)
-
-#    def test_ls(self):
-#        """
-#        Test that ls passes
-#        """
-#        o = self._cephfs_shell("ls")
-#        log.info("cephfs-shell output:\n{}".format(o))
-#
-#        o = self.mount_a.run_shell(['ls']).stdout.getvalue().strip().replace("\n", " ").split()
-#        log.info("mount_a output:\n{}".format(o))
-#
-#        # ls should not list hidden files without the -a switch
-#        if '.' in o or '..' in o:
-#            log.info('ls failed')
-#        else:
-#            log.info('ls succeeded')
-#
-#    def test_ls_a(self):
-#        """
-#        Test that ls -a passes
-#        """
-#        o = self._cephfs_shell("ls -a")
-#        log.info("cephfs-shell output:\n{}".format(o))
-#
-#        o = self.mount_a.run_shell(['ls', '-a']).stdout.getvalue().strip().replace("\n", " ").split()
-#        log.info("mount_a output:\n{}".format(o))
-#
-#        if '.' in o and '..' in o:
-#            log.info('ls -a succeeded')
-#        else:
-#            log.info('ls -a failed')
+        assert (s_hash == o_hash)
+
+
+    def test_put_without_target_name(self):
+        """
+        put - should fail as the cmd expects both arguments are mandatory.
+        """
+        with self.assertRaises(CommandFailedError):
+            self.get_cephfs_shell_cmd_output("put -")
+
+    def test_put_validate_local_path(self):
+        """
+        This test is intended to make sure local_path is validated before
+        trying to put the file from local fs to cephfs and the command
+        put ./dumpXYZ dump8 would fail as dumpXYX doesn't exist.
+        """
+        with self.assertRaises(CommandFailedError):
+            o = self.get_cephfs_shell_cmd_output("put ./dumpXYZ dump8")
+            log.info("cephfs-shell output:\n{}".format(o))
+
+class TestSnapshots(TestCephFSShell):
+    def test_snap(self):
+        """
+        Test that snapshot creation and deletion work
+        """
+        sd = self.fs.get_config('client_snapdir')
+        sdn = "data_dir/{}/snap1".format(sd)
+
+        # create a data dir and dump some files into it
+        self.get_cephfs_shell_cmd_output("mkdir data_dir")
+        s = 'A' * 10240
+        o = self.get_cephfs_shell_cmd_output("put - data_dir/data_a", stdin=s)
+        s = 'B' * 10240
+        o = self.get_cephfs_shell_cmd_output("put - data_dir/data_b", stdin=s)
+        s = 'C' * 10240
+        o = self.get_cephfs_shell_cmd_output("put - data_dir/data_c", stdin=s)
+        s = 'D' * 10240
+        o = self.get_cephfs_shell_cmd_output("put - data_dir/data_d", stdin=s)
+        s = 'E' * 10240
+        o = self.get_cephfs_shell_cmd_output("put - data_dir/data_e", stdin=s)
+
+        o = self.get_cephfs_shell_cmd_output("ls -l /data_dir")
+        log.info("cephfs-shell output:\n{}".format(o))
+
+        # create the snapshot - must pass
+        o = self.get_cephfs_shell_cmd_output("snap create snap1 /data_dir")
+        log.info("cephfs-shell output:\n{}".format(o))
+        self.assertEqual("", o)
+        o = self.mount_a.stat(sdn)
+        log.info("mount_a output:\n{}".format(o))
+        self.assertIn('st_mode', o)
+
+        # create the same snapshot again - must fail with an error message
+        self.negtest_cephfs_shell_cmd(cmd="snap create snap1 /data_dir",
+                                      errmsg="snapshot 'snap1' already exists")
+        o = self.mount_a.stat(sdn)
+        log.info("mount_a output:\n{}".format(o))
+        self.assertIn('st_mode', o)
+
+        # delete the snapshot - must pass
+        o = self.get_cephfs_shell_cmd_output("snap delete snap1 /data_dir")
+        log.info("cephfs-shell output:\n{}".format(o))
+        self.assertEqual("", o)
+        try:
+            o = self.mount_a.stat(sdn)
+        except CommandFailedError:
+            # snap dir should not exist anymore
+            pass
+        log.info("mount_a output:\n{}".format(o))
+        self.assertNotIn('st_mode', o)
+
+        # delete the same snapshot again - must fail with an error message
+        self.negtest_cephfs_shell_cmd(cmd="snap delete snap1 /data_dir",
+                                      errmsg="'snap1': no such snapshot")
+        try:
+            o = self.mount_a.stat(sdn)
+        except CommandFailedError:
+            pass
+        log.info("mount_a output:\n{}".format(o))
+        self.assertNotIn('st_mode', o)
+
+
+class TestCD(TestCephFSShell):
+    CLIENTS_REQUIRED = 1
+
+    def test_cd_with_no_args(self):
+        """
+        Test that when cd is issued without any arguments, CWD is changed
+        to root directory.
+        """
+        path = 'dir1/dir2/dir3'
+        self.mount_a.run_shell_payload(f"mkdir -p {path}")
+        expected_cwd = '/'
+
+        script = 'cd {}\ncd\ncwd\n'.format(path)
+        output = self.get_cephfs_shell_script_output(script)
+        self.assertEqual(output, expected_cwd)
+
+    def test_cd_with_args(self):
+        """
+        Test that when cd is issued with an argument, CWD is changed
+        to the path passed in the argument.
+        """
+        path = 'dir1/dir2/dir3'
+        self.mount_a.run_shell_payload(f"mkdir -p {path}")
+        expected_cwd = '/dir1/dir2/dir3'
+
+        script = 'cd {}\ncwd\n'.format(path)
+        output = self.get_cephfs_shell_script_output(script)
+        self.assertEqual(output, expected_cwd)
+
+
+class TestDU(TestCephFSShell):
+    CLIENTS_REQUIRED = 1
+
+    def test_du_works_for_regfiles(self):
+        regfilename = 'some_regfile'
+        regfile_abspath = path.join(self.mount_a.mountpoint, regfilename)
+        self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
+
+        size = humansize(self.mount_a.stat(regfile_abspath)['st_size'])
+        expected_output = r'{}{}{}'.format(size, " +", regfilename)
+
+        du_output = self.get_cephfs_shell_cmd_output('du ' + regfilename)
+        self.assertRegex(du_output, expected_output)
+
+    def test_du_works_for_non_empty_dirs(self):
+        dirname = 'some_directory'
+        dir_abspath = path.join(self.mount_a.mountpoint, dirname)
+        regfilename = 'some_regfile'
+        regfile_abspath = path.join(dir_abspath, regfilename)
+        self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
+        self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
+
+        # XXX: we stat `regfile_abspath` here because ceph du reports
+        # a non-empty
+        # directory's size as sum of sizes of all files under it.
+        size = humansize(self.mount_a.stat(regfile_abspath)['st_size'])
+        expected_output = r'{}{}{}'.format(size, " +", dirname)
+
+        sleep(10)
+        du_output = self.get_cephfs_shell_cmd_output('du ' + dirname)
+        self.assertRegex(du_output, expected_output)
+
+    def test_du_works_for_empty_dirs(self):
+        dirname = 'some_directory'
+        dir_abspath = path.join(self.mount_a.mountpoint, dirname)
+        self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
+
+        size = humansize(self.mount_a.stat(dir_abspath)['st_size'])
+        expected_output = r'{}{}{}'.format(size, " +", dirname)
+
+        du_output = self.get_cephfs_shell_cmd_output('du ' + dirname)
+        self.assertRegex(du_output, expected_output)
+
+    def test_du_works_for_hardlinks(self):
+        regfilename = 'some_regfile'
+        regfile_abspath = path.join(self.mount_a.mountpoint, regfilename)
+        self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
+        hlinkname = 'some_hardlink'
+        hlink_abspath = path.join(self.mount_a.mountpoint, hlinkname)
+        self.mount_a.run_shell_payload(f"ln {regfile_abspath} {hlink_abspath}")
+
+        size = humansize(self.mount_a.stat(hlink_abspath)['st_size'])
+        expected_output = r'{}{}{}'.format(size, " +", hlinkname)
+
+        du_output = self.get_cephfs_shell_cmd_output('du ' + hlinkname)
+        self.assertRegex(du_output, expected_output)
+
+    def test_du_works_for_softlinks_to_files(self):
+        regfilename = 'some_regfile'
+        regfile_abspath = path.join(self.mount_a.mountpoint, regfilename)
+        self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
+        slinkname = 'some_softlink'
+        slink_abspath = path.join(self.mount_a.mountpoint, slinkname)
+        self.mount_a.run_shell_payload(
+            f"ln -s {regfile_abspath} {slink_abspath}")
+
+        size = humansize(self.mount_a.lstat(slink_abspath)['st_size'])
+        expected_output = r'{}{}{}'.format(size, " +", slinkname)
+
+        du_output = self.get_cephfs_shell_cmd_output('du ' + slinkname)
+        self.assertRegex(du_output, expected_output)
+
+    def test_du_works_for_softlinks_to_dirs(self):
+        dirname = 'some_directory'
+        dir_abspath = path.join(self.mount_a.mountpoint, dirname)
+        self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
+        slinkname = 'some_softlink'
+        slink_abspath = path.join(self.mount_a.mountpoint, slinkname)
+        self.mount_a.run_shell_payload(f"ln -s {dir_abspath} {slink_abspath}")
+
+        size = humansize(self.mount_a.lstat(slink_abspath)['st_size'])
+        expected_output = r'{}{}{}'.format(size, " +", slinkname)
+
+        du_output = self.get_cephfs_shell_cmd_output('du ' + slinkname)
+        self.assertRegex(du_output, expected_output)
+
+    # NOTE: tests using these are pretty slow since to this methods sleeps for
+    # 15 seconds
+    def _setup_files(self, return_path_to_files=False, path_prefix='./'):
+        dirname = 'dir1'
+        regfilename = 'regfile'
+        hlinkname = 'hlink'
+        slinkname = 'slink1'
+        slink2name = 'slink2'
+
+        dir_abspath = path.join(self.mount_a.mountpoint, dirname)
+        regfile_abspath = path.join(self.mount_a.mountpoint, regfilename)
+        hlink_abspath = path.join(self.mount_a.mountpoint, hlinkname)
+        slink_abspath = path.join(self.mount_a.mountpoint, slinkname)
+        slink2_abspath = path.join(self.mount_a.mountpoint, slink2name)
+
+        self.mount_a.run_shell_payload(f"mkdir {dir_abspath}")
+        self.mount_a.run_shell_payload(f"touch {regfile_abspath}")
+        self.mount_a.run_shell_payload(f"ln {regfile_abspath} {hlink_abspath}")
+        self.mount_a.run_shell_payload(
+            f"ln -s {regfile_abspath} {slink_abspath}")
+        self.mount_a.run_shell_payload(f"ln -s {dir_abspath} {slink2_abspath}")
+
+        dir2_name = 'dir2'
+        dir21_name = 'dir21'
+        regfile121_name = 'regfile121'
+        dir2_abspath = path.join(self.mount_a.mountpoint, dir2_name)
+        dir21_abspath = path.join(dir2_abspath, dir21_name)
+        regfile121_abspath = path.join(dir21_abspath, regfile121_name)
+        self.mount_a.run_shell_payload(f"mkdir -p {dir21_abspath}")
+        self.mount_a.run_shell_payload(f"touch {regfile121_abspath}")
+
+        self.mount_a.client_remote.write_file(regfile_abspath, 'somedata')
+        self.mount_a.client_remote.write_file(regfile121_abspath,
+                                              'somemoredata')
+
+        # TODO: is there a way to trigger/force update ceph.dir.rbytes?
+        # wait so that attr ceph.dir.rbytes gets a chance to be updated.
+        sleep(20)
+
+        expected_patterns = []
+        path_to_files = []
+
+        def append_expected_output_pattern(f):
+            if f == '/':
+                expected_patterns.append(r'{}{}{}'.format(size, " +", '.' + f))
+            else:
+                expected_patterns.append(r'{}{}{}'.format(
+                    size, " +",
+                    path_prefix + path.relpath(f, self.mount_a.mountpoint)))
+
+        for f in [dir_abspath, regfile_abspath, regfile121_abspath,
+                  hlink_abspath, slink_abspath, slink2_abspath]:
+            size = humansize(self.mount_a.stat(
+                f, follow_symlinks=False)['st_size'])
+            append_expected_output_pattern(f)
+
+        # get size for directories containig regfiles within
+        for f in [dir2_abspath, dir21_abspath]:
+            size = humansize(self.mount_a.stat(regfile121_abspath,
+                                               follow_symlinks=False)[
+                                 'st_size'])
+            append_expected_output_pattern(f)
+
+        # get size for CephFS root
+        size = 0
+        for f in [regfile_abspath, regfile121_abspath, slink_abspath,
+                  slink2_abspath]:
+            size += self.mount_a.stat(f, follow_symlinks=False)['st_size']
+        size = humansize(size)
+        append_expected_output_pattern('/')
+
+        if return_path_to_files:
+            for p in [dir_abspath, regfile_abspath, dir2_abspath,
+                      dir21_abspath, regfile121_abspath, hlink_abspath,
+                      slink_abspath, slink2_abspath]:
+                path_to_files.append(path.relpath(p, self.mount_a.mountpoint))
+
+            return expected_patterns, path_to_files
+        else:
+            return expected_patterns
+
+    def test_du_works_recursively_with_no_path_in_args(self):
+        expected_patterns_in_output = self._setup_files()
+        du_output = self.get_cephfs_shell_cmd_output('du -r')
+
+        for expected_output in expected_patterns_in_output:
+            self.assertRegex(du_output, expected_output)
+
+    def test_du_with_path_in_args(self):
+        expected_patterns_in_output, path_to_files = self._setup_files(
+            True, path_prefix='')
+
+        args = ['du', '/']
+        for p in path_to_files:
+            args.append(p)
+        du_output = self.get_cephfs_shell_cmd_output(args)
+
+        for expected_output in expected_patterns_in_output:
+            self.assertRegex(du_output, expected_output)
+
+    def test_du_with_no_args(self):
+        expected_patterns_in_output = self._setup_files()
+
+        du_output = self.get_cephfs_shell_cmd_output('du')
+
+        for expected_output in expected_patterns_in_output:
+            # Since CWD is CephFS root and being non-recursive expect only
+            # CWD in DU report.
+            if expected_output.find('/') == len(expected_output) - 1:
+                self.assertRegex(du_output, expected_output)
+
+
+class TestDF(TestCephFSShell):
+    def validate_df(self, filename):
+        df_output = self.get_cephfs_shell_cmd_output('df ' + filename)
+        log.info("cephfs-shell df output:\n{}".format(df_output))
+
+        shell_df = df_output.splitlines()[1].split()
+
+        block_size = int(self.mount_a.df()["total"]) // 1024
+        log.info("cephfs df block size output:{}\n".format(block_size))
+
+        st_size = int(self.mount_a.stat(filename)["st_size"])
+        log.info("cephfs stat used output:{}".format(st_size))
+        log.info("cephfs available:{}\n".format(block_size - st_size))
+
+        self.assertTupleEqual((block_size, st_size, block_size - st_size),
+                              (int(shell_df[0]), int(shell_df[1]),
+                               int(shell_df[2])))
+
+    def test_df_with_no_args(self):
+        expected_output = ''
+        df_output = self.get_cephfs_shell_cmd_output('df')
+        assert df_output == expected_output
+
+    def test_df_for_valid_directory(self):
+        dir_name = 'dir1'
+        mount_output = self.mount_a.run_shell_payload(f"mkdir {dir_name}")
+        log.info("cephfs-shell mount output:\n{}".format(mount_output))
+        self.validate_df(dir_name)
+
+    def test_df_for_invalid_directory(self):
+        dir_abspath = path.join(self.mount_a.mountpoint, 'non-existent-dir')
+        self.negtest_cephfs_shell_cmd(cmd='df ' + dir_abspath,
+                                      errmsg='error in stat')
+
+    def test_df_for_valid_file(self):
+        s = 'df test' * 14145016
+        o = self.get_cephfs_shell_cmd_output("put - dumpfile", stdin=s)
+        log.info("cephfs-shell output:\n{}".format(o))
+        self.validate_df("dumpfile")
+
+
+class TestQuota(TestCephFSShell):
+    dir_name = 'testdir'
+
+    def create_dir(self):
+        mount_output = self.get_cephfs_shell_cmd_output(
+            'mkdir ' + self.dir_name)
+        log.info("cephfs-shell mount output:\n{}".format(mount_output))
+
+    def set_and_get_quota_vals(self, input_val, check_status=True):
+        self.run_cephfs_shell_cmd(['quota', 'set', '--max_bytes',
+                                   input_val[0], '--max_files', input_val[1],
+                                   self.dir_name], check_status=check_status)
+
+        quota_output = self.get_cephfs_shell_cmd_output(
+            ['quota', 'get', self.dir_name],
+            check_status=check_status)
+
+        quota_output = quota_output.split()
+        return quota_output[1], quota_output[3]
+
+    def test_set(self):
+        self.create_dir()
+        set_values = ('6', '2')
+        self.assertTupleEqual(self.set_and_get_quota_vals(set_values),
+                              set_values)
+
+    def test_replace_values(self):
+        self.test_set()
+        set_values = ('20', '4')
+        self.assertTupleEqual(self.set_and_get_quota_vals(set_values),
+                              set_values)
+
+    def test_set_invalid_dir(self):
+        set_values = ('5', '5')
+        try:
+            self.assertTupleEqual(self.set_and_get_quota_vals(
+                set_values, False), set_values)
+            raise Exception(
+                "Something went wrong!! Values set for non existing directory")
+        except IndexError:
+            # Test should pass as values cannot be set for non
+            # existing directory
+            pass
+
+    def test_set_invalid_values(self):
+        self.create_dir()
+        set_values = ('-6', '-5')
+        try:
+            self.assertTupleEqual(self.set_and_get_quota_vals(set_values,
+                                                              False),
+                                  set_values)
+            raise Exception("Something went wrong!! Invalid values set")
+        except IndexError:
+            # Test should pass as invalid values cannot be set
+            pass
+
+    def test_exceed_file_limit(self):
+        self.test_set()
+        dir_abspath = path.join(self.mount_a.mountpoint, self.dir_name)
+        self.mount_a.run_shell_payload(f"touch {dir_abspath}/file1")
+        file2 = path.join(dir_abspath, "file2")
+        try:
+            self.mount_a.run_shell_payload(f"touch {file2}")
+            raise Exception(
+                "Something went wrong!! File creation should have failed")
+        except CommandFailedError:
+            # Test should pass as file quota set to 2
+            # Additional condition to confirm file creation failure
+            if not path.exists(file2):
+                return 0
+            raise
+
+    def test_exceed_write_limit(self):
+        self.test_set()
+        dir_abspath = path.join(self.mount_a.mountpoint, self.dir_name)
+        filename = 'test_file'
+        file_abspath = path.join(dir_abspath, filename)
+        try:
+            # Write should fail as bytes quota is set to 6
+            self.mount_a.client_remote.write_file(file_abspath,
+                                                  'Disk raise Exception')
+            raise Exception("Write should have failed")
+        except CommandFailedError:
+            # Test should pass only when write command fails
+            path_exists = path.exists(file_abspath)
+            if not path_exists:
+                # Testing with teuthology: No file is created.
+                return 0
+            elif path_exists and not path.getsize(file_abspath):
+                # Testing on Fedora 30: When write fails, empty
+                # file gets created.
+                return 0
+            else:
+                raise
+
+
+class TestXattr(TestCephFSShell):
+    dir_name = 'testdir'
+
+    def create_dir(self):
+        self.run_cephfs_shell_cmd('mkdir ' + self.dir_name)
+
+    def set_get_list_xattr_vals(self, input_val, negtest=False):
+        setxattr_output = self.get_cephfs_shell_cmd_output(
+            ['setxattr', self.dir_name, input_val[0], input_val[1]])
+        log.info("cephfs-shell setxattr output:\n{}".format(setxattr_output))
+
+        getxattr_output = self.get_cephfs_shell_cmd_output(
+            ['getxattr', self.dir_name, input_val[0]])
+        log.info("cephfs-shell getxattr output:\n{}".format(getxattr_output))
+
+        listxattr_output = self.get_cephfs_shell_cmd_output(
+            ['listxattr', self.dir_name])
+        log.info("cephfs-shell listxattr output:\n{}".format(listxattr_output))
+
+        return listxattr_output, getxattr_output
+
+    def test_set(self):
+        self.create_dir()
+        set_values = ('user.key', '2')
+        self.assertTupleEqual(self.set_get_list_xattr_vals(set_values),
+                              set_values)
+
+    def test_reset(self):
+        self.test_set()
+        set_values = ('user.key', '4')
+        self.assertTupleEqual(self.set_get_list_xattr_vals(set_values),
+                              set_values)
+
+    def test_non_existing_dir(self):
+        input_val = ('user.key', '9')
+        self.negtest_cephfs_shell_cmd(
+            cmd=['setxattr', self.dir_name, input_val[0],
+                 input_val[1]])
+        self.negtest_cephfs_shell_cmd(
+            cmd=['getxattr', self.dir_name, input_val[0]])
+        self.negtest_cephfs_shell_cmd(cmd=['listxattr', self.dir_name])
+
+
+class TestLS(TestCephFSShell):
+    dir_name = 'test_dir'
+    hidden_dir_name = '.test_hidden_dir'
+
+    def test_ls(self):
+        """ Test that ls prints files in CWD. """
+        self.run_cephfs_shell_cmd(f'mkdir {self.dir_name}')
+
+        ls_output = self.get_cephfs_shell_cmd_output("ls")
+        log.info(f"output of ls command:\n{ls_output}")
+
+        self.assertIn(self.dir_name, ls_output)
+
+    def test_ls_a(self):
+        """ Test ls -a prints hidden files in CWD."""
+
+        self.run_cephfs_shell_cmd(f'mkdir {self.hidden_dir_name}')
+
+        ls_a_output = self.get_cephfs_shell_cmd_output(['ls', '-a'])
+        log.info(f"output of ls -a command:\n{ls_a_output}")
+
+        self.assertIn(self.hidden_dir_name, ls_a_output)
+
+    def test_ls_does_not_print_hidden_dir(self):
+        """ Test ls command does not print hidden directory """
+
+        self.run_cephfs_shell_cmd(f'mkdir {self.hidden_dir_name}')
+
+        ls_output = self.get_cephfs_shell_cmd_output("ls")
+        log.info(f"output of ls command:\n{ls_output}")
+
+        self.assertNotIn(self.hidden_dir_name, ls_output)
+
+    def test_ls_a_prints_non_hidden_dir(self):
+        """ Test ls -a command prints non hidden directory """
+
+        self.run_cephfs_shell_cmd(
+            f'mkdir {self.hidden_dir_name} {self.dir_name}')
+
+        ls_a_output = self.get_cephfs_shell_cmd_output(['ls', '-a'])
+        log.info(f"output of ls -a command:\n{ls_a_output}")
+
+        self.assertIn(self.dir_name, ls_a_output)
+
+    def test_ls_H_prints_human_readable_file_size(self):
+        """ Test "ls -lH" prints human readable file size."""
+
+        file_sizes = ['1', '1K', '1M', '1G']
+        file_names = ['dump1', 'dump2', 'dump3', 'dump4']
+
+        for (file_size, file_name) in zip(file_sizes, file_names):
+            temp_file = self.mount_a.client_remote.mktemp(file_name)
+            self.mount_a.run_shell_payload(
+                f"fallocate -l {file_size} {temp_file}")
+            self.mount_a.run_shell_payload(f'mv {temp_file} ./')
+
+        ls_H_output = self.get_cephfs_shell_cmd_output(['ls', '-lH'])
+
+        ls_H_file_size = set()
+        for line in ls_H_output.split('\n'):
+            ls_H_file_size.add(line.split()[1])
+
+        # test that file sizes are in human readable format
+        self.assertEqual({'1B', '1K', '1M', '1G'}, ls_H_file_size)
+
+    def test_ls_s_sort_by_size(self):
+        """ Test "ls -S" sorts file listing by file_size """
+        test_file1 = "test_file1.txt"
+        test_file2 = "test_file2.txt"
+        file1_content = 'A' * 102
+        file2_content = 'B' * 10
+
+        self.run_cephfs_shell_cmd(f"write {test_file1}", stdin=file1_content)
+        self.run_cephfs_shell_cmd(f"write {test_file2}", stdin=file2_content)
+
+        ls_s_output = self.get_cephfs_shell_cmd_output(['ls', '-lS'])
+
+        file_sizes = []
+        for line in ls_s_output.split('\n'):
+            file_sizes.append(line.split()[1])
+
+        # test that file size are in ascending order
+        self.assertEqual(file_sizes, sorted(file_sizes))
+
+
+class TestMisc(TestCephFSShell):
+    def test_issue_cephfs_shell_cmd_at_invocation(self):
+        """
+        Test that `cephfs-shell -c conf cmd` works.
+        """
+        # choosing a long name since short ones have a higher probability
+        # of getting matched by coincidence.
+        dirname = 'somedirectory'
+        self.run_cephfs_shell_cmd(['mkdir', dirname])
+
+        output = self.mount_a.client_remote.sh(['cephfs-shell', 'ls']). \
+            strip()
+
+        self.assertRegex(output, dirname)
+
+    def test_help(self):
+        """
+        Test that help outputs commands.
+        """
+        o = self.get_cephfs_shell_cmd_output("help all")
+        log.info("output:\n{}".format(o))
+
+
+    def test_chmod(self):
+        """Test chmod is allowed above o0777 """
+        
+        test_file1 = "test_file2.txt"
+        file1_content = 'A' * 102
+        self.run_cephfs_shell_cmd(f"write {test_file1}", stdin=file1_content)
+        self.run_cephfs_shell_cmd(f"chmod 01777 {test_file1}")
+        
+class TestShellOpts(TestCephFSShell):
+    """
+    Contains tests for shell options from conf file and shell prompt.
+    """
+
+    def setUp(self):
+        super(type(self), self).setUp()
+
+        # output of following command -
+        # editor - was: 'vim'
+        # now: '?'
+        # editor: '?'
+        self.editor_val = self.get_cephfs_shell_cmd_output(
+            'set editor ?, set editor').split('\n')[2]
+        self.editor_val = self.editor_val.split(':')[1]. \
+            replace("'", "", 2).strip()
+
+    def write_tempconf(self, confcontents):
+        self.tempconfpath = self.mount_a.client_remote.mktemp(
+            suffix='cephfs-shell.conf')
+        self.mount_a.client_remote.write_file(self.tempconfpath,
+                                              confcontents)
+
+    def test_reading_conf(self):
+        self.write_tempconf("[cephfs-shell]\neditor =  ???")
+
+        # output of following command -
+        # CephFS:~/>>> set editor
+        # editor: 'vim'
+        final_editor_val = self.get_cephfs_shell_cmd_output(
+            cmd='set editor', shell_conf_path=self.tempconfpath)
+        final_editor_val = final_editor_val.split(': ')[1]
+        final_editor_val = final_editor_val.replace("'", "", 2)
+
+        self.assertNotEqual(self.editor_val, final_editor_val)
+
+    def test_reading_conf_with_dup_opt(self):
+        """
+        Read conf without duplicate sections/options.
+        """
+        self.write_tempconf("[cephfs-shell]\neditor = ???\neditor = " +
+                            self.editor_val)
+
+        # output of following command -
+        # CephFS:~/>>> set editor
+        # editor: 'vim'
+        final_editor_val = self.get_cephfs_shell_cmd_output(
+            cmd='set editor', shell_conf_path=self.tempconfpath)
+        final_editor_val = final_editor_val.split(': ')[1]
+        final_editor_val = final_editor_val.replace("'", "", 2)
+
+        self.assertEqual(self.editor_val, final_editor_val)
+
+    def test_setting_opt_after_reading_conf(self):
+        self.write_tempconf("[cephfs-shell]\neditor = ???")
+
+        # output of following command -
+        # editor - was: vim
+        # now: vim
+        # editor: vim
+        final_editor_val = self.get_cephfs_shell_cmd_output(
+            cmd='set editor %s, set editor' % self.editor_val,
+            shell_conf_path=self.tempconfpath)
+        final_editor_val = final_editor_val.split('\n')[2]
+        final_editor_val = final_editor_val.split(': ')[1]
+        final_editor_val = final_editor_val.replace("'", "", 2)
+
+        self.assertEqual(self.editor_val, final_editor_val)