]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/util/thread_local_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / util / thread_local_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#include <thread>
7#include <atomic>
8#include <string>
9
10#include "rocksdb/env.h"
11#include "port/port.h"
12#include "util/autovector.h"
13#include "util/sync_point.h"
14#include "util/testharness.h"
15#include "util/testutil.h"
16#include "util/thread_local.h"
17
18namespace rocksdb {
19
20class ThreadLocalTest : public testing::Test {
21 public:
22 ThreadLocalTest() : env_(Env::Default()) {}
23
24 Env* env_;
25};
26
27namespace {
28
29struct Params {
30 Params(port::Mutex* m, port::CondVar* c, int* u, int n,
31 UnrefHandler handler = nullptr)
32 : mu(m),
33 cv(c),
34 unref(u),
35 total(n),
36 started(0),
37 completed(0),
38 doWrite(false),
39 tls1(handler),
40 tls2(nullptr) {}
41
42 port::Mutex* mu;
43 port::CondVar* cv;
44 int* unref;
45 int total;
46 int started;
47 int completed;
48 bool doWrite;
49 ThreadLocalPtr tls1;
50 ThreadLocalPtr* tls2;
51};
52
53class IDChecker : public ThreadLocalPtr {
54public:
55 static uint32_t PeekId() {
56 return TEST_PeekId();
57 }
58};
59
60} // anonymous namespace
61
62// Suppress false positive clang analyzer warnings.
63#ifndef __clang_analyzer__
64TEST_F(ThreadLocalTest, UniqueIdTest) {
65 port::Mutex mu;
66 port::CondVar cv(&mu);
67
68 ASSERT_EQ(IDChecker::PeekId(), 0u);
69 // New ThreadLocal instance bumps id by 1
70 {
71 // Id used 0
72 Params p1(&mu, &cv, nullptr, 1u);
73 ASSERT_EQ(IDChecker::PeekId(), 1u);
74 // Id used 1
75 Params p2(&mu, &cv, nullptr, 1u);
76 ASSERT_EQ(IDChecker::PeekId(), 2u);
77 // Id used 2
78 Params p3(&mu, &cv, nullptr, 1u);
79 ASSERT_EQ(IDChecker::PeekId(), 3u);
80 // Id used 3
81 Params p4(&mu, &cv, nullptr, 1u);
82 ASSERT_EQ(IDChecker::PeekId(), 4u);
83 }
84 // id 3, 2, 1, 0 are in the free queue in order
85 ASSERT_EQ(IDChecker::PeekId(), 0u);
86
87 // pick up 0
88 Params p1(&mu, &cv, nullptr, 1u);
89 ASSERT_EQ(IDChecker::PeekId(), 1u);
90 // pick up 1
91 Params* p2 = new Params(&mu, &cv, nullptr, 1u);
92 ASSERT_EQ(IDChecker::PeekId(), 2u);
93 // pick up 2
94 Params p3(&mu, &cv, nullptr, 1u);
95 ASSERT_EQ(IDChecker::PeekId(), 3u);
96 // return up 1
97 delete p2;
98 ASSERT_EQ(IDChecker::PeekId(), 1u);
99 // Now we have 3, 1 in queue
100 // pick up 1
101 Params p4(&mu, &cv, nullptr, 1u);
102 ASSERT_EQ(IDChecker::PeekId(), 3u);
103 // pick up 3
104 Params p5(&mu, &cv, nullptr, 1u);
105 // next new id
106 ASSERT_EQ(IDChecker::PeekId(), 4u);
107 // After exit, id sequence in queue:
108 // 3, 1, 2, 0
109}
110#endif // __clang_analyzer__
111
112TEST_F(ThreadLocalTest, SequentialReadWriteTest) {
113 // global id list carries over 3, 1, 2, 0
114 ASSERT_EQ(IDChecker::PeekId(), 0u);
115
116 port::Mutex mu;
117 port::CondVar cv(&mu);
118 Params p(&mu, &cv, nullptr, 1);
119 ThreadLocalPtr tls2;
120 p.tls2 = &tls2;
121
122 auto func = [](void* ptr) {
123 auto& params = *static_cast<Params*>(ptr);
124
125 ASSERT_TRUE(params.tls1.Get() == nullptr);
126 params.tls1.Reset(reinterpret_cast<int*>(1));
127 ASSERT_TRUE(params.tls1.Get() == reinterpret_cast<int*>(1));
128 params.tls1.Reset(reinterpret_cast<int*>(2));
129 ASSERT_TRUE(params.tls1.Get() == reinterpret_cast<int*>(2));
130
131 ASSERT_TRUE(params.tls2->Get() == nullptr);
132 params.tls2->Reset(reinterpret_cast<int*>(1));
133 ASSERT_TRUE(params.tls2->Get() == reinterpret_cast<int*>(1));
134 params.tls2->Reset(reinterpret_cast<int*>(2));
135 ASSERT_TRUE(params.tls2->Get() == reinterpret_cast<int*>(2));
136
137 params.mu->Lock();
138 ++(params.completed);
139 params.cv->SignalAll();
140 params.mu->Unlock();
141 };
142
143 for (int iter = 0; iter < 1024; ++iter) {
144 ASSERT_EQ(IDChecker::PeekId(), 1u);
145 // Another new thread, read/write should not see value from previous thread
146 env_->StartThread(func, static_cast<void*>(&p));
147 mu.Lock();
148 while (p.completed != iter + 1) {
149 cv.Wait();
150 }
151 mu.Unlock();
152 ASSERT_EQ(IDChecker::PeekId(), 1u);
153 }
154}
155
156TEST_F(ThreadLocalTest, ConcurrentReadWriteTest) {
157 // global id list carries over 3, 1, 2, 0
158 ASSERT_EQ(IDChecker::PeekId(), 0u);
159
160 ThreadLocalPtr tls2;
161 port::Mutex mu1;
162 port::CondVar cv1(&mu1);
163 Params p1(&mu1, &cv1, nullptr, 16);
164 p1.tls2 = &tls2;
165
166 port::Mutex mu2;
167 port::CondVar cv2(&mu2);
168 Params p2(&mu2, &cv2, nullptr, 16);
169 p2.doWrite = true;
170 p2.tls2 = &tls2;
171
172 auto func = [](void* ptr) {
173 auto& p = *static_cast<Params*>(ptr);
174
175 p.mu->Lock();
176 // Size_T switches size along with the ptr size
177 // we want to cast to.
178 size_t own = ++(p.started);
179 p.cv->SignalAll();
180 while (p.started != p.total) {
181 p.cv->Wait();
182 }
183 p.mu->Unlock();
184
185 // Let write threads write a different value from the read threads
186 if (p.doWrite) {
187 own += 8192;
188 }
189
190 ASSERT_TRUE(p.tls1.Get() == nullptr);
191 ASSERT_TRUE(p.tls2->Get() == nullptr);
192
193 auto* env = Env::Default();
194 auto start = env->NowMicros();
195
196 p.tls1.Reset(reinterpret_cast<size_t*>(own));
197 p.tls2->Reset(reinterpret_cast<size_t*>(own + 1));
198 // Loop for 1 second
199 while (env->NowMicros() - start < 1000 * 1000) {
200 for (int iter = 0; iter < 100000; ++iter) {
201 ASSERT_TRUE(p.tls1.Get() == reinterpret_cast<size_t*>(own));
202 ASSERT_TRUE(p.tls2->Get() == reinterpret_cast<size_t*>(own + 1));
203 if (p.doWrite) {
204 p.tls1.Reset(reinterpret_cast<size_t*>(own));
205 p.tls2->Reset(reinterpret_cast<size_t*>(own + 1));
206 }
207 }
208 }
209
210 p.mu->Lock();
211 ++(p.completed);
212 p.cv->SignalAll();
213 p.mu->Unlock();
214 };
215
216 // Initiate 2 instnaces: one keeps writing and one keeps reading.
217 // The read instance should not see data from the write instance.
218 // Each thread local copy of the value are also different from each
219 // other.
220 for (int th = 0; th < p1.total; ++th) {
221 env_->StartThread(func, static_cast<void*>(&p1));
222 }
223 for (int th = 0; th < p2.total; ++th) {
224 env_->StartThread(func, static_cast<void*>(&p2));
225 }
226
227 mu1.Lock();
228 while (p1.completed != p1.total) {
229 cv1.Wait();
230 }
231 mu1.Unlock();
232
233 mu2.Lock();
234 while (p2.completed != p2.total) {
235 cv2.Wait();
236 }
237 mu2.Unlock();
238
239 ASSERT_EQ(IDChecker::PeekId(), 3u);
240}
241
242TEST_F(ThreadLocalTest, Unref) {
243 ASSERT_EQ(IDChecker::PeekId(), 0u);
244
245 auto unref = [](void* ptr) {
246 auto& p = *static_cast<Params*>(ptr);
247 p.mu->Lock();
248 ++(*p.unref);
249 p.mu->Unlock();
250 };
251
252 // Case 0: no unref triggered if ThreadLocalPtr is never accessed
253 auto func0 = [](void* ptr) {
254 auto& p = *static_cast<Params*>(ptr);
255
256 p.mu->Lock();
257 ++(p.started);
258 p.cv->SignalAll();
259 while (p.started != p.total) {
260 p.cv->Wait();
261 }
262 p.mu->Unlock();
263 };
264
265 for (int th = 1; th <= 128; th += th) {
266 port::Mutex mu;
267 port::CondVar cv(&mu);
268 int unref_count = 0;
269 Params p(&mu, &cv, &unref_count, th, unref);
270
271 for (int i = 0; i < p.total; ++i) {
272 env_->StartThread(func0, static_cast<void*>(&p));
273 }
274 env_->WaitForJoin();
275 ASSERT_EQ(unref_count, 0);
276 }
277
278 // Case 1: unref triggered by thread exit
279 auto func1 = [](void* ptr) {
280 auto& p = *static_cast<Params*>(ptr);
281
282 p.mu->Lock();
283 ++(p.started);
284 p.cv->SignalAll();
285 while (p.started != p.total) {
286 p.cv->Wait();
287 }
288 p.mu->Unlock();
289
290 ASSERT_TRUE(p.tls1.Get() == nullptr);
291 ASSERT_TRUE(p.tls2->Get() == nullptr);
292
293 p.tls1.Reset(ptr);
294 p.tls2->Reset(ptr);
295
296 p.tls1.Reset(ptr);
297 p.tls2->Reset(ptr);
298 };
299
300 for (int th = 1; th <= 128; th += th) {
301 port::Mutex mu;
302 port::CondVar cv(&mu);
303 int unref_count = 0;
304 ThreadLocalPtr tls2(unref);
305 Params p(&mu, &cv, &unref_count, th, unref);
306 p.tls2 = &tls2;
307
308 for (int i = 0; i < p.total; ++i) {
309 env_->StartThread(func1, static_cast<void*>(&p));
310 }
311
312 env_->WaitForJoin();
313
314 // N threads x 2 ThreadLocal instance cleanup on thread exit
315 ASSERT_EQ(unref_count, 2 * p.total);
316 }
317
318 // Case 2: unref triggered by ThreadLocal instance destruction
319 auto func2 = [](void* ptr) {
320 auto& p = *static_cast<Params*>(ptr);
321
322 p.mu->Lock();
323 ++(p.started);
324 p.cv->SignalAll();
325 while (p.started != p.total) {
326 p.cv->Wait();
327 }
328 p.mu->Unlock();
329
330 ASSERT_TRUE(p.tls1.Get() == nullptr);
331 ASSERT_TRUE(p.tls2->Get() == nullptr);
332
333 p.tls1.Reset(ptr);
334 p.tls2->Reset(ptr);
335
336 p.tls1.Reset(ptr);
337 p.tls2->Reset(ptr);
338
339 p.mu->Lock();
340 ++(p.completed);
341 p.cv->SignalAll();
342
343 // Waiting for instruction to exit thread
344 while (p.completed != 0) {
345 p.cv->Wait();
346 }
347 p.mu->Unlock();
348 };
349
350 for (int th = 1; th <= 128; th += th) {
351 port::Mutex mu;
352 port::CondVar cv(&mu);
353 int unref_count = 0;
354 Params p(&mu, &cv, &unref_count, th, unref);
355 p.tls2 = new ThreadLocalPtr(unref);
356
357 for (int i = 0; i < p.total; ++i) {
358 env_->StartThread(func2, static_cast<void*>(&p));
359 }
360
361 // Wait for all threads to finish using Params
362 mu.Lock();
363 while (p.completed != p.total) {
364 cv.Wait();
365 }
366 mu.Unlock();
367
368 // Now destroy one ThreadLocal instance
369 delete p.tls2;
370 p.tls2 = nullptr;
371 // instance destroy for N threads
372 ASSERT_EQ(unref_count, p.total);
373
374 // Signal to exit
375 mu.Lock();
376 p.completed = 0;
377 cv.SignalAll();
378 mu.Unlock();
379 env_->WaitForJoin();
380 // additional N threads exit unref for the left instance
381 ASSERT_EQ(unref_count, 2 * p.total);
382 }
383}
384
385TEST_F(ThreadLocalTest, Swap) {
386 ThreadLocalPtr tls;
387 tls.Reset(reinterpret_cast<void*>(1));
388 ASSERT_EQ(reinterpret_cast<int64_t>(tls.Swap(nullptr)), 1);
389 ASSERT_TRUE(tls.Swap(reinterpret_cast<void*>(2)) == nullptr);
390 ASSERT_EQ(reinterpret_cast<int64_t>(tls.Get()), 2);
391 ASSERT_EQ(reinterpret_cast<int64_t>(tls.Swap(reinterpret_cast<void*>(3))), 2);
392}
393
394TEST_F(ThreadLocalTest, Scrape) {
395 auto unref = [](void* ptr) {
396 auto& p = *static_cast<Params*>(ptr);
397 p.mu->Lock();
398 ++(*p.unref);
399 p.mu->Unlock();
400 };
401
402 auto func = [](void* ptr) {
403 auto& p = *static_cast<Params*>(ptr);
404
405 ASSERT_TRUE(p.tls1.Get() == nullptr);
406 ASSERT_TRUE(p.tls2->Get() == nullptr);
407
408 p.tls1.Reset(ptr);
409 p.tls2->Reset(ptr);
410
411 p.tls1.Reset(ptr);
412 p.tls2->Reset(ptr);
413
414 p.mu->Lock();
415 ++(p.completed);
416 p.cv->SignalAll();
417
418 // Waiting for instruction to exit thread
419 while (p.completed != 0) {
420 p.cv->Wait();
421 }
422 p.mu->Unlock();
423 };
424
425 for (int th = 1; th <= 128; th += th) {
426 port::Mutex mu;
427 port::CondVar cv(&mu);
428 int unref_count = 0;
429 Params p(&mu, &cv, &unref_count, th, unref);
430 p.tls2 = new ThreadLocalPtr(unref);
431
432 for (int i = 0; i < p.total; ++i) {
433 env_->StartThread(func, static_cast<void*>(&p));
434 }
435
436 // Wait for all threads to finish using Params
437 mu.Lock();
438 while (p.completed != p.total) {
439 cv.Wait();
440 }
441 mu.Unlock();
442
443 ASSERT_EQ(unref_count, 0);
444
445 // Scrape all thread local data. No unref at thread
446 // exit or ThreadLocalPtr destruction
447 autovector<void*> ptrs;
448 p.tls1.Scrape(&ptrs, nullptr);
449 p.tls2->Scrape(&ptrs, nullptr);
450 delete p.tls2;
451 // Signal to exit
452 mu.Lock();
453 p.completed = 0;
454 cv.SignalAll();
455 mu.Unlock();
456 env_->WaitForJoin();
457
458 ASSERT_EQ(unref_count, 0);
459 }
460}
461
462TEST_F(ThreadLocalTest, Fold) {
463 auto unref = [](void* ptr) {
464 delete static_cast<std::atomic<int64_t>*>(ptr);
465 };
466 static const int kNumThreads = 16;
467 static const int kItersPerThread = 10;
468 port::Mutex mu;
469 port::CondVar cv(&mu);
470 Params params(&mu, &cv, nullptr, kNumThreads, unref);
471 auto func = [](void* ptr) {
472 auto& p = *static_cast<Params*>(ptr);
473 ASSERT_TRUE(p.tls1.Get() == nullptr);
474 p.tls1.Reset(new std::atomic<int64_t>(0));
475
476 for (int i = 0; i < kItersPerThread; ++i) {
477 static_cast<std::atomic<int64_t>*>(p.tls1.Get())->fetch_add(1);
478 }
479
480 p.mu->Lock();
481 ++(p.completed);
482 p.cv->SignalAll();
483
484 // Waiting for instruction to exit thread
485 while (p.completed != 0) {
486 p.cv->Wait();
487 }
488 p.mu->Unlock();
489 };
490
491 for (int th = 0; th < params.total; ++th) {
492 env_->StartThread(func, static_cast<void*>(&params));
493 }
494
495 // Wait for all threads to finish using Params
496 mu.Lock();
497 while (params.completed != params.total) {
498 cv.Wait();
499 }
500 mu.Unlock();
501
502 // Verify Fold() behavior
503 int64_t sum = 0;
504 params.tls1.Fold(
505 [](void* ptr, void* res) {
506 auto sum_ptr = static_cast<int64_t*>(res);
507 *sum_ptr += static_cast<std::atomic<int64_t>*>(ptr)->load();
508 },
509 &sum);
510 ASSERT_EQ(sum, kNumThreads * kItersPerThread);
511
512 // Signal to exit
513 mu.Lock();
514 params.completed = 0;
515 cv.SignalAll();
516 mu.Unlock();
517 env_->WaitForJoin();
518}
519
520TEST_F(ThreadLocalTest, CompareAndSwap) {
521 ThreadLocalPtr tls;
522 ASSERT_TRUE(tls.Swap(reinterpret_cast<void*>(1)) == nullptr);
523 void* expected = reinterpret_cast<void*>(1);
524 // Swap in 2
525 ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast<void*>(2), expected));
526 expected = reinterpret_cast<void*>(100);
527 // Fail Swap, still 2
528 ASSERT_TRUE(!tls.CompareAndSwap(reinterpret_cast<void*>(2), expected));
529 ASSERT_EQ(expected, reinterpret_cast<void*>(2));
530 // Swap in 3
531 expected = reinterpret_cast<void*>(2);
532 ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast<void*>(3), expected));
533 ASSERT_EQ(tls.Get(), reinterpret_cast<void*>(3));
534}
535
536namespace {
537
11fdf7f2 538void* AccessThreadLocal(void* /*arg*/) {
7c673cae
FG
539 TEST_SYNC_POINT("AccessThreadLocal:Start");
540 ThreadLocalPtr tlp;
541 tlp.Reset(new std::string("hello RocksDB"));
542 TEST_SYNC_POINT("AccessThreadLocal:End");
543 return nullptr;
544}
545
546} // namespace
547
548// The following test is disabled as it requires manual steps to run it
549// correctly.
550//
551// Currently we have no way to acess SyncPoint w/o ASAN error when the
552// child thread dies after the main thread dies. So if you manually enable
553// this test and only see an ASAN error on SyncPoint, it means you pass the
554// test.
555TEST_F(ThreadLocalTest, DISABLED_MainThreadDiesFirst) {
556 rocksdb::SyncPoint::GetInstance()->LoadDependency(
557 {{"AccessThreadLocal:Start", "MainThreadDiesFirst:End"},
558 {"PosixEnv::~PosixEnv():End", "AccessThreadLocal:End"}});
559
560 // Triggers the initialization of singletons.
561 Env::Default();
562
563#ifndef ROCKSDB_LITE
564 try {
565#endif // ROCKSDB_LITE
566 rocksdb::port::Thread th(&AccessThreadLocal, nullptr);
567 th.detach();
568 TEST_SYNC_POINT("MainThreadDiesFirst:End");
569#ifndef ROCKSDB_LITE
570 } catch (const std::system_error& ex) {
571 std::cerr << "Start thread: " << ex.code() << std::endl;
11fdf7f2 572 FAIL();
7c673cae
FG
573 }
574#endif // ROCKSDB_LITE
575}
576
577} // namespace rocksdb
578
579int main(int argc, char** argv) {
580 ::testing::InitGoogleTest(&argc, argv);
581 return RUN_ALL_TESTS();
582}