]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/libcephfs/lazyio.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / libcephfs / lazyio.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2019 Red Hat Ltd
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "gtest/gtest.h"
16 #include "include/cephfs/libcephfs.h"
17 #include "include/rados/librados.h"
18 #include <errno.h>
19 #include <fcntl.h>
20 #include <unistd.h>
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <dirent.h>
24 #if defined(__linux__)
25 #include <sys/xattr.h>
26 #endif
27
28 rados_t cluster;
29
30 TEST(LibCephFS, LazyIOOneWriterMulipleReaders) {
31 struct ceph_mount_info *ca, *cb;
32 ASSERT_EQ(ceph_create(&ca, NULL), 0);
33 ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
34 ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
35 ASSERT_EQ(ceph_mount(ca, NULL), 0);
36
37 ASSERT_EQ(ceph_create(&cb, NULL), 0);
38 ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
39 ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
40 ASSERT_EQ(ceph_mount(cb, NULL), 0);
41
42 char name[20];
43 snprintf(name, sizeof(name), "foo.%d", getpid());
44
45 int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
46 ASSERT_LE(0, fda);
47
48 int fdb = ceph_open(cb, name, O_RDONLY, 0644);
49 ASSERT_LE(0, fdb);
50
51 ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
52 ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
53
54 char out_buf[] = "fooooooooo";
55
56 /* Client a issues a write and propagates/flushes the buffer */
57 ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 0));
58 ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
59
60 /* Client a issues a write and propagates/flushes the buffer */
61 ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 10));
62 ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
63
64 char in_buf[40];
65 /* Calling ceph_lazyio_synchronize here will invalidate client b's cache and hence enable client a to fetch the propagated write of client a in the subsequent read */
66 ASSERT_EQ(0, ceph_lazyio_synchronize(cb, fdb, 0, 0));
67 ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
68 ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
69
70 /* Client a does not need to call ceph_lazyio_synchronize here because it is the latest writer and fda holds the updated inode*/
71 ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
72 ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
73
74 ceph_close(ca, fda);
75 ceph_close(cb, fdb);
76
77 ceph_shutdown(ca);
78 ceph_shutdown(cb);
79 }
80
81 TEST(LibCephFS, LazyIOMultipleWritersMulipleReaders) {
82 struct ceph_mount_info *ca, *cb;
83 ASSERT_EQ(ceph_create(&ca, NULL), 0);
84 ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
85 ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
86 ASSERT_EQ(ceph_mount(ca, NULL), 0);
87
88 ASSERT_EQ(ceph_create(&cb, NULL), 0);
89 ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
90 ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
91 ASSERT_EQ(ceph_mount(cb, NULL), 0);
92
93 char name[20];
94 snprintf(name, sizeof(name), "foo2.%d", getpid());
95
96 int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
97 ASSERT_LE(0, fda);
98
99 int fdb = ceph_open(cb, name, O_RDWR, 0644);
100 ASSERT_LE(0, fdb);
101
102 ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
103 ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
104
105 char out_buf[] = "fooooooooo";
106 /* Client a issues a write and propagates/flushes the buffer */
107 ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 0));
108 ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
109
110 /* Client b issues a write and propagates/flushes the buffer*/
111 ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 10));
112 ASSERT_EQ(0, ceph_lazyio_propagate(cb, fdb, 0, 0));
113
114 char in_buf[40];
115 /* Calling ceph_lazyio_synchronize here will invalidate client a's cache and hence enable client a to fetch the propagated writes of client b in the subsequent read */
116 ASSERT_EQ(0, ceph_lazyio_synchronize(ca, fda, 0, 0));
117 ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
118 ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
119
120 /* Client b does not need to call ceph_lazyio_synchronize here because it is the latest writer and the writes before it have already been propagated*/
121 ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
122 ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
123
124 /* Client a issues a write */
125 char wait_out_buf[] = "foobarbars";
126 ASSERT_EQ((int)sizeof(wait_out_buf), ceph_write(ca, fda, wait_out_buf, sizeof(wait_out_buf), 20));
127 ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
128
129 /* Client a does not need to call ceph_lazyio_synchronize here because it is the latest writer and the writes before it have already been propagated*/
130 ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), (2*(strlen(out_buf)))+strlen(wait_out_buf)+1);
131 ASSERT_STREQ(in_buf, "fooooooooofooooooooofoobarbars");
132
133 /* Calling ceph_lazyio_synchronize here will invalidate client b's cache and hence enable client a to fetch the propagated write of client a in the subsequent read */
134 ASSERT_EQ(0, ceph_lazyio_synchronize(cb, fdb, 0, 0));
135 ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), (2*(strlen(out_buf)))+strlen(wait_out_buf)+1);
136 ASSERT_STREQ(in_buf, "fooooooooofooooooooofoobarbars");
137
138 ceph_close(ca, fda);
139 ceph_close(cb, fdb);
140
141 ceph_shutdown(ca);
142 ceph_shutdown(cb);
143 }
144
145 TEST(LibCephFS, LazyIOMultipleWritersOneReader) {
146 struct ceph_mount_info *ca, *cb;
147 ASSERT_EQ(ceph_create(&ca, NULL), 0);
148 ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
149 ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
150 ASSERT_EQ(ceph_mount(ca, NULL), 0);
151
152 ASSERT_EQ(ceph_create(&cb, NULL), 0);
153 ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
154 ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
155 ASSERT_EQ(ceph_mount(cb, NULL), 0);
156
157 char name[20];
158 snprintf(name, sizeof(name), "foo3.%d", getpid());
159
160 int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
161 ASSERT_LE(0, fda);
162
163 int fdb = ceph_open(cb, name, O_RDWR, 0644);
164 ASSERT_LE(0, fdb);
165
166 ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
167 ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
168
169 char out_buf[] = "fooooooooo";
170 /* Client a issues a write and propagates/flushes the buffer */
171 ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 0));
172 ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
173
174 /* Client b issues a write and propagates/flushes the buffer*/
175 ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 10));
176 ASSERT_EQ(0, ceph_lazyio_propagate(cb, fdb, 0, 0));
177
178 char in_buf[40];
179 /* Client a reads the file and verifies that it only reads it's propagated writes and not Client b's*/
180 ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), strlen(out_buf)+1);
181 ASSERT_STREQ(in_buf, "fooooooooo");
182
183 /* Client a reads the file again, this time with a lazyio_synchronize to check if the cache gets invalidated and data is refetched i.e all the propagated writes are being read*/
184 ASSERT_EQ(0, ceph_lazyio_synchronize(ca, fda, 0, 0));
185 ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
186 ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
187
188 ceph_close(ca, fda);
189 ceph_close(cb, fdb);
190
191 ceph_shutdown(ca);
192 ceph_shutdown(cb);
193 }
194
195 TEST(LibCephFS, LazyIOSynchronizeFlush) {
196 /* Test to make sure lazyio_synchronize flushes dirty buffers */
197 struct ceph_mount_info *ca, *cb;
198 ASSERT_EQ(ceph_create(&ca, NULL), 0);
199 ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
200 ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
201 ASSERT_EQ(ceph_mount(ca, NULL), 0);
202
203 ASSERT_EQ(ceph_create(&cb, NULL), 0);
204 ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
205 ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
206 ASSERT_EQ(ceph_mount(cb, NULL), 0);
207
208 char name[20];
209 snprintf(name, sizeof(name), "foo4.%d", getpid());
210
211 int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
212 ASSERT_LE(0, fda);
213
214 int fdb = ceph_open(cb, name, O_RDWR, 0644);
215 ASSERT_LE(0, fdb);
216
217 ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
218 ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
219
220 char out_buf[] = "fooooooooo";
221
222 /* Client a issues a write and propagates it*/
223 ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 0));
224 ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
225
226 /* Client b issues writes and without lazyio_propagate*/
227 ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 10));
228 ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 20));
229
230 char in_buf[40];
231 /* Calling ceph_lazyio_synchronize here will first flush the possibly pending buffered write of client b and invalidate client b's cache and hence enable client b to fetch all the propagated writes */
232 ASSERT_EQ(0, ceph_lazyio_synchronize(cb, fdb, 0, 0));
233 ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), 3*strlen(out_buf)+1);
234 ASSERT_STREQ(in_buf, "fooooooooofooooooooofooooooooo");
235
236 /* Required to call ceph_lazyio_synchronize here since client b is the latest writer and client a is out of sync with updated file*/
237 ASSERT_EQ(0, ceph_lazyio_synchronize(ca, fda, 0, 0));
238 ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), 3*strlen(out_buf)+1);
239 ASSERT_STREQ(in_buf, "fooooooooofooooooooofooooooooo");
240
241 ceph_close(ca, fda);
242 ceph_close(cb, fdb);
243
244 ceph_shutdown(ca);
245 ceph_shutdown(cb);
246 }
247
248 TEST(LibCephFS, WithoutandWithLazyIO) {
249 struct ceph_mount_info *ca, *cb;
250 ASSERT_EQ(ceph_create(&ca, NULL), 0);
251 ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
252 ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
253 ASSERT_EQ(ceph_mount(ca, NULL), 0);
254
255 ASSERT_EQ(ceph_create(&cb, NULL), 0);
256 ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
257 ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
258 ASSERT_EQ(ceph_mount(cb, NULL), 0);
259
260 char name[20];
261 snprintf(name, sizeof(name), "foo5.%d", getpid());
262
263 int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
264 ASSERT_LE(0, fda);
265
266 int fdb = ceph_open(cb, name, O_RDWR, 0644);
267 ASSERT_LE(0, fdb);
268
269 char out_buf_w[] = "1234567890";
270 /* Doing some non lazyio writes and read*/
271 ASSERT_EQ((int)sizeof(out_buf_w), ceph_write(ca, fda, out_buf_w, sizeof(out_buf_w), 0));
272
273 ASSERT_EQ((int)sizeof(out_buf_w), ceph_write(cb, fdb, out_buf_w, sizeof(out_buf_w), 10));
274
275 char in_buf_w[30];
276 ASSERT_EQ(ceph_read(ca, fda, in_buf_w, sizeof(in_buf_w), 0), 2*strlen(out_buf_w)+1);
277
278 /* Enable lazyio*/
279 ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
280 ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
281
282 char out_buf[] = "fooooooooo";
283
284 /* Client a issues a write and propagates/flushes the buffer*/
285 ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 20));
286 ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
287
288 /* Client b issues a write and propagates/flushes the buffer*/
289 ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 30));
290 ASSERT_EQ(0, ceph_lazyio_propagate(cb, fdb, 0, 0));
291
292 char in_buf[50];
293 /* Calling ceph_lazyio_synchronize here will invalidate client a's cache and hence enable client a to fetch the propagated writes of client b in the subsequent read */
294 ASSERT_EQ(0, ceph_lazyio_synchronize(ca, fda, 0, 0));
295 ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), (2*(strlen(out_buf)))+(2*(strlen(out_buf_w)))+1);
296 ASSERT_STREQ(in_buf, "12345678901234567890fooooooooofooooooooo");
297
298 /* Client b does not need to call ceph_lazyio_synchronize here because it is the latest writer and the writes before it have already been propagated*/
299 ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), (2*(strlen(out_buf)))+(2*(strlen(out_buf_w)))+1);
300 ASSERT_STREQ(in_buf, "12345678901234567890fooooooooofooooooooo");
301
302 ceph_close(ca, fda);
303 ceph_close(cb, fdb);
304
305 ceph_shutdown(ca);
306 ceph_shutdown(cb);
307 }
308
309 static int update_root_mode()
310 {
311 struct ceph_mount_info *admin;
312 int r = ceph_create(&admin, NULL);
313 if (r < 0)
314 return r;
315 ceph_conf_read_file(admin, NULL);
316 ceph_conf_parse_env(admin, NULL);
317 ceph_conf_set(admin, "client_permissions", "false");
318 r = ceph_mount(admin, "/");
319 if (r < 0)
320 goto out;
321 r = ceph_chmod(admin, "/", 0777);
322 out:
323 ceph_shutdown(admin);
324 return r;
325 }
326
327 int main(int argc, char **argv)
328 {
329 int r = update_root_mode();
330 if (r < 0)
331 exit(1);
332
333 ::testing::InitGoogleTest(&argc, argv);
334
335 srand(getpid());
336
337 r = rados_create(&cluster, NULL);
338 if (r < 0)
339 exit(1);
340
341 r = rados_conf_read_file(cluster, NULL);
342 if (r < 0)
343 exit(1);
344
345 rados_conf_parse_env(cluster, NULL);
346 r = rados_connect(cluster);
347 if (r < 0)
348 exit(1);
349
350 r = RUN_ALL_TESTS();
351
352 rados_shutdown(cluster);
353
354 return r;
355 }