-from nose.tools import assert_raises, assert_equal, assert_not_equal, assert_greater, with_setup
+from assertions import assert_raises, assert_equal, assert_not_equal, assert_greater
for xattr in xattrs[:-1]:
cephfs.removexattr("/", xattr)
for xattr in xattrs[:-1]:
cephfs.removexattr("/", xattr)
stat = cephfs.statx(b'/', libcephfs.CEPH_STATX_MODE, 0)
assert('mode' in stat.keys())
stat = cephfs.statx(b'/', libcephfs.CEPH_STATX_BTIME, 0)
stat = cephfs.statx(b'/', libcephfs.CEPH_STATX_MODE, 0)
assert('mode' in stat.keys())
stat = cephfs.statx(b'/', libcephfs.CEPH_STATX_BTIME, 0)
fd = cephfs.open(b'file-1', 'w', 0o755)
cephfs.write(fd, b"asdf", 0)
stat = cephfs.fsync(fd, 0)
fd = cephfs.open(b'file-1', 'w', 0o755)
cephfs.write(fd, b"asdf", 0)
stat = cephfs.fsync(fd, 0)
#sync on non-existing fd (assume fd 12345 is not exists)
assert_raises(libcephfs.Error, cephfs.fsync, 12345, 0)
#sync on non-existing fd (assume fd 12345 is not exists)
assert_raises(libcephfs.Error, cephfs.fsync, 12345, 0)
cephfs.mkdir(b"/temp-directory", 0o755)
cephfs.mkdirs(b"/temp-directory/foo/bar", 0o755)
cephfs.chdir(b"/temp-directory")
cephfs.mkdir(b"/temp-directory", 0o755)
cephfs.mkdirs(b"/temp-directory/foo/bar", 0o755)
cephfs.chdir(b"/temp-directory")
cephfs.rmdir(b"/temp-directory")
assert_raises(libcephfs.ObjectNotFound, cephfs.chdir, b"/temp-directory")
cephfs.rmdir(b"/temp-directory")
assert_raises(libcephfs.ObjectNotFound, cephfs.chdir, b"/temp-directory")
assert_raises(libcephfs.OperationNotSupported, cephfs.setxattr, "/", "key", b"value", 0)
cephfs.setxattr("/", "user.key", b"value", 0)
assert_equal(b"value", cephfs.getxattr("/", "user.key"))
assert_raises(libcephfs.OperationNotSupported, cephfs.setxattr, "/", "key", b"value", 0)
cephfs.setxattr("/", "user.key", b"value", 0)
assert_equal(b"value", cephfs.getxattr("/", "user.key"))
def gen_mirror_xattr():
cluster_id = str(uuid.uuid4())
fs_id = random.randint(1, 10)
def gen_mirror_xattr():
cluster_id = str(uuid.uuid4())
fs_id = random.randint(1, 10)
# check mirror info xattr format
assert_raises(libcephfs.InvalidValue, cephfs.setxattr, '/', 'ceph.mirror.info', b"unknown", 0)
# check mirror info xattr format
assert_raises(libcephfs.InvalidValue, cephfs.setxattr, '/', 'ceph.mirror.info', b"unknown", 0)
fd = cephfs.open(b'/file-fxattr', 'w', 0o755)
assert_raises(libcephfs.OperationNotSupported, cephfs.fsetxattr, fd, "key", b"value", 0)
assert_raises(TypeError, cephfs.fsetxattr, "fd", "user.key", b"value", 0)
fd = cephfs.open(b'/file-fxattr', 'w', 0o755)
assert_raises(libcephfs.OperationNotSupported, cephfs.fsetxattr, fd, "key", b"value", 0)
assert_raises(TypeError, cephfs.fsetxattr, "fd", "user.key", b"value", 0)
cephfs.mkdir(b"/a", 0o755)
cephfs.mkdir(b"/a/b", 0o755)
cephfs.rename(b"/a", b"/b")
cephfs.mkdir(b"/a", 0o755)
cephfs.mkdir(b"/a/b", 0o755)
cephfs.rename(b"/a", b"/b")
assert_raises(libcephfs.ObjectNotFound, cephfs.open, b'file-1', 'r')
assert_raises(libcephfs.ObjectNotFound, cephfs.open, b'file-1', 'r+')
fd = cephfs.open(b'file-1', 'w', 0o755)
assert_raises(libcephfs.ObjectNotFound, cephfs.open, b'file-1', 'r')
assert_raises(libcephfs.ObjectNotFound, cephfs.open, b'file-1', 'r+')
fd = cephfs.open(b'file-1', 'w', 0o755)
assert_raises(libcephfs.OperationNotSupported, cephfs.open, b'file-1', 'a')
cephfs.unlink(b'file-1')
assert_raises(libcephfs.OperationNotSupported, cephfs.open, b'file-1', 'a')
cephfs.unlink(b'file-1')
fd = cephfs.open(b'file-1', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
cephfs.close(fd)
fd = cephfs.open(b'file-1', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
cephfs.close(fd)
fd = cephfs.open(b'file-1', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
cephfs.close(fd)
fd = cephfs.open(b'file-1', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
cephfs.close(fd)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
cephfs.close(fd)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
cephfs.close(fd)
assert_equal(b"/", cephfs.getcwd())
cephfs.mkdir(b"/temp-directory", 0o755)
assert_equal(b"/", cephfs.getcwd())
cephfs.mkdir(b"/temp-directory", 0o755)
# whether it really still exists
assert_equal(b"/temp-directory", cephfs.getcwd())
# whether it really still exists
assert_equal(b"/temp-directory", cephfs.getcwd())
fd = cephfs.open(b'file-1', 'w', 0o755)
cephfs.flock(fd, fcntl.LOCK_EX, 123);
fd = cephfs.open(b'file-1', 'w', 0o755)
cephfs.flock(fd, fcntl.LOCK_EX, 123);
fd = cephfs.open(b'/file-lxattr', 'w', 0o755)
cephfs.close(fd)
cephfs.setxattr(b"/file-lxattr", "user.key", b"value", 0)
fd = cephfs.open(b'/file-lxattr', 'w', 0o755)
cephfs.close(fd)
cephfs.setxattr(b"/file-lxattr", "user.key", b"value", 0)
cephfs.mkdir(b"/mount-directory", 0o755)
cephfs.unmount()
cephfs.mount(mount_root = b"/mount-directory")
cephfs.mkdir(b"/mount-directory", 0o755)
cephfs.unmount()
cephfs.mount(mount_root = b"/mount-directory")
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
fd = cephfs.open(b'/file-1', 'w', 0o755)
cephfs.write(fd, b'0000', 0)
cephfs.close(fd)
fd = cephfs.open(b'/file-fchmod', 'w', 0o655)
st = cephfs.statx(b'/file-fchmod', libcephfs.CEPH_STATX_MODE, 0)
mode = st["mode"] | stat.S_IXUSR
fd = cephfs.open(b'/file-fchmod', 'w', 0o655)
st = cephfs.statx(b'/file-fchmod', libcephfs.CEPH_STATX_MODE, 0)
mode = st["mode"] | stat.S_IXUSR
fd = cephfs.open(b'/file-truncate', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
cephfs.truncate(b'/file-truncate', 0)
fd = cephfs.open(b'/file-truncate', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
cephfs.truncate(b'/file-truncate', 0)
fd = cephfs.open(b'/file-ftruncate', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
assert_raises(TypeError, cephfs.ftruncate, b'/file-ftruncate', 0)
fd = cephfs.open(b'/file-ftruncate', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
assert_raises(TypeError, cephfs.ftruncate, b'/file-ftruncate', 0)
fd = cephfs.open(b'/file-fallocate', 'w', 0o755)
assert_raises(TypeError, cephfs.fallocate, b'/file-fallocate', 0, 10)
cephfs.fallocate(fd, 0, 10)
fd = cephfs.open(b'/file-fallocate', 'w', 0o755)
assert_raises(TypeError, cephfs.fallocate, b'/file-fallocate', 0, 10)
cephfs.fallocate(fd, 0, 10)
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
cephfs.mknod(b'/file-fifo', mode)
st = cephfs.statx(b'/file-fifo', libcephfs.CEPH_STATX_MODE, 0)
assert_equal(st["mode"] & mode, mode)
cephfs.unlink(b'/file-fifo')
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
cephfs.mknod(b'/file-fifo', mode)
st = cephfs.statx(b'/file-fifo', libcephfs.CEPH_STATX_MODE, 0)
assert_equal(st["mode"] & mode, mode)
cephfs.unlink(b'/file-fifo')
fd = cephfs.open(b'/file-lazyio', 'w', 0o755)
assert_raises(TypeError, cephfs.lazyio, "fd", 1)
assert_raises(TypeError, cephfs.lazyio, fd, "1")
fd = cephfs.open(b'/file-lazyio', 'w', 0o755)
assert_raises(TypeError, cephfs.lazyio, "fd", 1)
assert_raises(TypeError, cephfs.lazyio, fd, "1")
fd = cephfs.open(b'/file-rep', 'w', 0o755)
assert_raises(TypeError, cephfs.get_file_replication, "fd")
l_dict = cephfs.get_layout(fd)
fd = cephfs.open(b'/file-rep', 'w', 0o755)
assert_raises(TypeError, cephfs.get_file_replication, "fd")
l_dict = cephfs.get_layout(fd)
fd = cephfs.open(b'/file-caps', 'w', 0o755)
timeout = cephfs.get_cap_return_timeout()
assert_equal(timeout, 300)
fd = cephfs.open(b'/file-caps', 'w', 0o755)
timeout = cephfs.get_cap_return_timeout()
assert_equal(timeout, 300)
ses_id_uid = uuid.uuid1()
ses_id_str = str(ses_id_uid)
cephfs.set_uuid(ses_id_str)
ses_id_uid = uuid.uuid1()
ses_id_str = str(ses_id_uid)
cephfs.set_uuid(ses_id_str)
assert_raises(TypeError, cephfs.set_session_timeout, "300")
cephfs.set_session_timeout(300)
assert_raises(TypeError, cephfs.set_session_timeout, "300")
cephfs.set_session_timeout(300)
fd = cephfs.open(b'file-setattrx', 'w', 0o655)
cephfs.write(fd, b"1111", 0)
cephfs.close(fd)
fd = cephfs.open(b'file-setattrx', 'w', 0o655)
cephfs.write(fd, b"1111", 0)
cephfs.close(fd)
assert_equal(10, st1["size"])
cephfs.unlink(b'file-setattrx')
assert_equal(10, st1["size"])
cephfs.unlink(b'file-setattrx')
fd = cephfs.open(b'file-fsetattrx', 'w', 0o655)
cephfs.write(fd, b"1111", 0)
st = cephfs.statx(b'file-fsetattrx', libcephfs.CEPH_STATX_MODE, 0)
fd = cephfs.open(b'file-fsetattrx', 'w', 0o655)
cephfs.write(fd, b"1111", 0)
st = cephfs.statx(b'file-fsetattrx', libcephfs.CEPH_STATX_MODE, 0)
fd = cephfs.open(b'file-get-layout', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
assert_raises(TypeError, cephfs.get_layout, "fd")
fd = cephfs.open(b'file-get-layout', 'w', 0o755)
cephfs.write(fd, b"1111", 0)
assert_raises(TypeError, cephfs.get_layout, "fd")
dp_dict = cephfs.get_default_pool()
assert('pool_id' in dp_dict.keys())
assert('pool_name' in dp_dict.keys())
dp_dict = cephfs.get_default_pool()
assert('pool_id' in dp_dict.keys())
assert('pool_name' in dp_dict.keys())
dp_dict = cephfs.get_default_pool()
assert('pool_id' in dp_dict.keys())
assert('pool_name' in dp_dict.keys())
dp_dict = cephfs.get_default_pool()
assert('pool_id' in dp_dict.keys())
assert('pool_name' in dp_dict.keys())
size=int(s.split(" ")[-1])
assert_equal(cephfs.get_pool_replication(dp_dict["pool_id"]), size)
size=int(s.split(" ")[-1])
assert_equal(cephfs.get_pool_replication(dp_dict["pool_id"]), size)
cephfs.mkdir("/dir-1", 0o755)
cephfs.setxattr("/dir-1", "ceph.quota.max_bytes", b"5", 0)
fd = cephfs.open(b'/dir-1/file-1', 'w', 0o755)
cephfs.mkdir("/dir-1", 0o755)
cephfs.setxattr("/dir-1", "ceph.quota.max_bytes", b"5", 0)
fd = cephfs.open(b'/dir-1/file-1', 'w', 0o755)
cephfs.mkdir("/dir-1", 0o755)
# snap with custom metadata
cephfs.mkdir("/dir-1", 0o755)
# snap with custom metadata