]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "util/threadpool_imp.h" | |
11 | ||
12 | #include "monitoring/thread_status_util.h" | |
13 | #include "port/port.h" | |
14 | ||
15 | #ifndef OS_WIN | |
16 | # include <unistd.h> | |
17 | #endif | |
18 | ||
19 | #ifdef OS_LINUX | |
20 | # include <sys/syscall.h> | |
11fdf7f2 | 21 | # include <sys/resource.h> |
7c673cae FG |
22 | #endif |
23 | ||
11fdf7f2 | 24 | #include <stdlib.h> |
7c673cae FG |
25 | #include <algorithm> |
26 | #include <atomic> | |
27 | #include <condition_variable> | |
f67539c2 | 28 | #include <deque> |
7c673cae | 29 | #include <mutex> |
11fdf7f2 | 30 | #include <sstream> |
7c673cae FG |
31 | #include <thread> |
32 | #include <vector> | |
33 | ||
f67539c2 | 34 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
35 | |
36 | void ThreadPoolImpl::PthreadCall(const char* label, int result) { | |
37 | if (result != 0) { | |
38 | fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); | |
39 | abort(); | |
40 | } | |
41 | } | |
42 | ||
43 | struct ThreadPoolImpl::Impl { | |
44 | ||
45 | Impl(); | |
46 | ~Impl(); | |
47 | ||
48 | void JoinThreads(bool wait_for_jobs_to_complete); | |
49 | ||
50 | void SetBackgroundThreadsInternal(int num, bool allow_reduce); | |
11fdf7f2 | 51 | int GetBackgroundThreads(); |
7c673cae FG |
52 | |
53 | unsigned int GetQueueLen() const { | |
54 | return queue_len_.load(std::memory_order_relaxed); | |
55 | } | |
56 | ||
57 | void LowerIOPriority(); | |
58 | ||
11fdf7f2 TL |
59 | void LowerCPUPriority(); |
60 | ||
7c673cae FG |
61 | void WakeUpAllThreads() { |
62 | bgsignal_.notify_all(); | |
63 | } | |
64 | ||
65 | void BGThread(size_t thread_id); | |
66 | ||
67 | void StartBGThreads(); | |
68 | ||
69 | void Submit(std::function<void()>&& schedule, | |
70 | std::function<void()>&& unschedule, void* tag); | |
71 | ||
72 | int UnSchedule(void* arg); | |
73 | ||
74 | void SetHostEnv(Env* env) { env_ = env; } | |
75 | ||
76 | Env* GetHostEnv() const { return env_; } | |
77 | ||
78 | bool HasExcessiveThread() const { | |
79 | return static_cast<int>(bgthreads_.size()) > total_threads_limit_; | |
80 | } | |
81 | ||
82 | // Return true iff the current thread is the excessive thread to terminate. | |
83 | // Always terminate the running thread that is added last, even if there are | |
84 | // more than one thread to terminate. | |
85 | bool IsLastExcessiveThread(size_t thread_id) const { | |
86 | return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; | |
87 | } | |
88 | ||
89 | bool IsExcessiveThread(size_t thread_id) const { | |
90 | return static_cast<int>(thread_id) >= total_threads_limit_; | |
91 | } | |
92 | ||
93 | // Return the thread priority. | |
94 | // This would allow its member-thread to know its priority. | |
95 | Env::Priority GetThreadPriority() const { return priority_; } | |
96 | ||
97 | // Set the thread priority. | |
98 | void SetThreadPriority(Env::Priority priority) { priority_ = priority; } | |
99 | ||
100 | private: | |
f67539c2 TL |
101 | static void BGThreadWrapper(void* arg); |
102 | ||
103 | bool low_io_priority_; | |
104 | bool low_cpu_priority_; | |
105 | Env::Priority priority_; | |
106 | Env* env_; | |
107 | ||
108 | int total_threads_limit_; | |
109 | std::atomic_uint queue_len_; // Queue length. Used for stats reporting | |
110 | bool exit_all_threads_; | |
111 | bool wait_for_jobs_to_complete_; | |
112 | ||
113 | // Entry per Schedule()/Submit() call | |
114 | struct BGItem { | |
115 | void* tag = nullptr; | |
116 | std::function<void()> function; | |
117 | std::function<void()> unschedFunction; | |
7c673cae FG |
118 | }; |
119 | ||
120 | using BGQueue = std::deque<BGItem>; | |
121 | BGQueue queue_; | |
122 | ||
123 | std::mutex mu_; | |
124 | std::condition_variable bgsignal_; | |
125 | std::vector<port::Thread> bgthreads_; | |
126 | }; | |
127 | ||
128 | ||
129 | inline | |
130 | ThreadPoolImpl::Impl::Impl() | |
11fdf7f2 | 131 | : |
7c673cae | 132 | low_io_priority_(false), |
11fdf7f2 | 133 | low_cpu_priority_(false), |
7c673cae FG |
134 | priority_(Env::LOW), |
135 | env_(nullptr), | |
11fdf7f2 | 136 | total_threads_limit_(0), |
7c673cae FG |
137 | queue_len_(), |
138 | exit_all_threads_(false), | |
139 | wait_for_jobs_to_complete_(false), | |
140 | queue_(), | |
141 | mu_(), | |
142 | bgsignal_(), | |
143 | bgthreads_() { | |
144 | } | |
145 | ||
146 | inline | |
147 | ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); } | |
148 | ||
149 | void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) { | |
150 | ||
151 | std::unique_lock<std::mutex> lock(mu_); | |
152 | assert(!exit_all_threads_); | |
153 | ||
154 | wait_for_jobs_to_complete_ = wait_for_jobs_to_complete; | |
155 | exit_all_threads_ = true; | |
11fdf7f2 TL |
156 | // prevent threads from being recreated right after they're joined, in case |
157 | // the user is concurrently submitting jobs. | |
158 | total_threads_limit_ = 0; | |
7c673cae FG |
159 | |
160 | lock.unlock(); | |
161 | ||
162 | bgsignal_.notify_all(); | |
163 | ||
164 | for (auto& th : bgthreads_) { | |
165 | th.join(); | |
166 | } | |
167 | ||
168 | bgthreads_.clear(); | |
169 | ||
170 | exit_all_threads_ = false; | |
171 | wait_for_jobs_to_complete_ = false; | |
172 | } | |
173 | ||
174 | inline | |
175 | void ThreadPoolImpl::Impl::LowerIOPriority() { | |
176 | std::lock_guard<std::mutex> lock(mu_); | |
177 | low_io_priority_ = true; | |
178 | } | |
179 | ||
11fdf7f2 TL |
180 | inline |
181 | void ThreadPoolImpl::Impl::LowerCPUPriority() { | |
182 | std::lock_guard<std::mutex> lock(mu_); | |
183 | low_cpu_priority_ = true; | |
184 | } | |
7c673cae FG |
185 | |
186 | void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { | |
187 | bool low_io_priority = false; | |
11fdf7f2 TL |
188 | bool low_cpu_priority = false; |
189 | ||
7c673cae | 190 | while (true) { |
494da23a | 191 | // Wait until there is an item that is ready to run |
7c673cae FG |
192 | std::unique_lock<std::mutex> lock(mu_); |
193 | // Stop waiting if the thread needs to do work or needs to terminate. | |
194 | while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && | |
195 | (queue_.empty() || IsExcessiveThread(thread_id))) { | |
196 | bgsignal_.wait(lock); | |
197 | } | |
198 | ||
199 | if (exit_all_threads_) { // mechanism to let BG threads exit safely | |
200 | ||
494da23a | 201 | if (!wait_for_jobs_to_complete_ || |
7c673cae FG |
202 | queue_.empty()) { |
203 | break; | |
204 | } | |
205 | } | |
206 | ||
207 | if (IsLastExcessiveThread(thread_id)) { | |
208 | // Current thread is the last generated one and is excessive. | |
209 | // We always terminate excessive thread in the reverse order of | |
210 | // generation time. | |
211 | auto& terminating_thread = bgthreads_.back(); | |
212 | terminating_thread.detach(); | |
213 | bgthreads_.pop_back(); | |
214 | ||
215 | if (HasExcessiveThread()) { | |
216 | // There is still at least more excessive thread to terminate. | |
217 | WakeUpAllThreads(); | |
218 | } | |
219 | break; | |
220 | } | |
221 | ||
222 | auto func = std::move(queue_.front().function); | |
223 | queue_.pop_front(); | |
224 | ||
225 | queue_len_.store(static_cast<unsigned int>(queue_.size()), | |
226 | std::memory_order_relaxed); | |
227 | ||
228 | bool decrease_io_priority = (low_io_priority != low_io_priority_); | |
11fdf7f2 | 229 | bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_); |
7c673cae FG |
230 | lock.unlock(); |
231 | ||
232 | #ifdef OS_LINUX | |
11fdf7f2 TL |
233 | if (decrease_cpu_priority) { |
234 | setpriority( | |
235 | PRIO_PROCESS, | |
236 | // Current thread. | |
237 | 0, | |
238 | // Lowest priority possible. | |
239 | 19); | |
240 | low_cpu_priority = true; | |
241 | } | |
242 | ||
7c673cae FG |
243 | if (decrease_io_priority) { |
244 | #define IOPRIO_CLASS_SHIFT (13) | |
245 | #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) | |
246 | // Put schedule into IOPRIO_CLASS_IDLE class (lowest) | |
247 | // These system calls only have an effect when used in conjunction | |
248 | // with an I/O scheduler that supports I/O priorities. As at | |
249 | // kernel 2.6.17 the only such scheduler is the Completely | |
250 | // Fair Queuing (CFQ) I/O scheduler. | |
251 | // To change scheduler: | |
252 | // echo cfq > /sys/block/<device_name>/queue/schedule | |
253 | // Tunables to consider: | |
254 | // /sys/block/<device_name>/queue/slice_idle | |
255 | // /sys/block/<device_name>/queue/slice_sync | |
256 | syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS | |
257 | 0, // current thread | |
258 | IOPRIO_PRIO_VALUE(3, 0)); | |
259 | low_io_priority = true; | |
260 | } | |
261 | #else | |
262 | (void)decrease_io_priority; // avoid 'unused variable' error | |
11fdf7f2 | 263 | (void)decrease_cpu_priority; |
7c673cae FG |
264 | #endif |
265 | func(); | |
266 | } | |
267 | } | |
268 | ||
269 | // Helper struct for passing arguments when creating threads. | |
270 | struct BGThreadMetadata { | |
271 | ThreadPoolImpl::Impl* thread_pool_; | |
272 | size_t thread_id_; // Thread count in the thread. | |
273 | BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id) | |
274 | : thread_pool_(thread_pool), thread_id_(thread_id) {} | |
275 | }; | |
276 | ||
f67539c2 | 277 | void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) { |
7c673cae FG |
278 | BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg); |
279 | size_t thread_id = meta->thread_id_; | |
280 | ThreadPoolImpl::Impl* tp = meta->thread_pool_; | |
281 | #ifdef ROCKSDB_USING_THREAD_STATUS | |
11fdf7f2 TL |
282 | // initialize it because compiler isn't good enough to see we don't use it |
283 | // uninitialized | |
284 | ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES; | |
285 | switch (tp->GetThreadPriority()) { | |
286 | case Env::Priority::HIGH: | |
287 | thread_type = ThreadStatus::HIGH_PRIORITY; | |
288 | break; | |
289 | case Env::Priority::LOW: | |
290 | thread_type = ThreadStatus::LOW_PRIORITY; | |
291 | break; | |
292 | case Env::Priority::BOTTOM: | |
293 | thread_type = ThreadStatus::BOTTOM_PRIORITY; | |
294 | break; | |
494da23a TL |
295 | case Env::Priority::USER: |
296 | thread_type = ThreadStatus::USER; | |
297 | break; | |
11fdf7f2 TL |
298 | case Env::Priority::TOTAL: |
299 | assert(false); | |
f67539c2 | 300 | return; |
11fdf7f2 TL |
301 | } |
302 | assert(thread_type != ThreadStatus::NUM_THREAD_TYPES); | |
303 | ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type); | |
7c673cae FG |
304 | #endif |
305 | delete meta; | |
306 | tp->BGThread(thread_id); | |
307 | #ifdef ROCKSDB_USING_THREAD_STATUS | |
308 | ThreadStatusUtil::UnregisterThread(); | |
309 | #endif | |
f67539c2 | 310 | return; |
7c673cae FG |
311 | } |
312 | ||
313 | void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num, | |
314 | bool allow_reduce) { | |
f67539c2 | 315 | std::lock_guard<std::mutex> lock(mu_); |
7c673cae | 316 | if (exit_all_threads_) { |
7c673cae FG |
317 | return; |
318 | } | |
319 | if (num > total_threads_limit_ || | |
320 | (num < total_threads_limit_ && allow_reduce)) { | |
11fdf7f2 | 321 | total_threads_limit_ = std::max(0, num); |
7c673cae FG |
322 | WakeUpAllThreads(); |
323 | StartBGThreads(); | |
324 | } | |
325 | } | |
326 | ||
11fdf7f2 TL |
327 | int ThreadPoolImpl::Impl::GetBackgroundThreads() { |
328 | std::unique_lock<std::mutex> lock(mu_); | |
329 | return total_threads_limit_; | |
330 | } | |
331 | ||
7c673cae FG |
332 | void ThreadPoolImpl::Impl::StartBGThreads() { |
333 | // Start background thread if necessary | |
334 | while ((int)bgthreads_.size() < total_threads_limit_) { | |
335 | ||
336 | port::Thread p_t(&BGThreadWrapper, | |
337 | new BGThreadMetadata(this, bgthreads_.size())); | |
338 | ||
339 | // Set the thread name to aid debugging | |
340 | #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) | |
341 | #if __GLIBC_PREREQ(2, 12) | |
342 | auto th_handle = p_t.native_handle(); | |
11fdf7f2 TL |
343 | std::string thread_priority = Env::PriorityToString(GetThreadPriority()); |
344 | std::ostringstream thread_name_stream; | |
345 | thread_name_stream << "rocksdb:"; | |
346 | for (char c : thread_priority) { | |
347 | thread_name_stream << static_cast<char>(tolower(c)); | |
348 | } | |
349 | thread_name_stream << bgthreads_.size(); | |
350 | pthread_setname_np(th_handle, thread_name_stream.str().c_str()); | |
7c673cae FG |
351 | #endif |
352 | #endif | |
353 | bgthreads_.push_back(std::move(p_t)); | |
354 | } | |
355 | } | |
356 | ||
357 | void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule, | |
358 | std::function<void()>&& unschedule, void* tag) { | |
359 | ||
360 | std::lock_guard<std::mutex> lock(mu_); | |
361 | ||
362 | if (exit_all_threads_) { | |
363 | return; | |
364 | } | |
365 | ||
366 | StartBGThreads(); | |
367 | ||
368 | // Add to priority queue | |
369 | queue_.push_back(BGItem()); | |
370 | ||
371 | auto& item = queue_.back(); | |
372 | item.tag = tag; | |
373 | item.function = std::move(schedule); | |
374 | item.unschedFunction = std::move(unschedule); | |
375 | ||
376 | queue_len_.store(static_cast<unsigned int>(queue_.size()), | |
377 | std::memory_order_relaxed); | |
378 | ||
379 | if (!HasExcessiveThread()) { | |
380 | // Wake up at least one waiting thread. | |
381 | bgsignal_.notify_one(); | |
382 | } else { | |
383 | // Need to wake up all threads to make sure the one woken | |
384 | // up is not the one to terminate. | |
385 | WakeUpAllThreads(); | |
386 | } | |
387 | } | |
388 | ||
389 | int ThreadPoolImpl::Impl::UnSchedule(void* arg) { | |
390 | int count = 0; | |
391 | ||
392 | std::vector<std::function<void()>> candidates; | |
393 | { | |
394 | std::lock_guard<std::mutex> lock(mu_); | |
395 | ||
396 | // Remove from priority queue | |
397 | BGQueue::iterator it = queue_.begin(); | |
398 | while (it != queue_.end()) { | |
399 | if (arg == (*it).tag) { | |
400 | if (it->unschedFunction) { | |
401 | candidates.push_back(std::move(it->unschedFunction)); | |
402 | } | |
403 | it = queue_.erase(it); | |
404 | count++; | |
405 | } else { | |
406 | ++it; | |
407 | } | |
408 | } | |
409 | queue_len_.store(static_cast<unsigned int>(queue_.size()), | |
410 | std::memory_order_relaxed); | |
411 | } | |
412 | ||
413 | ||
414 | // Run unschedule functions outside the mutex | |
415 | for (auto& f : candidates) { | |
416 | f(); | |
417 | } | |
418 | ||
419 | return count; | |
420 | } | |
421 | ||
11fdf7f2 | 422 | ThreadPoolImpl::ThreadPoolImpl() : |
7c673cae FG |
423 | impl_(new Impl()) { |
424 | } | |
425 | ||
426 | ||
427 | ThreadPoolImpl::~ThreadPoolImpl() { | |
428 | } | |
429 | ||
430 | void ThreadPoolImpl::JoinAllThreads() { | |
431 | impl_->JoinThreads(false); | |
432 | } | |
433 | ||
434 | void ThreadPoolImpl::SetBackgroundThreads(int num) { | |
435 | impl_->SetBackgroundThreadsInternal(num, true); | |
436 | } | |
437 | ||
11fdf7f2 TL |
438 | int ThreadPoolImpl::GetBackgroundThreads() { |
439 | return impl_->GetBackgroundThreads(); | |
440 | } | |
441 | ||
7c673cae FG |
442 | unsigned int ThreadPoolImpl::GetQueueLen() const { |
443 | return impl_->GetQueueLen(); | |
444 | } | |
445 | ||
446 | void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() { | |
447 | impl_->JoinThreads(true); | |
448 | } | |
449 | ||
450 | void ThreadPoolImpl::LowerIOPriority() { | |
451 | impl_->LowerIOPriority(); | |
452 | } | |
453 | ||
11fdf7f2 TL |
454 | void ThreadPoolImpl::LowerCPUPriority() { |
455 | impl_->LowerCPUPriority(); | |
456 | } | |
457 | ||
7c673cae FG |
458 | void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) { |
459 | impl_->SetBackgroundThreadsInternal(num, false); | |
460 | } | |
461 | ||
462 | void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) { | |
463 | auto copy(job); | |
464 | impl_->Submit(std::move(copy), std::function<void()>(), nullptr); | |
465 | } | |
466 | ||
467 | ||
468 | void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) { | |
469 | impl_->Submit(std::move(job), std::function<void()>(), nullptr); | |
470 | } | |
471 | ||
472 | void ThreadPoolImpl::Schedule(void(*function)(void* arg1), void* arg, | |
473 | void* tag, void(*unschedFunction)(void* arg)) { | |
11fdf7f2 TL |
474 | if (unschedFunction == nullptr) { |
475 | impl_->Submit(std::bind(function, arg), std::function<void()>(), tag); | |
476 | } else { | |
477 | impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg), | |
478 | tag); | |
7c673cae | 479 | } |
7c673cae FG |
480 | } |
481 | ||
482 | int ThreadPoolImpl::UnSchedule(void* arg) { | |
483 | return impl_->UnSchedule(arg); | |
484 | } | |
485 | ||
486 | void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); } | |
487 | ||
488 | Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); } | |
489 | ||
490 | // Return the thread priority. | |
491 | // This would allow its member-thread to know its priority. | |
492 | Env::Priority ThreadPoolImpl::GetThreadPriority() const { | |
493 | return impl_->GetThreadPriority(); | |
494 | } | |
495 | ||
496 | // Set the thread priority. | |
497 | void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) { | |
498 | impl_->SetThreadPriority(priority); | |
499 | } | |
500 | ||
501 | ThreadPool* NewThreadPool(int num_threads) { | |
502 | ThreadPoolImpl* thread_pool = new ThreadPoolImpl(); | |
503 | thread_pool->SetBackgroundThreads(num_threads); | |
504 | return thread_pool; | |
505 | } | |
506 | ||
f67539c2 | 507 | } // namespace ROCKSDB_NAMESPACE |