# vim: expandtab smarttab shiftwidth=4 softtabstop=4
from assertions import assert_raises, assert_equal, assert_not_equal, assert_greater
+import collections
+collections.Callable = collections.abc.Callable
import cephfs as libcephfs
import fcntl
import os
global cephfs
cephfs.shutdown()
-@pytest.fixture
-def testdir():
- d = cephfs.opendir(b"/")
+def purge_dir(path, is_snap = False):
+ print(b"Purge " + path)
+ d = cephfs.opendir(path)
+ if (not path.endswith(b"/")):
+ path = path + b"/"
dent = cephfs.readdir(d)
while dent:
if (dent.d_name not in [b".", b".."]):
+ print(path + dent.d_name)
if dent.is_dir():
- cephfs.rmdir(b"/" + dent.d_name)
+ if (not is_snap):
+ try:
+ snappath = path + dent.d_name + b"/.snap"
+ cephfs.stat(snappath)
+ purge_dir(snappath, True)
+ except:
+ pass
+ purge_dir(path + dent.d_name, False)
+ cephfs.rmdir(path + dent.d_name)
+ else:
+ print("rmsnap on {} snap {}".format(path, dent.d_name))
+ cephfs.rmsnap(path, dent.d_name);
else:
- cephfs.unlink(b"/" + dent.d_name)
-
+ cephfs.unlink(path + dent.d_name)
dent = cephfs.readdir(d)
-
cephfs.closedir(d)
+@pytest.fixture
+def testdir():
+ purge_dir(b"/")
+
cephfs.chdir(b"/")
_, ret_buf = cephfs.listxattr("/")
print(f'ret_buf={ret_buf}')
cephfs.unmount()
assert_raises(libcephfs.InvalidValue, cephfs.set_mount_timeout, -5)
cephfs.mount()
+
+def test_snapdiff(testdir):
+ cephfs.mkdir("/snapdiff_test", 0o755)
+ fd = cephfs.open('/snapdiff_test/file-1', 'w', 0o755)
+ cephfs.write(fd, b"1111", 0)
+ cephfs.close(fd)
+ fd = cephfs.open('/snapdiff_test/file-2', 'w', 0o755)
+ cephfs.write(fd, b"2222", 0)
+ cephfs.close(fd)
+ cephfs.mksnap("/snapdiff_test", "snap1", 0o755)
+ fd = cephfs.open('/snapdiff_test/file-1', 'w', 0o755)
+ cephfs.write(fd, b"1222", 0)
+ cephfs.close(fd)
+ cephfs.unlink('/snapdiff_test/file-2')
+ cephfs.mksnap("/snapdiff_test", "snap2", 0o755)
+ snap1id = cephfs.snap_info(b"/snapdiff_test/.snap/snap1")['id']
+ snap2id = cephfs.snap_info(b"/snapdiff_test/.snap/snap2")['id']
+ diff = cephfs.opensnapdiff(b"/snapdiff_test", b"/", b"snap2", b"snap1")
+ cnt = 0
+ e = diff.readdir()
+ while e is not None:
+ if (e.d_name == b"file-1"):
+ cnt = cnt + 1
+ assert_equal(snap2id, e.d_snapid)
+ elif (e.d_name == b"file-2"):
+ cnt = cnt + 1
+ assert_equal(snap1id, e.d_snapid)
+ elif (e.d_name != b"." and e.d_name != b".."):
+ cnt = cnt + 1
+ e = diff.readdir()
+ assert_equal(cnt, 2)
+ diff.close()
+
+ # remove directory
+ purge_dir(b"/snapdiff_test");