]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/util/threadpool_imp.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / util / threadpool_imp.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "util/threadpool_imp.h"
11
12#include "monitoring/thread_status_util.h"
13#include "port/port.h"
14
15#ifndef OS_WIN
16# include <unistd.h>
17#endif
18
19#ifdef OS_LINUX
20# include <sys/syscall.h>
11fdf7f2 21# include <sys/resource.h>
7c673cae
FG
22#endif
23
11fdf7f2 24#include <stdlib.h>
7c673cae
FG
25#include <algorithm>
26#include <atomic>
27#include <condition_variable>
f67539c2 28#include <deque>
7c673cae 29#include <mutex>
11fdf7f2 30#include <sstream>
7c673cae
FG
31#include <thread>
32#include <vector>
33
f67539c2 34namespace ROCKSDB_NAMESPACE {
7c673cae
FG
35
36void ThreadPoolImpl::PthreadCall(const char* label, int result) {
37 if (result != 0) {
38 fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
39 abort();
40 }
41}
42
43struct ThreadPoolImpl::Impl {
44
45 Impl();
46 ~Impl();
47
48 void JoinThreads(bool wait_for_jobs_to_complete);
49
50 void SetBackgroundThreadsInternal(int num, bool allow_reduce);
11fdf7f2 51 int GetBackgroundThreads();
7c673cae
FG
52
53 unsigned int GetQueueLen() const {
54 return queue_len_.load(std::memory_order_relaxed);
55 }
56
57 void LowerIOPriority();
58
11fdf7f2
TL
59 void LowerCPUPriority();
60
7c673cae
FG
61 void WakeUpAllThreads() {
62 bgsignal_.notify_all();
63 }
64
65 void BGThread(size_t thread_id);
66
67 void StartBGThreads();
68
69 void Submit(std::function<void()>&& schedule,
70 std::function<void()>&& unschedule, void* tag);
71
72 int UnSchedule(void* arg);
73
74 void SetHostEnv(Env* env) { env_ = env; }
75
76 Env* GetHostEnv() const { return env_; }
77
78 bool HasExcessiveThread() const {
79 return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
80 }
81
82 // Return true iff the current thread is the excessive thread to terminate.
83 // Always terminate the running thread that is added last, even if there are
84 // more than one thread to terminate.
85 bool IsLastExcessiveThread(size_t thread_id) const {
86 return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
87 }
88
89 bool IsExcessiveThread(size_t thread_id) const {
90 return static_cast<int>(thread_id) >= total_threads_limit_;
91 }
92
93 // Return the thread priority.
94 // This would allow its member-thread to know its priority.
95 Env::Priority GetThreadPriority() const { return priority_; }
96
97 // Set the thread priority.
98 void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
99
100private:
f67539c2
TL
101 static void BGThreadWrapper(void* arg);
102
103 bool low_io_priority_;
104 bool low_cpu_priority_;
105 Env::Priority priority_;
106 Env* env_;
107
108 int total_threads_limit_;
109 std::atomic_uint queue_len_; // Queue length. Used for stats reporting
110 bool exit_all_threads_;
111 bool wait_for_jobs_to_complete_;
112
113 // Entry per Schedule()/Submit() call
114 struct BGItem {
115 void* tag = nullptr;
116 std::function<void()> function;
117 std::function<void()> unschedFunction;
7c673cae
FG
118 };
119
120 using BGQueue = std::deque<BGItem>;
121 BGQueue queue_;
122
123 std::mutex mu_;
124 std::condition_variable bgsignal_;
125 std::vector<port::Thread> bgthreads_;
126};
127
128
129inline
130ThreadPoolImpl::Impl::Impl()
11fdf7f2 131 :
7c673cae 132 low_io_priority_(false),
11fdf7f2 133 low_cpu_priority_(false),
7c673cae
FG
134 priority_(Env::LOW),
135 env_(nullptr),
11fdf7f2 136 total_threads_limit_(0),
7c673cae
FG
137 queue_len_(),
138 exit_all_threads_(false),
139 wait_for_jobs_to_complete_(false),
140 queue_(),
141 mu_(),
142 bgsignal_(),
143 bgthreads_() {
144}
145
146inline
147ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); }
148
149void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {
150
151 std::unique_lock<std::mutex> lock(mu_);
152 assert(!exit_all_threads_);
153
154 wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;
155 exit_all_threads_ = true;
11fdf7f2
TL
156 // prevent threads from being recreated right after they're joined, in case
157 // the user is concurrently submitting jobs.
158 total_threads_limit_ = 0;
7c673cae
FG
159
160 lock.unlock();
161
162 bgsignal_.notify_all();
163
164 for (auto& th : bgthreads_) {
165 th.join();
166 }
167
168 bgthreads_.clear();
169
170 exit_all_threads_ = false;
171 wait_for_jobs_to_complete_ = false;
172}
173
174inline
175void ThreadPoolImpl::Impl::LowerIOPriority() {
176 std::lock_guard<std::mutex> lock(mu_);
177 low_io_priority_ = true;
178}
179
11fdf7f2
TL
180inline
181void ThreadPoolImpl::Impl::LowerCPUPriority() {
182 std::lock_guard<std::mutex> lock(mu_);
183 low_cpu_priority_ = true;
184}
7c673cae
FG
185
186void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
187 bool low_io_priority = false;
11fdf7f2
TL
188 bool low_cpu_priority = false;
189
7c673cae 190 while (true) {
494da23a 191 // Wait until there is an item that is ready to run
7c673cae
FG
192 std::unique_lock<std::mutex> lock(mu_);
193 // Stop waiting if the thread needs to do work or needs to terminate.
194 while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
195 (queue_.empty() || IsExcessiveThread(thread_id))) {
196 bgsignal_.wait(lock);
197 }
198
199 if (exit_all_threads_) { // mechanism to let BG threads exit safely
200
494da23a 201 if (!wait_for_jobs_to_complete_ ||
7c673cae
FG
202 queue_.empty()) {
203 break;
204 }
205 }
206
207 if (IsLastExcessiveThread(thread_id)) {
208 // Current thread is the last generated one and is excessive.
209 // We always terminate excessive thread in the reverse order of
210 // generation time.
211 auto& terminating_thread = bgthreads_.back();
212 terminating_thread.detach();
213 bgthreads_.pop_back();
214
215 if (HasExcessiveThread()) {
216 // There is still at least more excessive thread to terminate.
217 WakeUpAllThreads();
218 }
219 break;
220 }
221
222 auto func = std::move(queue_.front().function);
223 queue_.pop_front();
224
225 queue_len_.store(static_cast<unsigned int>(queue_.size()),
226 std::memory_order_relaxed);
227
228 bool decrease_io_priority = (low_io_priority != low_io_priority_);
11fdf7f2 229 bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);
7c673cae
FG
230 lock.unlock();
231
232#ifdef OS_LINUX
11fdf7f2
TL
233 if (decrease_cpu_priority) {
234 setpriority(
235 PRIO_PROCESS,
236 // Current thread.
237 0,
238 // Lowest priority possible.
239 19);
240 low_cpu_priority = true;
241 }
242
7c673cae
FG
243 if (decrease_io_priority) {
244#define IOPRIO_CLASS_SHIFT (13)
245#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
246 // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
247 // These system calls only have an effect when used in conjunction
248 // with an I/O scheduler that supports I/O priorities. As at
249 // kernel 2.6.17 the only such scheduler is the Completely
250 // Fair Queuing (CFQ) I/O scheduler.
251 // To change scheduler:
252 // echo cfq > /sys/block/<device_name>/queue/schedule
253 // Tunables to consider:
254 // /sys/block/<device_name>/queue/slice_idle
255 // /sys/block/<device_name>/queue/slice_sync
256 syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS
257 0, // current thread
258 IOPRIO_PRIO_VALUE(3, 0));
259 low_io_priority = true;
260 }
261#else
262 (void)decrease_io_priority; // avoid 'unused variable' error
11fdf7f2 263 (void)decrease_cpu_priority;
7c673cae
FG
264#endif
265 func();
266 }
267}
268
269// Helper struct for passing arguments when creating threads.
270struct BGThreadMetadata {
271 ThreadPoolImpl::Impl* thread_pool_;
272 size_t thread_id_; // Thread count in the thread.
273 BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id)
274 : thread_pool_(thread_pool), thread_id_(thread_id) {}
275};
276
f67539c2 277void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
7c673cae
FG
278 BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
279 size_t thread_id = meta->thread_id_;
280 ThreadPoolImpl::Impl* tp = meta->thread_pool_;
281#ifdef ROCKSDB_USING_THREAD_STATUS
11fdf7f2
TL
282 // initialize it because compiler isn't good enough to see we don't use it
283 // uninitialized
284 ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES;
285 switch (tp->GetThreadPriority()) {
286 case Env::Priority::HIGH:
287 thread_type = ThreadStatus::HIGH_PRIORITY;
288 break;
289 case Env::Priority::LOW:
290 thread_type = ThreadStatus::LOW_PRIORITY;
291 break;
292 case Env::Priority::BOTTOM:
293 thread_type = ThreadStatus::BOTTOM_PRIORITY;
294 break;
494da23a
TL
295 case Env::Priority::USER:
296 thread_type = ThreadStatus::USER;
297 break;
11fdf7f2
TL
298 case Env::Priority::TOTAL:
299 assert(false);
f67539c2 300 return;
11fdf7f2
TL
301 }
302 assert(thread_type != ThreadStatus::NUM_THREAD_TYPES);
303 ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type);
7c673cae
FG
304#endif
305 delete meta;
306 tp->BGThread(thread_id);
307#ifdef ROCKSDB_USING_THREAD_STATUS
308 ThreadStatusUtil::UnregisterThread();
309#endif
f67539c2 310 return;
7c673cae
FG
311}
312
313void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
314 bool allow_reduce) {
f67539c2 315 std::lock_guard<std::mutex> lock(mu_);
7c673cae 316 if (exit_all_threads_) {
7c673cae
FG
317 return;
318 }
319 if (num > total_threads_limit_ ||
320 (num < total_threads_limit_ && allow_reduce)) {
11fdf7f2 321 total_threads_limit_ = std::max(0, num);
7c673cae
FG
322 WakeUpAllThreads();
323 StartBGThreads();
324 }
325}
326
11fdf7f2
TL
327int ThreadPoolImpl::Impl::GetBackgroundThreads() {
328 std::unique_lock<std::mutex> lock(mu_);
329 return total_threads_limit_;
330}
331
7c673cae
FG
332void ThreadPoolImpl::Impl::StartBGThreads() {
333 // Start background thread if necessary
334 while ((int)bgthreads_.size() < total_threads_limit_) {
335
336 port::Thread p_t(&BGThreadWrapper,
337 new BGThreadMetadata(this, bgthreads_.size()));
338
339// Set the thread name to aid debugging
340#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
341#if __GLIBC_PREREQ(2, 12)
342 auto th_handle = p_t.native_handle();
11fdf7f2
TL
343 std::string thread_priority = Env::PriorityToString(GetThreadPriority());
344 std::ostringstream thread_name_stream;
345 thread_name_stream << "rocksdb:";
346 for (char c : thread_priority) {
347 thread_name_stream << static_cast<char>(tolower(c));
348 }
349 thread_name_stream << bgthreads_.size();
350 pthread_setname_np(th_handle, thread_name_stream.str().c_str());
7c673cae
FG
351#endif
352#endif
353 bgthreads_.push_back(std::move(p_t));
354 }
355}
356
357void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,
358 std::function<void()>&& unschedule, void* tag) {
359
360 std::lock_guard<std::mutex> lock(mu_);
361
362 if (exit_all_threads_) {
363 return;
364 }
365
366 StartBGThreads();
367
368 // Add to priority queue
369 queue_.push_back(BGItem());
370
371 auto& item = queue_.back();
372 item.tag = tag;
373 item.function = std::move(schedule);
374 item.unschedFunction = std::move(unschedule);
375
376 queue_len_.store(static_cast<unsigned int>(queue_.size()),
377 std::memory_order_relaxed);
378
379 if (!HasExcessiveThread()) {
380 // Wake up at least one waiting thread.
381 bgsignal_.notify_one();
382 } else {
383 // Need to wake up all threads to make sure the one woken
384 // up is not the one to terminate.
385 WakeUpAllThreads();
386 }
387}
388
389int ThreadPoolImpl::Impl::UnSchedule(void* arg) {
390 int count = 0;
391
392 std::vector<std::function<void()>> candidates;
393 {
394 std::lock_guard<std::mutex> lock(mu_);
395
396 // Remove from priority queue
397 BGQueue::iterator it = queue_.begin();
398 while (it != queue_.end()) {
399 if (arg == (*it).tag) {
400 if (it->unschedFunction) {
401 candidates.push_back(std::move(it->unschedFunction));
402 }
403 it = queue_.erase(it);
404 count++;
405 } else {
406 ++it;
407 }
408 }
409 queue_len_.store(static_cast<unsigned int>(queue_.size()),
410 std::memory_order_relaxed);
411 }
412
413
414 // Run unschedule functions outside the mutex
415 for (auto& f : candidates) {
416 f();
417 }
418
419 return count;
420}
421
11fdf7f2 422ThreadPoolImpl::ThreadPoolImpl() :
7c673cae
FG
423 impl_(new Impl()) {
424}
425
426
427ThreadPoolImpl::~ThreadPoolImpl() {
428}
429
430void ThreadPoolImpl::JoinAllThreads() {
431 impl_->JoinThreads(false);
432}
433
434void ThreadPoolImpl::SetBackgroundThreads(int num) {
435 impl_->SetBackgroundThreadsInternal(num, true);
436}
437
11fdf7f2
TL
438int ThreadPoolImpl::GetBackgroundThreads() {
439 return impl_->GetBackgroundThreads();
440}
441
7c673cae
FG
442unsigned int ThreadPoolImpl::GetQueueLen() const {
443 return impl_->GetQueueLen();
444}
445
446void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() {
447 impl_->JoinThreads(true);
448}
449
450void ThreadPoolImpl::LowerIOPriority() {
451 impl_->LowerIOPriority();
452}
453
11fdf7f2
TL
454void ThreadPoolImpl::LowerCPUPriority() {
455 impl_->LowerCPUPriority();
456}
457
7c673cae
FG
458void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
459 impl_->SetBackgroundThreadsInternal(num, false);
460}
461
462void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) {
463 auto copy(job);
464 impl_->Submit(std::move(copy), std::function<void()>(), nullptr);
465}
466
467
468void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) {
469 impl_->Submit(std::move(job), std::function<void()>(), nullptr);
470}
471
472void ThreadPoolImpl::Schedule(void(*function)(void* arg1), void* arg,
473 void* tag, void(*unschedFunction)(void* arg)) {
11fdf7f2
TL
474 if (unschedFunction == nullptr) {
475 impl_->Submit(std::bind(function, arg), std::function<void()>(), tag);
476 } else {
477 impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg),
478 tag);
7c673cae 479 }
7c673cae
FG
480}
481
482int ThreadPoolImpl::UnSchedule(void* arg) {
483 return impl_->UnSchedule(arg);
484}
485
486void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); }
487
488Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); }
489
490// Return the thread priority.
491// This would allow its member-thread to know its priority.
492Env::Priority ThreadPoolImpl::GetThreadPriority() const {
493 return impl_->GetThreadPriority();
494}
495
496// Set the thread priority.
497void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) {
498 impl_->SetThreadPriority(priority);
499}
500
501ThreadPool* NewThreadPool(int num_threads) {
502 ThreadPoolImpl* thread_pool = new ThreadPoolImpl();
503 thread_pool->SetBackgroundThreads(num_threads);
504 return thread_pool;
505}
506
f67539c2 507} // namespace ROCKSDB_NAMESPACE