1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "util/threadpool_imp.h"
12 #include "monitoring/thread_status_util.h"
13 #include "port/port.h"
20 # include <sys/syscall.h>
25 #include <condition_variable>
33 void ThreadPoolImpl::PthreadCall(const char* label
, int result
) {
35 fprintf(stderr
, "pthread %s: %s\n", label
, strerror(result
));
40 struct ThreadPoolImpl::Impl
{
45 void JoinThreads(bool wait_for_jobs_to_complete
);
47 void SetBackgroundThreadsInternal(int num
, bool allow_reduce
);
49 unsigned int GetQueueLen() const {
50 return queue_len_
.load(std::memory_order_relaxed
);
53 void LowerIOPriority();
55 void WakeUpAllThreads() {
56 bgsignal_
.notify_all();
59 void BGThread(size_t thread_id
);
61 void StartBGThreads();
63 void Submit(std::function
<void()>&& schedule
,
64 std::function
<void()>&& unschedule
, void* tag
);
66 int UnSchedule(void* arg
);
68 void SetHostEnv(Env
* env
) { env_
= env
; }
70 Env
* GetHostEnv() const { return env_
; }
72 bool HasExcessiveThread() const {
73 return static_cast<int>(bgthreads_
.size()) > total_threads_limit_
;
76 // Return true iff the current thread is the excessive thread to terminate.
77 // Always terminate the running thread that is added last, even if there are
78 // more than one thread to terminate.
79 bool IsLastExcessiveThread(size_t thread_id
) const {
80 return HasExcessiveThread() && thread_id
== bgthreads_
.size() - 1;
83 bool IsExcessiveThread(size_t thread_id
) const {
84 return static_cast<int>(thread_id
) >= total_threads_limit_
;
87 // Return the thread priority.
88 // This would allow its member-thread to know its priority.
89 Env::Priority
GetThreadPriority() const { return priority_
; }
91 // Set the thread priority.
92 void SetThreadPriority(Env::Priority priority
) { priority_
= priority
; }
96 static void* BGThreadWrapper(void* arg
);
98 bool low_io_priority_
;
99 Env::Priority priority_
;
102 int total_threads_limit_
;
103 std::atomic_uint queue_len_
; // Queue length. Used for stats reporting
104 bool exit_all_threads_
;
105 bool wait_for_jobs_to_complete_
;
107 // Entry per Schedule()/Submit() call
110 std::function
<void()> function
;
111 std::function
<void()> unschedFunction
;
114 using BGQueue
= std::deque
<BGItem
>;
118 std::condition_variable bgsignal_
;
119 std::vector
<port::Thread
> bgthreads_
;
124 ThreadPoolImpl::Impl::Impl()
126 low_io_priority_(false),
129 total_threads_limit_(1),
131 exit_all_threads_(false),
132 wait_for_jobs_to_complete_(false),
140 ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_
.size() == 0U); }
142 void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete
) {
144 std::unique_lock
<std::mutex
> lock(mu_
);
145 assert(!exit_all_threads_
);
147 wait_for_jobs_to_complete_
= wait_for_jobs_to_complete
;
148 exit_all_threads_
= true;
152 bgsignal_
.notify_all();
154 for (auto& th
: bgthreads_
) {
160 exit_all_threads_
= false;
161 wait_for_jobs_to_complete_
= false;
165 void ThreadPoolImpl::Impl::LowerIOPriority() {
166 std::lock_guard
<std::mutex
> lock(mu_
);
167 low_io_priority_
= true;
171 void ThreadPoolImpl::Impl::BGThread(size_t thread_id
) {
172 bool low_io_priority
= false;
174 // Wait until there is an item that is ready to run
175 std::unique_lock
<std::mutex
> lock(mu_
);
176 // Stop waiting if the thread needs to do work or needs to terminate.
177 while (!exit_all_threads_
&& !IsLastExcessiveThread(thread_id
) &&
178 (queue_
.empty() || IsExcessiveThread(thread_id
))) {
179 bgsignal_
.wait(lock
);
182 if (exit_all_threads_
) { // mechanism to let BG threads exit safely
184 if(!wait_for_jobs_to_complete_
||
190 if (IsLastExcessiveThread(thread_id
)) {
191 // Current thread is the last generated one and is excessive.
192 // We always terminate excessive thread in the reverse order of
194 auto& terminating_thread
= bgthreads_
.back();
195 terminating_thread
.detach();
196 bgthreads_
.pop_back();
198 if (HasExcessiveThread()) {
199 // There is still at least more excessive thread to terminate.
205 auto func
= std::move(queue_
.front().function
);
208 queue_len_
.store(static_cast<unsigned int>(queue_
.size()),
209 std::memory_order_relaxed
);
211 bool decrease_io_priority
= (low_io_priority
!= low_io_priority_
);
215 if (decrease_io_priority
) {
216 #define IOPRIO_CLASS_SHIFT (13)
217 #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
218 // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
219 // These system calls only have an effect when used in conjunction
220 // with an I/O scheduler that supports I/O priorities. As at
221 // kernel 2.6.17 the only such scheduler is the Completely
222 // Fair Queuing (CFQ) I/O scheduler.
223 // To change scheduler:
224 // echo cfq > /sys/block/<device_name>/queue/schedule
225 // Tunables to consider:
226 // /sys/block/<device_name>/queue/slice_idle
227 // /sys/block/<device_name>/queue/slice_sync
228 syscall(SYS_ioprio_set
, 1, // IOPRIO_WHO_PROCESS
230 IOPRIO_PRIO_VALUE(3, 0));
231 low_io_priority
= true;
234 (void)decrease_io_priority
; // avoid 'unused variable' error
240 // Helper struct for passing arguments when creating threads.
241 struct BGThreadMetadata
{
242 ThreadPoolImpl::Impl
* thread_pool_
;
243 size_t thread_id_
; // Thread count in the thread.
244 BGThreadMetadata(ThreadPoolImpl::Impl
* thread_pool
, size_t thread_id
)
245 : thread_pool_(thread_pool
), thread_id_(thread_id
) {}
248 void* ThreadPoolImpl::Impl::BGThreadWrapper(void* arg
) {
249 BGThreadMetadata
* meta
= reinterpret_cast<BGThreadMetadata
*>(arg
);
250 size_t thread_id
= meta
->thread_id_
;
251 ThreadPoolImpl::Impl
* tp
= meta
->thread_pool_
;
252 #ifdef ROCKSDB_USING_THREAD_STATUS
254 ThreadStatusUtil::RegisterThread(
255 tp
->GetHostEnv(), (tp
->GetThreadPriority() == Env::Priority::HIGH
256 ? ThreadStatus::HIGH_PRIORITY
257 : ThreadStatus::LOW_PRIORITY
));
260 tp
->BGThread(thread_id
);
261 #ifdef ROCKSDB_USING_THREAD_STATUS
262 ThreadStatusUtil::UnregisterThread();
267 void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num
,
269 std::unique_lock
<std::mutex
> lock(mu_
);
270 if (exit_all_threads_
) {
274 if (num
> total_threads_limit_
||
275 (num
< total_threads_limit_
&& allow_reduce
)) {
276 total_threads_limit_
= std::max(1, num
);
282 void ThreadPoolImpl::Impl::StartBGThreads() {
283 // Start background thread if necessary
284 while ((int)bgthreads_
.size() < total_threads_limit_
) {
286 port::Thread
p_t(&BGThreadWrapper
,
287 new BGThreadMetadata(this, bgthreads_
.size()));
289 // Set the thread name to aid debugging
290 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
291 #if __GLIBC_PREREQ(2, 12)
292 auto th_handle
= p_t
.native_handle();
294 snprintf(name_buf
, sizeof name_buf
, "rocksdb:bg%" ROCKSDB_PRIszt
,
296 name_buf
[sizeof name_buf
- 1] = '\0';
297 pthread_setname_np(th_handle
, name_buf
);
300 bgthreads_
.push_back(std::move(p_t
));
304 void ThreadPoolImpl::Impl::Submit(std::function
<void()>&& schedule
,
305 std::function
<void()>&& unschedule
, void* tag
) {
307 std::lock_guard
<std::mutex
> lock(mu_
);
309 if (exit_all_threads_
) {
315 // Add to priority queue
316 queue_
.push_back(BGItem());
318 auto& item
= queue_
.back();
320 item
.function
= std::move(schedule
);
321 item
.unschedFunction
= std::move(unschedule
);
323 queue_len_
.store(static_cast<unsigned int>(queue_
.size()),
324 std::memory_order_relaxed
);
326 if (!HasExcessiveThread()) {
327 // Wake up at least one waiting thread.
328 bgsignal_
.notify_one();
330 // Need to wake up all threads to make sure the one woken
331 // up is not the one to terminate.
336 int ThreadPoolImpl::Impl::UnSchedule(void* arg
) {
339 std::vector
<std::function
<void()>> candidates
;
341 std::lock_guard
<std::mutex
> lock(mu_
);
343 // Remove from priority queue
344 BGQueue::iterator it
= queue_
.begin();
345 while (it
!= queue_
.end()) {
346 if (arg
== (*it
).tag
) {
347 if (it
->unschedFunction
) {
348 candidates
.push_back(std::move(it
->unschedFunction
));
350 it
= queue_
.erase(it
);
356 queue_len_
.store(static_cast<unsigned int>(queue_
.size()),
357 std::memory_order_relaxed
);
361 // Run unschedule functions outside the mutex
362 for (auto& f
: candidates
) {
369 ThreadPoolImpl::ThreadPoolImpl() :
374 ThreadPoolImpl::~ThreadPoolImpl() {
377 void ThreadPoolImpl::JoinAllThreads() {
378 impl_
->JoinThreads(false);
381 void ThreadPoolImpl::SetBackgroundThreads(int num
) {
382 impl_
->SetBackgroundThreadsInternal(num
, true);
385 unsigned int ThreadPoolImpl::GetQueueLen() const {
386 return impl_
->GetQueueLen();
389 void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() {
390 impl_
->JoinThreads(true);
393 void ThreadPoolImpl::LowerIOPriority() {
394 impl_
->LowerIOPriority();
397 void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num
) {
398 impl_
->SetBackgroundThreadsInternal(num
, false);
401 void ThreadPoolImpl::SubmitJob(const std::function
<void()>& job
) {
403 impl_
->Submit(std::move(copy
), std::function
<void()>(), nullptr);
407 void ThreadPoolImpl::SubmitJob(std::function
<void()>&& job
) {
408 impl_
->Submit(std::move(job
), std::function
<void()>(), nullptr);
411 void ThreadPoolImpl::Schedule(void(*function
)(void* arg1
), void* arg
,
412 void* tag
, void(*unschedFunction
)(void* arg
)) {
414 std::function
<void()> fn
= [arg
, function
] { function(arg
); };
416 std::function
<void()> unfn
;
417 if (unschedFunction
!= nullptr) {
418 auto uf
= [arg
, unschedFunction
] { unschedFunction(arg
); };
419 unfn
= std::move(uf
);
422 impl_
->Submit(std::move(fn
), std::move(unfn
), tag
);
425 int ThreadPoolImpl::UnSchedule(void* arg
) {
426 return impl_
->UnSchedule(arg
);
429 void ThreadPoolImpl::SetHostEnv(Env
* env
) { impl_
->SetHostEnv(env
); }
431 Env
* ThreadPoolImpl::GetHostEnv() const { return impl_
->GetHostEnv(); }
433 // Return the thread priority.
434 // This would allow its member-thread to know its priority.
435 Env::Priority
ThreadPoolImpl::GetThreadPriority() const {
436 return impl_
->GetThreadPriority();
439 // Set the thread priority.
440 void ThreadPoolImpl::SetThreadPriority(Env::Priority priority
) {
441 impl_
->SetThreadPriority(priority
);
444 ThreadPool
* NewThreadPool(int num_threads
) {
445 ThreadPoolImpl
* thread_pool
= new ThreadPoolImpl();
446 thread_pool
->SetBackgroundThreads(num_threads
);
450 } // namespace rocksdb