]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/pybind/test_cephfs.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / pybind / test_cephfs.py
index 584d150eaeddd2745bc038f0e8ae2f965fdb4e8e..e633ee0cd01767f566f0b7bdc02ae1a377ee5b08 100644 (file)
@@ -1,8 +1,10 @@
 # vim: expandtab smarttab shiftwidth=4 softtabstop=4
-from nose.tools import assert_raises, assert_equal, with_setup
+from nose.tools import assert_raises, assert_equal, assert_greater, with_setup
 import cephfs as libcephfs
 import fcntl
 import os
+import time
+from datetime import datetime
 
 cephfs = None
 
@@ -52,6 +54,23 @@ def test_statfs():
     stat = cephfs.statfs(b'/')
     assert(len(stat) == 11)
 
+@with_setup(setup_test)
+def test_statx():
+    stat = cephfs.statx(b'/', libcephfs.CEPH_STATX_MODE, 0)
+    assert('mode' in stat.keys())
+    stat = cephfs.statx(b'/', libcephfs.CEPH_STATX_BTIME, 0)
+    assert('btime' in stat.keys())
+    
+    fd = cephfs.open(b'file-1', 'w', 0o755)
+    cephfs.write(fd, b"1111", 0)
+    cephfs.close(fd)
+    cephfs.symlink(b'file-1', b'file-2')
+    stat = cephfs.statx(b'file-2', libcephfs.CEPH_STATX_MODE | libcephfs.CEPH_STATX_BTIME, libcephfs.AT_SYMLINK_NOFOLLOW)
+    assert('mode' in stat.keys())
+    assert('btime' in stat.keys())
+    cephfs.unlink(b'file-2')
+    cephfs.unlink(b'file-1')
+
 @with_setup(setup_test)
 def test_syncfs():
     stat = cephfs.sync_fs()
@@ -111,6 +130,14 @@ def test_xattr():
     # Pass explicit size, and we'll get the value
     assert_equal(300, len(cephfs.getxattr("/", "user.big", 300)))
 
+    cephfs.removexattr("/", "user.key")
+    # user.key is already removed
+    assert_raises(libcephfs.NoData, cephfs.getxattr, "/", "user.key")
+
+    # user.big is only listed
+    ret_val, ret_buff = cephfs.listxattr("/")
+    assert_equal(9, ret_val)
+    assert_equal("user.big\x00", ret_buff.decode('utf-8'))
 
 @with_setup(setup_test)
 def test_rename():
@@ -230,7 +257,177 @@ def test_mount_root():
     cephfs.mkdir(b"/mount-directory", 0o755)
     cephfs.unmount()
     cephfs.mount(mount_root = b"/mount-directory")
-    cephfs.unmount()
 
     assert_raises(libcephfs.Error, cephfs.mount, mount_root = b"/nowhere")
+    cephfs.unmount()
+    cephfs.mount()
+
+@with_setup(setup_test)
+def test_utime():
+    fd = cephfs.open(b'/file-1', 'w', 0o755)
+    cephfs.write(fd, b'0000', 0)
+    cephfs.close(fd)
+
+    stx_pre = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    time.sleep(1)
+    cephfs.utime(b'/file-1')
+
+    stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    assert_greater(stx_post['atime'], stx_pre['atime'])
+    assert_greater(stx_post['mtime'], stx_pre['mtime'])
+
+    atime_pre = int(time.mktime(stx_pre['atime'].timetuple()))
+    mtime_pre = int(time.mktime(stx_pre['mtime'].timetuple()))
+
+    cephfs.utime(b'/file-1', (atime_pre, mtime_pre))
+    stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    assert_equal(stx_post['atime'], stx_pre['atime'])
+    assert_equal(stx_post['mtime'], stx_pre['mtime'])
+
+    cephfs.unlink(b'/file-1')
+
+@with_setup(setup_test)
+def test_futime():
+    fd = cephfs.open(b'/file-1', 'w', 0o755)
+    cephfs.write(fd, b'0000', 0)
+
+    stx_pre = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    time.sleep(1)
+    cephfs.futime(fd)
+
+    stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    assert_greater(stx_post['atime'], stx_pre['atime'])
+    assert_greater(stx_post['mtime'], stx_pre['mtime'])
+
+    atime_pre = int(time.mktime(stx_pre['atime'].timetuple()))
+    mtime_pre = int(time.mktime(stx_pre['mtime'].timetuple()))
+
+    cephfs.futime(fd, (atime_pre, mtime_pre))
+    stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    assert_equal(stx_post['atime'], stx_pre['atime'])
+    assert_equal(stx_post['mtime'], stx_pre['mtime'])
+
+    cephfs.close(fd)
+    cephfs.unlink(b'/file-1')
+
+@with_setup(setup_test)
+def test_utimes():
+    fd = cephfs.open(b'/file-1', 'w', 0o755)
+    cephfs.write(fd, b'0000', 0)
+    cephfs.close(fd)
+
+    stx_pre = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    time.sleep(1)
+    cephfs.utimes(b'/file-1')
+
+    stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    assert_greater(stx_post['atime'], stx_pre['atime'])
+    assert_greater(stx_post['mtime'], stx_pre['mtime'])
+
+    atime_pre = time.mktime(stx_pre['atime'].timetuple())
+    mtime_pre = time.mktime(stx_pre['mtime'].timetuple())
 
