1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "util/threadpool_imp.h"
12 #include "monitoring/thread_status_util.h"
13 #include "port/port.h"
20 # include <sys/syscall.h>
21 # include <sys/resource.h>
27 #include <condition_variable>
34 namespace ROCKSDB_NAMESPACE
{
36 void ThreadPoolImpl::PthreadCall(const char* label
, int result
) {
38 fprintf(stderr
, "pthread %s: %s\n", label
, strerror(result
));
43 struct ThreadPoolImpl::Impl
{
48 void JoinThreads(bool wait_for_jobs_to_complete
);
50 void SetBackgroundThreadsInternal(int num
, bool allow_reduce
);
51 int GetBackgroundThreads();
53 unsigned int GetQueueLen() const {
54 return queue_len_
.load(std::memory_order_relaxed
);
57 void LowerIOPriority();
59 void LowerCPUPriority();
61 void WakeUpAllThreads() {
62 bgsignal_
.notify_all();
65 void BGThread(size_t thread_id
);
67 void StartBGThreads();
69 void Submit(std::function
<void()>&& schedule
,
70 std::function
<void()>&& unschedule
, void* tag
);
72 int UnSchedule(void* arg
);
74 void SetHostEnv(Env
* env
) { env_
= env
; }
76 Env
* GetHostEnv() const { return env_
; }
78 bool HasExcessiveThread() const {
79 return static_cast<int>(bgthreads_
.size()) > total_threads_limit_
;
82 // Return true iff the current thread is the excessive thread to terminate.
83 // Always terminate the running thread that is added last, even if there are
84 // more than one thread to terminate.
85 bool IsLastExcessiveThread(size_t thread_id
) const {
86 return HasExcessiveThread() && thread_id
== bgthreads_
.size() - 1;
89 bool IsExcessiveThread(size_t thread_id
) const {
90 return static_cast<int>(thread_id
) >= total_threads_limit_
;
93 // Return the thread priority.
94 // This would allow its member-thread to know its priority.
95 Env::Priority
GetThreadPriority() const { return priority_
; }
97 // Set the thread priority.
98 void SetThreadPriority(Env::Priority priority
) { priority_
= priority
; }
101 static void BGThreadWrapper(void* arg
);
103 bool low_io_priority_
;
104 bool low_cpu_priority_
;
105 Env::Priority priority_
;
108 int total_threads_limit_
;
109 std::atomic_uint queue_len_
; // Queue length. Used for stats reporting
110 bool exit_all_threads_
;
111 bool wait_for_jobs_to_complete_
;
113 // Entry per Schedule()/Submit() call
116 std::function
<void()> function
;
117 std::function
<void()> unschedFunction
;
120 using BGQueue
= std::deque
<BGItem
>;
124 std::condition_variable bgsignal_
;
125 std::vector
<port::Thread
> bgthreads_
;
130 ThreadPoolImpl::Impl::Impl()
132 low_io_priority_(false),
133 low_cpu_priority_(false),
136 total_threads_limit_(0),
138 exit_all_threads_(false),
139 wait_for_jobs_to_complete_(false),
147 ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_
.size() == 0U); }
149 void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete
) {
151 std::unique_lock
<std::mutex
> lock(mu_
);
152 assert(!exit_all_threads_
);
154 wait_for_jobs_to_complete_
= wait_for_jobs_to_complete
;
155 exit_all_threads_
= true;
156 // prevent threads from being recreated right after they're joined, in case
157 // the user is concurrently submitting jobs.
158 total_threads_limit_
= 0;
162 bgsignal_
.notify_all();
164 for (auto& th
: bgthreads_
) {
170 exit_all_threads_
= false;
171 wait_for_jobs_to_complete_
= false;
175 void ThreadPoolImpl::Impl::LowerIOPriority() {
176 std::lock_guard
<std::mutex
> lock(mu_
);
177 low_io_priority_
= true;
181 void ThreadPoolImpl::Impl::LowerCPUPriority() {
182 std::lock_guard
<std::mutex
> lock(mu_
);
183 low_cpu_priority_
= true;
186 void ThreadPoolImpl::Impl::BGThread(size_t thread_id
) {
187 bool low_io_priority
= false;
188 bool low_cpu_priority
= false;
191 // Wait until there is an item that is ready to run
192 std::unique_lock
<std::mutex
> lock(mu_
);
193 // Stop waiting if the thread needs to do work or needs to terminate.
194 while (!exit_all_threads_
&& !IsLastExcessiveThread(thread_id
) &&
195 (queue_
.empty() || IsExcessiveThread(thread_id
))) {
196 bgsignal_
.wait(lock
);
199 if (exit_all_threads_
) { // mechanism to let BG threads exit safely
201 if (!wait_for_jobs_to_complete_
||
207 if (IsLastExcessiveThread(thread_id
)) {
208 // Current thread is the last generated one and is excessive.
209 // We always terminate excessive thread in the reverse order of
211 auto& terminating_thread
= bgthreads_
.back();
212 terminating_thread
.detach();
213 bgthreads_
.pop_back();
215 if (HasExcessiveThread()) {
216 // There is still at least more excessive thread to terminate.
222 auto func
= std::move(queue_
.front().function
);
225 queue_len_
.store(static_cast<unsigned int>(queue_
.size()),
226 std::memory_order_relaxed
);
228 bool decrease_io_priority
= (low_io_priority
!= low_io_priority_
);
229 bool decrease_cpu_priority
= (low_cpu_priority
!= low_cpu_priority_
);
233 if (decrease_cpu_priority
) {
238 // Lowest priority possible.
240 low_cpu_priority
= true;
243 if (decrease_io_priority
) {
244 #define IOPRIO_CLASS_SHIFT (13)
245 #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
246 // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
247 // These system calls only have an effect when used in conjunction
248 // with an I/O scheduler that supports I/O priorities. As at
249 // kernel 2.6.17 the only such scheduler is the Completely
250 // Fair Queuing (CFQ) I/O scheduler.
251 // To change scheduler:
252 // echo cfq > /sys/block/<device_name>/queue/schedule
253 // Tunables to consider:
254 // /sys/block/<device_name>/queue/slice_idle
255 // /sys/block/<device_name>/queue/slice_sync
256 syscall(SYS_ioprio_set
, 1, // IOPRIO_WHO_PROCESS
258 IOPRIO_PRIO_VALUE(3, 0));
259 low_io_priority
= true;
262 (void)decrease_io_priority
; // avoid 'unused variable' error
263 (void)decrease_cpu_priority
;
269 // Helper struct for passing arguments when creating threads.
270 struct BGThreadMetadata
{
271 ThreadPoolImpl::Impl
* thread_pool_
;
272 size_t thread_id_
; // Thread count in the thread.
273 BGThreadMetadata(ThreadPoolImpl::Impl
* thread_pool
, size_t thread_id
)
274 : thread_pool_(thread_pool
), thread_id_(thread_id
) {}
277 void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg
) {
278 BGThreadMetadata
* meta
= reinterpret_cast<BGThreadMetadata
*>(arg
);
279 size_t thread_id
= meta
->thread_id_
;
280 ThreadPoolImpl::Impl
* tp
= meta
->thread_pool_
;
281 #ifdef ROCKSDB_USING_THREAD_STATUS
282 // initialize it because compiler isn't good enough to see we don't use it
284 ThreadStatus::ThreadType thread_type
= ThreadStatus::NUM_THREAD_TYPES
;
285 switch (tp
->GetThreadPriority()) {
286 case Env::Priority::HIGH
:
287 thread_type
= ThreadStatus::HIGH_PRIORITY
;
289 case Env::Priority::LOW
:
290 thread_type
= ThreadStatus::LOW_PRIORITY
;
292 case Env::Priority::BOTTOM
:
293 thread_type
= ThreadStatus::BOTTOM_PRIORITY
;
295 case Env::Priority::USER
:
296 thread_type
= ThreadStatus::USER
;
298 case Env::Priority::TOTAL
:
302 assert(thread_type
!= ThreadStatus::NUM_THREAD_TYPES
);
303 ThreadStatusUtil::RegisterThread(tp
->GetHostEnv(), thread_type
);
306 tp
->BGThread(thread_id
);
307 #ifdef ROCKSDB_USING_THREAD_STATUS
308 ThreadStatusUtil::UnregisterThread();
313 void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num
,
315 std::lock_guard
<std::mutex
> lock(mu_
);
316 if (exit_all_threads_
) {
319 if (num
> total_threads_limit_
||
320 (num
< total_threads_limit_
&& allow_reduce
)) {
321 total_threads_limit_
= std::max(0, num
);
327 int ThreadPoolImpl::Impl::GetBackgroundThreads() {
328 std::unique_lock
<std::mutex
> lock(mu_
);
329 return total_threads_limit_
;
332 void ThreadPoolImpl::Impl::StartBGThreads() {
333 // Start background thread if necessary
334 while ((int)bgthreads_
.size() < total_threads_limit_
) {
336 port::Thread
p_t(&BGThreadWrapper
,
337 new BGThreadMetadata(this, bgthreads_
.size()));
339 // Set the thread name to aid debugging
340 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
341 #if __GLIBC_PREREQ(2, 12)
342 auto th_handle
= p_t
.native_handle();
343 std::string thread_priority
= Env::PriorityToString(GetThreadPriority());
344 std::ostringstream thread_name_stream
;
345 thread_name_stream
<< "rocksdb:";
346 for (char c
: thread_priority
) {
347 thread_name_stream
<< static_cast<char>(tolower(c
));
349 thread_name_stream
<< bgthreads_
.size();
350 pthread_setname_np(th_handle
, thread_name_stream
.str().c_str());
353 bgthreads_
.push_back(std::move(p_t
));
357 void ThreadPoolImpl::Impl::Submit(std::function
<void()>&& schedule
,
358 std::function
<void()>&& unschedule
, void* tag
) {
360 std::lock_guard
<std::mutex
> lock(mu_
);
362 if (exit_all_threads_
) {
368 // Add to priority queue
369 queue_
.push_back(BGItem());
371 auto& item
= queue_
.back();
373 item
.function
= std::move(schedule
);
374 item
.unschedFunction
= std::move(unschedule
);
376 queue_len_
.store(static_cast<unsigned int>(queue_
.size()),
377 std::memory_order_relaxed
);
379 if (!HasExcessiveThread()) {
380 // Wake up at least one waiting thread.
381 bgsignal_
.notify_one();
383 // Need to wake up all threads to make sure the one woken
384 // up is not the one to terminate.
389 int ThreadPoolImpl::Impl::UnSchedule(void* arg
) {
392 std::vector
<std::function
<void()>> candidates
;
394 std::lock_guard
<std::mutex
> lock(mu_
);
396 // Remove from priority queue
397 BGQueue::iterator it
= queue_
.begin();
398 while (it
!= queue_
.end()) {
399 if (arg
== (*it
).tag
) {
400 if (it
->unschedFunction
) {
401 candidates
.push_back(std::move(it
->unschedFunction
));
403 it
= queue_
.erase(it
);
409 queue_len_
.store(static_cast<unsigned int>(queue_
.size()),
410 std::memory_order_relaxed
);
414 // Run unschedule functions outside the mutex
415 for (auto& f
: candidates
) {
422 ThreadPoolImpl::ThreadPoolImpl() :
427 ThreadPoolImpl::~ThreadPoolImpl() {
430 void ThreadPoolImpl::JoinAllThreads() {
431 impl_
->JoinThreads(false);
434 void ThreadPoolImpl::SetBackgroundThreads(int num
) {
435 impl_
->SetBackgroundThreadsInternal(num
, true);
438 int ThreadPoolImpl::GetBackgroundThreads() {
439 return impl_
->GetBackgroundThreads();
442 unsigned int ThreadPoolImpl::GetQueueLen() const {
443 return impl_
->GetQueueLen();
446 void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() {
447 impl_
->JoinThreads(true);
450 void ThreadPoolImpl::LowerIOPriority() {
451 impl_
->LowerIOPriority();
454 void ThreadPoolImpl::LowerCPUPriority() {
455 impl_
->LowerCPUPriority();
458 void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num
) {
459 impl_
->SetBackgroundThreadsInternal(num
, false);
462 void ThreadPoolImpl::SubmitJob(const std::function
<void()>& job
) {
464 impl_
->Submit(std::move(copy
), std::function
<void()>(), nullptr);
468 void ThreadPoolImpl::SubmitJob(std::function
<void()>&& job
) {
469 impl_
->Submit(std::move(job
), std::function
<void()>(), nullptr);
472 void ThreadPoolImpl::Schedule(void(*function
)(void* arg1
), void* arg
,
473 void* tag
, void(*unschedFunction
)(void* arg
)) {
474 if (unschedFunction
== nullptr) {
475 impl_
->Submit(std::bind(function
, arg
), std::function
<void()>(), tag
);
477 impl_
->Submit(std::bind(function
, arg
), std::bind(unschedFunction
, arg
),
482 int ThreadPoolImpl::UnSchedule(void* arg
) {
483 return impl_
->UnSchedule(arg
);
486 void ThreadPoolImpl::SetHostEnv(Env
* env
) { impl_
->SetHostEnv(env
); }
488 Env
* ThreadPoolImpl::GetHostEnv() const { return impl_
->GetHostEnv(); }
490 // Return the thread priority.
491 // This would allow its member-thread to know its priority.
492 Env::Priority
ThreadPoolImpl::GetThreadPriority() const {
493 return impl_
->GetThreadPriority();
496 // Set the thread priority.
497 void ThreadPoolImpl::SetThreadPriority(Env::Priority priority
) {
498 impl_
->SetThreadPriority(priority
);
501 ThreadPool
* NewThreadPool(int num_threads
) {
502 ThreadPoolImpl
* thread_pool
= new ThreadPoolImpl();
503 thread_pool
->SetBackgroundThreads(num_threads
);
507 } // namespace ROCKSDB_NAMESPACE