# vim: expandtab smarttab shiftwidth=4 softtabstop=4
-from nose.tools import assert_raises, assert_equal, with_setup
+from nose.tools import assert_raises, assert_equal, assert_greater, with_setup
import cephfs as libcephfs
import fcntl
import os
+import time
+from datetime import datetime
cephfs = None
stat = cephfs.statfs(b'/')
assert(len(stat) == 11)
+@with_setup(setup_test)
+def test_statx():
+ stat = cephfs.statx(b'/', libcephfs.CEPH_STATX_MODE, 0)
+ assert('mode' in stat.keys())
+ stat = cephfs.statx(b'/', libcephfs.CEPH_STATX_BTIME, 0)
+ assert('btime' in stat.keys())
+
+ fd = cephfs.open(b'file-1', 'w', 0o755)
+ cephfs.write(fd, b"1111", 0)
+ cephfs.close(fd)
+ cephfs.symlink(b'file-1', b'file-2')
+ stat = cephfs.statx(b'file-2', libcephfs.CEPH_STATX_MODE | libcephfs.CEPH_STATX_BTIME, libcephfs.AT_SYMLINK_NOFOLLOW)
+ assert('mode' in stat.keys())
+ assert('btime' in stat.keys())
+ cephfs.unlink(b'file-2')
+ cephfs.unlink(b'file-1')
+
@with_setup(setup_test)
def test_syncfs():
stat = cephfs.sync_fs()
# Pass explicit size, and we'll get the value
assert_equal(300, len(cephfs.getxattr("/", "user.big", 300)))
+ cephfs.removexattr("/", "user.key")
+ # user.key is already removed
+ assert_raises(libcephfs.NoData, cephfs.getxattr, "/", "user.key")
+
+ # user.big is only listed
+ ret_val, ret_buff = cephfs.listxattr("/")
+ assert_equal(9, ret_val)
+ assert_equal("user.big\x00", ret_buff.decode('utf-8'))
@with_setup(setup_test)
def test_rename():
cephfs.mkdir(b"/mount-directory", 0o755)
cephfs.unmount()
cephfs.mount(mount_root = b"/mount-directory")
- cephfs.unmount()
assert_raises(libcephfs.Error, cephfs.mount, mount_root = b"/nowhere")
+ cephfs.unmount()
+ cephfs.mount()
+
+@with_setup(setup_test)
+def test_utime():
+ fd = cephfs.open(b'/file-1', 'w', 0o755)
+ cephfs.write(fd, b'0000', 0)
+ cephfs.close(fd)
+
+ stx_pre = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ time.sleep(1)
+ cephfs.utime(b'/file-1')
+
+ stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ assert_greater(stx_post['atime'], stx_pre['atime'])
+ assert_greater(stx_post['mtime'], stx_pre['mtime'])
+
+ atime_pre = int(time.mktime(stx_pre['atime'].timetuple()))
+ mtime_pre = int(time.mktime(stx_pre['mtime'].timetuple()))
+
+ cephfs.utime(b'/file-1', (atime_pre, mtime_pre))
+ stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ assert_equal(stx_post['atime'], stx_pre['atime'])
+ assert_equal(stx_post['mtime'], stx_pre['mtime'])
+
+ cephfs.unlink(b'/file-1')
+
+@with_setup(setup_test)
+def test_futime():
+ fd = cephfs.open(b'/file-1', 'w', 0o755)
+ cephfs.write(fd, b'0000', 0)
+
+ stx_pre = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ time.sleep(1)
+ cephfs.futime(fd)
+
+ stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ assert_greater(stx_post['atime'], stx_pre['atime'])
+ assert_greater(stx_post['mtime'], stx_pre['mtime'])
+
+ atime_pre = int(time.mktime(stx_pre['atime'].timetuple()))
+ mtime_pre = int(time.mktime(stx_pre['mtime'].timetuple()))
+
+ cephfs.futime(fd, (atime_pre, mtime_pre))
+ stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ assert_equal(stx_post['atime'], stx_pre['atime'])
+ assert_equal(stx_post['mtime'], stx_pre['mtime'])
+
+ cephfs.close(fd)
+ cephfs.unlink(b'/file-1')
+
+@with_setup(setup_test)
+def test_utimes():
+ fd = cephfs.open(b'/file-1', 'w', 0o755)
+ cephfs.write(fd, b'0000', 0)
+ cephfs.close(fd)
+
+ stx_pre = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ time.sleep(1)
+ cephfs.utimes(b'/file-1')
+
+ stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ assert_greater(stx_post['atime'], stx_pre['atime'])
+ assert_greater(stx_post['mtime'], stx_pre['mtime'])
+
+ atime_pre = time.mktime(stx_pre['atime'].timetuple())
+ mtime_pre = time.mktime(stx_pre['mtime'].timetuple())
+ cephfs.utimes(b'/file-1', (atime_pre, mtime_pre))
+ stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ assert_equal(stx_post['atime'], stx_pre['atime'])
+ assert_equal(stx_post['mtime'], stx_pre['mtime'])
+
+ cephfs.unlink(b'/file-1')
+
+@with_setup(setup_test)
+def test_lutimes():
+ fd = cephfs.open(b'/file-1', 'w', 0o755)
+ cephfs.write(fd, b'0000', 0)
+ cephfs.close(fd)
+
+ cephfs.symlink(b'/file-1', b'/file-2')
+
+ stx_pre_t = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+ stx_pre_s = cephfs.statx(b'/file-2', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, libcephfs.AT_SYMLINK_NOFOLLOW)
+
+ time.sleep(1)
+ cephfs.lutimes(b'/file-2')
+
+ stx_post_t = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+ stx_post_s = cephfs.statx(b'/file-2', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, libcephfs.AT_SYMLINK_NOFOLLOW)
+
+ assert_equal(stx_post_t['atime'], stx_pre_t['atime'])
+ assert_equal(stx_post_t['mtime'], stx_pre_t['mtime'])
+
+ assert_greater(stx_post_s['atime'], stx_pre_s['atime'])
+ assert_greater(stx_post_s['mtime'], stx_pre_s['mtime'])
+
+ atime_pre = time.mktime(stx_pre_s['atime'].timetuple())
+ mtime_pre = time.mktime(stx_pre_s['mtime'].timetuple())
+
+ cephfs.lutimes(b'/file-2', (atime_pre, mtime_pre))
+ stx_post_s = cephfs.statx(b'/file-2', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, libcephfs.AT_SYMLINK_NOFOLLOW)
+
+ assert_equal(stx_post_s['atime'], stx_pre_s['atime'])
+ assert_equal(stx_post_s['mtime'], stx_pre_s['mtime'])
+
+ cephfs.unlink(b'/file-2')
+ cephfs.unlink(b'/file-1')
+
+@with_setup(setup_test)
+def test_futimes():
+ fd = cephfs.open(b'/file-1', 'w', 0o755)
+ cephfs.write(fd, b'0000', 0)
+
+ stx_pre = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ time.sleep(1)
+ cephfs.futimes(fd)
+
+ stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ assert_greater(stx_post['atime'], stx_pre['atime'])
+ assert_greater(stx_post['mtime'], stx_pre['mtime'])
+
+ atime_pre = time.mktime(stx_pre['atime'].timetuple())
+ mtime_pre = time.mktime(stx_pre['mtime'].timetuple())
+
+ cephfs.futimes(fd, (atime_pre, mtime_pre))
+ stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ assert_equal(stx_post['atime'], stx_pre['atime'])
+ assert_equal(stx_post['mtime'], stx_pre['mtime'])
+
+ cephfs.close(fd)
+ cephfs.unlink(b'/file-1')
+
+@with_setup(setup_test)
+def test_futimens():
+ fd = cephfs.open(b'/file-1', 'w', 0o755)
+ cephfs.write(fd, b'0000', 0)
+
+ stx_pre = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ time.sleep(1)
+ cephfs.futimens(fd)
+
+ stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ assert_greater(stx_post['atime'], stx_pre['atime'])
+ assert_greater(stx_post['mtime'], stx_pre['mtime'])
+
+ atime_pre = time.mktime(stx_pre['atime'].timetuple())
+ mtime_pre = time.mktime(stx_pre['mtime'].timetuple())
+
+ cephfs.futimens(fd, (atime_pre, mtime_pre))
+ stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+ assert_equal(stx_post['atime'], stx_pre['atime'])
+ assert_equal(stx_post['mtime'], stx_pre['mtime'])
+
+ cephfs.close(fd)
+ cephfs.unlink(b'/file-1')