+    cephfs.utimes(b'/file-1', (atime_pre, mtime_pre))
+    stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    assert_equal(stx_post['atime'], stx_pre['atime'])
+    assert_equal(stx_post['mtime'], stx_pre['mtime'])
+
+    cephfs.unlink(b'/file-1')
+
+@with_setup(setup_test)
+def test_lutimes():
+    fd = cephfs.open(b'/file-1', 'w', 0o755)
+    cephfs.write(fd, b'0000', 0)
+    cephfs.close(fd)
+
+    cephfs.symlink(b'/file-1', b'/file-2')
+
+    stx_pre_t = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+    stx_pre_s = cephfs.statx(b'/file-2', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, libcephfs.AT_SYMLINK_NOFOLLOW)
+
+    time.sleep(1)
+    cephfs.lutimes(b'/file-2')
+
+    stx_post_t = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+    stx_post_s = cephfs.statx(b'/file-2', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, libcephfs.AT_SYMLINK_NOFOLLOW)
+
+    assert_equal(stx_post_t['atime'], stx_pre_t['atime'])
+    assert_equal(stx_post_t['mtime'], stx_pre_t['mtime'])
+
+    assert_greater(stx_post_s['atime'], stx_pre_s['atime'])
+    assert_greater(stx_post_s['mtime'], stx_pre_s['mtime'])
+
+    atime_pre = time.mktime(stx_pre_s['atime'].timetuple())
+    mtime_pre = time.mktime(stx_pre_s['mtime'].timetuple())
+
+    cephfs.lutimes(b'/file-2', (atime_pre, mtime_pre))
+    stx_post_s = cephfs.statx(b'/file-2', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, libcephfs.AT_SYMLINK_NOFOLLOW)
+
+    assert_equal(stx_post_s['atime'], stx_pre_s['atime'])
+    assert_equal(stx_post_s['mtime'], stx_pre_s['mtime'])
+
+    cephfs.unlink(b'/file-2')
+    cephfs.unlink(b'/file-1')
+
+@with_setup(setup_test)
+def test_futimes():
+    fd = cephfs.open(b'/file-1', 'w', 0o755)
+    cephfs.write(fd, b'0000', 0)
+
+    stx_pre = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    time.sleep(1)
+    cephfs.futimes(fd)
+
+    stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    assert_greater(stx_post['atime'], stx_pre['atime'])
+    assert_greater(stx_post['mtime'], stx_pre['mtime'])
+
+    atime_pre = time.mktime(stx_pre['atime'].timetuple())
+    mtime_pre = time.mktime(stx_pre['mtime'].timetuple())
+
+    cephfs.futimes(fd, (atime_pre, mtime_pre))
+    stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    assert_equal(stx_post['atime'], stx_pre['atime'])
+    assert_equal(stx_post['mtime'], stx_pre['mtime'])
+
+    cephfs.close(fd)
+    cephfs.unlink(b'/file-1')
+
+@with_setup(setup_test)
+def test_futimens():
+    fd = cephfs.open(b'/file-1', 'w', 0o755)
+    cephfs.write(fd, b'0000', 0)
+
+    stx_pre = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    time.sleep(1)
+    cephfs.futimens(fd)
+
+    stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    assert_greater(stx_post['atime'], stx_pre['atime'])
+    assert_greater(stx_post['mtime'], stx_pre['mtime'])
+
+    atime_pre = time.mktime(stx_pre['atime'].timetuple())
+    mtime_pre = time.mktime(stx_pre['mtime'].timetuple())
+
+    cephfs.futimens(fd, (atime_pre, mtime_pre))
+    stx_post = cephfs.statx(b'/file-1', libcephfs.CEPH_STATX_ATIME | libcephfs.CEPH_STATX_MTIME, 0)
+
+    assert_equal(stx_post['atime'], stx_pre['atime'])
+    assert_equal(stx_post['mtime'], stx_pre['mtime'])
+
+    cephfs.close(fd)
+    cephfs.unlink(b'/file-1')