]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/libcephfs/recordlock.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / test / libcephfs / recordlock.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2011 New Dream Network
7 * 2016 Red Hat
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include <pthread.h>
17 #include "gtest/gtest.h"
18 #ifndef GTEST_IS_THREADSAFE
19 #error "!GTEST_IS_THREADSAFE"
20 #endif
21
22 #include "include/cephfs/libcephfs.h"
23 #include <errno.h>
24 #include <sys/fcntl.h>
25 #include <unistd.h>
26 #include <sys/file.h>
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <dirent.h>
30 #include <sys/xattr.h>
31
32 #include <stdlib.h>
33 #include <semaphore.h>
34 #include <time.h>
35 #include <sys/mman.h>
36
37 #ifdef __linux__
38 #include <limits.h>
39 #endif
40
41 #include "include/ceph_assert.h"
42
43 // Startup common: create and mount ceph fs
44 #define STARTUP_CEPH() do { \
45 ASSERT_EQ(0, ceph_create(&cmount, NULL)); \
46 ASSERT_EQ(0, ceph_conf_read_file(cmount, NULL)); \
47 ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL)); \
48 ASSERT_EQ(0, ceph_mount(cmount, NULL)); \
49 } while(0)
50
51 // Cleanup common: unmount and release ceph fs
52 #define CLEANUP_CEPH() do { \
53 ASSERT_EQ(0, ceph_unmount(cmount)); \
54 ASSERT_EQ(0, ceph_release(cmount)); \
55 } while(0)
56
57 static const mode_t fileMode = S_IRWXU | S_IRWXG | S_IRWXO;
58
59 // Default wait time for normal and "slow" operations
60 // (5" should be enough in case of network congestion)
61 static const long waitMs = 10;
62 static const long waitSlowMs = 5000;
63
64 // Get the absolute struct timespec reference from now + 'ms' milliseconds
65 static const struct timespec* abstime(struct timespec &ts, long ms) {
66 if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
67 ceph_abort();
68 }
69 ts.tv_nsec += ms * 1000000;
70 ts.tv_sec += ts.tv_nsec / 1000000000;
71 ts.tv_nsec %= 1000000000;
72 return &ts;
73 }
74
75 /* Basic locking */
76
77 TEST(LibCephFS, BasicRecordLocking) {
78 struct ceph_mount_info *cmount = NULL;
79 STARTUP_CEPH();
80
81 char c_file[1024];
82 sprintf(c_file, "recordlock_test_%d", getpid());
83 Fh *fh = NULL;
84 Inode *root = NULL, *inode = NULL;
85 struct ceph_statx stx;
86 int rc;
87 struct flock lock1, lock2;
88 UserPerm *perms = ceph_mount_perms(cmount);
89
90 // Get the root inode
91 rc = ceph_ll_lookup_root(cmount, &root);
92 ASSERT_EQ(rc, 0);
93
94 // Get the inode and Fh corresponding to c_file
95 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
96 &inode, &fh, &stx, 0, 0, perms);
97 ASSERT_EQ(rc, 0);
98
99 // write lock twice
100 lock1.l_type = F_WRLCK;
101 lock1.l_whence = SEEK_SET;
102 lock1.l_start = 0;
103 lock1.l_len = 1024;
104 lock1.l_pid = getpid();
105 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
106
107 lock2.l_type = F_WRLCK;
108 lock2.l_whence = SEEK_SET;
109 lock2.l_start = 0;
110 lock2.l_len = 1024;
111 lock2.l_pid = getpid();
112 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock2, 43, false));
113
114 // Now try a conflicting read lock
115 lock2.l_type = F_RDLCK;
116 lock2.l_whence = SEEK_SET;
117 lock2.l_start = 100;
118 lock2.l_len = 100;
119 lock2.l_pid = getpid();
120 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock2, 43, false));
121
122 // Now do a getlk
123 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
124 ASSERT_EQ(lock2.l_type, F_WRLCK);
125 ASSERT_EQ(lock2.l_start, 0);
126 ASSERT_EQ(lock2.l_len, 1024);
127 ASSERT_EQ(lock2.l_pid, getpid());
128
129 // Extend the range of the write lock
130 lock1.l_type = F_WRLCK;
131 lock1.l_whence = SEEK_SET;
132 lock1.l_start = 1024;
133 lock1.l_len = 1024;
134 lock1.l_pid = getpid();
135 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
136
137 // Now do a getlk
138 lock2.l_type = F_RDLCK;
139 lock2.l_whence = SEEK_SET;
140 lock2.l_start = 100;
141 lock2.l_len = 100;
142 lock2.l_pid = getpid();
143 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
144 ASSERT_EQ(lock2.l_type, F_WRLCK);
145 ASSERT_EQ(lock2.l_start, 0);
146 ASSERT_EQ(lock2.l_len, 2048);
147 ASSERT_EQ(lock2.l_pid, getpid());
148
149 // Now release part of the range
150 lock1.l_type = F_UNLCK;
151 lock1.l_whence = SEEK_SET;
152 lock1.l_start = 512;
153 lock1.l_len = 1024;
154 lock1.l_pid = getpid();
155 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
156
157 // Now do a getlk to check 1st part
158 lock2.l_type = F_RDLCK;
159 lock2.l_whence = SEEK_SET;
160 lock2.l_start = 100;
161 lock2.l_len = 100;
162 lock2.l_pid = getpid();
163 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
164 ASSERT_EQ(lock2.l_type, F_WRLCK);
165 ASSERT_EQ(lock2.l_start, 0);
166 ASSERT_EQ(lock2.l_len, 512);
167 ASSERT_EQ(lock2.l_pid, getpid());
168
169 // Now do a getlk to check 2nd part
170 lock2.l_type = F_RDLCK;
171 lock2.l_whence = SEEK_SET;
172 lock2.l_start = 2000;
173 lock2.l_len = 100;
174 lock2.l_pid = getpid();
175 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
176 ASSERT_EQ(lock2.l_type, F_WRLCK);
177 ASSERT_EQ(lock2.l_start, 1536);
178 ASSERT_EQ(lock2.l_len, 512);
179 ASSERT_EQ(lock2.l_pid, getpid());
180
181 // Now do a getlk to check released part
182 lock2.l_type = F_RDLCK;
183 lock2.l_whence = SEEK_SET;
184 lock2.l_start = 512;
185 lock2.l_len = 1024;
186 lock2.l_pid = getpid();
187 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
188 ASSERT_EQ(lock2.l_type, F_UNLCK);
189 ASSERT_EQ(lock2.l_start, 512);
190 ASSERT_EQ(lock2.l_len, 1024);
191 ASSERT_EQ(lock2.l_pid, getpid());
192
193 // Now downgrade the 1st part of the lock
194 lock1.l_type = F_RDLCK;
195 lock1.l_whence = SEEK_SET;
196 lock1.l_start = 0;
197 lock1.l_len = 512;
198 lock1.l_pid = getpid();
199 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
200
201 // Now do a getlk to check 1st part
202 lock2.l_type = F_WRLCK;
203 lock2.l_whence = SEEK_SET;
204 lock2.l_start = 100;
205 lock2.l_len = 100;
206 lock2.l_pid = getpid();
207 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
208 ASSERT_EQ(lock2.l_type, F_RDLCK);
209 ASSERT_EQ(lock2.l_start, 0);
210 ASSERT_EQ(lock2.l_len, 512);
211 ASSERT_EQ(lock2.l_pid, getpid());
212
213 // Now upgrade the 1st part of the lock
214 lock1.l_type = F_WRLCK;
215 lock1.l_whence = SEEK_SET;
216 lock1.l_start = 0;
217 lock1.l_len = 512;
218 lock1.l_pid = getpid();
219 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
220
221 // Now do a getlk to check 1st part
222 lock2.l_type = F_WRLCK;
223 lock2.l_whence = SEEK_SET;
224 lock2.l_start = 100;
225 lock2.l_len = 100;
226 lock2.l_pid = getpid();
227 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
228 ASSERT_EQ(lock2.l_type, F_WRLCK);
229 ASSERT_EQ(lock2.l_start, 0);
230 ASSERT_EQ(lock2.l_len, 512);
231 ASSERT_EQ(lock2.l_pid, getpid());
232
233 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
234 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
235 CLEANUP_CEPH();
236 }
237
238 /* Locking in different threads */
239
240 // Used by ConcurrentLocking test
241 struct str_ConcurrentRecordLocking {
242 const char *file;
243 struct ceph_mount_info *cmount; // !NULL if shared
244 sem_t sem[2];
245 sem_t semReply[2];
246 void sem_init(int pshared) {
247 ASSERT_EQ(0, ::sem_init(&sem[0], pshared, 0));
248 ASSERT_EQ(0, ::sem_init(&sem[1], pshared, 0));
249 ASSERT_EQ(0, ::sem_init(&semReply[0], pshared, 0));
250 ASSERT_EQ(0, ::sem_init(&semReply[1], pshared, 0));
251 }
252 void sem_destroy() {
253 ASSERT_EQ(0, ::sem_destroy(&sem[0]));
254 ASSERT_EQ(0, ::sem_destroy(&sem[1]));
255 ASSERT_EQ(0, ::sem_destroy(&semReply[0]));
256 ASSERT_EQ(0, ::sem_destroy(&semReply[1]));
257 }
258 };
259
260 // Wakeup main (for (N) steps)
261 #define PING_MAIN(n) ASSERT_EQ(0, sem_post(&s.sem[n%2]))
262 // Wait for main to wake us up (for (RN) steps)
263 #define WAIT_MAIN(n) \
264 ASSERT_EQ(0, sem_timedwait(&s.semReply[n%2], abstime(ts, waitSlowMs)))
265
266 // Wakeup worker (for (RN) steps)
267 #define PING_WORKER(n) ASSERT_EQ(0, sem_post(&s.semReply[n%2]))
268 // Wait for worker to wake us up (for (N) steps)
269 #define WAIT_WORKER(n) \
270 ASSERT_EQ(0, sem_timedwait(&s.sem[n%2], abstime(ts, waitSlowMs)))
271 // Worker shall not wake us up (for (N) steps)
272 #define NOT_WAIT_WORKER(n) \
273 ASSERT_EQ(-1, sem_timedwait(&s.sem[n%2], abstime(ts, waitMs)))
274
275 // Do twice an operation
276 #define TWICE(EXPR) do { \
277 EXPR; \
278 EXPR; \
279 } while(0)
280
281 /* Locking in different threads */
282
283 // Used by ConcurrentLocking test
284 static void thread_ConcurrentRecordLocking(str_ConcurrentRecordLocking& s) {
285 struct ceph_mount_info *const cmount = s.cmount;
286 Fh *fh = NULL;
287 Inode *root = NULL, *inode = NULL;
288 struct ceph_statx stx;
289 struct flock lock1;
290 int rc;
291 struct timespec ts;
292
293 // Get the root inode
294 rc = ceph_ll_lookup_root(cmount, &root);
295 ASSERT_EQ(rc, 0);
296
297 // Get the inode and Fh corresponding to c_file
298 rc = ceph_ll_create(cmount, root, s.file, fileMode, O_RDWR | O_CREAT,
299 &inode, &fh, &stx, 0, 0, ceph_mount_perms(cmount));
300 ASSERT_EQ(rc, 0);
301
302 lock1.l_type = F_WRLCK;
303 lock1.l_whence = SEEK_SET;
304 lock1.l_start = 0;
305 lock1.l_len = 1024;
306 lock1.l_pid = getpid();
307 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
308
309 PING_MAIN(1); // (1)
310 lock1.l_type = F_WRLCK;
311 lock1.l_whence = SEEK_SET;
312 lock1.l_start = 0;
313 lock1.l_len = 1024;
314 lock1.l_pid = getpid();
315 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
316 PING_MAIN(2); // (2)
317
318 lock1.l_type = F_UNLCK;
319 lock1.l_whence = SEEK_SET;
320 lock1.l_start = 0;
321 lock1.l_len = 1024;
322 lock1.l_pid = getpid();
323 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
324 PING_MAIN(3); // (3)
325
326 lock1.l_type = F_RDLCK;
327 lock1.l_whence = SEEK_SET;
328 lock1.l_start = 0;
329 lock1.l_len = 1024;
330 lock1.l_pid = getpid();
331 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
332 PING_MAIN(4); // (4)
333
334 WAIT_MAIN(1); // (R1)
335 lock1.l_type = F_UNLCK;
336 lock1.l_whence = SEEK_SET;
337 lock1.l_start = 0;
338 lock1.l_len = 1024;
339 lock1.l_pid = getpid();
340 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
341 PING_MAIN(5); // (5)
342
343 WAIT_MAIN(2); // (R2)
344 lock1.l_type = F_WRLCK;
345 lock1.l_whence = SEEK_SET;
346 lock1.l_start = 0;
347 lock1.l_len = 1024;
348 lock1.l_pid = getpid();
349 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
350 PING_MAIN(6); // (6)
351
352 WAIT_MAIN(3); // (R3)
353 lock1.l_type = F_UNLCK;
354 lock1.l_whence = SEEK_SET;
355 lock1.l_start = 0;
356 lock1.l_len = 1024;
357 lock1.l_pid = getpid();
358 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
359 PING_MAIN(7); // (7)
360
361 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
362 }
363
364 // Used by ConcurrentRecordLocking test
365 static void* thread_ConcurrentRecordLocking_(void *arg) {
366 str_ConcurrentRecordLocking *const s =
367 reinterpret_cast<str_ConcurrentRecordLocking*>(arg);
368 thread_ConcurrentRecordLocking(*s);
369 return NULL;
370 }
371
372 TEST(LibCephFS, ConcurrentRecordLocking) {
373 const pid_t mypid = getpid();
374 struct ceph_mount_info *cmount;
375 STARTUP_CEPH();
376
377 char c_file[1024];
378 sprintf(c_file, "recordlock_test_%d", mypid);
379 Fh *fh = NULL;
380 Inode *root = NULL, *inode = NULL;
381 struct ceph_statx stx;
382 struct flock lock1;
383 int rc;
384 UserPerm *perms = ceph_mount_perms(cmount);
385
386 // Get the root inode
387 rc = ceph_ll_lookup_root(cmount, &root);
388 ASSERT_EQ(rc, 0);
389
390 // Get the inode and Fh corresponding to c_file
391 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
392 &inode, &fh, &stx, 0, 0, perms);
393 ASSERT_EQ(rc, 0);
394
395 // Lock
396 lock1.l_type = F_WRLCK;
397 lock1.l_whence = SEEK_SET;
398 lock1.l_start = 0;
399 lock1.l_len = 1024;
400 lock1.l_pid = getpid();
401 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
402
403 // Start locker thread
404 pthread_t thread;
405 struct timespec ts;
406 str_ConcurrentRecordLocking s = { c_file, cmount };
407 s.sem_init(0);
408 ASSERT_EQ(0, pthread_create(&thread, NULL, thread_ConcurrentRecordLocking_, &s));
409 // Synchronization point with thread (failure: thread is dead)
410 WAIT_WORKER(1); // (1)
411
412 // Shall not have lock immediately
413 NOT_WAIT_WORKER(2); // (2)
414
415 // Unlock
416 lock1.l_type = F_UNLCK;
417 lock1.l_whence = SEEK_SET;
418 lock1.l_start = 0;
419 lock1.l_len = 1024;
420 lock1.l_pid = getpid();
421 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
422
423 // Shall have lock
424 // Synchronization point with thread (failure: thread is dead)
425 WAIT_WORKER(2); // (2)
426
427 // Synchronization point with thread (failure: thread is dead)
428 WAIT_WORKER(3); // (3)
429
430 // Wait for thread to share lock
431 WAIT_WORKER(4); // (4)
432 lock1.l_type = F_WRLCK;
433 lock1.l_whence = SEEK_SET;
434 lock1.l_start = 0;
435 lock1.l_len = 1024;
436 lock1.l_pid = getpid();
437 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
438 lock1.l_type = F_RDLCK;
439 lock1.l_whence = SEEK_SET;
440 lock1.l_start = 0;
441 lock1.l_len = 1024;
442 lock1.l_pid = getpid();
443 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
444
445 // Wake up thread to unlock shared lock
446 PING_WORKER(1); // (R1)
447 WAIT_WORKER(5); // (5)
448
449 // Now we can lock exclusively
450 // Upgrade to exclusive lock (as per POSIX)
451 lock1.l_type = F_WRLCK;
452 lock1.l_whence = SEEK_SET;
453 lock1.l_start = 0;
454 lock1.l_len = 1024;
455 lock1.l_pid = getpid();
456 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
457
458 // Wake up thread to lock shared lock
459 PING_WORKER(2); // (R2)
460
461 // Shall not have lock immediately
462 NOT_WAIT_WORKER(6); // (6)
463
464 // Release lock ; thread will get it
465 lock1.l_type = F_UNLCK;
466 lock1.l_whence = SEEK_SET;
467 lock1.l_start = 0;
468 lock1.l_len = 1024;
469 lock1.l_pid = getpid();
470 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
471 WAIT_WORKER(6); // (6)
472
473 // We no longer have the lock
474 lock1.l_type = F_WRLCK;
475 lock1.l_whence = SEEK_SET;
476 lock1.l_start = 0;
477 lock1.l_len = 1024;
478 lock1.l_pid = getpid();
479 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
480 lock1.l_type = F_RDLCK;
481 lock1.l_whence = SEEK_SET;
482 lock1.l_start = 0;
483 lock1.l_len = 1024;
484 lock1.l_pid = getpid();
485 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
486
487 // Wake up thread to unlock exclusive lock
488 PING_WORKER(3); // (R3)
489 WAIT_WORKER(7); // (7)
490
491 // We can lock it again
492 lock1.l_type = F_WRLCK;
493 lock1.l_whence = SEEK_SET;
494 lock1.l_start = 0;
495 lock1.l_len = 1024;
496 lock1.l_pid = getpid();
497 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
498 lock1.l_type = F_UNLCK;
499 lock1.l_whence = SEEK_SET;
500 lock1.l_start = 0;
501 lock1.l_len = 1024;
502 lock1.l_pid = getpid();
503 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
504
505 // Cleanup
506 void *retval = (void*) (uintptr_t) -1;
507 ASSERT_EQ(0, pthread_join(thread, &retval));
508 ASSERT_EQ(NULL, retval);
509 s.sem_destroy();
510 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
511 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
512 CLEANUP_CEPH();
513 }
514
515 TEST(LibCephFS, ThreesomeRecordLocking) {
516 const pid_t mypid = getpid();
517 struct ceph_mount_info *cmount;
518 STARTUP_CEPH();
519
520 char c_file[1024];
521 sprintf(c_file, "recordlock_test_%d", mypid);
522 Fh *fh = NULL;
523 Inode *root = NULL, *inode = NULL;
524 struct ceph_statx stx;
525 struct flock lock1;
526 int rc;
527 UserPerm *perms = ceph_mount_perms(cmount);
528
529 // Get the root inode
530 rc = ceph_ll_lookup_root(cmount, &root);
531 ASSERT_EQ(rc, 0);
532
533 // Get the inode and Fh corresponding to c_file
534 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
535 &inode, &fh, &stx, 0, 0, perms);
536 ASSERT_EQ(rc, 0);
537
538 // Lock
539 lock1.l_type = F_WRLCK;
540 lock1.l_whence = SEEK_SET;
541 lock1.l_start = 0;
542 lock1.l_len = 1024;
543 lock1.l_pid = getpid();
544 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
545
546 // Start locker thread
547 pthread_t thread[2];
548 struct timespec ts;
549 str_ConcurrentRecordLocking s = { c_file, cmount };
550 s.sem_init(0);
551 ASSERT_EQ(0, pthread_create(&thread[0], NULL, thread_ConcurrentRecordLocking_, &s));
552 ASSERT_EQ(0, pthread_create(&thread[1], NULL, thread_ConcurrentRecordLocking_, &s));
553 // Synchronization point with thread (failure: thread is dead)
554 TWICE(WAIT_WORKER(1)); // (1)
555
556 // Shall not have lock immediately
557 NOT_WAIT_WORKER(2); // (2)
558
559 // Unlock
560 lock1.l_type = F_UNLCK;
561 lock1.l_whence = SEEK_SET;
562 lock1.l_start = 0;
563 lock1.l_len = 1024;
564 lock1.l_pid = getpid();
565 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
566
567 // Shall have lock
568 TWICE(// Synchronization point with thread (failure: thread is dead)
569 WAIT_WORKER(2); // (2)
570
571 // Synchronization point with thread (failure: thread is dead)
572 WAIT_WORKER(3)); // (3)
573
574 // Wait for thread to share lock
575 TWICE(WAIT_WORKER(4)); // (4)
576 lock1.l_type = F_WRLCK;
577 lock1.l_whence = SEEK_SET;
578 lock1.l_start = 0;
579 lock1.l_len = 1024;
580 lock1.l_pid = getpid();
581 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
582 lock1.l_type = F_RDLCK;
583 lock1.l_whence = SEEK_SET;
584 lock1.l_start = 0;
585 lock1.l_len = 1024;
586 lock1.l_pid = getpid();
587 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
588
589 // Wake up thread to unlock shared lock
590 TWICE(PING_WORKER(1); // (R1)
591 WAIT_WORKER(5)); // (5)
592
593 // Now we can lock exclusively
594 // Upgrade to exclusive lock (as per POSIX)
595 lock1.l_type = F_WRLCK;
596 lock1.l_whence = SEEK_SET;
597 lock1.l_start = 0;
598 lock1.l_len = 1024;
599 lock1.l_pid = getpid();
600 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), true));
601
602 TWICE( // Wake up thread to lock shared lock
603 PING_WORKER(2); // (R2)
604
605 // Shall not have lock immediately
606 NOT_WAIT_WORKER(6)); // (6)
607
608 // Release lock ; thread will get it
609 lock1.l_type = F_UNLCK;
610 lock1.l_whence = SEEK_SET;
611 lock1.l_start = 0;
612 lock1.l_len = 1024;
613 lock1.l_pid = getpid();
614 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
615 TWICE(WAIT_WORKER(6); // (6)
616
617 // We no longer have the lock
618 lock1.l_type = F_WRLCK;
619 lock1.l_whence = SEEK_SET;
620 lock1.l_start = 0;
621 lock1.l_len = 1024;
622 lock1.l_pid = getpid();
623 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
624 lock1.l_type = F_RDLCK;
625 lock1.l_whence = SEEK_SET;
626 lock1.l_start = 0;
627 lock1.l_len = 1024;
628 lock1.l_pid = getpid();
629 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
630
631 // Wake up thread to unlock exclusive lock
632 PING_WORKER(3); // (R3)
633 WAIT_WORKER(7); // (7)
634 );
635
636 // We can lock it again
637 lock1.l_type = F_WRLCK;
638 lock1.l_whence = SEEK_SET;
639 lock1.l_start = 0;
640 lock1.l_len = 1024;
641 lock1.l_pid = getpid();
642 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
643 lock1.l_type = F_UNLCK;
644 lock1.l_whence = SEEK_SET;
645 lock1.l_start = 0;
646 lock1.l_len = 1024;
647 lock1.l_pid = getpid();
648 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
649
650 // Cleanup
651 void *retval = (void*) (uintptr_t) -1;
652 ASSERT_EQ(0, pthread_join(thread[0], &retval));
653 ASSERT_EQ(NULL, retval);
654 ASSERT_EQ(0, pthread_join(thread[1], &retval));
655 ASSERT_EQ(NULL, retval);
656 s.sem_destroy();
657 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
658 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
659 CLEANUP_CEPH();
660 }
661
662 /* Locking in different processes */
663
664 #define PROCESS_SLOW_MS() \
665 static const long waitMs = 100; \
666 (void) waitMs
667
668 // Used by ConcurrentLocking test
669 static void process_ConcurrentRecordLocking(str_ConcurrentRecordLocking& s) {
670 const pid_t mypid = getpid();
671 PROCESS_SLOW_MS();
672
673 struct ceph_mount_info *cmount = NULL;
674 struct timespec ts;
675 Fh *fh = NULL;
676 Inode *root = NULL, *inode = NULL;
677 struct ceph_statx stx;
678 int rc;
679 struct flock lock1;
680
681 STARTUP_CEPH();
682 s.cmount = cmount;
683
684 // Get the root inode
685 rc = ceph_ll_lookup_root(cmount, &root);
686 ASSERT_EQ(rc, 0);
687
688 // Get the inode and Fh corresponding to c_file
689 rc = ceph_ll_create(cmount, root, s.file, fileMode, O_RDWR | O_CREAT,
690 &inode, &fh, &stx, 0, 0, ceph_mount_perms(cmount));
691 ASSERT_EQ(rc, 0);
692
693 WAIT_MAIN(1); // (R1)
694
695 lock1.l_type = F_WRLCK;
696 lock1.l_whence = SEEK_SET;
697 lock1.l_start = 0;
698 lock1.l_len = 1024;
699 lock1.l_pid = getpid();
700 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
701 PING_MAIN(1); // (1)
702 lock1.l_type = F_WRLCK;
703 lock1.l_whence = SEEK_SET;
704 lock1.l_start = 0;
705 lock1.l_len = 1024;
706 lock1.l_pid = getpid();
707 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
708 PING_MAIN(2); // (2)
709
710 lock1.l_type = F_UNLCK;
711 lock1.l_whence = SEEK_SET;
712 lock1.l_start = 0;
713 lock1.l_len = 1024;
714 lock1.l_pid = getpid();
715 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
716 PING_MAIN(3); // (3)
717
718 lock1.l_type = F_RDLCK;
719 lock1.l_whence = SEEK_SET;
720 lock1.l_start = 0;
721 lock1.l_len = 1024;
722 lock1.l_pid = getpid();
723 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
724 PING_MAIN(4); // (4)
725
726 WAIT_MAIN(2); // (R2)
727 lock1.l_type = F_UNLCK;
728 lock1.l_whence = SEEK_SET;
729 lock1.l_start = 0;
730 lock1.l_len = 1024;
731 lock1.l_pid = getpid();
732 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
733 PING_MAIN(5); // (5)
734
735 WAIT_MAIN(3); // (R3)
736 lock1.l_type = F_WRLCK;
737 lock1.l_whence = SEEK_SET;
738 lock1.l_start = 0;
739 lock1.l_len = 1024;
740 lock1.l_pid = getpid();
741 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
742 PING_MAIN(6); // (6)
743
744 WAIT_MAIN(4); // (R4)
745 lock1.l_type = F_UNLCK;
746 lock1.l_whence = SEEK_SET;
747 lock1.l_start = 0;
748 lock1.l_len = 1024;
749 lock1.l_pid = getpid();
750 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
751 PING_MAIN(7); // (7)
752
753 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
754 CLEANUP_CEPH();
755
756 s.sem_destroy();
757 exit(EXIT_SUCCESS);
758 }
759
760 // Disabled because of fork() issues (http://tracker.ceph.com/issues/16556)
761 TEST(LibCephFS, DISABLED_InterProcessRecordLocking) {
762 PROCESS_SLOW_MS();
763 // Process synchronization
764 char c_file[1024];
765 const pid_t mypid = getpid();
766 sprintf(c_file, "recordlock_test_%d", mypid);
767 Fh *fh = NULL;
768 Inode *root = NULL, *inode = NULL;
769 struct ceph_statx stx;
770 struct flock lock1;
771 int rc;
772
773 // Note: the semaphores MUST be on a shared memory segment
774 str_ConcurrentRecordLocking *const shs =
775 reinterpret_cast<str_ConcurrentRecordLocking*>
776 (mmap(0, sizeof(*shs), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
777 -1, 0));
778 str_ConcurrentRecordLocking &s = *shs;
779 s.file = c_file;
780 s.sem_init(1);
781
782 // Start locker process
783 const pid_t pid = fork();
784 ASSERT_GE(pid, 0);
785 if (pid == 0) {
786 process_ConcurrentRecordLocking(s);
787 exit(EXIT_FAILURE);
788 }
789
790 struct timespec ts;
791 struct ceph_mount_info *cmount;
792 STARTUP_CEPH();
793 UserPerm *perms = ceph_mount_perms(cmount);
794
795 // Get the root inode
796 rc = ceph_ll_lookup_root(cmount, &root);
797 ASSERT_EQ(rc, 0);
798
799 // Get the inode and Fh corresponding to c_file
800 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
801 &inode, &fh, &stx, 0, 0, perms);
802 ASSERT_EQ(rc, 0);
803
804 // Lock
805 lock1.l_type = F_WRLCK;
806 lock1.l_whence = SEEK_SET;
807 lock1.l_start = 0;
808 lock1.l_len = 1024;
809 lock1.l_pid = getpid();
810 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
811
812 // Synchronization point with process (failure: process is dead)
813 PING_WORKER(1); // (R1)
814 WAIT_WORKER(1); // (1)
815
816 // Shall not have lock immediately
817 NOT_WAIT_WORKER(2); // (2)
818
819 // Unlock
820 lock1.l_type = F_UNLCK;
821 lock1.l_whence = SEEK_SET;
822 lock1.l_start = 0;
823 lock1.l_len = 1024;
824 lock1.l_pid = getpid();
825 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
826
827 // Shall have lock
828 // Synchronization point with process (failure: process is dead)
829 WAIT_WORKER(2); // (2)
830
831 // Synchronization point with process (failure: process is dead)
832 WAIT_WORKER(3); // (3)
833
834 // Wait for process to share lock
835 WAIT_WORKER(4); // (4)
836 lock1.l_type = F_WRLCK;
837 lock1.l_whence = SEEK_SET;
838 lock1.l_start = 0;
839 lock1.l_len = 1024;
840 lock1.l_pid = getpid();
841 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
842 lock1.l_type = F_RDLCK;
843 lock1.l_whence = SEEK_SET;
844 lock1.l_start = 0;
845 lock1.l_len = 1024;
846 lock1.l_pid = getpid();
847 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
848
849 // Wake up process to unlock shared lock
850 PING_WORKER(2); // (R2)
851 WAIT_WORKER(5); // (5)
852
853 // Now we can lock exclusively
854 // Upgrade to exclusive lock (as per POSIX)
855 lock1.l_type = F_WRLCK;
856 lock1.l_whence = SEEK_SET;
857 lock1.l_start = 0;
858 lock1.l_len = 1024;
859 lock1.l_pid = getpid();
860 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
861
862 // Wake up process to lock shared lock
863 PING_WORKER(3); // (R3)
864
865 // Shall not have lock immediately
866 NOT_WAIT_WORKER(6); // (6)
867
868 // Release lock ; process will get it
869 lock1.l_type = F_UNLCK;
870 lock1.l_whence = SEEK_SET;
871 lock1.l_start = 0;
872 lock1.l_len = 1024;
873 lock1.l_pid = getpid();
874 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
875 WAIT_WORKER(6); // (6)
876
877 // We no longer have the lock
878 lock1.l_type = F_WRLCK;
879 lock1.l_whence = SEEK_SET;
880 lock1.l_start = 0;
881 lock1.l_len = 1024;
882 lock1.l_pid = getpid();
883 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
884 lock1.l_type = F_RDLCK;
885 lock1.l_whence = SEEK_SET;
886 lock1.l_start = 0;
887 lock1.l_len = 1024;
888 lock1.l_pid = getpid();
889 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
890
891 // Wake up process to unlock exclusive lock
892 PING_WORKER(4); // (R4)
893 WAIT_WORKER(7); // (7)
894
895 // We can lock it again
896 lock1.l_type = F_WRLCK;
897 lock1.l_whence = SEEK_SET;
898 lock1.l_start = 0;
899 lock1.l_len = 1024;
900 lock1.l_pid = getpid();
901 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
902 lock1.l_type = F_UNLCK;
903 lock1.l_whence = SEEK_SET;
904 lock1.l_start = 0;
905 lock1.l_len = 1024;
906 lock1.l_pid = getpid();
907 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
908
909 // Wait pid
910 int status;
911 ASSERT_EQ(pid, waitpid(pid, &status, 0));
912 ASSERT_EQ(EXIT_SUCCESS, status);
913
914 // Cleanup
915 s.sem_destroy();
916 ASSERT_EQ(0, munmap(shs, sizeof(*shs)));
917 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
918 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
919 CLEANUP_CEPH();
920 }
921
922 // Disabled because of fork() issues (http://tracker.ceph.com/issues/16556)
923 TEST(LibCephFS, DISABLED_ThreesomeInterProcessRecordLocking) {
924 PROCESS_SLOW_MS();
925 // Process synchronization
926 char c_file[1024];
927 const pid_t mypid = getpid();
928 sprintf(c_file, "recordlock_test_%d", mypid);
929 Fh *fh = NULL;
930 Inode *root = NULL, *inode = NULL;
931 struct ceph_statx stx;
932 struct flock lock1;
933 int rc;
934
935 // Note: the semaphores MUST be on a shared memory segment
936 str_ConcurrentRecordLocking *const shs =
937 reinterpret_cast<str_ConcurrentRecordLocking*>
938 (mmap(0, sizeof(*shs), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
939 -1, 0));
940 str_ConcurrentRecordLocking &s = *shs;
941 s.file = c_file;
942 s.sem_init(1);
943
944 // Start locker processes
945 pid_t pid[2];
946 pid[0] = fork();
947 ASSERT_GE(pid[0], 0);
948 if (pid[0] == 0) {
949 process_ConcurrentRecordLocking(s);
950 exit(EXIT_FAILURE);
951 }
952 pid[1] = fork();
953 ASSERT_GE(pid[1], 0);
954 if (pid[1] == 0) {
955 process_ConcurrentRecordLocking(s);
956 exit(EXIT_FAILURE);
957 }
958
959 struct timespec ts;
960 struct ceph_mount_info *cmount;
961 STARTUP_CEPH();
962
963 // Get the root inode
964 rc = ceph_ll_lookup_root(cmount, &root);
965 ASSERT_EQ(rc, 0);
966
967 // Get the inode and Fh corresponding to c_file
968 UserPerm *perms = ceph_mount_perms(cmount);
969 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
970 &inode, &fh, &stx, 0, 0, perms);
971 ASSERT_EQ(rc, 0);
972
973 // Lock
974 lock1.l_type = F_WRLCK;
975 lock1.l_whence = SEEK_SET;
976 lock1.l_start = 0;
977 lock1.l_len = 1024;
978 lock1.l_pid = getpid();
979 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
980
981 // Synchronization point with process (failure: process is dead)
982 TWICE(PING_WORKER(1)); // (R1)
983 TWICE(WAIT_WORKER(1)); // (1)
984
985 // Shall not have lock immediately
986 NOT_WAIT_WORKER(2); // (2)
987
988 // Unlock
989 lock1.l_type = F_UNLCK;
990 lock1.l_whence = SEEK_SET;
991 lock1.l_start = 0;
992 lock1.l_len = 1024;
993 lock1.l_pid = getpid();
994 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
995
996 // Shall have lock
997 TWICE(// Synchronization point with process (failure: process is dead)
998 WAIT_WORKER(2); // (2)
999
1000 // Synchronization point with process (failure: process is dead)
1001 WAIT_WORKER(3)); // (3)
1002
1003 // Wait for process to share lock
1004 TWICE(WAIT_WORKER(4)); // (4)
1005 lock1.l_type = F_WRLCK;
1006 lock1.l_whence = SEEK_SET;
1007 lock1.l_start = 0;
1008 lock1.l_len = 1024;
1009 lock1.l_pid = getpid();
1010 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1011 lock1.l_type = F_RDLCK;
1012 lock1.l_whence = SEEK_SET;
1013 lock1.l_start = 0;
1014 lock1.l_len = 1024;
1015 lock1.l_pid = getpid();
1016 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1017
1018 // Wake up process to unlock shared lock
1019 TWICE(PING_WORKER(2); // (R2)
1020 WAIT_WORKER(5)); // (5)
1021
1022 // Now we can lock exclusively
1023 // Upgrade to exclusive lock (as per POSIX)
1024 lock1.l_type = F_WRLCK;
1025 lock1.l_whence = SEEK_SET;
1026 lock1.l_start = 0;
1027 lock1.l_len = 1024;
1028 lock1.l_pid = getpid();
1029 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
1030
1031 TWICE( // Wake up process to lock shared lock
1032 PING_WORKER(3); // (R3)
1033
1034 // Shall not have lock immediately
1035 NOT_WAIT_WORKER(6)); // (6)
1036
1037 // Release lock ; process will get it
1038 lock1.l_type = F_UNLCK;
1039 lock1.l_whence = SEEK_SET;
1040 lock1.l_start = 0;
1041 lock1.l_len = 1024;
1042 lock1.l_pid = getpid();
1043 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1044 TWICE(WAIT_WORKER(6); // (6)
1045
1046 // We no longer have the lock
1047 lock1.l_type = F_WRLCK;
1048 lock1.l_whence = SEEK_SET;
1049 lock1.l_start = 0;
1050 lock1.l_len = 1024;
1051 lock1.l_pid = getpid();
1052 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
1053 lock1.l_type = F_RDLCK;
1054 lock1.l_whence = SEEK_SET;
1055 lock1.l_start = 0;
1056 lock1.l_len = 1024;
1057 lock1.l_pid = getpid();
1058 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, pthread_self(), false));
1059
1060 // Wake up process to unlock exclusive lock
1061 PING_WORKER(4); // (R4)
1062 WAIT_WORKER(7); // (7)
1063 );
1064
1065 // We can lock it again
1066 lock1.l_type = F_WRLCK;
1067 lock1.l_whence = SEEK_SET;
1068 lock1.l_start = 0;
1069 lock1.l_len = 1024;
1070 lock1.l_pid = getpid();
1071 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1072 lock1.l_type = F_UNLCK;
1073 lock1.l_whence = SEEK_SET;
1074 lock1.l_start = 0;
1075 lock1.l_len = 1024;
1076 lock1.l_pid = getpid();
1077 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1078
1079 // Wait pids
1080 int status;
1081 ASSERT_EQ(pid[0], waitpid(pid[0], &status, 0));
1082 ASSERT_EQ(EXIT_SUCCESS, status);
1083 ASSERT_EQ(pid[1], waitpid(pid[1], &status, 0));
1084 ASSERT_EQ(EXIT_SUCCESS, status);
1085
1086 // Cleanup
1087 s.sem_destroy();
1088 ASSERT_EQ(0, munmap(shs, sizeof(*shs)));
1089 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
1090 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
1091 CLEANUP_CEPH();
1092 }