]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/pybind/test_cephfs.py
update sources to v12.1.1
[ceph.git] / ceph / src / test / pybind / test_cephfs.py
1 # vim: expandtab smarttab shiftwidth=4 softtabstop=4
2 from nose.tools import assert_raises, assert_equal, with_setup
3 import cephfs as libcephfs
4 import fcntl
5 import os
6
7 cephfs = None
8
9 def setup_module():
10 global cephfs
11 cephfs = libcephfs.LibCephFS(conffile='')
12 cephfs.mount()
13
14 def teardown_module():
15 global cephfs
16 cephfs.shutdown()
17
18 def setup_test():
19 d = cephfs.opendir(b"/")
20 dent = cephfs.readdir(d)
21 while dent:
22 if (dent.d_name not in [b".", b".."]):
23 if dent.is_dir():
24 cephfs.rmdir(b"/" + dent.d_name)
25 else:
26 cephfs.unlink(b"/" + dent.d_name)
27
28 dent = cephfs.readdir(d)
29
30 cephfs.closedir(d)
31
32 cephfs.chdir(b"/")
33
34 @with_setup(setup_test)
35 def test_conf_get():
36 fsid = cephfs.conf_get("fsid")
37 assert(len(fsid) > 0)
38
39 @with_setup(setup_test)
40 def test_version():
41 cephfs.version()
42
43 @with_setup(setup_test)
44 def test_fstat():
45 fd = cephfs.open(b'file-1', 'w', 0o755)
46 stat = cephfs.fstat(fd)
47 assert(len(stat) == 13)
48 cephfs.close(fd)
49
50 @with_setup(setup_test)
51 def test_statfs():
52 stat = cephfs.statfs(b'/')
53 assert(len(stat) == 11)
54
55 @with_setup(setup_test)
56 def test_syncfs():
57 stat = cephfs.sync_fs()
58
59 @with_setup(setup_test)
60 def test_fsync():
61 fd = cephfs.open(b'file-1', 'w', 0o755)
62 cephfs.write(fd, b"asdf", 0)
63 stat = cephfs.fsync(fd, 0)
64 cephfs.write(fd, b"qwer", 0)
65 stat = cephfs.fsync(fd, 1)
66 cephfs.close(fd)
67 #sync on non-existing fd (assume fd 12345 is not exists)
68 assert_raises(libcephfs.Error, cephfs.fsync, 12345, 0)
69
70 @with_setup(setup_test)
71 def test_directory():
72 cephfs.mkdir(b"/temp-directory", 0o755)
73 cephfs.mkdirs(b"/temp-directory/foo/bar", 0o755)
74 cephfs.chdir(b"/temp-directory")
75 assert_equal(cephfs.getcwd(), b"/temp-directory")
76 cephfs.rmdir(b"/temp-directory/foo/bar")
77 cephfs.rmdir(b"/temp-directory/foo")
78 cephfs.rmdir(b"/temp-directory")
79 assert_raises(libcephfs.ObjectNotFound, cephfs.chdir, b"/temp-directory")
80
81 @with_setup(setup_test)
82 def test_walk_dir():
83 cephfs.chdir(b"/")
84 dirs = [b"dir-1", b"dir-2", b"dir-3"]
85 for i in dirs:
86 cephfs.mkdir(i, 0o755)
87 handler = cephfs.opendir(b"/")
88 d = cephfs.readdir(handler)
89 dirs += [b".", b".."]
90 while d:
91 assert(d.d_name in dirs)
92 dirs.remove(d.d_name)
93 d = cephfs.readdir(handler)
94 assert(len(dirs) == 0)
95 dirs = [b"/dir-1", b"/dir-2", b"/dir-3"]
96 for i in dirs:
97 cephfs.rmdir(i)
98 cephfs.closedir(handler)
99
100 @with_setup(setup_test)
101 def test_xattr():
102 assert_raises(libcephfs.OperationNotSupported, cephfs.setxattr, "/", "key", b"value", 0)
103 cephfs.setxattr("/", "user.key", b"value", 0)
104 assert_equal(b"value", cephfs.getxattr("/", "user.key"))
105
106 cephfs.setxattr("/", "user.big", b"x" * 300, 0)
107
108 # Default size is 255, get ERANGE
109 assert_raises(libcephfs.OutOfRange, cephfs.getxattr, "/", "user.big")
110
111 # Pass explicit size, and we'll get the value
112 assert_equal(300, len(cephfs.getxattr("/", "user.big", 300)))
113
114
115 @with_setup(setup_test)
116 def test_rename():
117 cephfs.mkdir(b"/a", 0o755)
118 cephfs.mkdir(b"/a/b", 0o755)
119 cephfs.rename(b"/a", b"/b")
120 cephfs.stat(b"/b/b")
121 cephfs.rmdir(b"/b/b")
122 cephfs.rmdir(b"/b")
123
124 @with_setup(setup_test)
125 def test_open():
126 assert_raises(libcephfs.ObjectNotFound, cephfs.open, b'file-1', 'r')
127 assert_raises(libcephfs.ObjectNotFound, cephfs.open, b'file-1', 'r+')
128 fd = cephfs.open(b'file-1', 'w', 0o755)
129 cephfs.write(fd, b"asdf", 0)
130 cephfs.close(fd)
131 fd = cephfs.open(b'file-1', 'r', 0o755)
132 assert_equal(cephfs.read(fd, 0, 4), b"asdf")
133 cephfs.close(fd)
134 fd = cephfs.open(b'file-1', 'r+', 0o755)
135 cephfs.write(fd, b"zxcv", 4)
136 assert_equal(cephfs.read(fd, 4, 8), b"zxcv")
137 cephfs.close(fd)
138 fd = cephfs.open(b'file-1', 'w+', 0o755)
139 assert_equal(cephfs.read(fd, 0, 4), b"")
140 cephfs.write(fd, b"zxcv", 4)
141 assert_equal(cephfs.read(fd, 4, 8), b"zxcv")
142 cephfs.close(fd)
143 fd = cephfs.open(b'file-1', os.O_RDWR, 0o755)
144 cephfs.write(fd, b"asdf", 0)
145 assert_equal(cephfs.read(fd, 0, 4), b"asdf")
146 cephfs.close(fd)
147 assert_raises(libcephfs.OperationNotSupported, cephfs.open, b'file-1', 'a')
148 cephfs.unlink(b'file-1')
149
150 @with_setup(setup_test)
151 def test_link():
152 fd = cephfs.open(b'file-1', 'w', 0o755)
153 cephfs.write(fd, b"1111", 0)
154 cephfs.close(fd)
155 cephfs.link(b'file-1', b'file-2')
156 fd = cephfs.open(b'file-2', 'r', 0o755)
157 assert_equal(cephfs.read(fd, 0, 4), b"1111")
158 cephfs.close(fd)
159 fd = cephfs.open(b'file-2', 'r+', 0o755)
160 cephfs.write(fd, b"2222", 4)
161 cephfs.close(fd)
162 fd = cephfs.open(b'file-1', 'r', 0o755)
163 assert_equal(cephfs.read(fd, 0, 8), b"11112222")
164 cephfs.close(fd)
165 cephfs.unlink(b'file-2')
166
167 @with_setup(setup_test)
168 def test_symlink():
169 fd = cephfs.open(b'file-1', 'w', 0o755)
170 cephfs.write(fd, b"1111", 0)
171 cephfs.close(fd)
172 cephfs.symlink(b'file-1', b'file-2')
173 fd = cephfs.open(b'file-2', 'r', 0o755)
174 assert_equal(cephfs.read(fd, 0, 4), b"1111")
175 cephfs.close(fd)
176 fd = cephfs.open(b'file-2', 'r+', 0o755)
177 cephfs.write(fd, b"2222", 4)
178 cephfs.close(fd)
179 fd = cephfs.open(b'file-1', 'r', 0o755)
180 assert_equal(cephfs.read(fd, 0, 8), b"11112222")
181 cephfs.close(fd)
182 cephfs.unlink(b'file-2')
183
184 @with_setup(setup_test)
185 def test_readlink():
186 fd = cephfs.open(b'/file-1', 'w', 0o755)
187 cephfs.write(fd, b"1111", 0)
188 cephfs.close(fd)
189 cephfs.symlink(b'/file-1', b'/file-2')
190 d = cephfs.readlink(b"/file-2",100)
191 assert_equal(d, b"/file-1")
192 cephfs.unlink(b'/file-2')
193 cephfs.unlink(b'/file-1')
194
195 @with_setup(setup_test)
196 def test_delete_cwd():
197 assert_equal(b"/", cephfs.getcwd())
198
199 cephfs.mkdir(b"/temp-directory", 0o755)
200 cephfs.chdir(b"/temp-directory")
201 cephfs.rmdir(b"/temp-directory")
202
203 # getcwd gives you something stale here: it remembers the path string
204 # even when things are unlinked. It's up to the caller to find out
205 # whether it really still exists
206 assert_equal(b"/temp-directory", cephfs.getcwd())
207
208 @with_setup(setup_test)
209 def test_flock():
210 fd = cephfs.open(b'file-1', 'w', 0o755)
211
212 cephfs.flock(fd, fcntl.LOCK_EX, 123);
213 fd2 = cephfs.open(b'file-1', 'w', 0o755)
214
215 assert_raises(libcephfs.WouldBlock, cephfs.flock, fd2,
216 fcntl.LOCK_EX | fcntl.LOCK_NB, 456);
217 cephfs.close(fd2)
218
219 cephfs.close(fd)
220
221 @with_setup(setup_test)
222 def test_mount_unmount():
223 test_directory()
224 cephfs.unmount()
225 cephfs.mount()
226 test_open()