]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/util/thread_local_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / util / thread_local_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include <thread>
7 #include <atomic>
8 #include <string>
9
10 #include "port/port.h"
11 #include "rocksdb/env.h"
12 #include "test_util/sync_point.h"
13 #include "test_util/testharness.h"
14 #include "test_util/testutil.h"
15 #include "util/autovector.h"
16 #include "util/thread_local.h"
17
18 namespace ROCKSDB_NAMESPACE {
19
20 class ThreadLocalTest : public testing::Test {
21 public:
22 ThreadLocalTest() : env_(Env::Default()) {}
23
24 Env* env_;
25 };
26
27 namespace {
28
29 struct Params {
30 Params(port::Mutex* m, port::CondVar* c, int* u, int n,
31 UnrefHandler handler = nullptr)
32 : mu(m),
33 cv(c),
34 unref(u),
35 total(n),
36 started(0),
37 completed(0),
38 doWrite(false),
39 tls1(handler),
40 tls2(nullptr) {}
41
42 port::Mutex* mu;
43 port::CondVar* cv;
44 int* unref;
45 int total;
46 int started;
47 int completed;
48 bool doWrite;
49 ThreadLocalPtr tls1;
50 ThreadLocalPtr* tls2;
51 };
52
53 class IDChecker : public ThreadLocalPtr {
54 public:
55 static uint32_t PeekId() {
56 return TEST_PeekId();
57 }
58 };
59
60 } // anonymous namespace
61
62 // Suppress false positive clang analyzer warnings.
63 #ifndef __clang_analyzer__
64 TEST_F(ThreadLocalTest, UniqueIdTest) {
65 port::Mutex mu;
66 port::CondVar cv(&mu);
67
68 uint32_t base_id = IDChecker::PeekId();
69 // New ThreadLocal instance bumps id by 1
70 {
71 // Id used 0
72 Params p1(&mu, &cv, nullptr, 1u);
73 ASSERT_EQ(IDChecker::PeekId(), base_id + 1u);
74 // Id used 1
75 Params p2(&mu, &cv, nullptr, 1u);
76 ASSERT_EQ(IDChecker::PeekId(), base_id + 2u);
77 // Id used 2
78 Params p3(&mu, &cv, nullptr, 1u);
79 ASSERT_EQ(IDChecker::PeekId(), base_id + 3u);
80 // Id used 3
81 Params p4(&mu, &cv, nullptr, 1u);
82 ASSERT_EQ(IDChecker::PeekId(), base_id + 4u);
83 }
84 // id 3, 2, 1, 0 are in the free queue in order
85 ASSERT_EQ(IDChecker::PeekId(), base_id + 0u);
86
87 // pick up 0
88 Params p1(&mu, &cv, nullptr, 1u);
89 ASSERT_EQ(IDChecker::PeekId(), base_id + 1u);
90 // pick up 1
91 Params* p2 = new Params(&mu, &cv, nullptr, 1u);
92 ASSERT_EQ(IDChecker::PeekId(), base_id + 2u);
93 // pick up 2
94 Params p3(&mu, &cv, nullptr, 1u);
95 ASSERT_EQ(IDChecker::PeekId(), base_id + 3u);
96 // return up 1
97 delete p2;
98 ASSERT_EQ(IDChecker::PeekId(), base_id + 1u);
99 // Now we have 3, 1 in queue
100 // pick up 1
101 Params p4(&mu, &cv, nullptr, 1u);
102 ASSERT_EQ(IDChecker::PeekId(), base_id + 3u);
103 // pick up 3
104 Params p5(&mu, &cv, nullptr, 1u);
105 // next new id
106 ASSERT_EQ(IDChecker::PeekId(), base_id + 4u);
107 // After exit, id sequence in queue:
108 // 3, 1, 2, 0
109 }
110 #endif // __clang_analyzer__
111
112 TEST_F(ThreadLocalTest, SequentialReadWriteTest) {
113 // global id list carries over 3, 1, 2, 0
114 uint32_t base_id = IDChecker::PeekId();
115
116 port::Mutex mu;
117 port::CondVar cv(&mu);
118 Params p(&mu, &cv, nullptr, 1);
119 ThreadLocalPtr tls2;
120 p.tls2 = &tls2;
121
122 ASSERT_GT(IDChecker::PeekId(), base_id);
123 base_id = IDChecker::PeekId();
124
125 auto func = [](void* ptr) {
126 auto& params = *static_cast<Params*>(ptr);
127
128 ASSERT_TRUE(params.tls1.Get() == nullptr);
129 params.tls1.Reset(reinterpret_cast<int*>(1));
130 ASSERT_TRUE(params.tls1.Get() == reinterpret_cast<int*>(1));
131 params.tls1.Reset(reinterpret_cast<int*>(2));
132 ASSERT_TRUE(params.tls1.Get() == reinterpret_cast<int*>(2));
133
134 ASSERT_TRUE(params.tls2->Get() == nullptr);
135 params.tls2->Reset(reinterpret_cast<int*>(1));
136 ASSERT_TRUE(params.tls2->Get() == reinterpret_cast<int*>(1));
137 params.tls2->Reset(reinterpret_cast<int*>(2));
138 ASSERT_TRUE(params.tls2->Get() == reinterpret_cast<int*>(2));
139
140 params.mu->Lock();
141 ++(params.completed);
142 params.cv->SignalAll();
143 params.mu->Unlock();
144 };
145
146 for (int iter = 0; iter < 1024; ++iter) {
147 ASSERT_EQ(IDChecker::PeekId(), base_id);
148 // Another new thread, read/write should not see value from previous thread
149 env_->StartThread(func, static_cast<void*>(&p));
150 mu.Lock();
151 while (p.completed != iter + 1) {
152 cv.Wait();
153 }
154 mu.Unlock();
155 ASSERT_EQ(IDChecker::PeekId(), base_id);
156 }
157 }
158
159 TEST_F(ThreadLocalTest, ConcurrentReadWriteTest) {
160 // global id list carries over 3, 1, 2, 0
161 uint32_t base_id = IDChecker::PeekId();
162
163 ThreadLocalPtr tls2;
164 port::Mutex mu1;
165 port::CondVar cv1(&mu1);
166 Params p1(&mu1, &cv1, nullptr, 16);
167 p1.tls2 = &tls2;
168
169 port::Mutex mu2;
170 port::CondVar cv2(&mu2);
171 Params p2(&mu2, &cv2, nullptr, 16);
172 p2.doWrite = true;
173 p2.tls2 = &tls2;
174
175 auto func = [](void* ptr) {
176 auto& p = *static_cast<Params*>(ptr);
177
178 p.mu->Lock();
179 // Size_T switches size along with the ptr size
180 // we want to cast to.
181 size_t own = ++(p.started);
182 p.cv->SignalAll();
183 while (p.started != p.total) {
184 p.cv->Wait();
185 }
186 p.mu->Unlock();
187
188 // Let write threads write a different value from the read threads
189 if (p.doWrite) {
190 own += 8192;
191 }
192
193 ASSERT_TRUE(p.tls1.Get() == nullptr);
194 ASSERT_TRUE(p.tls2->Get() == nullptr);
195
196 auto* env = Env::Default();
197 auto start = env->NowMicros();
198
199 p.tls1.Reset(reinterpret_cast<size_t*>(own));
200 p.tls2->Reset(reinterpret_cast<size_t*>(own + 1));
201 // Loop for 1 second
202 while (env->NowMicros() - start < 1000 * 1000) {
203 for (int iter = 0; iter < 100000; ++iter) {
204 ASSERT_TRUE(p.tls1.Get() == reinterpret_cast<size_t*>(own));
205 ASSERT_TRUE(p.tls2->Get() == reinterpret_cast<size_t*>(own + 1));
206 if (p.doWrite) {
207 p.tls1.Reset(reinterpret_cast<size_t*>(own));
208 p.tls2->Reset(reinterpret_cast<size_t*>(own + 1));
209 }
210 }
211 }
212
213 p.mu->Lock();
214 ++(p.completed);
215 p.cv->SignalAll();
216 p.mu->Unlock();
217 };
218
219 // Initiate 2 instnaces: one keeps writing and one keeps reading.
220 // The read instance should not see data from the write instance.
221 // Each thread local copy of the value are also different from each
222 // other.
223 for (int th = 0; th < p1.total; ++th) {
224 env_->StartThread(func, static_cast<void*>(&p1));
225 }
226 for (int th = 0; th < p2.total; ++th) {
227 env_->StartThread(func, static_cast<void*>(&p2));
228 }
229
230 mu1.Lock();
231 while (p1.completed != p1.total) {
232 cv1.Wait();
233 }
234 mu1.Unlock();
235
236 mu2.Lock();
237 while (p2.completed != p2.total) {
238 cv2.Wait();
239 }
240 mu2.Unlock();
241
242 ASSERT_EQ(IDChecker::PeekId(), base_id + 3u);
243 }
244
245 TEST_F(ThreadLocalTest, Unref) {
246 auto unref = [](void* ptr) {
247 auto& p = *static_cast<Params*>(ptr);
248 p.mu->Lock();
249 ++(*p.unref);
250 p.mu->Unlock();
251 };
252
253 // Case 0: no unref triggered if ThreadLocalPtr is never accessed
254 auto func0 = [](void* ptr) {
255 auto& p = *static_cast<Params*>(ptr);
256
257 p.mu->Lock();
258 ++(p.started);
259 p.cv->SignalAll();
260 while (p.started != p.total) {
261 p.cv->Wait();
262 }
263 p.mu->Unlock();
264 };
265
266 for (int th = 1; th <= 128; th += th) {
267 port::Mutex mu;
268 port::CondVar cv(&mu);
269 int unref_count = 0;
270 Params p(&mu, &cv, &unref_count, th, unref);
271
272 for (int i = 0; i < p.total; ++i) {
273 env_->StartThread(func0, static_cast<void*>(&p));
274 }
275 env_->WaitForJoin();
276 ASSERT_EQ(unref_count, 0);
277 }
278
279 // Case 1: unref triggered by thread exit
280 auto func1 = [](void* ptr) {
281 auto& p = *static_cast<Params*>(ptr);
282
283 p.mu->Lock();
284 ++(p.started);
285 p.cv->SignalAll();
286 while (p.started != p.total) {
287 p.cv->Wait();
288 }
289 p.mu->Unlock();
290
291 ASSERT_TRUE(p.tls1.Get() == nullptr);
292 ASSERT_TRUE(p.tls2->Get() == nullptr);
293
294 p.tls1.Reset(ptr);
295 p.tls2->Reset(ptr);
296
297 p.tls1.Reset(ptr);
298 p.tls2->Reset(ptr);
299 };
300
301 for (int th = 1; th <= 128; th += th) {
302 port::Mutex mu;
303 port::CondVar cv(&mu);
304 int unref_count = 0;
305 ThreadLocalPtr tls2(unref);
306 Params p(&mu, &cv, &unref_count, th, unref);
307 p.tls2 = &tls2;
308
309 for (int i = 0; i < p.total; ++i) {
310 env_->StartThread(func1, static_cast<void*>(&p));
311 }
312
313 env_->WaitForJoin();
314
315 // N threads x 2 ThreadLocal instance cleanup on thread exit
316 ASSERT_EQ(unref_count, 2 * p.total);
317 }
318
319 // Case 2: unref triggered by ThreadLocal instance destruction
320 auto func2 = [](void* ptr) {
321 auto& p = *static_cast<Params*>(ptr);
322
323 p.mu->Lock();
324 ++(p.started);
325 p.cv->SignalAll();
326 while (p.started != p.total) {
327 p.cv->Wait();
328 }
329 p.mu->Unlock();
330
331 ASSERT_TRUE(p.tls1.Get() == nullptr);
332 ASSERT_TRUE(p.tls2->Get() == nullptr);
333
334 p.tls1.Reset(ptr);
335 p.tls2->Reset(ptr);
336
337 p.tls1.Reset(ptr);
338 p.tls2->Reset(ptr);
339
340 p.mu->Lock();
341 ++(p.completed);
342 p.cv->SignalAll();
343
344 // Waiting for instruction to exit thread
345 while (p.completed != 0) {
346 p.cv->Wait();
347 }
348 p.mu->Unlock();
349 };
350
351 for (int th = 1; th <= 128; th += th) {
352 port::Mutex mu;
353 port::CondVar cv(&mu);
354 int unref_count = 0;
355 Params p(&mu, &cv, &unref_count, th, unref);
356 p.tls2 = new ThreadLocalPtr(unref);
357
358 for (int i = 0; i < p.total; ++i) {
359 env_->StartThread(func2, static_cast<void*>(&p));
360 }
361
362 // Wait for all threads to finish using Params
363 mu.Lock();
364 while (p.completed != p.total) {
365 cv.Wait();
366 }
367 mu.Unlock();
368
369 // Now destroy one ThreadLocal instance
370 delete p.tls2;
371 p.tls2 = nullptr;
372 // instance destroy for N threads
373 ASSERT_EQ(unref_count, p.total);
374
375 // Signal to exit
376 mu.Lock();
377 p.completed = 0;
378 cv.SignalAll();
379 mu.Unlock();
380 env_->WaitForJoin();
381 // additional N threads exit unref for the left instance
382 ASSERT_EQ(unref_count, 2 * p.total);
383 }
384 }
385
386 TEST_F(ThreadLocalTest, Swap) {
387 ThreadLocalPtr tls;
388 tls.Reset(reinterpret_cast<void*>(1));
389 ASSERT_EQ(reinterpret_cast<int64_t>(tls.Swap(nullptr)), 1);
390 ASSERT_TRUE(tls.Swap(reinterpret_cast<void*>(2)) == nullptr);
391 ASSERT_EQ(reinterpret_cast<int64_t>(tls.Get()), 2);
392 ASSERT_EQ(reinterpret_cast<int64_t>(tls.Swap(reinterpret_cast<void*>(3))), 2);
393 }
394
395 TEST_F(ThreadLocalTest, Scrape) {
396 auto unref = [](void* ptr) {
397 auto& p = *static_cast<Params*>(ptr);
398 p.mu->Lock();
399 ++(*p.unref);
400 p.mu->Unlock();
401 };
402
403 auto func = [](void* ptr) {
404 auto& p = *static_cast<Params*>(ptr);
405
406 ASSERT_TRUE(p.tls1.Get() == nullptr);
407 ASSERT_TRUE(p.tls2->Get() == nullptr);
408
409 p.tls1.Reset(ptr);
410 p.tls2->Reset(ptr);
411
412 p.tls1.Reset(ptr);
413 p.tls2->Reset(ptr);
414
415 p.mu->Lock();
416 ++(p.completed);
417 p.cv->SignalAll();
418
419 // Waiting for instruction to exit thread
420 while (p.completed != 0) {
421 p.cv->Wait();
422 }
423 p.mu->Unlock();
424 };
425
426 for (int th = 1; th <= 128; th += th) {
427 port::Mutex mu;
428 port::CondVar cv(&mu);
429 int unref_count = 0;
430 Params p(&mu, &cv, &unref_count, th, unref);
431 p.tls2 = new ThreadLocalPtr(unref);
432
433 for (int i = 0; i < p.total; ++i) {
434 env_->StartThread(func, static_cast<void*>(&p));
435 }
436
437 // Wait for all threads to finish using Params
438 mu.Lock();
439 while (p.completed != p.total) {
440 cv.Wait();
441 }
442 mu.Unlock();
443
444 ASSERT_EQ(unref_count, 0);
445
446 // Scrape all thread local data. No unref at thread
447 // exit or ThreadLocalPtr destruction
448 autovector<void*> ptrs;
449 p.tls1.Scrape(&ptrs, nullptr);
450 p.tls2->Scrape(&ptrs, nullptr);
451 delete p.tls2;
452 // Signal to exit
453 mu.Lock();
454 p.completed = 0;
455 cv.SignalAll();
456 mu.Unlock();
457 env_->WaitForJoin();
458
459 ASSERT_EQ(unref_count, 0);
460 }
461 }
462
463 TEST_F(ThreadLocalTest, Fold) {
464 auto unref = [](void* ptr) {
465 delete static_cast<std::atomic<int64_t>*>(ptr);
466 };
467 static const int kNumThreads = 16;
468 static const int kItersPerThread = 10;
469 port::Mutex mu;
470 port::CondVar cv(&mu);
471 Params params(&mu, &cv, nullptr, kNumThreads, unref);
472 auto func = [](void* ptr) {
473 auto& p = *static_cast<Params*>(ptr);
474 ASSERT_TRUE(p.tls1.Get() == nullptr);
475 p.tls1.Reset(new std::atomic<int64_t>(0));
476
477 for (int i = 0; i < kItersPerThread; ++i) {
478 static_cast<std::atomic<int64_t>*>(p.tls1.Get())->fetch_add(1);
479 }
480
481 p.mu->Lock();
482 ++(p.completed);
483 p.cv->SignalAll();
484
485 // Waiting for instruction to exit thread
486 while (p.completed != 0) {
487 p.cv->Wait();
488 }
489 p.mu->Unlock();
490 };
491
492 for (int th = 0; th < params.total; ++th) {
493 env_->StartThread(func, static_cast<void*>(&params));
494 }
495
496 // Wait for all threads to finish using Params
497 mu.Lock();
498 while (params.completed != params.total) {
499 cv.Wait();
500 }
501 mu.Unlock();
502
503 // Verify Fold() behavior
504 int64_t sum = 0;
505 params.tls1.Fold(
506 [](void* ptr, void* res) {
507 auto sum_ptr = static_cast<int64_t*>(res);
508 *sum_ptr += static_cast<std::atomic<int64_t>*>(ptr)->load();
509 },
510 &sum);
511 ASSERT_EQ(sum, kNumThreads * kItersPerThread);
512
513 // Signal to exit
514 mu.Lock();
515 params.completed = 0;
516 cv.SignalAll();
517 mu.Unlock();
518 env_->WaitForJoin();
519 }
520
521 TEST_F(ThreadLocalTest, CompareAndSwap) {
522 ThreadLocalPtr tls;
523 ASSERT_TRUE(tls.Swap(reinterpret_cast<void*>(1)) == nullptr);
524 void* expected = reinterpret_cast<void*>(1);
525 // Swap in 2
526 ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast<void*>(2), expected));
527 expected = reinterpret_cast<void*>(100);
528 // Fail Swap, still 2
529 ASSERT_TRUE(!tls.CompareAndSwap(reinterpret_cast<void*>(2), expected));
530 ASSERT_EQ(expected, reinterpret_cast<void*>(2));
531 // Swap in 3
532 expected = reinterpret_cast<void*>(2);
533 ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast<void*>(3), expected));
534 ASSERT_EQ(tls.Get(), reinterpret_cast<void*>(3));
535 }
536
537 namespace {
538
539 void* AccessThreadLocal(void* /*arg*/) {
540 TEST_SYNC_POINT("AccessThreadLocal:Start");
541 ThreadLocalPtr tlp;
542 tlp.Reset(new std::string("hello RocksDB"));
543 TEST_SYNC_POINT("AccessThreadLocal:End");
544 return nullptr;
545 }
546
547 } // namespace
548
549 // The following test is disabled as it requires manual steps to run it
550 // correctly.
551 //
552 // Currently we have no way to acess SyncPoint w/o ASAN error when the
553 // child thread dies after the main thread dies. So if you manually enable
554 // this test and only see an ASAN error on SyncPoint, it means you pass the
555 // test.
556 TEST_F(ThreadLocalTest, DISABLED_MainThreadDiesFirst) {
557 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
558 {{"AccessThreadLocal:Start", "MainThreadDiesFirst:End"},
559 {"PosixEnv::~PosixEnv():End", "AccessThreadLocal:End"}});
560
561 // Triggers the initialization of singletons.
562 Env::Default();
563
564 #ifndef ROCKSDB_LITE
565 try {
566 #endif // ROCKSDB_LITE
567 ROCKSDB_NAMESPACE::port::Thread th(&AccessThreadLocal, nullptr);
568 th.detach();
569 TEST_SYNC_POINT("MainThreadDiesFirst:End");
570 #ifndef ROCKSDB_LITE
571 } catch (const std::system_error& ex) {
572 std::cerr << "Start thread: " << ex.code() << std::endl;
573 FAIL();
574 }
575 #endif // ROCKSDB_LITE
576 }
577
578 } // namespace ROCKSDB_NAMESPACE
579
580 int main(int argc, char** argv) {
581 ::testing::InitGoogleTest(&argc, argv);
582 return RUN_ALL_TESTS();
583 }