# vim: expandtab smarttab shiftwidth=4 softtabstop=4
-from nose.tools import assert_raises, assert_equal, assert_not_equal, assert_greater, with_setup
+from assertions import assert_raises, assert_equal, assert_not_equal, assert_greater
import cephfs as libcephfs
import fcntl
import os
+import pytest
import random
import time
import stat
global cephfs
cephfs.shutdown()
-def setup_test():
+@pytest.fixture
+def testdir():
d = cephfs.opendir(b"/")
dent = cephfs.readdir(d)
while dent:
for xattr in xattrs[:-1]:
cephfs.removexattr("/", xattr)
-@with_setup(setup_test)
-def test_conf_get():
+def test_conf_get(testdir):
fsid = cephfs.conf_get("fsid")
assert(len(fsid) > 0)
-@with_setup(setup_test)
def test_version():
cephfs.version()
-@with_setup(setup_test)
-def test_fstat():
+def test_fstat(testdir):
fd = cephfs.open(b'file-1', 'w', 0o755)
stat = cephfs.fstat(fd)
assert(len(stat) == 13)
cephfs.close(fd)
-@with_setup(setup_test)
-def test_statfs():
+def test_statfs(testdir):
stat = cephfs.statfs(b'/')
assert(len(stat) == 11)
-@with_setup(setup_test)
-def test_statx():
+def test_statx(testdir):
stat = cephfs.statx(b'/', libcephfs.CEPH_STATX_MODE, 0)
assert('mode' in stat.keys())
stat = cephfs.statx(b'/', libcephfs.CEPH_STATX_BTIME, 0)
cephfs.unlink(b'file-2')
cephfs.unlink(b'file-1')
-@with_setup(setup_test)
-def test_syncfs():
+def test_syncfs(testdir):
stat = cephfs.sync_fs()
-@with_setup(setup_test)
-def test_fsync():
+def test_fsync(testdir):
fd = cephfs.open(b'file-1', 'w', 0o755)
cephfs.write(fd, b"asdf", 0)
stat = cephfs.fsync(fd, 0)
#sync on non-existing fd (assume fd 12345 is not exists)
assert_raises(libcephfs.Error, cephfs.fsync, 12345, 0)
-@with_setup(setup_test)
-def test_directory():
+def test_directory(testdir):
cephfs.mkdir(b"/temp-directory", 0o755)
cephfs.mkdirs(b"/temp-directory/foo/bar", 0o755)
cephfs.chdir(b"/temp-directory")
cephfs.rmdir(b"/temp-directory")
assert_raises(libcephfs.ObjectNotFound, cephfs.chdir, b"/temp-directory")
-@with_setup(setup_test)
-def test_walk_dir():
+def test_walk_dir(testdir):
cephfs.chdir(b"/")
dirs = [b"dir-1", b"dir-2", b"dir-3"]
for i in dirs:
cephfs.rmdir(i)
cephfs.closedir(handler)
-@with_setup(setup_test)
-def test_xattr():
+def test_xattr(testdir):
assert_raises(libcephfs.OperationNotSupported, cephfs.setxattr, "/", "key", b"value", 0)
cephfs.setxattr("/", "user.key", b"value", 0)
assert_equal(b"value", cephfs.getxattr("/", "user.key"))
assert_equal(9, ret_val)
assert_equal("user.big\x00", ret_buff.decode('utf-8'))
-@with_setup(setup_test)
-def test_ceph_mirror_xattr():
+def test_ceph_mirror_xattr(testdir):
def gen_mirror_xattr():
cluster_id = str(uuid.uuid4())
fs_id = random.randint(1, 10)
# check mirror info xattr format
assert_raises(libcephfs.InvalidValue, cephfs.setxattr, '/', 'ceph.mirror.info', b"unknown", 0)
-@with_setup(setup_test)
-def test_fxattr():
+def test_fxattr(testdir):
fd = cephfs.open(b'/file-fxattr', 'w', 0o755)
assert_raises(libcephfs.OperationNotSupported, cephfs.fsetxattr, fd, "key", b"value", 0)
assert_raises(TypeError, cephfs.fsetxattr, "fd", "user.key", b"value", 0)
cephfs.close(fd)
cephfs.unlink(b'/file-fxattr')
-@with_setup(setup_test)
-def test_rename():
+def test_rename(testdir):
cephfs.mkdir(b"/a", 0o755)
cephfs.mkdir(b"/a/b", 0o755)
cephfs.rename(b"/a", b"/b")
cephfs.rmdir(b"/b/b")
cephfs.rmdir(b"/b")
-@with_setup(setup_test)
-def test_open():
+def test_open(testdir):
assert_raises(libcephfs.ObjectNotFound, cephfs.open, b'file-1', 'r')
assert_raises(libcephfs.ObjectNotFound, cephfs.open, b'file-1', 'r+')
fd = cephfs.open(b'file-1', 'w', 0o755)
assert_raises(libcephfs.OperationNotSupported, cephfs.open, b'file-1', 'a')
cephfs.unlink(b'file-1')
-@with_setup(setup_test)
-def test_link():
+def test_link(testdir):
fd = cephfs.open(b'file-1', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
cephfs.close(fd)
cephfs.close(fd)
cephfs.unlink(b'file-2')
-@with_setup(setup_test)
-def test_symlink():
+def test_symlink(testdir):
fd = cephfs.open(b'file-1', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
cephfs.close(fd)
cephfs.close(fd)
cephfs.unlink(b'file-2')
-@with_setup(setup_test)
-def test_readlink():
+def test_readlink(testdir):
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
cephfs.close(fd)
cephfs.unlink(b'/file-2')
cephfs.unlink(b'/file-1')
-@with_setup(setup_test)
-def test_delete_cwd():
+def test_delete_cwd(testdir):
assert_equal(b"/", cephfs.getcwd())
cephfs.mkdir(b"/temp-directory", 0o755)
# whether it really still exists
assert_equal(b"/temp-directory", cephfs.getcwd())
-@with_setup(setup_test)
-def test_flock():
+def test_flock(testdir):
fd = cephfs.open(b'file-1', 'w', 0o755)
cephfs.flock(fd, fcntl.LOCK_EX, 123);
cephfs.close(fd)
-@with_setup(setup_test)
-def test_mount_unmount():
- test_directory()
+def test_mount_unmount(testdir):
+ test_directory(testdir)
cephfs.unmount()
cephfs.mount()
- test_open()
+ test_open(testdir)
-@with_setup(setup_test)
-def test_lxattr():
+def test_lxattr(testdir):
fd = cephfs.open(b'/file-lxattr', 'w', 0o755)
cephfs.close(fd)
cephfs.setxattr(b"/file-lxattr", "user.key", b"value", 0)
cephfs.unlink(b'/file-lxattr')
cephfs.unlink(b'/file-sym-lxattr')
-@with_setup(setup_test)
-def test_mount_root():
+def test_mount_root(testdir):
cephfs.mkdir(b"/mount-directory", 0o755)
cephfs.unmount()
cephfs.mount(mount_root = b"/mount-directory")
cephfs.unmount()
cephfs.mount()
-@with_setup(setup_test)
-def test_utime():
+def test_utime(testdir):
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
cephfs.unlink(b'/file-1')
-@with_setup(setup_test)
-def test_futime():
+def test_futime(testdir):
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
cephfs.unlink(b'/file-1')
-@with_setup(setup_test)
-def test_utimes():
+def test_utimes(testdir):
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
cephfs.unlink(b'/file-1')
-@with_setup(setup_test)
-def test_lutimes():
+def test_lutimes(testdir):
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
cephfs.unlink(b'/file-2')
cephfs.unlink(b'/file-1')
-@with_setup(setup_test)
-def test_futimes():
+def test_futimes(testdir):
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
cephfs.unlink(b'/file-1')
-@with_setup(setup_test)
-def test_futimens():
+def test_futimens(testdir):
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
cephfs.unlink(b'/file-1')
-@with_setup(setup_test)
-def test_lchmod():
+def test_lchmod(testdir):
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
cephfs.unlink(b'/file-2')
cephfs.unlink(b'/file-1')
-@with_setup(setup_test)
-def test_fchmod():
+def test_fchmod(testdir):
fd = cephfs.open(b'/file-fchmod', 'w', 0o655)
st = cephfs.statx(b'/file-fchmod', libcephfs.CEPH_STATX_MODE, 0)
mode = st["mode"] | stat.S_IXUSR
cephfs.close(fd)
cephfs.unlink(b'/file-fchmod')
-@with_setup(setup_test)
-def test_fchown():
+def test_fchown(testdir):
fd = cephfs.open(b'/file-fchown', 'w', 0o655)
uid = os.getuid()
gid = os.getgid()
cephfs.close(fd)
cephfs.unlink(b'/file-fchown')
-@with_setup(setup_test)
-def test_truncate():
+def test_truncate(testdir):
fd = cephfs.open(b'/file-truncate', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
cephfs.truncate(b'/file-truncate', 0)
cephfs.close(fd)
cephfs.unlink(b'/file-truncate')
-@with_setup(setup_test)
-def test_ftruncate():
+def test_ftruncate(testdir):
fd = cephfs.open(b'/file-ftruncate', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
assert_raises(TypeError, cephfs.ftruncate, b'/file-ftruncate', 0)
cephfs.close(fd)
cephfs.unlink(b'/file-ftruncate')
-@with_setup(setup_test)
-def test_fallocate():
+def test_fallocate(testdir):
fd = cephfs.open(b'/file-fallocate', 'w', 0o755)
assert_raises(TypeError, cephfs.fallocate, b'/file-fallocate', 0, 10)
cephfs.fallocate(fd, 0, 10)
cephfs.close(fd)
cephfs.unlink(b'/file-fallocate')
-@with_setup(setup_test)
-def test_mknod():
+def test_mknod(testdir):
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
cephfs.mknod(b'/file-fifo', mode)
st = cephfs.statx(b'/file-fifo', libcephfs.CEPH_STATX_MODE, 0)
assert_equal(st["mode"] & mode, mode)
cephfs.unlink(b'/file-fifo')
-@with_setup(setup_test)
-def test_lazyio():
+def test_lazyio(testdir):
fd = cephfs.open(b'/file-lazyio', 'w', 0o755)
assert_raises(TypeError, cephfs.lazyio, "fd", 1)
assert_raises(TypeError, cephfs.lazyio, fd, "1")
cephfs.close(fd)
cephfs.unlink(b'/file-lazyio')
-@with_setup(setup_test)
-def test_replication():
+def test_replication(testdir):
fd = cephfs.open(b'/file-rep', 'w', 0o755)
assert_raises(TypeError, cephfs.get_file_replication, "fd")
l_dict = cephfs.get_layout(fd)
cephfs.close(fd)
cephfs.unlink(b'/file-rep')
-@with_setup(setup_test)
-def test_caps():
+def test_caps(testdir):
fd = cephfs.open(b'/file-caps', 'w', 0o755)
timeout = cephfs.get_cap_return_timeout()
assert_equal(timeout, 300)
cephfs.close(fd)
cephfs.unlink(b'/file-caps')
-@with_setup(setup_test)
-def test_setuuid():
+def test_setuuid(testdir):
ses_id_uid = uuid.uuid1()
ses_id_str = str(ses_id_uid)
cephfs.set_uuid(ses_id_str)
-@with_setup(setup_test)
-def test_session_timeout():
+def test_session_timeout(testdir):
assert_raises(TypeError, cephfs.set_session_timeout, "300")
cephfs.set_session_timeout(300)
-@with_setup(setup_test)
-def test_readdirops():
+def test_readdirops(testdir):
cephfs.chdir(b"/")
dirs = [b"dir-1", b"dir-2", b"dir-3"]
for i in dirs:
cephfs.close(fd)
cephfs.unlink(b'file-1')
-@with_setup(setup_test)
-def test_setattrx():
+def test_setattrx(testdir):
fd = cephfs.open(b'file-setattrx', 'w', 0o655)
cephfs.write(fd, b"1111", 0)
cephfs.close(fd)
assert_equal(10, st1["size"])
cephfs.unlink(b'file-setattrx')
-@with_setup(setup_test)
-def test_fsetattrx():
+def test_fsetattrx(testdir):
fd = cephfs.open(b'file-fsetattrx', 'w', 0o655)
cephfs.write(fd, b"1111", 0)
st = cephfs.statx(b'file-fsetattrx', libcephfs.CEPH_STATX_MODE, 0)
cephfs.close(fd)
cephfs.unlink(b'file-fsetattrx')
-@with_setup(setup_test)
-def test_get_layout():
+def test_get_layout(testdir):
fd = cephfs.open(b'file-get-layout', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
assert_raises(TypeError, cephfs.get_layout, "fd")
cephfs.close(fd)
cephfs.unlink(b'file-get-layout')
-@with_setup(setup_test)
-def test_get_default_pool():
+def test_get_default_pool(testdir):
dp_dict = cephfs.get_default_pool()
assert('pool_id' in dp_dict.keys())
assert('pool_name' in dp_dict.keys())
-@with_setup(setup_test)
-def test_get_pool():
+def test_get_pool(testdir):
dp_dict = cephfs.get_default_pool()
assert('pool_id' in dp_dict.keys())
assert('pool_name' in dp_dict.keys())
size=int(s.split(" ")[-1])
assert_equal(cephfs.get_pool_replication(dp_dict["pool_id"]), size)
-@with_setup(setup_test)
-def test_disk_quota_exceeeded_error():
+def test_disk_quota_exceeeded_error(testdir):
cephfs.mkdir("/dir-1", 0o755)
cephfs.setxattr("/dir-1", "ceph.quota.max_bytes", b"5", 0)
fd = cephfs.open(b'/dir-1/file-1', 'w', 0o755)
cephfs.close(fd)
cephfs.unlink(b"/dir-1/file-1")
-@with_setup(setup_test)
-def test_empty_snapshot_info():
+def test_empty_snapshot_info(testdir):
cephfs.mkdir("/dir-1", 0o755)
# snap without metadata
# remove directory
cephfs.rmdir("/dir-1")
-@with_setup(setup_test)
-def test_snapshot_info():
+def test_snapshot_info(testdir):
cephfs.mkdir("/dir-1", 0o755)
# snap with custom metadata
# remove directory
cephfs.rmdir("/dir-1")
-@with_setup(setup_test)
-def test_set_mount_timeout_post_mount():
+def test_set_mount_timeout_post_mount(testdir):
assert_raises(libcephfs.LibCephFSStateError, cephfs.set_mount_timeout, 5)
-@with_setup(setup_test)
-def test_set_mount_timeout():
+def test_set_mount_timeout(testdir):
cephfs.unmount()
cephfs.set_mount_timeout(5)
cephfs.mount()
-@with_setup(setup_test)
-def test_set_mount_timeout_lt0():
+def test_set_mount_timeout_lt0(testdir):
cephfs.unmount()
assert_raises(libcephfs.InvalidValue, cephfs.set_mount_timeout, -5)
cephfs.mount()