]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/util/threadpool_imp.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rocksdb / util / threadpool_imp.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "util/threadpool_imp.h"
11
7c673cae 12#ifndef OS_WIN
1e59de90 13#include <unistd.h>
7c673cae
FG
14#endif
15
16#ifdef OS_LINUX
1e59de90
TL
17#include <sys/resource.h>
18#include <sys/syscall.h>
7c673cae
FG
19#endif
20
11fdf7f2 21#include <stdlib.h>
1e59de90 22
7c673cae
FG
23#include <algorithm>
24#include <atomic>
25#include <condition_variable>
f67539c2 26#include <deque>
7c673cae 27#include <mutex>
11fdf7f2 28#include <sstream>
7c673cae
FG
29#include <thread>
30#include <vector>
31
20effc67
TL
32#include "monitoring/thread_status_util.h"
33#include "port/port.h"
34#include "test_util/sync_point.h"
1e59de90 35#include "util/string_util.h"
20effc67 36
f67539c2 37namespace ROCKSDB_NAMESPACE {
7c673cae
FG
38
39void ThreadPoolImpl::PthreadCall(const char* label, int result) {
40 if (result != 0) {
1e59de90 41 fprintf(stderr, "pthread %s: %s\n", label, errnoStr(result).c_str());
7c673cae
FG
42 abort();
43 }
44}
45
46struct ThreadPoolImpl::Impl {
7c673cae
FG
47 Impl();
48 ~Impl();
49
50 void JoinThreads(bool wait_for_jobs_to_complete);
51
52 void SetBackgroundThreadsInternal(int num, bool allow_reduce);
11fdf7f2 53 int GetBackgroundThreads();
7c673cae
FG
54
55 unsigned int GetQueueLen() const {
56 return queue_len_.load(std::memory_order_relaxed);
57 }
58
59 void LowerIOPriority();
60
20effc67 61 void LowerCPUPriority(CpuPriority pri);
11fdf7f2 62
1e59de90 63 void WakeUpAllThreads() { bgsignal_.notify_all(); }
7c673cae
FG
64
65 void BGThread(size_t thread_id);
66
67 void StartBGThreads();
68
69 void Submit(std::function<void()>&& schedule,
1e59de90 70 std::function<void()>&& unschedule, void* tag);
7c673cae
FG
71
72 int UnSchedule(void* arg);
73
74 void SetHostEnv(Env* env) { env_ = env; }
75
76 Env* GetHostEnv() const { return env_; }
77
78 bool HasExcessiveThread() const {
79 return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
80 }
81
82 // Return true iff the current thread is the excessive thread to terminate.
83 // Always terminate the running thread that is added last, even if there are
84 // more than one thread to terminate.
85 bool IsLastExcessiveThread(size_t thread_id) const {
86 return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
87 }
88
89 bool IsExcessiveThread(size_t thread_id) const {
90 return static_cast<int>(thread_id) >= total_threads_limit_;
91 }
92
93 // Return the thread priority.
94 // This would allow its member-thread to know its priority.
95 Env::Priority GetThreadPriority() const { return priority_; }
96
97 // Set the thread priority.
98 void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
99
1e59de90
TL
100 int ReserveThreads(int threads_to_be_reserved) {
101 std::unique_lock<std::mutex> lock(mu_);
102 // We can reserve at most num_waiting_threads_ in total so the number of
103 // threads that can be reserved might be fewer than the desired one. In
104 // rare cases, num_waiting_threads_ could be less than reserved_threads
105 // due to SetBackgroundThreadInternal or last excessive threads. If that
106 // happens, we cannot reserve any other threads.
107 int reserved_threads_in_success =
108 std::min(std::max(num_waiting_threads_ - reserved_threads_, 0),
109 threads_to_be_reserved);
110 reserved_threads_ += reserved_threads_in_success;
111 return reserved_threads_in_success;
112 }
f67539c2 113
1e59de90
TL
114 int ReleaseThreads(int threads_to_be_released) {
115 std::unique_lock<std::mutex> lock(mu_);
116 // We cannot release more than reserved_threads_
117 int released_threads_in_success =
118 std::min(reserved_threads_, threads_to_be_released);
119 reserved_threads_ -= released_threads_in_success;
120 WakeUpAllThreads();
121 return released_threads_in_success;
122 }
f67539c2 123
1e59de90
TL
124 private:
125 static void BGThreadWrapper(void* arg);
126
127 bool low_io_priority_;
128 CpuPriority cpu_priority_;
129 Env::Priority priority_;
130 Env* env_;
131
132 int total_threads_limit_;
133 std::atomic_uint queue_len_; // Queue length. Used for stats reporting
134 // Number of reserved threads, managed by ReserveThreads(..) and
135 // ReleaseThreads(..), if num_waiting_threads_ is no larger than
136 // reserved_threads_, its thread will be blocked to ensure the reservation
137 // mechanism
138 int reserved_threads_;
139 // Number of waiting threads (Maximum number of threads that can be
140 // reserved), in rare cases, num_waiting_threads_ could be less than
141 // reserved_threads due to SetBackgroundThreadInternal or last
142 // excessive threads.
143 int num_waiting_threads_;
144 bool exit_all_threads_;
145 bool wait_for_jobs_to_complete_;
146
147 // Entry per Schedule()/Submit() call
148 struct BGItem {
149 void* tag = nullptr;
150 std::function<void()> function;
151 std::function<void()> unschedFunction;
7c673cae
FG
152 };
153
154 using BGQueue = std::deque<BGItem>;
1e59de90 155 BGQueue queue_;
7c673cae 156
1e59de90
TL
157 std::mutex mu_;
158 std::condition_variable bgsignal_;
7c673cae
FG
159 std::vector<port::Thread> bgthreads_;
160};
161
20effc67
TL
162inline ThreadPoolImpl::Impl::Impl()
163 : low_io_priority_(false),
164 cpu_priority_(CpuPriority::kNormal),
7c673cae
FG
165 priority_(Env::LOW),
166 env_(nullptr),
11fdf7f2 167 total_threads_limit_(0),
7c673cae 168 queue_len_(),
1e59de90
TL
169 reserved_threads_(0),
170 num_waiting_threads_(0),
7c673cae
FG
171 exit_all_threads_(false),
172 wait_for_jobs_to_complete_(false),
173 queue_(),
174 mu_(),
175 bgsignal_(),
20effc67 176 bgthreads_() {}
7c673cae 177
1e59de90 178inline ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); }
7c673cae
FG
179
180void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {
7c673cae
FG
181 std::unique_lock<std::mutex> lock(mu_);
182 assert(!exit_all_threads_);
183
184 wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;
185 exit_all_threads_ = true;
11fdf7f2
TL
186 // prevent threads from being recreated right after they're joined, in case
187 // the user is concurrently submitting jobs.
188 total_threads_limit_ = 0;
1e59de90
TL
189 reserved_threads_ = 0;
190 num_waiting_threads_ = 0;
7c673cae
FG
191
192 lock.unlock();
193
194 bgsignal_.notify_all();
195
196 for (auto& th : bgthreads_) {
197 th.join();
198 }
199
200 bgthreads_.clear();
201
202 exit_all_threads_ = false;
203 wait_for_jobs_to_complete_ = false;
204}
205
1e59de90 206inline void ThreadPoolImpl::Impl::LowerIOPriority() {
7c673cae
FG
207 std::lock_guard<std::mutex> lock(mu_);
208 low_io_priority_ = true;
209}
210
20effc67 211inline void ThreadPoolImpl::Impl::LowerCPUPriority(CpuPriority pri) {
11fdf7f2 212 std::lock_guard<std::mutex> lock(mu_);
20effc67 213 cpu_priority_ = pri;
11fdf7f2 214}
7c673cae
FG
215
216void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
217 bool low_io_priority = false;
20effc67 218 CpuPriority current_cpu_priority = CpuPriority::kNormal;
11fdf7f2 219
7c673cae 220 while (true) {
494da23a 221 // Wait until there is an item that is ready to run
7c673cae
FG
222 std::unique_lock<std::mutex> lock(mu_);
223 // Stop waiting if the thread needs to do work or needs to terminate.
1e59de90
TL
224 // Increase num_waiting_threads_ once this task has started waiting
225 num_waiting_threads_++;
226
227 TEST_SYNC_POINT("ThreadPoolImpl::BGThread::WaitingThreadsInc");
228 TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Start:th", thread_id);
229 // When not exist_all_threads and the current thread id is not the last
230 // excessive thread, it may be blocked due to 3 reasons: 1) queue is empty
231 // 2) it is the excessive thread (not the last one)
232 // 3) the number of waiting threads is not greater than reserved threads
233 // (i.e, no available threads due to full reservation")
7c673cae 234 while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
1e59de90
TL
235 (queue_.empty() || IsExcessiveThread(thread_id) ||
236 num_waiting_threads_ <= reserved_threads_)) {
7c673cae
FG
237 bgsignal_.wait(lock);
238 }
1e59de90
TL
239 // Decrease num_waiting_threads_ once the thread is not waiting
240 num_waiting_threads_--;
7c673cae
FG
241
242 if (exit_all_threads_) { // mechanism to let BG threads exit safely
243
1e59de90 244 if (!wait_for_jobs_to_complete_ || queue_.empty()) {
7c673cae 245 break;
1e59de90
TL
246 }
247 } else if (IsLastExcessiveThread(thread_id)) {
7c673cae
FG
248 // Current thread is the last generated one and is excessive.
249 // We always terminate excessive thread in the reverse order of
1e59de90
TL
250 // generation time. But not when `exit_all_threads_ == true`,
251 // otherwise `JoinThreads()` could try to `join()` a `detach()`ed
252 // thread.
7c673cae
FG
253 auto& terminating_thread = bgthreads_.back();
254 terminating_thread.detach();
255 bgthreads_.pop_back();
7c673cae
FG
256 if (HasExcessiveThread()) {
257 // There is still at least more excessive thread to terminate.
258 WakeUpAllThreads();
259 }
1e59de90
TL
260 TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Termination:th",
261 thread_id);
262 TEST_SYNC_POINT("ThreadPoolImpl::BGThread::Termination");
7c673cae
FG
263 break;
264 }
265
266 auto func = std::move(queue_.front().function);
267 queue_.pop_front();
268
269 queue_len_.store(static_cast<unsigned int>(queue_.size()),
270 std::memory_order_relaxed);
271
272 bool decrease_io_priority = (low_io_priority != low_io_priority_);
20effc67 273 CpuPriority cpu_priority = cpu_priority_;
7c673cae
FG
274 lock.unlock();
275
20effc67
TL
276 if (cpu_priority < current_cpu_priority) {
277 TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::BeforeSetCpuPriority",
278 &current_cpu_priority);
279 // 0 means current thread.
280 port::SetCpuPriority(0, cpu_priority);
281 current_cpu_priority = cpu_priority;
282 TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::AfterSetCpuPriority",
283 &current_cpu_priority);
11fdf7f2
TL
284 }
285
20effc67 286#ifdef OS_LINUX
7c673cae
FG
287 if (decrease_io_priority) {
288#define IOPRIO_CLASS_SHIFT (13)
289#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
290 // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
291 // These system calls only have an effect when used in conjunction
292 // with an I/O scheduler that supports I/O priorities. As at
293 // kernel 2.6.17 the only such scheduler is the Completely
294 // Fair Queuing (CFQ) I/O scheduler.
295 // To change scheduler:
296 // echo cfq > /sys/block/<device_name>/queue/schedule
297 // Tunables to consider:
298 // /sys/block/<device_name>/queue/slice_idle
299 // /sys/block/<device_name>/queue/slice_sync
300 syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS
301 0, // current thread
302 IOPRIO_PRIO_VALUE(3, 0));
303 low_io_priority = true;
304 }
305#else
306 (void)decrease_io_priority; // avoid 'unused variable' error
307#endif
20effc67
TL
308
309 TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::Impl::BGThread:BeforeRun",
310 &priority_);
311
7c673cae
FG
312 func();
313 }
314}
315
316// Helper struct for passing arguments when creating threads.
317struct BGThreadMetadata {
318 ThreadPoolImpl::Impl* thread_pool_;
319 size_t thread_id_; // Thread count in the thread.
320 BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id)
321 : thread_pool_(thread_pool), thread_id_(thread_id) {}
322};
323
f67539c2 324void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
7c673cae
FG
325 BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
326 size_t thread_id = meta->thread_id_;
327 ThreadPoolImpl::Impl* tp = meta->thread_pool_;
328#ifdef ROCKSDB_USING_THREAD_STATUS
11fdf7f2
TL
329 // initialize it because compiler isn't good enough to see we don't use it
330 // uninitialized
331 ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES;
332 switch (tp->GetThreadPriority()) {
333 case Env::Priority::HIGH:
334 thread_type = ThreadStatus::HIGH_PRIORITY;
335 break;
336 case Env::Priority::LOW:
337 thread_type = ThreadStatus::LOW_PRIORITY;
338 break;
339 case Env::Priority::BOTTOM:
340 thread_type = ThreadStatus::BOTTOM_PRIORITY;
341 break;
494da23a
TL
342 case Env::Priority::USER:
343 thread_type = ThreadStatus::USER;
344 break;
11fdf7f2
TL
345 case Env::Priority::TOTAL:
346 assert(false);
f67539c2 347 return;
11fdf7f2
TL
348 }
349 assert(thread_type != ThreadStatus::NUM_THREAD_TYPES);
350 ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type);
7c673cae
FG
351#endif
352 delete meta;
353 tp->BGThread(thread_id);
354#ifdef ROCKSDB_USING_THREAD_STATUS
355 ThreadStatusUtil::UnregisterThread();
356#endif
f67539c2 357 return;
7c673cae
FG
358}
359
360void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
1e59de90 361 bool allow_reduce) {
f67539c2 362 std::lock_guard<std::mutex> lock(mu_);
7c673cae 363 if (exit_all_threads_) {
7c673cae
FG
364 return;
365 }
366 if (num > total_threads_limit_ ||
367 (num < total_threads_limit_ && allow_reduce)) {
11fdf7f2 368 total_threads_limit_ = std::max(0, num);
7c673cae
FG
369 WakeUpAllThreads();
370 StartBGThreads();
371 }
372}
373
11fdf7f2
TL
374int ThreadPoolImpl::Impl::GetBackgroundThreads() {
375 std::unique_lock<std::mutex> lock(mu_);
376 return total_threads_limit_;
377}
378
7c673cae
FG
379void ThreadPoolImpl::Impl::StartBGThreads() {
380 // Start background thread if necessary
381 while ((int)bgthreads_.size() < total_threads_limit_) {
7c673cae 382 port::Thread p_t(&BGThreadWrapper,
1e59de90 383 new BGThreadMetadata(this, bgthreads_.size()));
7c673cae
FG
384
385// Set the thread name to aid debugging
386#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
387#if __GLIBC_PREREQ(2, 12)
388 auto th_handle = p_t.native_handle();
11fdf7f2
TL
389 std::string thread_priority = Env::PriorityToString(GetThreadPriority());
390 std::ostringstream thread_name_stream;
391 thread_name_stream << "rocksdb:";
392 for (char c : thread_priority) {
393 thread_name_stream << static_cast<char>(tolower(c));
394 }
11fdf7f2 395 pthread_setname_np(th_handle, thread_name_stream.str().c_str());
7c673cae
FG
396#endif
397#endif
398 bgthreads_.push_back(std::move(p_t));
399 }
400}
401
402void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,
1e59de90
TL
403 std::function<void()>&& unschedule,
404 void* tag) {
7c673cae
FG
405 std::lock_guard<std::mutex> lock(mu_);
406
407 if (exit_all_threads_) {
408 return;
409 }
410
411 StartBGThreads();
412
413 // Add to priority queue
414 queue_.push_back(BGItem());
1e59de90 415 TEST_SYNC_POINT("ThreadPoolImpl::Submit::Enqueue");
7c673cae
FG
416 auto& item = queue_.back();
417 item.tag = tag;
418 item.function = std::move(schedule);
419 item.unschedFunction = std::move(unschedule);
420
421 queue_len_.store(static_cast<unsigned int>(queue_.size()),
1e59de90 422 std::memory_order_relaxed);
7c673cae
FG
423
424 if (!HasExcessiveThread()) {
425 // Wake up at least one waiting thread.
426 bgsignal_.notify_one();
427 } else {
428 // Need to wake up all threads to make sure the one woken
429 // up is not the one to terminate.
430 WakeUpAllThreads();
431 }
432}
433
434int ThreadPoolImpl::Impl::UnSchedule(void* arg) {
435 int count = 0;
436
437 std::vector<std::function<void()>> candidates;
438 {
439 std::lock_guard<std::mutex> lock(mu_);
440
441 // Remove from priority queue
442 BGQueue::iterator it = queue_.begin();
443 while (it != queue_.end()) {
444 if (arg == (*it).tag) {
445 if (it->unschedFunction) {
446 candidates.push_back(std::move(it->unschedFunction));
447 }
448 it = queue_.erase(it);
449 count++;
450 } else {
451 ++it;
452 }
453 }
454 queue_len_.store(static_cast<unsigned int>(queue_.size()),
1e59de90 455 std::memory_order_relaxed);
7c673cae
FG
456 }
457
1e59de90 458 // Run unschedule functions outside the mutex
7c673cae
FG
459 for (auto& f : candidates) {
460 f();
461 }
462
463 return count;
464}
465
1e59de90 466ThreadPoolImpl::ThreadPoolImpl() : impl_(new Impl()) {}
7c673cae 467
1e59de90 468ThreadPoolImpl::~ThreadPoolImpl() {}
7c673cae 469
1e59de90 470void ThreadPoolImpl::JoinAllThreads() { impl_->JoinThreads(false); }
7c673cae
FG
471
472void ThreadPoolImpl::SetBackgroundThreads(int num) {
473 impl_->SetBackgroundThreadsInternal(num, true);
474}
475
11fdf7f2
TL
476int ThreadPoolImpl::GetBackgroundThreads() {
477 return impl_->GetBackgroundThreads();
478}
479
7c673cae
FG
480unsigned int ThreadPoolImpl::GetQueueLen() const {
481 return impl_->GetQueueLen();
482}
483
484void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() {
485 impl_->JoinThreads(true);
486}
487
1e59de90 488void ThreadPoolImpl::LowerIOPriority() { impl_->LowerIOPriority(); }
7c673cae 489
20effc67
TL
490void ThreadPoolImpl::LowerCPUPriority(CpuPriority pri) {
491 impl_->LowerCPUPriority(pri);
11fdf7f2
TL
492}
493
7c673cae
FG
494void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
495 impl_->SetBackgroundThreadsInternal(num, false);
496}
497
498void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) {
499 auto copy(job);
500 impl_->Submit(std::move(copy), std::function<void()>(), nullptr);
501}
502
7c673cae
FG
503void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) {
504 impl_->Submit(std::move(job), std::function<void()>(), nullptr);
505}
506
1e59de90
TL
507void ThreadPoolImpl::Schedule(void (*function)(void* arg1), void* arg,
508 void* tag, void (*unschedFunction)(void* arg)) {
11fdf7f2
TL
509 if (unschedFunction == nullptr) {
510 impl_->Submit(std::bind(function, arg), std::function<void()>(), tag);
511 } else {
512 impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg),
513 tag);
7c673cae 514 }
7c673cae
FG
515}
516
1e59de90 517int ThreadPoolImpl::UnSchedule(void* arg) { return impl_->UnSchedule(arg); }
7c673cae
FG
518
519void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); }
520
521Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); }
522
523// Return the thread priority.
524// This would allow its member-thread to know its priority.
525Env::Priority ThreadPoolImpl::GetThreadPriority() const {
526 return impl_->GetThreadPriority();
527}
528
529// Set the thread priority.
530void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) {
531 impl_->SetThreadPriority(priority);
532}
533
1e59de90
TL
534// Reserve a specific number of threads, prevent them from running other
535// functions The number of reserved threads could be fewer than the desired one
536int ThreadPoolImpl::ReserveThreads(int threads_to_be_reserved) {
537 return impl_->ReserveThreads(threads_to_be_reserved);
538}
539
540// Release a specific number of threads
541int ThreadPoolImpl::ReleaseThreads(int threads_to_be_released) {
542 return impl_->ReleaseThreads(threads_to_be_released);
543}
544
7c673cae
FG
545ThreadPool* NewThreadPool(int num_threads) {
546 ThreadPoolImpl* thread_pool = new ThreadPoolImpl();
547 thread_pool->SetBackgroundThreads(num_threads);
548 return thread_pool;
549}
550
f67539c2 551} // namespace ROCKSDB_NAMESPACE