]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/util/repeatable_thread.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / util / repeatable_thread.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #pragma once
7
8 #include <functional>
9 #include <string>
10
11 #include "monitoring/instrumented_mutex.h"
12 #include "port/port.h"
13 #include "rocksdb/system_clock.h"
14 #include "util/mutexlock.h"
15
16 namespace ROCKSDB_NAMESPACE {
17
18 // Simple wrapper around port::Thread that supports calling a callback every
19 // X seconds. If you pass in 0, then it will call your callback repeatedly
20 // without delay.
21 class RepeatableThread {
22 public:
23 RepeatableThread(std::function<void()> function,
24 const std::string& thread_name, SystemClock* clock,
25 uint64_t delay_us, uint64_t initial_delay_us = 0)
26 : function_(function),
27 thread_name_("rocksdb:" + thread_name),
28 clock_(clock),
29 delay_us_(delay_us),
30 initial_delay_us_(initial_delay_us),
31 mutex_(clock),
32 cond_var_(&mutex_),
33 running_(true),
34 #ifndef NDEBUG
35 waiting_(false),
36 run_count_(0),
37 #endif
38 thread_([this] { thread(); }) {
39 }
40
41 void cancel() {
42 {
43 InstrumentedMutexLock l(&mutex_);
44 if (!running_) {
45 return;
46 }
47 running_ = false;
48 cond_var_.SignalAll();
49 }
50 thread_.join();
51 }
52
53 bool IsRunning() { return running_; }
54
55 ~RepeatableThread() { cancel(); }
56
57 #ifndef NDEBUG
58 // Wait until RepeatableThread starting waiting, call the optional callback,
59 // then wait for one run of RepeatableThread. Tests can use provide a
60 // custom clock object to mock time, and use the callback here to bump current
61 // time and trigger RepeatableThread. See repeatable_thread_test for example.
62 //
63 // Note: only support one caller of this method.
64 void TEST_WaitForRun(std::function<void()> callback = nullptr) {
65 InstrumentedMutexLock l(&mutex_);
66 while (!waiting_) {
67 cond_var_.Wait();
68 }
69 uint64_t prev_count = run_count_;
70 if (callback != nullptr) {
71 callback();
72 }
73 cond_var_.SignalAll();
74 while (!(run_count_ > prev_count)) {
75 cond_var_.Wait();
76 }
77 }
78 #endif
79
80 private:
81 bool wait(uint64_t delay) {
82 InstrumentedMutexLock l(&mutex_);
83 if (running_ && delay > 0) {
84 uint64_t wait_until = clock_->NowMicros() + delay;
85 #ifndef NDEBUG
86 waiting_ = true;
87 cond_var_.SignalAll();
88 #endif
89 while (running_) {
90 cond_var_.TimedWait(wait_until);
91 if (clock_->NowMicros() >= wait_until) {
92 break;
93 }
94 }
95 #ifndef NDEBUG
96 waiting_ = false;
97 #endif
98 }
99 return running_;
100 }
101
102 void thread() {
103 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
104 #if __GLIBC_PREREQ(2, 12)
105 // Set thread name.
106 auto thread_handle = thread_.native_handle();
107 int ret __attribute__((__unused__)) =
108 pthread_setname_np(thread_handle, thread_name_.c_str());
109 assert(ret == 0);
110 #endif
111 #endif
112
113 assert(delay_us_ > 0);
114 if (!wait(initial_delay_us_)) {
115 return;
116 }
117 do {
118 function_();
119 #ifndef NDEBUG
120 {
121 InstrumentedMutexLock l(&mutex_);
122 run_count_++;
123 cond_var_.SignalAll();
124 }
125 #endif
126 } while (wait(delay_us_));
127 }
128
129 const std::function<void()> function_;
130 const std::string thread_name_;
131 SystemClock* clock_;
132 const uint64_t delay_us_;
133 const uint64_t initial_delay_us_;
134
135 // Mutex lock should be held when accessing running_, waiting_
136 // and run_count_.
137 InstrumentedMutex mutex_;
138 InstrumentedCondVar cond_var_;
139 bool running_;
140 #ifndef NDEBUG
141 // RepeatableThread waiting for timeout.
142 bool waiting_;
143 // Times function_ had run.
144 uint64_t run_count_;
145 #endif
146 port::Thread thread_;
147 };
148
149 } // namespace ROCKSDB_NAMESPACE