]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/libcephfs/recordlock.cc
ad108b69cf60fcb9c98452e117e18878c3461b02
[ceph.git] / ceph / src / test / libcephfs / recordlock.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2011 New Dream Network
7 * 2016 Red Hat
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include <pthread.h>
17 #include "gtest/gtest.h"
18 #ifndef GTEST_IS_THREADSAFE
19 #error "!GTEST_IS_THREADSAFE"
20 #endif
21
22 #include "include/compat.h"
23 #include "include/cephfs/libcephfs.h"
24 #include "include/fs_types.h"
25 #include <errno.h>
26 #include <sys/fcntl.h>
27 #include <unistd.h>
28 #include <sys/file.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <dirent.h>
32
33 #include <stdlib.h>
34 #include <semaphore.h>
35 #include <time.h>
36
37 #ifndef _WIN32
38 #include <sys/mman.h>
39 #endif
40
41 #ifdef __linux__
42 #include <limits.h>
43 #include <sys/xattr.h>
44 #elif __FreeBSD__
45 #include <sys/types.h>
46 #include <sys/wait.h>
47 #endif
48
49 #include "include/ceph_assert.h"
50 #include "ceph_pthread_self.h"
51
52 // Startup common: create and mount ceph fs
53 #define STARTUP_CEPH() do { \
54 ASSERT_EQ(0, ceph_create(&cmount, NULL)); \
55 ASSERT_EQ(0, ceph_conf_read_file(cmount, NULL)); \
56 ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL)); \
57 ASSERT_EQ(0, ceph_mount(cmount, NULL)); \
58 } while(0)
59
60 // Cleanup common: unmount and release ceph fs
61 #define CLEANUP_CEPH() do { \
62 ASSERT_EQ(0, ceph_unmount(cmount)); \
63 ASSERT_EQ(0, ceph_release(cmount)); \
64 } while(0)
65
66 static const mode_t fileMode = S_IRWXU | S_IRWXG | S_IRWXO;
67
68 // Default wait time for normal and "slow" operations
69 // (5" should be enough in case of network congestion)
70 static const long waitMs = 10;
71 static const long waitSlowMs = 5000;
72
73 // Get the absolute struct timespec reference from now + 'ms' milliseconds
74 static const struct timespec* abstime(struct timespec &ts, long ms) {
75 if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
76 ceph_abort();
77 }
78 ts.tv_nsec += ms * 1000000;
79 ts.tv_sec += ts.tv_nsec / 1000000000;
80 ts.tv_nsec %= 1000000000;
81 return &ts;
82 }
83
84 /* Basic locking */
85
86 TEST(LibCephFS, BasicRecordLocking) {
87 struct ceph_mount_info *cmount = NULL;
88 STARTUP_CEPH();
89
90 char c_file[1024];
91 sprintf(c_file, "recordlock_test_%d", getpid());
92 Fh *fh = NULL;
93 Inode *root = NULL, *inode = NULL;
94 struct ceph_statx stx;
95 int rc;
96 struct flock lock1, lock2;
97 UserPerm *perms = ceph_mount_perms(cmount);
98
99 // Get the root inode
100 rc = ceph_ll_lookup_root(cmount, &root);
101 ASSERT_EQ(rc, 0);
102
103 // Get the inode and Fh corresponding to c_file
104 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
105 &inode, &fh, &stx, 0, 0, perms);
106 ASSERT_EQ(rc, 0);
107
108 // write lock twice
109 lock1.l_type = F_WRLCK;
110 lock1.l_whence = SEEK_SET;
111 lock1.l_start = 0;
112 lock1.l_len = 1024;
113 lock1.l_pid = getpid();
114 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
115
116 lock2.l_type = F_WRLCK;
117 lock2.l_whence = SEEK_SET;
118 lock2.l_start = 0;
119 lock2.l_len = 1024;
120 lock2.l_pid = getpid();
121 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock2, 43, false));
122
123 // Now try a conflicting read lock
124 lock2.l_type = F_RDLCK;
125 lock2.l_whence = SEEK_SET;
126 lock2.l_start = 100;
127 lock2.l_len = 100;
128 lock2.l_pid = getpid();
129 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock2, 43, false));
130
131 // Now do a getlk
132 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
133 ASSERT_EQ(lock2.l_type, F_WRLCK);
134 ASSERT_EQ(lock2.l_start, 0);
135 ASSERT_EQ(lock2.l_len, 1024);
136 ASSERT_EQ(lock2.l_pid, getpid());
137
138 // Extend the range of the write lock
139 lock1.l_type = F_WRLCK;
140 lock1.l_whence = SEEK_SET;
141 lock1.l_start = 1024;
142 lock1.l_len = 1024;
143 lock1.l_pid = getpid();
144 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
145
146 // Now do a getlk
147 lock2.l_type = F_RDLCK;
148 lock2.l_whence = SEEK_SET;
149 lock2.l_start = 100;
150 lock2.l_len = 100;
151 lock2.l_pid = getpid();
152 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
153 ASSERT_EQ(lock2.l_type, F_WRLCK);
154 ASSERT_EQ(lock2.l_start, 0);
155 ASSERT_EQ(lock2.l_len, 2048);
156 ASSERT_EQ(lock2.l_pid, getpid());
157
158 // Now release part of the range
159 lock1.l_type = F_UNLCK;
160 lock1.l_whence = SEEK_SET;
161 lock1.l_start = 512;
162 lock1.l_len = 1024;
163 lock1.l_pid = getpid();
164 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
165
166 // Now do a getlk to check 1st part
167 lock2.l_type = F_RDLCK;
168 lock2.l_whence = SEEK_SET;
169 lock2.l_start = 100;
170 lock2.l_len = 100;
171 lock2.l_pid = getpid();
172 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
173 ASSERT_EQ(lock2.l_type, F_WRLCK);
174 ASSERT_EQ(lock2.l_start, 0);
175 ASSERT_EQ(lock2.l_len, 512);
176 ASSERT_EQ(lock2.l_pid, getpid());
177
178 // Now do a getlk to check 2nd part
179 lock2.l_type = F_RDLCK;
180 lock2.l_whence = SEEK_SET;
181 lock2.l_start = 2000;
182 lock2.l_len = 100;
183 lock2.l_pid = getpid();
184 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
185 ASSERT_EQ(lock2.l_type, F_WRLCK);
186 ASSERT_EQ(lock2.l_start, 1536);
187 ASSERT_EQ(lock2.l_len, 512);
188 ASSERT_EQ(lock2.l_pid, getpid());
189
190 // Now do a getlk to check released part
191 lock2.l_type = F_RDLCK;
192 lock2.l_whence = SEEK_SET;
193 lock2.l_start = 512;
194 lock2.l_len = 1024;
195 lock2.l_pid = getpid();
196 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
197 ASSERT_EQ(lock2.l_type, F_UNLCK);
198 ASSERT_EQ(lock2.l_start, 512);
199 ASSERT_EQ(lock2.l_len, 1024);
200 ASSERT_EQ(lock2.l_pid, getpid());
201
202 // Now downgrade the 1st part of the lock
203 lock1.l_type = F_RDLCK;
204 lock1.l_whence = SEEK_SET;
205 lock1.l_start = 0;
206 lock1.l_len = 512;
207 lock1.l_pid = getpid();
208 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
209
210 // Now do a getlk to check 1st part
211 lock2.l_type = F_WRLCK;
212 lock2.l_whence = SEEK_SET;
213 lock2.l_start = 100;
214 lock2.l_len = 100;
215 lock2.l_pid = getpid();
216 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
217 ASSERT_EQ(lock2.l_type, F_RDLCK);
218 ASSERT_EQ(lock2.l_start, 0);
219 ASSERT_EQ(lock2.l_len, 512);
220 ASSERT_EQ(lock2.l_pid, getpid());
221
222 // Now upgrade the 1st part of the lock
223 lock1.l_type = F_WRLCK;
224 lock1.l_whence = SEEK_SET;
225 lock1.l_start = 0;
226 lock1.l_len = 512;
227 lock1.l_pid = getpid();
228 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
229
230 // Now do a getlk to check 1st part
231 lock2.l_type = F_WRLCK;
232 lock2.l_whence = SEEK_SET;
233 lock2.l_start = 100;
234 lock2.l_len = 100;
235 lock2.l_pid = getpid();
236 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
237 ASSERT_EQ(lock2.l_type, F_WRLCK);
238 ASSERT_EQ(lock2.l_start, 0);
239 ASSERT_EQ(lock2.l_len, 512);
240 ASSERT_EQ(lock2.l_pid, getpid());
241
242 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
243 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
244 CLEANUP_CEPH();
245 }
246
247 /* Locking in different threads */
248
249 // Used by ConcurrentLocking test
250 struct str_ConcurrentRecordLocking {
251 const char *file;
252 struct ceph_mount_info *cmount; // !NULL if shared
253 sem_t sem[2];
254 sem_t semReply[2];
255 void sem_init(int pshared) {
256 ASSERT_EQ(0, ::sem_init(&sem[0], pshared, 0));
257 ASSERT_EQ(0, ::sem_init(&sem[1], pshared, 0));
258 ASSERT_EQ(0, ::sem_init(&semReply[0], pshared, 0));
259 ASSERT_EQ(0, ::sem_init(&semReply[1], pshared, 0));
260 }
261 void sem_destroy() {
262 ASSERT_EQ(0, ::sem_destroy(&sem[0]));
263 ASSERT_EQ(0, ::sem_destroy(&sem[1]));
264 ASSERT_EQ(0, ::sem_destroy(&semReply[0]));
265 ASSERT_EQ(0, ::sem_destroy(&semReply[1]));
266 }
267 };
268
269 // Wakeup main (for (N) steps)
270 #define PING_MAIN(n) ASSERT_EQ(0, sem_post(&s.sem[n%2]))
271 // Wait for main to wake us up (for (RN) steps)
272 #define WAIT_MAIN(n) \
273 ASSERT_EQ(0, sem_timedwait(&s.semReply[n%2], abstime(ts, waitSlowMs)))
274
275 // Wakeup worker (for (RN) steps)
276 #define PING_WORKER(n) ASSERT_EQ(0, sem_post(&s.semReply[n%2]))
277 // Wait for worker to wake us up (for (N) steps)
278 #define WAIT_WORKER(n) \
279 ASSERT_EQ(0, sem_timedwait(&s.sem[n%2], abstime(ts, waitSlowMs)))
280 // Worker shall not wake us up (for (N) steps)
281 #define NOT_WAIT_WORKER(n) \
282 ASSERT_EQ(-1, sem_timedwait(&s.sem[n%2], abstime(ts, waitMs)))
283
284 // Do twice an operation
285 #define TWICE(EXPR) do { \
286 EXPR; \
287 EXPR; \
288 } while(0)
289
290 /* Locking in different threads */
291
292 // Used by ConcurrentLocking test
293 static void thread_ConcurrentRecordLocking(str_ConcurrentRecordLocking& s) {
294 struct ceph_mount_info *const cmount = s.cmount;
295 Fh *fh = NULL;
296 Inode *root = NULL, *inode = NULL;
297 struct ceph_statx stx;
298 struct flock lock1;
299 int rc;
300 struct timespec ts;
301
302 // Get the root inode
303 rc = ceph_ll_lookup_root(cmount, &root);
304 ASSERT_EQ(rc, 0);
305
306 // Get the inode and Fh corresponding to c_file
307 rc = ceph_ll_create(cmount, root, s.file, fileMode, O_RDWR | O_CREAT,
308 &inode, &fh, &stx, 0, 0, ceph_mount_perms(cmount));
309 ASSERT_EQ(rc, 0);
310
311 lock1.l_type = F_WRLCK;
312 lock1.l_whence = SEEK_SET;
313 lock1.l_start = 0;
314 lock1.l_len = 1024;
315 lock1.l_pid = getpid();
316 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
317
318 PING_MAIN(1); // (1)
319 lock1.l_type = F_WRLCK;
320 lock1.l_whence = SEEK_SET;
321 lock1.l_start = 0;
322 lock1.l_len = 1024;
323 lock1.l_pid = getpid();
324 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
325 PING_MAIN(2); // (2)
326
327 lock1.l_type = F_UNLCK;
328 lock1.l_whence = SEEK_SET;
329 lock1.l_start = 0;
330 lock1.l_len = 1024;
331 lock1.l_pid = getpid();
332 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
333 PING_MAIN(3); // (3)
334
335 lock1.l_type = F_RDLCK;
336 lock1.l_whence = SEEK_SET;
337 lock1.l_start = 0;
338 lock1.l_len = 1024;
339 lock1.l_pid = getpid();
340 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
341 PING_MAIN(4); // (4)
342
343 WAIT_MAIN(1); // (R1)
344 lock1.l_type = F_UNLCK;
345 lock1.l_whence = SEEK_SET;
346 lock1.l_start = 0;
347 lock1.l_len = 1024;
348 lock1.l_pid = getpid();
349 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
350 PING_MAIN(5); // (5)
351
352 WAIT_MAIN(2); // (R2)
353 lock1.l_type = F_WRLCK;
354 lock1.l_whence = SEEK_SET;
355 lock1.l_start = 0;
356 lock1.l_len = 1024;
357 lock1.l_pid = getpid();
358 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
359 PING_MAIN(6); // (6)
360
361 WAIT_MAIN(3); // (R3)
362 lock1.l_type = F_UNLCK;
363 lock1.l_whence = SEEK_SET;
364 lock1.l_start = 0;
365 lock1.l_len = 1024;
366 lock1.l_pid = getpid();
367 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
368 PING_MAIN(7); // (7)
369
370 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
371 }
372
373 // Used by ConcurrentRecordLocking test
374 static void* thread_ConcurrentRecordLocking_(void *arg) {
375 str_ConcurrentRecordLocking *const s =
376 reinterpret_cast<str_ConcurrentRecordLocking*>(arg);
377 thread_ConcurrentRecordLocking(*s);
378 return NULL;
379 }
380
381 TEST(LibCephFS, ConcurrentRecordLocking) {
382 const pid_t mypid = getpid();
383 struct ceph_mount_info *cmount;
384 STARTUP_CEPH();
385
386 char c_file[1024];
387 sprintf(c_file, "recordlock_test_%d", mypid);
388 Fh *fh = NULL;
389 Inode *root = NULL, *inode = NULL;
390 struct ceph_statx stx;
391 struct flock lock1;
392 int rc;
393 UserPerm *perms = ceph_mount_perms(cmount);
394
395 // Get the root inode
396 rc = ceph_ll_lookup_root(cmount, &root);
397 ASSERT_EQ(rc, 0);
398
399 // Get the inode and Fh corresponding to c_file
400 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
401 &inode, &fh, &stx, 0, 0, perms);
402 ASSERT_EQ(rc, 0);
403
404 // Lock
405 lock1.l_type = F_WRLCK;
406 lock1.l_whence = SEEK_SET;
407 lock1.l_start = 0;
408 lock1.l_len = 1024;
409 lock1.l_pid = getpid();
410 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
411
412 // Start locker thread
413 pthread_t thread;
414 struct timespec ts;
415 str_ConcurrentRecordLocking s = { c_file, cmount };
416 s.sem_init(0);
417 ASSERT_EQ(0, pthread_create(&thread, NULL, thread_ConcurrentRecordLocking_, &s));
418 // Synchronization point with thread (failure: thread is dead)
419 WAIT_WORKER(1); // (1)
420
421 // Shall not have lock immediately
422 NOT_WAIT_WORKER(2); // (2)
423
424 // Unlock
425 lock1.l_type = F_UNLCK;
426 lock1.l_whence = SEEK_SET;
427 lock1.l_start = 0;
428 lock1.l_len = 1024;
429 lock1.l_pid = getpid();
430 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
431
432 // Shall have lock
433 // Synchronization point with thread (failure: thread is dead)
434 WAIT_WORKER(2); // (2)
435
436 // Synchronization point with thread (failure: thread is dead)
437 WAIT_WORKER(3); // (3)
438
439 // Wait for thread to share lock
440 WAIT_WORKER(4); // (4)
441 lock1.l_type = F_WRLCK;
442 lock1.l_whence = SEEK_SET;
443 lock1.l_start = 0;
444 lock1.l_len = 1024;
445 lock1.l_pid = getpid();
446 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
447 lock1.l_type = F_RDLCK;
448 lock1.l_whence = SEEK_SET;
449 lock1.l_start = 0;
450 lock1.l_len = 1024;
451 lock1.l_pid = getpid();
452 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
453
454 // Wake up thread to unlock shared lock
455 PING_WORKER(1); // (R1)
456 WAIT_WORKER(5); // (5)
457
458 // Now we can lock exclusively
459 // Upgrade to exclusive lock (as per POSIX)
460 lock1.l_type = F_WRLCK;
461 lock1.l_whence = SEEK_SET;
462 lock1.l_start = 0;
463 lock1.l_len = 1024;
464 lock1.l_pid = getpid();
465 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
466
467 // Wake up thread to lock shared lock
468 PING_WORKER(2); // (R2)
469
470 // Shall not have lock immediately
471 NOT_WAIT_WORKER(6); // (6)
472
473 // Release lock ; thread will get it
474 lock1.l_type = F_UNLCK;
475 lock1.l_whence = SEEK_SET;
476 lock1.l_start = 0;
477 lock1.l_len = 1024;
478 lock1.l_pid = getpid();
479 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
480 WAIT_WORKER(6); // (6)
481
482 // We no longer have the lock
483 lock1.l_type = F_WRLCK;
484 lock1.l_whence = SEEK_SET;
485 lock1.l_start = 0;
486 lock1.l_len = 1024;
487 lock1.l_pid = getpid();
488 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
489 lock1.l_type = F_RDLCK;
490 lock1.l_whence = SEEK_SET;
491 lock1.l_start = 0;
492 lock1.l_len = 1024;
493 lock1.l_pid = getpid();
494 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
495
496 // Wake up thread to unlock exclusive lock
497 PING_WORKER(3); // (R3)
498 WAIT_WORKER(7); // (7)
499
500 // We can lock it again
501 lock1.l_type = F_WRLCK;
502 lock1.l_whence = SEEK_SET;
503 lock1.l_start = 0;
504 lock1.l_len = 1024;
505 lock1.l_pid = getpid();
506 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
507 lock1.l_type = F_UNLCK;
508 lock1.l_whence = SEEK_SET;
509 lock1.l_start = 0;
510 lock1.l_len = 1024;
511 lock1.l_pid = getpid();
512 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
513
514 // Cleanup
515 void *retval = (void*) (uintptr_t) -1;
516 ASSERT_EQ(0, pthread_join(thread, &retval));
517 ASSERT_EQ(NULL, retval);
518 s.sem_destroy();
519 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
520 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
521 CLEANUP_CEPH();
522 }
523
524 TEST(LibCephFS, ThreesomeRecordLocking) {
525 const pid_t mypid = getpid();
526 struct ceph_mount_info *cmount;
527 STARTUP_CEPH();
528
529 char c_file[1024];
530 sprintf(c_file, "recordlock_test_%d", mypid);
531 Fh *fh = NULL;
532 Inode *root = NULL, *inode = NULL;
533 struct ceph_statx stx;
534 struct flock lock1;
535 int rc;
536 UserPerm *perms = ceph_mount_perms(cmount);
537
538 // Get the root inode
539 rc = ceph_ll_lookup_root(cmount, &root);
540 ASSERT_EQ(rc, 0);
541
542 // Get the inode and Fh corresponding to c_file
543 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
544 &inode, &fh, &stx, 0, 0, perms);
545 ASSERT_EQ(rc, 0);
546
547 // Lock
548 lock1.l_type = F_WRLCK;
549 lock1.l_whence = SEEK_SET;
550 lock1.l_start = 0;
551 lock1.l_len = 1024;
552 lock1.l_pid = getpid();
553 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
554
555 // Start locker thread
556 pthread_t thread[2];
557 struct timespec ts;
558 str_ConcurrentRecordLocking s = { c_file, cmount };
559 s.sem_init(0);
560 ASSERT_EQ(0, pthread_create(&thread[0], NULL, thread_ConcurrentRecordLocking_, &s));
561 ASSERT_EQ(0, pthread_create(&thread[1], NULL, thread_ConcurrentRecordLocking_, &s));
562 // Synchronization point with thread (failure: thread is dead)
563 TWICE(WAIT_WORKER(1)); // (1)
564
565 // Shall not have lock immediately
566 NOT_WAIT_WORKER(2); // (2)
567
568 // Unlock
569 lock1.l_type = F_UNLCK;
570 lock1.l_whence = SEEK_SET;
571 lock1.l_start = 0;
572 lock1.l_len = 1024;
573 lock1.l_pid = getpid();
574 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
575
576 // Shall have lock
577 TWICE(// Synchronization point with thread (failure: thread is dead)
578 WAIT_WORKER(2); // (2)
579
580 // Synchronization point with thread (failure: thread is dead)
581 WAIT_WORKER(3)); // (3)
582
583 // Wait for thread to share lock
584 TWICE(WAIT_WORKER(4)); // (4)
585 lock1.l_type = F_WRLCK;
586 lock1.l_whence = SEEK_SET;
587 lock1.l_start = 0;
588 lock1.l_len = 1024;
589 lock1.l_pid = getpid();
590 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
591 lock1.l_type = F_RDLCK;
592 lock1.l_whence = SEEK_SET;
593 lock1.l_start = 0;
594 lock1.l_len = 1024;
595 lock1.l_pid = getpid();
596 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
597
598 // Wake up thread to unlock shared lock
599 TWICE(PING_WORKER(1); // (R1)
600 WAIT_WORKER(5)); // (5)
601
602 // Now we can lock exclusively
603 // Upgrade to exclusive lock (as per POSIX)
604 lock1.l_type = F_WRLCK;
605 lock1.l_whence = SEEK_SET;
606 lock1.l_start = 0;
607 lock1.l_len = 1024;
608 lock1.l_pid = getpid();
609 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
610
611 TWICE( // Wake up thread to lock shared lock
612 PING_WORKER(2); // (R2)
613
614 // Shall not have lock immediately
615 NOT_WAIT_WORKER(6)); // (6)
616
617 // Release lock ; thread will get it
618 lock1.l_type = F_UNLCK;
619 lock1.l_whence = SEEK_SET;
620 lock1.l_start = 0;
621 lock1.l_len = 1024;
622 lock1.l_pid = getpid();
623 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
624 TWICE(WAIT_WORKER(6); // (6)
625
626 // We no longer have the lock
627 lock1.l_type = F_WRLCK;
628 lock1.l_whence = SEEK_SET;
629 lock1.l_start = 0;
630 lock1.l_len = 1024;
631 lock1.l_pid = getpid();
632 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
633 lock1.l_type = F_RDLCK;
634 lock1.l_whence = SEEK_SET;
635 lock1.l_start = 0;
636 lock1.l_len = 1024;
637 lock1.l_pid = getpid();
638 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
639
640 // Wake up thread to unlock exclusive lock
641 PING_WORKER(3); // (R3)
642 WAIT_WORKER(7); // (7)
643 );
644
645 // We can lock it again
646 lock1.l_type = F_WRLCK;
647 lock1.l_whence = SEEK_SET;
648 lock1.l_start = 0;
649 lock1.l_len = 1024;
650 lock1.l_pid = getpid();
651 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
652 lock1.l_type = F_UNLCK;
653 lock1.l_whence = SEEK_SET;
654 lock1.l_start = 0;
655 lock1.l_len = 1024;
656 lock1.l_pid = getpid();
657 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
658
659 // Cleanup
660 void *retval = (void*) (uintptr_t) -1;
661 ASSERT_EQ(0, pthread_join(thread[0], &retval));
662 ASSERT_EQ(NULL, retval);
663 ASSERT_EQ(0, pthread_join(thread[1], &retval));
664 ASSERT_EQ(NULL, retval);
665 s.sem_destroy();
666 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
667 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
668 CLEANUP_CEPH();
669 }
670
671 /* Locking in different processes */
672
673 #define PROCESS_SLOW_MS() \
674 static const long waitMs = 100; \
675 (void) waitMs
676
677 // Used by ConcurrentLocking test
678 static void process_ConcurrentRecordLocking(str_ConcurrentRecordLocking& s) {
679 const pid_t mypid = getpid();
680 PROCESS_SLOW_MS();
681
682 struct ceph_mount_info *cmount = NULL;
683 struct timespec ts;
684 Fh *fh = NULL;
685 Inode *root = NULL, *inode = NULL;
686 struct ceph_statx stx;
687 int rc;
688 struct flock lock1;
689
690 STARTUP_CEPH();
691 s.cmount = cmount;
692
693 // Get the root inode
694 rc = ceph_ll_lookup_root(cmount, &root);
695 ASSERT_EQ(rc, 0);
696
697 // Get the inode and Fh corresponding to c_file
698 rc = ceph_ll_create(cmount, root, s.file, fileMode, O_RDWR | O_CREAT,
699 &inode, &fh, &stx, 0, 0, ceph_mount_perms(cmount));
700 ASSERT_EQ(rc, 0);
701
702 WAIT_MAIN(1); // (R1)
703
704 lock1.l_type = F_WRLCK;
705 lock1.l_whence = SEEK_SET;
706 lock1.l_start = 0;
707 lock1.l_len = 1024;
708 lock1.l_pid = getpid();
709 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
710 PING_MAIN(1); // (1)
711 lock1.l_type = F_WRLCK;
712 lock1.l_whence = SEEK_SET;
713 lock1.l_start = 0;
714 lock1.l_len = 1024;
715 lock1.l_pid = getpid();
716 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
717 PING_MAIN(2); // (2)
718
719 lock1.l_type = F_UNLCK;
720 lock1.l_whence = SEEK_SET;
721 lock1.l_start = 0;
722 lock1.l_len = 1024;
723 lock1.l_pid = getpid();
724 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
725 PING_MAIN(3); // (3)
726
727 lock1.l_type = F_RDLCK;
728 lock1.l_whence = SEEK_SET;
729 lock1.l_start = 0;
730 lock1.l_len = 1024;
731 lock1.l_pid = getpid();
732 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
733 PING_MAIN(4); // (4)
734
735 WAIT_MAIN(2); // (R2)
736 lock1.l_type = F_UNLCK;
737 lock1.l_whence = SEEK_SET;
738 lock1.l_start = 0;
739 lock1.l_len = 1024;
740 lock1.l_pid = getpid();
741 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
742 PING_MAIN(5); // (5)
743
744 WAIT_MAIN(3); // (R3)
745 lock1.l_type = F_WRLCK;
746 lock1.l_whence = SEEK_SET;
747 lock1.l_start = 0;
748 lock1.l_len = 1024;
749 lock1.l_pid = getpid();
750 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
751 PING_MAIN(6); // (6)
752
753 WAIT_MAIN(4); // (R4)
754 lock1.l_type = F_UNLCK;
755 lock1.l_whence = SEEK_SET;
756 lock1.l_start = 0;
757 lock1.l_len = 1024;
758 lock1.l_pid = getpid();
759 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
760 PING_MAIN(7); // (7)
761
762 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
763 CLEANUP_CEPH();
764
765 s.sem_destroy();
766 exit(EXIT_SUCCESS);
767 }
768
769 // Disabled because of fork() issues (http://tracker.ceph.com/issues/16556)
770 #ifndef _WIN32
771 TEST(LibCephFS, DISABLED_InterProcessRecordLocking) {
772 PROCESS_SLOW_MS();
773 // Process synchronization
774 char c_file[1024];
775 const pid_t mypid = getpid();
776 sprintf(c_file, "recordlock_test_%d", mypid);
777 Fh *fh = NULL;
778 Inode *root = NULL, *inode = NULL;
779 struct ceph_statx stx;
780 struct flock lock1;
781 int rc;
782
783 // Note: the semaphores MUST be on a shared memory segment
784 str_ConcurrentRecordLocking *const shs =
785 reinterpret_cast<str_ConcurrentRecordLocking*>
786 (mmap(0, sizeof(*shs), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
787 -1, 0));
788 str_ConcurrentRecordLocking &s = *shs;
789 s.file = c_file;
790 s.sem_init(1);
791
792 // Start locker process
793 const pid_t pid = fork();
794 ASSERT_GE(pid, 0);
795 if (pid == 0) {
796 process_ConcurrentRecordLocking(s);
797 exit(EXIT_FAILURE);
798 }
799
800 struct timespec ts;
801 struct ceph_mount_info *cmount;
802 STARTUP_CEPH();
803 UserPerm *perms = ceph_mount_perms(cmount);
804
805 // Get the root inode
806 rc = ceph_ll_lookup_root(cmount, &root);
807 ASSERT_EQ(rc, 0);
808
809 // Get the inode and Fh corresponding to c_file
810 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
811 &inode, &fh, &stx, 0, 0, perms);
812 ASSERT_EQ(rc, 0);
813
814 // Lock
815 lock1.l_type = F_WRLCK;
816 lock1.l_whence = SEEK_SET;
817 lock1.l_start = 0;
818 lock1.l_len = 1024;
819 lock1.l_pid = getpid();
820 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
821
822 // Synchronization point with process (failure: process is dead)
823 PING_WORKER(1); // (R1)
824 WAIT_WORKER(1); // (1)
825
826 // Shall not have lock immediately
827 NOT_WAIT_WORKER(2); // (2)
828
829 // Unlock
830 lock1.l_type = F_UNLCK;
831 lock1.l_whence = SEEK_SET;
832 lock1.l_start = 0;
833 lock1.l_len = 1024;
834 lock1.l_pid = getpid();
835 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
836
837 // Shall have lock
838 // Synchronization point with process (failure: process is dead)
839 WAIT_WORKER(2); // (2)
840
841 // Synchronization point with process (failure: process is dead)
842 WAIT_WORKER(3); // (3)
843
844 // Wait for process to share lock
845 WAIT_WORKER(4); // (4)
846 lock1.l_type = F_WRLCK;
847 lock1.l_whence = SEEK_SET;
848 lock1.l_start = 0;
849 lock1.l_len = 1024;
850 lock1.l_pid = getpid();
851 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
852 lock1.l_type = F_RDLCK;
853 lock1.l_whence = SEEK_SET;
854 lock1.l_start = 0;
855 lock1.l_len = 1024;
856 lock1.l_pid = getpid();
857 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
858
859 // Wake up process to unlock shared lock
860 PING_WORKER(2); // (R2)
861 WAIT_WORKER(5); // (5)
862
863 // Now we can lock exclusively
864 // Upgrade to exclusive lock (as per POSIX)
865 lock1.l_type = F_WRLCK;
866 lock1.l_whence = SEEK_SET;
867 lock1.l_start = 0;
868 lock1.l_len = 1024;
869 lock1.l_pid = getpid();
870 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
871
872 // Wake up process to lock shared lock
873 PING_WORKER(3); // (R3)
874
875 // Shall not have lock immediately
876 NOT_WAIT_WORKER(6); // (6)
877
878 // Release lock ; process will get it
879 lock1.l_type = F_UNLCK;
880 lock1.l_whence = SEEK_SET;
881 lock1.l_start = 0;
882 lock1.l_len = 1024;
883 lock1.l_pid = getpid();
884 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
885 WAIT_WORKER(6); // (6)
886
887 // We no longer have the lock
888 lock1.l_type = F_WRLCK;
889 lock1.l_whence = SEEK_SET;
890 lock1.l_start = 0;
891 lock1.l_len = 1024;
892 lock1.l_pid = getpid();
893 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
894 lock1.l_type = F_RDLCK;
895 lock1.l_whence = SEEK_SET;
896 lock1.l_start = 0;
897 lock1.l_len = 1024;
898 lock1.l_pid = getpid();
899 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
900
901 // Wake up process to unlock exclusive lock
902 PING_WORKER(4); // (R4)
903 WAIT_WORKER(7); // (7)
904
905 // We can lock it again
906 lock1.l_type = F_WRLCK;
907 lock1.l_whence = SEEK_SET;
908 lock1.l_start = 0;
909 lock1.l_len = 1024;
910 lock1.l_pid = getpid();
911 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
912 lock1.l_type = F_UNLCK;
913 lock1.l_whence = SEEK_SET;
914 lock1.l_start = 0;
915 lock1.l_len = 1024;
916 lock1.l_pid = getpid();
917 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
918
919 // Wait pid
920 int status;
921 ASSERT_EQ(pid, waitpid(pid, &status, 0));
922 ASSERT_EQ(EXIT_SUCCESS, status);
923
924 // Cleanup
925 s.sem_destroy();
926 ASSERT_EQ(0, munmap(shs, sizeof(*shs)));
927 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
928 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
929 CLEANUP_CEPH();
930 }
931 #endif
932
933 #ifndef _WIN32
934 // Disabled because of fork() issues (http://tracker.ceph.com/issues/16556)
935 TEST(LibCephFS, DISABLED_ThreesomeInterProcessRecordLocking) {
936 PROCESS_SLOW_MS();
937 // Process synchronization
938 char c_file[1024];
939 const pid_t mypid = getpid();
940 sprintf(c_file, "recordlock_test_%d", mypid);
941 Fh *fh = NULL;
942 Inode *root = NULL, *inode = NULL;
943 struct ceph_statx stx;
944 struct flock lock1;
945 int rc;
946
947 // Note: the semaphores MUST be on a shared memory segment
948 str_ConcurrentRecordLocking *const shs =
949 reinterpret_cast<str_ConcurrentRecordLocking*>
950 (mmap(0, sizeof(*shs), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
951 -1, 0));
952 str_ConcurrentRecordLocking &s = *shs;
953 s.file = c_file;
954 s.sem_init(1);
955
956 // Start locker processes
957 pid_t pid[2];
958 pid[0] = fork();
959 ASSERT_GE(pid[0], 0);
960 if (pid[0] == 0) {
961 process_ConcurrentRecordLocking(s);
962 exit(EXIT_FAILURE);
963 }
964 pid[1] = fork();
965 ASSERT_GE(pid[1], 0);
966 if (pid[1] == 0) {
967 process_ConcurrentRecordLocking(s);
968 exit(EXIT_FAILURE);
969 }
970
971 struct timespec ts;
972 struct ceph_mount_info *cmount;
973 STARTUP_CEPH();
974
975 // Get the root inode
976 rc = ceph_ll_lookup_root(cmount, &root);
977 ASSERT_EQ(rc, 0);
978
979 // Get the inode and Fh corresponding to c_file
980 UserPerm *perms = ceph_mount_perms(cmount);
981 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
982 &inode, &fh, &stx, 0, 0, perms);
983 ASSERT_EQ(rc, 0);
984
985 // Lock
986 lock1.l_type = F_WRLCK;
987 lock1.l_whence = SEEK_SET;
988 lock1.l_start = 0;
989 lock1.l_len = 1024;
990 lock1.l_pid = getpid();
991 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
992
993 // Synchronization point with process (failure: process is dead)
994 TWICE(PING_WORKER(1)); // (R1)
995 TWICE(WAIT_WORKER(1)); // (1)
996
997 // Shall not have lock immediately
998 NOT_WAIT_WORKER(2); // (2)
999
1000 // Unlock
1001 lock1.l_type = F_UNLCK;
1002 lock1.l_whence = SEEK_SET;
1003 lock1.l_start = 0;
1004 lock1.l_len = 1024;
1005 lock1.l_pid = getpid();
1006 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1007
1008 // Shall have lock
1009 TWICE(// Synchronization point with process (failure: process is dead)
1010 WAIT_WORKER(2); // (2)
1011
1012 // Synchronization point with process (failure: process is dead)
1013 WAIT_WORKER(3)); // (3)
1014
1015 // Wait for process to share lock
1016 TWICE(WAIT_WORKER(4)); // (4)
1017 lock1.l_type = F_WRLCK;
1018 lock1.l_whence = SEEK_SET;
1019 lock1.l_start = 0;
1020 lock1.l_len = 1024;
1021 lock1.l_pid = getpid();
1022 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1023 lock1.l_type = F_RDLCK;
1024 lock1.l_whence = SEEK_SET;
1025 lock1.l_start = 0;
1026 lock1.l_len = 1024;
1027 lock1.l_pid = getpid();
1028 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1029
1030 // Wake up process to unlock shared lock
1031 TWICE(PING_WORKER(2); // (R2)
1032 WAIT_WORKER(5)); // (5)
1033
1034 // Now we can lock exclusively
1035 // Upgrade to exclusive lock (as per POSIX)
1036 lock1.l_type = F_WRLCK;
1037 lock1.l_whence = SEEK_SET;
1038 lock1.l_start = 0;
1039 lock1.l_len = 1024;
1040 lock1.l_pid = getpid();
1041 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
1042
1043 TWICE( // Wake up process to lock shared lock
1044 PING_WORKER(3); // (R3)
1045
1046 // Shall not have lock immediately
1047 NOT_WAIT_WORKER(6)); // (6)
1048
1049 // Release lock ; process will get it
1050 lock1.l_type = F_UNLCK;
1051 lock1.l_whence = SEEK_SET;
1052 lock1.l_start = 0;
1053 lock1.l_len = 1024;
1054 lock1.l_pid = getpid();
1055 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1056 TWICE(WAIT_WORKER(6); // (6)
1057
1058 // We no longer have the lock
1059 lock1.l_type = F_WRLCK;
1060 lock1.l_whence = SEEK_SET;
1061 lock1.l_start = 0;
1062 lock1.l_len = 1024;
1063 lock1.l_pid = getpid();
1064 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
1065 lock1.l_type = F_RDLCK;
1066 lock1.l_whence = SEEK_SET;
1067 lock1.l_start = 0;
1068 lock1.l_len = 1024;
1069 lock1.l_pid = getpid();
1070 ASSERT_EQ(-CEPHFS_EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
1071
1072 // Wake up process to unlock exclusive lock
1073 PING_WORKER(4); // (R4)
1074 WAIT_WORKER(7); // (7)
1075 );
1076
1077 // We can lock it again
1078 lock1.l_type = F_WRLCK;
1079 lock1.l_whence = SEEK_SET;
1080 lock1.l_start = 0;
1081 lock1.l_len = 1024;
1082 lock1.l_pid = getpid();
1083 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1084 lock1.l_type = F_UNLCK;
1085 lock1.l_whence = SEEK_SET;
1086 lock1.l_start = 0;
1087 lock1.l_len = 1024;
1088 lock1.l_pid = getpid();
1089 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1090
1091 // Wait pids
1092 int status;
1093 ASSERT_EQ(pid[0], waitpid(pid[0], &status, 0));
1094 ASSERT_EQ(EXIT_SUCCESS, status);
1095 ASSERT_EQ(pid[1], waitpid(pid[1], &status, 0));
1096 ASSERT_EQ(EXIT_SUCCESS, status);
1097
1098 // Cleanup
1099 s.sem_destroy();
1100 ASSERT_EQ(0, munmap(shs, sizeof(*shs)));
1101 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
1102 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
1103 CLEANUP_CEPH();
1104 }
1105 #endif