]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/util/threadpool_imp.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / util / threadpool_imp.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "util/threadpool_imp.h"
11
12 #include "monitoring/thread_status_util.h"
13 #include "port/port.h"
14
15 #ifndef OS_WIN
16 # include <unistd.h>
17 #endif
18
19 #ifdef OS_LINUX
20 # include <sys/syscall.h>
21 #endif
22
23 #include <algorithm>
24 #include <atomic>
25 #include <condition_variable>
26 #include <mutex>
27 #include <stdlib.h>
28 #include <thread>
29 #include <vector>
30
31 namespace rocksdb {
32
33 void ThreadPoolImpl::PthreadCall(const char* label, int result) {
34 if (result != 0) {
35 fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
36 abort();
37 }
38 }
39
40 struct ThreadPoolImpl::Impl {
41
42 Impl();
43 ~Impl();
44
45 void JoinThreads(bool wait_for_jobs_to_complete);
46
47 void SetBackgroundThreadsInternal(int num, bool allow_reduce);
48
49 unsigned int GetQueueLen() const {
50 return queue_len_.load(std::memory_order_relaxed);
51 }
52
53 void LowerIOPriority();
54
55 void WakeUpAllThreads() {
56 bgsignal_.notify_all();
57 }
58
59 void BGThread(size_t thread_id);
60
61 void StartBGThreads();
62
63 void Submit(std::function<void()>&& schedule,
64 std::function<void()>&& unschedule, void* tag);
65
66 int UnSchedule(void* arg);
67
68 void SetHostEnv(Env* env) { env_ = env; }
69
70 Env* GetHostEnv() const { return env_; }
71
72 bool HasExcessiveThread() const {
73 return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
74 }
75
76 // Return true iff the current thread is the excessive thread to terminate.
77 // Always terminate the running thread that is added last, even if there are
78 // more than one thread to terminate.
79 bool IsLastExcessiveThread(size_t thread_id) const {
80 return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
81 }
82
83 bool IsExcessiveThread(size_t thread_id) const {
84 return static_cast<int>(thread_id) >= total_threads_limit_;
85 }
86
87 // Return the thread priority.
88 // This would allow its member-thread to know its priority.
89 Env::Priority GetThreadPriority() const { return priority_; }
90
91 // Set the thread priority.
92 void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
93
94 private:
95
96 static void* BGThreadWrapper(void* arg);
97
98 bool low_io_priority_;
99 Env::Priority priority_;
100 Env* env_;
101
102 int total_threads_limit_;
103 std::atomic_uint queue_len_; // Queue length. Used for stats reporting
104 bool exit_all_threads_;
105 bool wait_for_jobs_to_complete_;
106
107 // Entry per Schedule()/Submit() call
108 struct BGItem {
109 void* tag = nullptr;
110 std::function<void()> function;
111 std::function<void()> unschedFunction;
112 };
113
114 using BGQueue = std::deque<BGItem>;
115 BGQueue queue_;
116
117 std::mutex mu_;
118 std::condition_variable bgsignal_;
119 std::vector<port::Thread> bgthreads_;
120 };
121
122
123 inline
124 ThreadPoolImpl::Impl::Impl()
125 :
126 low_io_priority_(false),
127 priority_(Env::LOW),
128 env_(nullptr),
129 total_threads_limit_(1),
130 queue_len_(),
131 exit_all_threads_(false),
132 wait_for_jobs_to_complete_(false),
133 queue_(),
134 mu_(),
135 bgsignal_(),
136 bgthreads_() {
137 }
138
139 inline
140 ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); }
141
142 void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {
143
144 std::unique_lock<std::mutex> lock(mu_);
145 assert(!exit_all_threads_);
146
147 wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;
148 exit_all_threads_ = true;
149
150 lock.unlock();
151
152 bgsignal_.notify_all();
153
154 for (auto& th : bgthreads_) {
155 th.join();
156 }
157
158 bgthreads_.clear();
159
160 exit_all_threads_ = false;
161 wait_for_jobs_to_complete_ = false;
162 }
163
164 inline
165 void ThreadPoolImpl::Impl::LowerIOPriority() {
166 std::lock_guard<std::mutex> lock(mu_);
167 low_io_priority_ = true;
168 }
169
170
171 void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
172 bool low_io_priority = false;
173 while (true) {
174 // Wait until there is an item that is ready to run
175 std::unique_lock<std::mutex> lock(mu_);
176 // Stop waiting if the thread needs to do work or needs to terminate.
177 while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
178 (queue_.empty() || IsExcessiveThread(thread_id))) {
179 bgsignal_.wait(lock);
180 }
181
182 if (exit_all_threads_) { // mechanism to let BG threads exit safely
183
184 if(!wait_for_jobs_to_complete_ ||
185 queue_.empty()) {
186 break;
187 }
188 }
189
190 if (IsLastExcessiveThread(thread_id)) {
191 // Current thread is the last generated one and is excessive.
192 // We always terminate excessive thread in the reverse order of
193 // generation time.
194 auto& terminating_thread = bgthreads_.back();
195 terminating_thread.detach();
196 bgthreads_.pop_back();
197
198 if (HasExcessiveThread()) {
199 // There is still at least more excessive thread to terminate.
200 WakeUpAllThreads();
201 }
202 break;
203 }
204
205 auto func = std::move(queue_.front().function);
206 queue_.pop_front();
207
208 queue_len_.store(static_cast<unsigned int>(queue_.size()),
209 std::memory_order_relaxed);
210
211 bool decrease_io_priority = (low_io_priority != low_io_priority_);
212 lock.unlock();
213
214 #ifdef OS_LINUX
215 if (decrease_io_priority) {
216 #define IOPRIO_CLASS_SHIFT (13)
217 #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
218 // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
219 // These system calls only have an effect when used in conjunction
220 // with an I/O scheduler that supports I/O priorities. As at
221 // kernel 2.6.17 the only such scheduler is the Completely
222 // Fair Queuing (CFQ) I/O scheduler.
223 // To change scheduler:
224 // echo cfq > /sys/block/<device_name>/queue/schedule
225 // Tunables to consider:
226 // /sys/block/<device_name>/queue/slice_idle
227 // /sys/block/<device_name>/queue/slice_sync
228 syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS
229 0, // current thread
230 IOPRIO_PRIO_VALUE(3, 0));
231 low_io_priority = true;
232 }
233 #else
234 (void)decrease_io_priority; // avoid 'unused variable' error
235 #endif
236 func();
237 }
238 }
239
240 // Helper struct for passing arguments when creating threads.
241 struct BGThreadMetadata {
242 ThreadPoolImpl::Impl* thread_pool_;
243 size_t thread_id_; // Thread count in the thread.
244 BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id)
245 : thread_pool_(thread_pool), thread_id_(thread_id) {}
246 };
247
248 void* ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
249 BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
250 size_t thread_id = meta->thread_id_;
251 ThreadPoolImpl::Impl* tp = meta->thread_pool_;
252 #ifdef ROCKSDB_USING_THREAD_STATUS
253 // for thread-status
254 ThreadStatusUtil::RegisterThread(
255 tp->GetHostEnv(), (tp->GetThreadPriority() == Env::Priority::HIGH
256 ? ThreadStatus::HIGH_PRIORITY
257 : ThreadStatus::LOW_PRIORITY));
258 #endif
259 delete meta;
260 tp->BGThread(thread_id);
261 #ifdef ROCKSDB_USING_THREAD_STATUS
262 ThreadStatusUtil::UnregisterThread();
263 #endif
264 return nullptr;
265 }
266
267 void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
268 bool allow_reduce) {
269 std::unique_lock<std::mutex> lock(mu_);
270 if (exit_all_threads_) {
271 lock.unlock();
272 return;
273 }
274 if (num > total_threads_limit_ ||
275 (num < total_threads_limit_ && allow_reduce)) {
276 total_threads_limit_ = std::max(1, num);
277 WakeUpAllThreads();
278 StartBGThreads();
279 }
280 }
281
282 void ThreadPoolImpl::Impl::StartBGThreads() {
283 // Start background thread if necessary
284 while ((int)bgthreads_.size() < total_threads_limit_) {
285
286 port::Thread p_t(&BGThreadWrapper,
287 new BGThreadMetadata(this, bgthreads_.size()));
288
289 // Set the thread name to aid debugging
290 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
291 #if __GLIBC_PREREQ(2, 12)
292 auto th_handle = p_t.native_handle();
293 char name_buf[16];
294 snprintf(name_buf, sizeof name_buf, "rocksdb:bg%" ROCKSDB_PRIszt,
295 bgthreads_.size());
296 name_buf[sizeof name_buf - 1] = '\0';
297 pthread_setname_np(th_handle, name_buf);
298 #endif
299 #endif
300 bgthreads_.push_back(std::move(p_t));
301 }
302 }
303
304 void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,
305 std::function<void()>&& unschedule, void* tag) {
306
307 std::lock_guard<std::mutex> lock(mu_);
308
309 if (exit_all_threads_) {
310 return;
311 }
312
313 StartBGThreads();
314
315 // Add to priority queue
316 queue_.push_back(BGItem());
317
318 auto& item = queue_.back();
319 item.tag = tag;
320 item.function = std::move(schedule);
321 item.unschedFunction = std::move(unschedule);
322
323 queue_len_.store(static_cast<unsigned int>(queue_.size()),
324 std::memory_order_relaxed);
325
326 if (!HasExcessiveThread()) {
327 // Wake up at least one waiting thread.
328 bgsignal_.notify_one();
329 } else {
330 // Need to wake up all threads to make sure the one woken
331 // up is not the one to terminate.
332 WakeUpAllThreads();
333 }
334 }
335
336 int ThreadPoolImpl::Impl::UnSchedule(void* arg) {
337 int count = 0;
338
339 std::vector<std::function<void()>> candidates;
340 {
341 std::lock_guard<std::mutex> lock(mu_);
342
343 // Remove from priority queue
344 BGQueue::iterator it = queue_.begin();
345 while (it != queue_.end()) {
346 if (arg == (*it).tag) {
347 if (it->unschedFunction) {
348 candidates.push_back(std::move(it->unschedFunction));
349 }
350 it = queue_.erase(it);
351 count++;
352 } else {
353 ++it;
354 }
355 }
356 queue_len_.store(static_cast<unsigned int>(queue_.size()),
357 std::memory_order_relaxed);
358 }
359
360
361 // Run unschedule functions outside the mutex
362 for (auto& f : candidates) {
363 f();
364 }
365
366 return count;
367 }
368
369 ThreadPoolImpl::ThreadPoolImpl() :
370 impl_(new Impl()) {
371 }
372
373
374 ThreadPoolImpl::~ThreadPoolImpl() {
375 }
376
377 void ThreadPoolImpl::JoinAllThreads() {
378 impl_->JoinThreads(false);
379 }
380
381 void ThreadPoolImpl::SetBackgroundThreads(int num) {
382 impl_->SetBackgroundThreadsInternal(num, true);
383 }
384
385 unsigned int ThreadPoolImpl::GetQueueLen() const {
386 return impl_->GetQueueLen();
387 }
388
389 void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() {
390 impl_->JoinThreads(true);
391 }
392
393 void ThreadPoolImpl::LowerIOPriority() {
394 impl_->LowerIOPriority();
395 }
396
397 void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
398 impl_->SetBackgroundThreadsInternal(num, false);
399 }
400
401 void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) {
402 auto copy(job);
403 impl_->Submit(std::move(copy), std::function<void()>(), nullptr);
404 }
405
406
407 void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) {
408 impl_->Submit(std::move(job), std::function<void()>(), nullptr);
409 }
410
411 void ThreadPoolImpl::Schedule(void(*function)(void* arg1), void* arg,
412 void* tag, void(*unschedFunction)(void* arg)) {
413
414 std::function<void()> fn = [arg, function] { function(arg); };
415
416 std::function<void()> unfn;
417 if (unschedFunction != nullptr) {
418 auto uf = [arg, unschedFunction] { unschedFunction(arg); };
419 unfn = std::move(uf);
420 }
421
422 impl_->Submit(std::move(fn), std::move(unfn), tag);
423 }
424
425 int ThreadPoolImpl::UnSchedule(void* arg) {
426 return impl_->UnSchedule(arg);
427 }
428
429 void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); }
430
431 Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); }
432
433 // Return the thread priority.
434 // This would allow its member-thread to know its priority.
435 Env::Priority ThreadPoolImpl::GetThreadPriority() const {
436 return impl_->GetThreadPriority();
437 }
438
439 // Set the thread priority.
440 void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) {
441 impl_->SetThreadPriority(priority);
442 }
443
444 ThreadPool* NewThreadPool(int num_threads) {
445 ThreadPoolImpl* thread_pool = new ThreadPoolImpl();
446 thread_pool->SetBackgroundThreads(num_threads);
447 return thread_pool;
448 }
449
450 } // namespace rocksdb