1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
11 #include "monitoring/instrumented_mutex.h"
12 #include "port/port.h"
13 #include "rocksdb/system_clock.h"
14 #include "util/mutexlock.h"
16 namespace ROCKSDB_NAMESPACE
{
18 // Simple wrapper around port::Thread that supports calling a callback every
19 // X seconds. If you pass in 0, then it will call your callback repeatedly
21 class RepeatableThread
{
23 RepeatableThread(std::function
<void()> function
,
24 const std::string
& thread_name
, SystemClock
* clock
,
25 uint64_t delay_us
, uint64_t initial_delay_us
= 0)
26 : function_(function
),
27 thread_name_("rocksdb:" + thread_name
),
30 initial_delay_us_(initial_delay_us
),
38 thread_([this] { thread(); }) {
43 InstrumentedMutexLock
l(&mutex_
);
48 cond_var_
.SignalAll();
53 bool IsRunning() { return running_
; }
55 ~RepeatableThread() { cancel(); }
58 // Wait until RepeatableThread starting waiting, call the optional callback,
59 // then wait for one run of RepeatableThread. Tests can use provide a
60 // custom clock object to mock time, and use the callback here to bump current
61 // time and trigger RepeatableThread. See repeatable_thread_test for example.
63 // Note: only support one caller of this method.
64 void TEST_WaitForRun(std::function
<void()> callback
= nullptr) {
65 InstrumentedMutexLock
l(&mutex_
);
69 uint64_t prev_count
= run_count_
;
70 if (callback
!= nullptr) {
73 cond_var_
.SignalAll();
74 while (!(run_count_
> prev_count
)) {
81 bool wait(uint64_t delay
) {
82 InstrumentedMutexLock
l(&mutex_
);
83 if (running_
&& delay
> 0) {
84 uint64_t wait_until
= clock_
->NowMicros() + delay
;
87 cond_var_
.SignalAll();
90 cond_var_
.TimedWait(wait_until
);
91 if (clock_
->NowMicros() >= wait_until
) {
103 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
104 #if __GLIBC_PREREQ(2, 12)
106 auto thread_handle
= thread_
.native_handle();
107 int ret
__attribute__((__unused__
)) =
108 pthread_setname_np(thread_handle
, thread_name_
.c_str());
113 assert(delay_us_
> 0);
114 if (!wait(initial_delay_us_
)) {
121 InstrumentedMutexLock
l(&mutex_
);
123 cond_var_
.SignalAll();
126 } while (wait(delay_us_
));
129 const std::function
<void()> function_
;
130 const std::string thread_name_
;
132 const uint64_t delay_us_
;
133 const uint64_t initial_delay_us_
;
135 // Mutex lock should be held when accessing running_, waiting_
137 InstrumentedMutex mutex_
;
138 InstrumentedCondVar cond_var_
;
141 // RepeatableThread waiting for timeout.
143 // Times function_ had run.
146 port::Thread thread_
;
149 } // namespace ROCKSDB_NAMESPACE