]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/libcephfs/recordlock.cc
import ceph 14.2.5
[ceph.git] / ceph / src / test / libcephfs / recordlock.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2011 New Dream Network
7 * 2016 Red Hat
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16#include <pthread.h>
17#include "gtest/gtest.h"
18#ifndef GTEST_IS_THREADSAFE
19#error "!GTEST_IS_THREADSAFE"
20#endif
21
22#include "include/cephfs/libcephfs.h"
23#include <errno.h>
24#include <sys/fcntl.h>
25#include <unistd.h>
26#include <sys/file.h>
27#include <sys/types.h>
28#include <sys/stat.h>
29#include <dirent.h>
7c673cae
FG
30
31#include <stdlib.h>
32#include <semaphore.h>
33#include <time.h>
34#include <sys/mman.h>
35
36#ifdef __linux__
37#include <limits.h>
eafe8130
TL
38#include <sys/xattr.h>
39#elif __FreeBSD__
40#include <sys/types.h>
41#include <sys/wait.h>
7c673cae
FG
42#endif
43
11fdf7f2 44#include "include/ceph_assert.h"
eafe8130 45#include "ceph_pthread_self.h"
11fdf7f2 46
7c673cae
FG
47// Startup common: create and mount ceph fs
48#define STARTUP_CEPH() do { \
49 ASSERT_EQ(0, ceph_create(&cmount, NULL)); \
50 ASSERT_EQ(0, ceph_conf_read_file(cmount, NULL)); \
51 ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL)); \
52 ASSERT_EQ(0, ceph_mount(cmount, NULL)); \
53 } while(0)
54
55// Cleanup common: unmount and release ceph fs
56#define CLEANUP_CEPH() do { \
57 ASSERT_EQ(0, ceph_unmount(cmount)); \
58 ASSERT_EQ(0, ceph_release(cmount)); \
59 } while(0)
60
61static const mode_t fileMode = S_IRWXU | S_IRWXG | S_IRWXO;
62
63// Default wait time for normal and "slow" operations
64// (5" should be enough in case of network congestion)
65static const long waitMs = 10;
66static const long waitSlowMs = 5000;
67
68// Get the absolute struct timespec reference from now + 'ms' milliseconds
69static const struct timespec* abstime(struct timespec &ts, long ms) {
70 if (clock_gettime(CLOCK_REALTIME, &ts) == -1) {
11fdf7f2 71 ceph_abort();
7c673cae
FG
72 }
73 ts.tv_nsec += ms * 1000000;
74 ts.tv_sec += ts.tv_nsec / 1000000000;
75 ts.tv_nsec %= 1000000000;
76 return &ts;
77}
78
79/* Basic locking */
80
81TEST(LibCephFS, BasicRecordLocking) {
82 struct ceph_mount_info *cmount = NULL;
83 STARTUP_CEPH();
84
85 char c_file[1024];
86 sprintf(c_file, "recordlock_test_%d", getpid());
87 Fh *fh = NULL;
88 Inode *root = NULL, *inode = NULL;
89 struct ceph_statx stx;
90 int rc;
91 struct flock lock1, lock2;
92 UserPerm *perms = ceph_mount_perms(cmount);
93
94 // Get the root inode
95 rc = ceph_ll_lookup_root(cmount, &root);
96 ASSERT_EQ(rc, 0);
97
98 // Get the inode and Fh corresponding to c_file
99 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
100 &inode, &fh, &stx, 0, 0, perms);
101 ASSERT_EQ(rc, 0);
102
103 // write lock twice
104 lock1.l_type = F_WRLCK;
105 lock1.l_whence = SEEK_SET;
106 lock1.l_start = 0;
107 lock1.l_len = 1024;
108 lock1.l_pid = getpid();
109 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
110
111 lock2.l_type = F_WRLCK;
112 lock2.l_whence = SEEK_SET;
113 lock2.l_start = 0;
114 lock2.l_len = 1024;
115 lock2.l_pid = getpid();
116 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock2, 43, false));
117
118 // Now try a conflicting read lock
119 lock2.l_type = F_RDLCK;
120 lock2.l_whence = SEEK_SET;
121 lock2.l_start = 100;
122 lock2.l_len = 100;
123 lock2.l_pid = getpid();
124 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock2, 43, false));
125
126 // Now do a getlk
127 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
128 ASSERT_EQ(lock2.l_type, F_WRLCK);
129 ASSERT_EQ(lock2.l_start, 0);
130 ASSERT_EQ(lock2.l_len, 1024);
131 ASSERT_EQ(lock2.l_pid, getpid());
132
133 // Extend the range of the write lock
134 lock1.l_type = F_WRLCK;
135 lock1.l_whence = SEEK_SET;
136 lock1.l_start = 1024;
137 lock1.l_len = 1024;
138 lock1.l_pid = getpid();
139 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
140
141 // Now do a getlk
142 lock2.l_type = F_RDLCK;
143 lock2.l_whence = SEEK_SET;
144 lock2.l_start = 100;
145 lock2.l_len = 100;
146 lock2.l_pid = getpid();
147 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
148 ASSERT_EQ(lock2.l_type, F_WRLCK);
149 ASSERT_EQ(lock2.l_start, 0);
150 ASSERT_EQ(lock2.l_len, 2048);
151 ASSERT_EQ(lock2.l_pid, getpid());
152
153 // Now release part of the range
154 lock1.l_type = F_UNLCK;
155 lock1.l_whence = SEEK_SET;
156 lock1.l_start = 512;
157 lock1.l_len = 1024;
158 lock1.l_pid = getpid();
159 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
160
161 // Now do a getlk to check 1st part
162 lock2.l_type = F_RDLCK;
163 lock2.l_whence = SEEK_SET;
164 lock2.l_start = 100;
165 lock2.l_len = 100;
166 lock2.l_pid = getpid();
167 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
168 ASSERT_EQ(lock2.l_type, F_WRLCK);
169 ASSERT_EQ(lock2.l_start, 0);
170 ASSERT_EQ(lock2.l_len, 512);
171 ASSERT_EQ(lock2.l_pid, getpid());
172
173 // Now do a getlk to check 2nd part
174 lock2.l_type = F_RDLCK;
175 lock2.l_whence = SEEK_SET;
176 lock2.l_start = 2000;
177 lock2.l_len = 100;
178 lock2.l_pid = getpid();
179 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
180 ASSERT_EQ(lock2.l_type, F_WRLCK);
181 ASSERT_EQ(lock2.l_start, 1536);
182 ASSERT_EQ(lock2.l_len, 512);
183 ASSERT_EQ(lock2.l_pid, getpid());
184
185 // Now do a getlk to check released part
186 lock2.l_type = F_RDLCK;
187 lock2.l_whence = SEEK_SET;
188 lock2.l_start = 512;
189 lock2.l_len = 1024;
190 lock2.l_pid = getpid();
191 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
192 ASSERT_EQ(lock2.l_type, F_UNLCK);
193 ASSERT_EQ(lock2.l_start, 512);
194 ASSERT_EQ(lock2.l_len, 1024);
195 ASSERT_EQ(lock2.l_pid, getpid());
196
197 // Now downgrade the 1st part of the lock
198 lock1.l_type = F_RDLCK;
199 lock1.l_whence = SEEK_SET;
200 lock1.l_start = 0;
201 lock1.l_len = 512;
202 lock1.l_pid = getpid();
203 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
204
205 // Now do a getlk to check 1st part
206 lock2.l_type = F_WRLCK;
207 lock2.l_whence = SEEK_SET;
208 lock2.l_start = 100;
209 lock2.l_len = 100;
210 lock2.l_pid = getpid();
211 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
212 ASSERT_EQ(lock2.l_type, F_RDLCK);
213 ASSERT_EQ(lock2.l_start, 0);
214 ASSERT_EQ(lock2.l_len, 512);
215 ASSERT_EQ(lock2.l_pid, getpid());
216
217 // Now upgrade the 1st part of the lock
218 lock1.l_type = F_WRLCK;
219 lock1.l_whence = SEEK_SET;
220 lock1.l_start = 0;
221 lock1.l_len = 512;
222 lock1.l_pid = getpid();
223 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, 42, false));
224
225 // Now do a getlk to check 1st part
226 lock2.l_type = F_WRLCK;
227 lock2.l_whence = SEEK_SET;
228 lock2.l_start = 100;
229 lock2.l_len = 100;
230 lock2.l_pid = getpid();
231 ASSERT_EQ(0, ceph_ll_getlk(cmount, fh, &lock2, 43));
232 ASSERT_EQ(lock2.l_type, F_WRLCK);
233 ASSERT_EQ(lock2.l_start, 0);
234 ASSERT_EQ(lock2.l_len, 512);
235 ASSERT_EQ(lock2.l_pid, getpid());
236
237 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
238 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
239 CLEANUP_CEPH();
240}
241
242/* Locking in different threads */
243
244// Used by ConcurrentLocking test
245struct str_ConcurrentRecordLocking {
246 const char *file;
247 struct ceph_mount_info *cmount; // !NULL if shared
248 sem_t sem[2];
249 sem_t semReply[2];
250 void sem_init(int pshared) {
251 ASSERT_EQ(0, ::sem_init(&sem[0], pshared, 0));
252 ASSERT_EQ(0, ::sem_init(&sem[1], pshared, 0));
253 ASSERT_EQ(0, ::sem_init(&semReply[0], pshared, 0));
254 ASSERT_EQ(0, ::sem_init(&semReply[1], pshared, 0));
255 }
256 void sem_destroy() {
257 ASSERT_EQ(0, ::sem_destroy(&sem[0]));
258 ASSERT_EQ(0, ::sem_destroy(&sem[1]));
259 ASSERT_EQ(0, ::sem_destroy(&semReply[0]));
260 ASSERT_EQ(0, ::sem_destroy(&semReply[1]));
261 }
262};
263
264// Wakeup main (for (N) steps)
265#define PING_MAIN(n) ASSERT_EQ(0, sem_post(&s.sem[n%2]))
266// Wait for main to wake us up (for (RN) steps)
267#define WAIT_MAIN(n) \
268 ASSERT_EQ(0, sem_timedwait(&s.semReply[n%2], abstime(ts, waitSlowMs)))
269
270// Wakeup worker (for (RN) steps)
271#define PING_WORKER(n) ASSERT_EQ(0, sem_post(&s.semReply[n%2]))
272// Wait for worker to wake us up (for (N) steps)
273#define WAIT_WORKER(n) \
274 ASSERT_EQ(0, sem_timedwait(&s.sem[n%2], abstime(ts, waitSlowMs)))
275// Worker shall not wake us up (for (N) steps)
276#define NOT_WAIT_WORKER(n) \
277 ASSERT_EQ(-1, sem_timedwait(&s.sem[n%2], abstime(ts, waitMs)))
278
279// Do twice an operation
280#define TWICE(EXPR) do { \
281 EXPR; \
282 EXPR; \
283 } while(0)
284
285/* Locking in different threads */
286
287// Used by ConcurrentLocking test
288static void thread_ConcurrentRecordLocking(str_ConcurrentRecordLocking& s) {
289 struct ceph_mount_info *const cmount = s.cmount;
290 Fh *fh = NULL;
291 Inode *root = NULL, *inode = NULL;
292 struct ceph_statx stx;
293 struct flock lock1;
294 int rc;
295 struct timespec ts;
296
297 // Get the root inode
298 rc = ceph_ll_lookup_root(cmount, &root);
299 ASSERT_EQ(rc, 0);
300
301 // Get the inode and Fh corresponding to c_file
302 rc = ceph_ll_create(cmount, root, s.file, fileMode, O_RDWR | O_CREAT,
303 &inode, &fh, &stx, 0, 0, ceph_mount_perms(cmount));
304 ASSERT_EQ(rc, 0);
305
306 lock1.l_type = F_WRLCK;
307 lock1.l_whence = SEEK_SET;
308 lock1.l_start = 0;
309 lock1.l_len = 1024;
310 lock1.l_pid = getpid();
eafe8130 311 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
312
313 PING_MAIN(1); // (1)
314 lock1.l_type = F_WRLCK;
315 lock1.l_whence = SEEK_SET;
316 lock1.l_start = 0;
317 lock1.l_len = 1024;
318 lock1.l_pid = getpid();
eafe8130 319 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
7c673cae
FG
320 PING_MAIN(2); // (2)
321
322 lock1.l_type = F_UNLCK;
323 lock1.l_whence = SEEK_SET;
324 lock1.l_start = 0;
325 lock1.l_len = 1024;
326 lock1.l_pid = getpid();
eafe8130 327 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
328 PING_MAIN(3); // (3)
329
330 lock1.l_type = F_RDLCK;
331 lock1.l_whence = SEEK_SET;
332 lock1.l_start = 0;
333 lock1.l_len = 1024;
334 lock1.l_pid = getpid();
eafe8130 335 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
7c673cae
FG
336 PING_MAIN(4); // (4)
337
338 WAIT_MAIN(1); // (R1)
339 lock1.l_type = F_UNLCK;
340 lock1.l_whence = SEEK_SET;
341 lock1.l_start = 0;
342 lock1.l_len = 1024;
343 lock1.l_pid = getpid();
eafe8130 344 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
345 PING_MAIN(5); // (5)
346
347 WAIT_MAIN(2); // (R2)
348 lock1.l_type = F_WRLCK;
349 lock1.l_whence = SEEK_SET;
350 lock1.l_start = 0;
351 lock1.l_len = 1024;
352 lock1.l_pid = getpid();
eafe8130 353 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
7c673cae
FG
354 PING_MAIN(6); // (6)
355
356 WAIT_MAIN(3); // (R3)
357 lock1.l_type = F_UNLCK;
358 lock1.l_whence = SEEK_SET;
359 lock1.l_start = 0;
360 lock1.l_len = 1024;
361 lock1.l_pid = getpid();
eafe8130 362 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
363 PING_MAIN(7); // (7)
364
365 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
366}
367
368// Used by ConcurrentRecordLocking test
369static void* thread_ConcurrentRecordLocking_(void *arg) {
370 str_ConcurrentRecordLocking *const s =
371 reinterpret_cast<str_ConcurrentRecordLocking*>(arg);
372 thread_ConcurrentRecordLocking(*s);
373 return NULL;
374}
375
376TEST(LibCephFS, ConcurrentRecordLocking) {
377 const pid_t mypid = getpid();
378 struct ceph_mount_info *cmount;
379 STARTUP_CEPH();
380
381 char c_file[1024];
382 sprintf(c_file, "recordlock_test_%d", mypid);
383 Fh *fh = NULL;
384 Inode *root = NULL, *inode = NULL;
385 struct ceph_statx stx;
386 struct flock lock1;
387 int rc;
388 UserPerm *perms = ceph_mount_perms(cmount);
389
390 // Get the root inode
391 rc = ceph_ll_lookup_root(cmount, &root);
392 ASSERT_EQ(rc, 0);
393
394 // Get the inode and Fh corresponding to c_file
395 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
396 &inode, &fh, &stx, 0, 0, perms);
397 ASSERT_EQ(rc, 0);
398
399 // Lock
400 lock1.l_type = F_WRLCK;
401 lock1.l_whence = SEEK_SET;
402 lock1.l_start = 0;
403 lock1.l_len = 1024;
404 lock1.l_pid = getpid();
eafe8130 405 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
7c673cae
FG
406
407 // Start locker thread
408 pthread_t thread;
409 struct timespec ts;
410 str_ConcurrentRecordLocking s = { c_file, cmount };
411 s.sem_init(0);
412 ASSERT_EQ(0, pthread_create(&thread, NULL, thread_ConcurrentRecordLocking_, &s));
413 // Synchronization point with thread (failure: thread is dead)
414 WAIT_WORKER(1); // (1)
415
416 // Shall not have lock immediately
417 NOT_WAIT_WORKER(2); // (2)
418
419 // Unlock
420 lock1.l_type = F_UNLCK;
421 lock1.l_whence = SEEK_SET;
422 lock1.l_start = 0;
423 lock1.l_len = 1024;
424 lock1.l_pid = getpid();
eafe8130 425 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
426
427 // Shall have lock
428 // Synchronization point with thread (failure: thread is dead)
429 WAIT_WORKER(2); // (2)
430
431 // Synchronization point with thread (failure: thread is dead)
432 WAIT_WORKER(3); // (3)
433
434 // Wait for thread to share lock
435 WAIT_WORKER(4); // (4)
436 lock1.l_type = F_WRLCK;
437 lock1.l_whence = SEEK_SET;
438 lock1.l_start = 0;
439 lock1.l_len = 1024;
440 lock1.l_pid = getpid();
eafe8130 441 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
442 lock1.l_type = F_RDLCK;
443 lock1.l_whence = SEEK_SET;
444 lock1.l_start = 0;
445 lock1.l_len = 1024;
446 lock1.l_pid = getpid();
eafe8130 447 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
448
449 // Wake up thread to unlock shared lock
450 PING_WORKER(1); // (R1)
451 WAIT_WORKER(5); // (5)
452
453 // Now we can lock exclusively
454 // Upgrade to exclusive lock (as per POSIX)
455 lock1.l_type = F_WRLCK;
456 lock1.l_whence = SEEK_SET;
457 lock1.l_start = 0;
458 lock1.l_len = 1024;
459 lock1.l_pid = getpid();
eafe8130 460 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
7c673cae
FG
461
462 // Wake up thread to lock shared lock
463 PING_WORKER(2); // (R2)
464
465 // Shall not have lock immediately
466 NOT_WAIT_WORKER(6); // (6)
467
468 // Release lock ; thread will get it
469 lock1.l_type = F_UNLCK;
470 lock1.l_whence = SEEK_SET;
471 lock1.l_start = 0;
472 lock1.l_len = 1024;
473 lock1.l_pid = getpid();
eafe8130 474 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
475 WAIT_WORKER(6); // (6)
476
477 // We no longer have the lock
478 lock1.l_type = F_WRLCK;
479 lock1.l_whence = SEEK_SET;
480 lock1.l_start = 0;
481 lock1.l_len = 1024;
482 lock1.l_pid = getpid();
eafe8130 483 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
484 lock1.l_type = F_RDLCK;
485 lock1.l_whence = SEEK_SET;
486 lock1.l_start = 0;
487 lock1.l_len = 1024;
488 lock1.l_pid = getpid();
eafe8130 489 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
490
491 // Wake up thread to unlock exclusive lock
492 PING_WORKER(3); // (R3)
493 WAIT_WORKER(7); // (7)
494
495 // We can lock it again
496 lock1.l_type = F_WRLCK;
497 lock1.l_whence = SEEK_SET;
498 lock1.l_start = 0;
499 lock1.l_len = 1024;
500 lock1.l_pid = getpid();
eafe8130 501 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
502 lock1.l_type = F_UNLCK;
503 lock1.l_whence = SEEK_SET;
504 lock1.l_start = 0;
505 lock1.l_len = 1024;
506 lock1.l_pid = getpid();
eafe8130 507 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
508
509 // Cleanup
510 void *retval = (void*) (uintptr_t) -1;
511 ASSERT_EQ(0, pthread_join(thread, &retval));
512 ASSERT_EQ(NULL, retval);
513 s.sem_destroy();
514 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
515 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
516 CLEANUP_CEPH();
517}
518
519TEST(LibCephFS, ThreesomeRecordLocking) {
520 const pid_t mypid = getpid();
521 struct ceph_mount_info *cmount;
522 STARTUP_CEPH();
523
524 char c_file[1024];
525 sprintf(c_file, "recordlock_test_%d", mypid);
526 Fh *fh = NULL;
527 Inode *root = NULL, *inode = NULL;
528 struct ceph_statx stx;
529 struct flock lock1;
530 int rc;
531 UserPerm *perms = ceph_mount_perms(cmount);
532
533 // Get the root inode
534 rc = ceph_ll_lookup_root(cmount, &root);
535 ASSERT_EQ(rc, 0);
536
537 // Get the inode and Fh corresponding to c_file
538 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
539 &inode, &fh, &stx, 0, 0, perms);
540 ASSERT_EQ(rc, 0);
541
542 // Lock
543 lock1.l_type = F_WRLCK;
544 lock1.l_whence = SEEK_SET;
545 lock1.l_start = 0;
546 lock1.l_len = 1024;
547 lock1.l_pid = getpid();
eafe8130 548 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
7c673cae
FG
549
550 // Start locker thread
551 pthread_t thread[2];
552 struct timespec ts;
553 str_ConcurrentRecordLocking s = { c_file, cmount };
554 s.sem_init(0);
555 ASSERT_EQ(0, pthread_create(&thread[0], NULL, thread_ConcurrentRecordLocking_, &s));
556 ASSERT_EQ(0, pthread_create(&thread[1], NULL, thread_ConcurrentRecordLocking_, &s));
557 // Synchronization point with thread (failure: thread is dead)
558 TWICE(WAIT_WORKER(1)); // (1)
559
560 // Shall not have lock immediately
561 NOT_WAIT_WORKER(2); // (2)
562
563 // Unlock
564 lock1.l_type = F_UNLCK;
565 lock1.l_whence = SEEK_SET;
566 lock1.l_start = 0;
567 lock1.l_len = 1024;
568 lock1.l_pid = getpid();
eafe8130 569 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
570
571 // Shall have lock
572 TWICE(// Synchronization point with thread (failure: thread is dead)
573 WAIT_WORKER(2); // (2)
574
575 // Synchronization point with thread (failure: thread is dead)
576 WAIT_WORKER(3)); // (3)
577
578 // Wait for thread to share lock
579 TWICE(WAIT_WORKER(4)); // (4)
580 lock1.l_type = F_WRLCK;
581 lock1.l_whence = SEEK_SET;
582 lock1.l_start = 0;
583 lock1.l_len = 1024;
584 lock1.l_pid = getpid();
eafe8130 585 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
586 lock1.l_type = F_RDLCK;
587 lock1.l_whence = SEEK_SET;
588 lock1.l_start = 0;
589 lock1.l_len = 1024;
590 lock1.l_pid = getpid();
eafe8130 591 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
592
593 // Wake up thread to unlock shared lock
594 TWICE(PING_WORKER(1); // (R1)
595 WAIT_WORKER(5)); // (5)
596
597 // Now we can lock exclusively
598 // Upgrade to exclusive lock (as per POSIX)
599 lock1.l_type = F_WRLCK;
600 lock1.l_whence = SEEK_SET;
601 lock1.l_start = 0;
602 lock1.l_len = 1024;
603 lock1.l_pid = getpid();
eafe8130 604 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), true));
7c673cae
FG
605
606 TWICE( // Wake up thread to lock shared lock
607 PING_WORKER(2); // (R2)
608
609 // Shall not have lock immediately
610 NOT_WAIT_WORKER(6)); // (6)
611
612 // Release lock ; thread will get it
613 lock1.l_type = F_UNLCK;
614 lock1.l_whence = SEEK_SET;
615 lock1.l_start = 0;
616 lock1.l_len = 1024;
617 lock1.l_pid = getpid();
eafe8130 618 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
619 TWICE(WAIT_WORKER(6); // (6)
620
621 // We no longer have the lock
622 lock1.l_type = F_WRLCK;
623 lock1.l_whence = SEEK_SET;
624 lock1.l_start = 0;
625 lock1.l_len = 1024;
626 lock1.l_pid = getpid();
eafe8130 627 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
628 lock1.l_type = F_RDLCK;
629 lock1.l_whence = SEEK_SET;
630 lock1.l_start = 0;
631 lock1.l_len = 1024;
632 lock1.l_pid = getpid();
eafe8130 633 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
634
635 // Wake up thread to unlock exclusive lock
636 PING_WORKER(3); // (R3)
637 WAIT_WORKER(7); // (7)
638 );
639
640 // We can lock it again
641 lock1.l_type = F_WRLCK;
642 lock1.l_whence = SEEK_SET;
643 lock1.l_start = 0;
644 lock1.l_len = 1024;
645 lock1.l_pid = getpid();
eafe8130 646 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
647 lock1.l_type = F_UNLCK;
648 lock1.l_whence = SEEK_SET;
649 lock1.l_start = 0;
650 lock1.l_len = 1024;
651 lock1.l_pid = getpid();
eafe8130 652 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
653
654 // Cleanup
655 void *retval = (void*) (uintptr_t) -1;
656 ASSERT_EQ(0, pthread_join(thread[0], &retval));
657 ASSERT_EQ(NULL, retval);
658 ASSERT_EQ(0, pthread_join(thread[1], &retval));
659 ASSERT_EQ(NULL, retval);
660 s.sem_destroy();
661 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
662 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
663 CLEANUP_CEPH();
664}
665
666/* Locking in different processes */
667
668#define PROCESS_SLOW_MS() \
669 static const long waitMs = 100; \
670 (void) waitMs
671
672// Used by ConcurrentLocking test
673static void process_ConcurrentRecordLocking(str_ConcurrentRecordLocking& s) {
674 const pid_t mypid = getpid();
675 PROCESS_SLOW_MS();
676
677 struct ceph_mount_info *cmount = NULL;
678 struct timespec ts;
679 Fh *fh = NULL;
680 Inode *root = NULL, *inode = NULL;
681 struct ceph_statx stx;
682 int rc;
683 struct flock lock1;
684
685 STARTUP_CEPH();
686 s.cmount = cmount;
687
688 // Get the root inode
689 rc = ceph_ll_lookup_root(cmount, &root);
690 ASSERT_EQ(rc, 0);
691
692 // Get the inode and Fh corresponding to c_file
693 rc = ceph_ll_create(cmount, root, s.file, fileMode, O_RDWR | O_CREAT,
694 &inode, &fh, &stx, 0, 0, ceph_mount_perms(cmount));
695 ASSERT_EQ(rc, 0);
696
697 WAIT_MAIN(1); // (R1)
698
699 lock1.l_type = F_WRLCK;
700 lock1.l_whence = SEEK_SET;
701 lock1.l_start = 0;
702 lock1.l_len = 1024;
703 lock1.l_pid = getpid();
704 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
705 PING_MAIN(1); // (1)
706 lock1.l_type = F_WRLCK;
707 lock1.l_whence = SEEK_SET;
708 lock1.l_start = 0;
709 lock1.l_len = 1024;
710 lock1.l_pid = getpid();
711 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
712 PING_MAIN(2); // (2)
713
714 lock1.l_type = F_UNLCK;
715 lock1.l_whence = SEEK_SET;
716 lock1.l_start = 0;
717 lock1.l_len = 1024;
718 lock1.l_pid = getpid();
719 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
720 PING_MAIN(3); // (3)
721
722 lock1.l_type = F_RDLCK;
723 lock1.l_whence = SEEK_SET;
724 lock1.l_start = 0;
725 lock1.l_len = 1024;
726 lock1.l_pid = getpid();
727 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
728 PING_MAIN(4); // (4)
729
730 WAIT_MAIN(2); // (R2)
731 lock1.l_type = F_UNLCK;
732 lock1.l_whence = SEEK_SET;
733 lock1.l_start = 0;
734 lock1.l_len = 1024;
735 lock1.l_pid = getpid();
736 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
737 PING_MAIN(5); // (5)
738
739 WAIT_MAIN(3); // (R3)
740 lock1.l_type = F_WRLCK;
741 lock1.l_whence = SEEK_SET;
742 lock1.l_start = 0;
743 lock1.l_len = 1024;
744 lock1.l_pid = getpid();
745 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
746 PING_MAIN(6); // (6)
747
748 WAIT_MAIN(4); // (R4)
749 lock1.l_type = F_UNLCK;
750 lock1.l_whence = SEEK_SET;
751 lock1.l_start = 0;
752 lock1.l_len = 1024;
753 lock1.l_pid = getpid();
754 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
755 PING_MAIN(7); // (7)
756
757 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
758 CLEANUP_CEPH();
759
760 s.sem_destroy();
761 exit(EXIT_SUCCESS);
762}
763
764// Disabled because of fork() issues (http://tracker.ceph.com/issues/16556)
765TEST(LibCephFS, DISABLED_InterProcessRecordLocking) {
766 PROCESS_SLOW_MS();
767 // Process synchronization
768 char c_file[1024];
769 const pid_t mypid = getpid();
770 sprintf(c_file, "recordlock_test_%d", mypid);
771 Fh *fh = NULL;
772 Inode *root = NULL, *inode = NULL;
773 struct ceph_statx stx;
774 struct flock lock1;
775 int rc;
776
777 // Note: the semaphores MUST be on a shared memory segment
778 str_ConcurrentRecordLocking *const shs =
779 reinterpret_cast<str_ConcurrentRecordLocking*>
780 (mmap(0, sizeof(*shs), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
781 -1, 0));
782 str_ConcurrentRecordLocking &s = *shs;
783 s.file = c_file;
784 s.sem_init(1);
785
786 // Start locker process
787 const pid_t pid = fork();
788 ASSERT_GE(pid, 0);
789 if (pid == 0) {
790 process_ConcurrentRecordLocking(s);
791 exit(EXIT_FAILURE);
792 }
793
794 struct timespec ts;
795 struct ceph_mount_info *cmount;
796 STARTUP_CEPH();
797 UserPerm *perms = ceph_mount_perms(cmount);
798
799 // Get the root inode
800 rc = ceph_ll_lookup_root(cmount, &root);
801 ASSERT_EQ(rc, 0);
802
803 // Get the inode and Fh corresponding to c_file
804 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
805 &inode, &fh, &stx, 0, 0, perms);
806 ASSERT_EQ(rc, 0);
807
808 // Lock
809 lock1.l_type = F_WRLCK;
810 lock1.l_whence = SEEK_SET;
811 lock1.l_start = 0;
812 lock1.l_len = 1024;
813 lock1.l_pid = getpid();
814 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
815
816 // Synchronization point with process (failure: process is dead)
817 PING_WORKER(1); // (R1)
818 WAIT_WORKER(1); // (1)
819
820 // Shall not have lock immediately
821 NOT_WAIT_WORKER(2); // (2)
822
823 // Unlock
824 lock1.l_type = F_UNLCK;
825 lock1.l_whence = SEEK_SET;
826 lock1.l_start = 0;
827 lock1.l_len = 1024;
828 lock1.l_pid = getpid();
829 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
830
831 // Shall have lock
832 // Synchronization point with process (failure: process is dead)
833 WAIT_WORKER(2); // (2)
834
835 // Synchronization point with process (failure: process is dead)
836 WAIT_WORKER(3); // (3)
837
838 // Wait for process to share lock
839 WAIT_WORKER(4); // (4)
840 lock1.l_type = F_WRLCK;
841 lock1.l_whence = SEEK_SET;
842 lock1.l_start = 0;
843 lock1.l_len = 1024;
844 lock1.l_pid = getpid();
845 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
846 lock1.l_type = F_RDLCK;
847 lock1.l_whence = SEEK_SET;
848 lock1.l_start = 0;
849 lock1.l_len = 1024;
850 lock1.l_pid = getpid();
851 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
852
853 // Wake up process to unlock shared lock
854 PING_WORKER(2); // (R2)
855 WAIT_WORKER(5); // (5)
856
857 // Now we can lock exclusively
858 // Upgrade to exclusive lock (as per POSIX)
859 lock1.l_type = F_WRLCK;
860 lock1.l_whence = SEEK_SET;
861 lock1.l_start = 0;
862 lock1.l_len = 1024;
863 lock1.l_pid = getpid();
864 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
865
866 // Wake up process to lock shared lock
867 PING_WORKER(3); // (R3)
868
869 // Shall not have lock immediately
870 NOT_WAIT_WORKER(6); // (6)
871
872 // Release lock ; process will get it
873 lock1.l_type = F_UNLCK;
874 lock1.l_whence = SEEK_SET;
875 lock1.l_start = 0;
876 lock1.l_len = 1024;
877 lock1.l_pid = getpid();
878 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
879 WAIT_WORKER(6); // (6)
880
881 // We no longer have the lock
882 lock1.l_type = F_WRLCK;
883 lock1.l_whence = SEEK_SET;
884 lock1.l_start = 0;
885 lock1.l_len = 1024;
886 lock1.l_pid = getpid();
887 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
888 lock1.l_type = F_RDLCK;
889 lock1.l_whence = SEEK_SET;
890 lock1.l_start = 0;
891 lock1.l_len = 1024;
892 lock1.l_pid = getpid();
893 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
894
895 // Wake up process to unlock exclusive lock
896 PING_WORKER(4); // (R4)
897 WAIT_WORKER(7); // (7)
898
899 // We can lock it again
900 lock1.l_type = F_WRLCK;
901 lock1.l_whence = SEEK_SET;
902 lock1.l_start = 0;
903 lock1.l_len = 1024;
904 lock1.l_pid = getpid();
905 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
906 lock1.l_type = F_UNLCK;
907 lock1.l_whence = SEEK_SET;
908 lock1.l_start = 0;
909 lock1.l_len = 1024;
910 lock1.l_pid = getpid();
911 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
912
913 // Wait pid
914 int status;
915 ASSERT_EQ(pid, waitpid(pid, &status, 0));
916 ASSERT_EQ(EXIT_SUCCESS, status);
917
918 // Cleanup
919 s.sem_destroy();
920 ASSERT_EQ(0, munmap(shs, sizeof(*shs)));
921 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
922 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
923 CLEANUP_CEPH();
924}
925
926// Disabled because of fork() issues (http://tracker.ceph.com/issues/16556)
927TEST(LibCephFS, DISABLED_ThreesomeInterProcessRecordLocking) {
928 PROCESS_SLOW_MS();
929 // Process synchronization
930 char c_file[1024];
931 const pid_t mypid = getpid();
932 sprintf(c_file, "recordlock_test_%d", mypid);
933 Fh *fh = NULL;
934 Inode *root = NULL, *inode = NULL;
935 struct ceph_statx stx;
936 struct flock lock1;
937 int rc;
938
939 // Note: the semaphores MUST be on a shared memory segment
940 str_ConcurrentRecordLocking *const shs =
941 reinterpret_cast<str_ConcurrentRecordLocking*>
942 (mmap(0, sizeof(*shs), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
943 -1, 0));
944 str_ConcurrentRecordLocking &s = *shs;
945 s.file = c_file;
946 s.sem_init(1);
947
948 // Start locker processes
949 pid_t pid[2];
950 pid[0] = fork();
951 ASSERT_GE(pid[0], 0);
952 if (pid[0] == 0) {
953 process_ConcurrentRecordLocking(s);
954 exit(EXIT_FAILURE);
955 }
956 pid[1] = fork();
957 ASSERT_GE(pid[1], 0);
958 if (pid[1] == 0) {
959 process_ConcurrentRecordLocking(s);
960 exit(EXIT_FAILURE);
961 }
962
963 struct timespec ts;
964 struct ceph_mount_info *cmount;
965 STARTUP_CEPH();
966
967 // Get the root inode
968 rc = ceph_ll_lookup_root(cmount, &root);
969 ASSERT_EQ(rc, 0);
970
971 // Get the inode and Fh corresponding to c_file
972 UserPerm *perms = ceph_mount_perms(cmount);
973 rc = ceph_ll_create(cmount, root, c_file, fileMode, O_RDWR | O_CREAT,
974 &inode, &fh, &stx, 0, 0, perms);
975 ASSERT_EQ(rc, 0);
976
977 // Lock
978 lock1.l_type = F_WRLCK;
979 lock1.l_whence = SEEK_SET;
980 lock1.l_start = 0;
981 lock1.l_len = 1024;
982 lock1.l_pid = getpid();
983 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
984
985 // Synchronization point with process (failure: process is dead)
986 TWICE(PING_WORKER(1)); // (R1)
987 TWICE(WAIT_WORKER(1)); // (1)
988
989 // Shall not have lock immediately
990 NOT_WAIT_WORKER(2); // (2)
991
992 // Unlock
993 lock1.l_type = F_UNLCK;
994 lock1.l_whence = SEEK_SET;
995 lock1.l_start = 0;
996 lock1.l_len = 1024;
997 lock1.l_pid = getpid();
998 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
999
1000 // Shall have lock
1001 TWICE(// Synchronization point with process (failure: process is dead)
1002 WAIT_WORKER(2); // (2)
1003
1004 // Synchronization point with process (failure: process is dead)
1005 WAIT_WORKER(3)); // (3)
1006
1007 // Wait for process to share lock
1008 TWICE(WAIT_WORKER(4)); // (4)
1009 lock1.l_type = F_WRLCK;
1010 lock1.l_whence = SEEK_SET;
1011 lock1.l_start = 0;
1012 lock1.l_len = 1024;
1013 lock1.l_pid = getpid();
1014 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1015 lock1.l_type = F_RDLCK;
1016 lock1.l_whence = SEEK_SET;
1017 lock1.l_start = 0;
1018 lock1.l_len = 1024;
1019 lock1.l_pid = getpid();
1020 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1021
1022 // Wake up process to unlock shared lock
1023 TWICE(PING_WORKER(2); // (R2)
1024 WAIT_WORKER(5)); // (5)
1025
1026 // Now we can lock exclusively
1027 // Upgrade to exclusive lock (as per POSIX)
1028 lock1.l_type = F_WRLCK;
1029 lock1.l_whence = SEEK_SET;
1030 lock1.l_start = 0;
1031 lock1.l_len = 1024;
1032 lock1.l_pid = getpid();
1033 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, true));
1034
1035 TWICE( // Wake up process to lock shared lock
1036 PING_WORKER(3); // (R3)
1037
1038 // Shall not have lock immediately
1039 NOT_WAIT_WORKER(6)); // (6)
1040
1041 // Release lock ; process will get it
1042 lock1.l_type = F_UNLCK;
1043 lock1.l_whence = SEEK_SET;
1044 lock1.l_start = 0;
1045 lock1.l_len = 1024;
1046 lock1.l_pid = getpid();
1047 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1048 TWICE(WAIT_WORKER(6); // (6)
1049
1050 // We no longer have the lock
1051 lock1.l_type = F_WRLCK;
1052 lock1.l_whence = SEEK_SET;
1053 lock1.l_start = 0;
1054 lock1.l_len = 1024;
1055 lock1.l_pid = getpid();
eafe8130 1056 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
1057 lock1.l_type = F_RDLCK;
1058 lock1.l_whence = SEEK_SET;
1059 lock1.l_start = 0;
1060 lock1.l_len = 1024;
1061 lock1.l_pid = getpid();
eafe8130 1062 ASSERT_EQ(-EAGAIN, ceph_ll_setlk(cmount, fh, &lock1, ceph_pthread_self(), false));
7c673cae
FG
1063
1064 // Wake up process to unlock exclusive lock
1065 PING_WORKER(4); // (R4)
1066 WAIT_WORKER(7); // (7)
1067 );
1068
1069 // We can lock it again
1070 lock1.l_type = F_WRLCK;
1071 lock1.l_whence = SEEK_SET;
1072 lock1.l_start = 0;
1073 lock1.l_len = 1024;
1074 lock1.l_pid = getpid();
1075 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1076 lock1.l_type = F_UNLCK;
1077 lock1.l_whence = SEEK_SET;
1078 lock1.l_start = 0;
1079 lock1.l_len = 1024;
1080 lock1.l_pid = getpid();
1081 ASSERT_EQ(0, ceph_ll_setlk(cmount, fh, &lock1, mypid, false));
1082
1083 // Wait pids
1084 int status;
1085 ASSERT_EQ(pid[0], waitpid(pid[0], &status, 0));
1086 ASSERT_EQ(EXIT_SUCCESS, status);
1087 ASSERT_EQ(pid[1], waitpid(pid[1], &status, 0));
1088 ASSERT_EQ(EXIT_SUCCESS, status);
1089
1090 // Cleanup
1091 s.sem_destroy();
1092 ASSERT_EQ(0, munmap(shs, sizeof(*shs)));
1093 ASSERT_EQ(0, ceph_ll_close(cmount, fh));
1094 ASSERT_EQ(0, ceph_ll_unlink(cmount, root, c_file, perms));
1095 CLEANUP_CEPH();
1096}