]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "util/threadpool_imp.h" | |
11 | ||
12 | #include "monitoring/thread_status_util.h" | |
13 | #include "port/port.h" | |
14 | ||
15 | #ifndef OS_WIN | |
16 | # include <unistd.h> | |
17 | #endif | |
18 | ||
19 | #ifdef OS_LINUX | |
20 | # include <sys/syscall.h> | |
11fdf7f2 | 21 | # include <sys/resource.h> |
7c673cae FG |
22 | #endif |
23 | ||
11fdf7f2 | 24 | #include <stdlib.h> |
7c673cae FG |
25 | #include <algorithm> |
26 | #include <atomic> | |
27 | #include <condition_variable> | |
28 | #include <mutex> | |
11fdf7f2 | 29 | #include <sstream> |
7c673cae FG |
30 | #include <thread> |
31 | #include <vector> | |
32 | ||
33 | namespace rocksdb { | |
34 | ||
35 | void ThreadPoolImpl::PthreadCall(const char* label, int result) { | |
36 | if (result != 0) { | |
37 | fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); | |
38 | abort(); | |
39 | } | |
40 | } | |
41 | ||
42 | struct ThreadPoolImpl::Impl { | |
43 | ||
44 | Impl(); | |
45 | ~Impl(); | |
46 | ||
47 | void JoinThreads(bool wait_for_jobs_to_complete); | |
48 | ||
49 | void SetBackgroundThreadsInternal(int num, bool allow_reduce); | |
11fdf7f2 | 50 | int GetBackgroundThreads(); |
7c673cae FG |
51 | |
52 | unsigned int GetQueueLen() const { | |
53 | return queue_len_.load(std::memory_order_relaxed); | |
54 | } | |
55 | ||
56 | void LowerIOPriority(); | |
57 | ||
11fdf7f2 TL |
58 | void LowerCPUPriority(); |
59 | ||
7c673cae FG |
60 | void WakeUpAllThreads() { |
61 | bgsignal_.notify_all(); | |
62 | } | |
63 | ||
64 | void BGThread(size_t thread_id); | |
65 | ||
66 | void StartBGThreads(); | |
67 | ||
68 | void Submit(std::function<void()>&& schedule, | |
69 | std::function<void()>&& unschedule, void* tag); | |
70 | ||
71 | int UnSchedule(void* arg); | |
72 | ||
73 | void SetHostEnv(Env* env) { env_ = env; } | |
74 | ||
75 | Env* GetHostEnv() const { return env_; } | |
76 | ||
77 | bool HasExcessiveThread() const { | |
78 | return static_cast<int>(bgthreads_.size()) > total_threads_limit_; | |
79 | } | |
80 | ||
81 | // Return true iff the current thread is the excessive thread to terminate. | |
82 | // Always terminate the running thread that is added last, even if there are | |
83 | // more than one thread to terminate. | |
84 | bool IsLastExcessiveThread(size_t thread_id) const { | |
85 | return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; | |
86 | } | |
87 | ||
88 | bool IsExcessiveThread(size_t thread_id) const { | |
89 | return static_cast<int>(thread_id) >= total_threads_limit_; | |
90 | } | |
91 | ||
92 | // Return the thread priority. | |
93 | // This would allow its member-thread to know its priority. | |
94 | Env::Priority GetThreadPriority() const { return priority_; } | |
95 | ||
96 | // Set the thread priority. | |
97 | void SetThreadPriority(Env::Priority priority) { priority_ = priority; } | |
98 | ||
99 | private: | |
100 | ||
101 | static void* BGThreadWrapper(void* arg); | |
102 | ||
103 | bool low_io_priority_; | |
11fdf7f2 | 104 | bool low_cpu_priority_; |
7c673cae FG |
105 | Env::Priority priority_; |
106 | Env* env_; | |
107 | ||
108 | int total_threads_limit_; | |
109 | std::atomic_uint queue_len_; // Queue length. Used for stats reporting | |
110 | bool exit_all_threads_; | |
111 | bool wait_for_jobs_to_complete_; | |
112 | ||
113 | // Entry per Schedule()/Submit() call | |
114 | struct BGItem { | |
115 | void* tag = nullptr; | |
116 | std::function<void()> function; | |
117 | std::function<void()> unschedFunction; | |
118 | }; | |
119 | ||
120 | using BGQueue = std::deque<BGItem>; | |
121 | BGQueue queue_; | |
122 | ||
123 | std::mutex mu_; | |
124 | std::condition_variable bgsignal_; | |
125 | std::vector<port::Thread> bgthreads_; | |
126 | }; | |
127 | ||
128 | ||
129 | inline | |
130 | ThreadPoolImpl::Impl::Impl() | |
11fdf7f2 | 131 | : |
7c673cae | 132 | low_io_priority_(false), |
11fdf7f2 | 133 | low_cpu_priority_(false), |
7c673cae FG |
134 | priority_(Env::LOW), |
135 | env_(nullptr), | |
11fdf7f2 | 136 | total_threads_limit_(0), |
7c673cae FG |
137 | queue_len_(), |
138 | exit_all_threads_(false), | |
139 | wait_for_jobs_to_complete_(false), | |
140 | queue_(), | |
141 | mu_(), | |
142 | bgsignal_(), | |
143 | bgthreads_() { | |
144 | } | |
145 | ||
146 | inline | |
147 | ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); } | |
148 | ||
149 | void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) { | |
150 | ||
151 | std::unique_lock<std::mutex> lock(mu_); | |
152 | assert(!exit_all_threads_); | |
153 | ||
154 | wait_for_jobs_to_complete_ = wait_for_jobs_to_complete; | |
155 | exit_all_threads_ = true; | |
11fdf7f2 TL |
156 | // prevent threads from being recreated right after they're joined, in case |
157 | // the user is concurrently submitting jobs. | |
158 | total_threads_limit_ = 0; | |
7c673cae FG |
159 | |
160 | lock.unlock(); | |
161 | ||
162 | bgsignal_.notify_all(); | |
163 | ||
164 | for (auto& th : bgthreads_) { | |
165 | th.join(); | |
166 | } | |
167 | ||
168 | bgthreads_.clear(); | |
169 | ||
170 | exit_all_threads_ = false; | |
171 | wait_for_jobs_to_complete_ = false; | |
172 | } | |
173 | ||
174 | inline | |
175 | void ThreadPoolImpl::Impl::LowerIOPriority() { | |
176 | std::lock_guard<std::mutex> lock(mu_); | |
177 | low_io_priority_ = true; | |
178 | } | |
179 | ||
11fdf7f2 TL |
180 | inline |
181 | void ThreadPoolImpl::Impl::LowerCPUPriority() { | |
182 | std::lock_guard<std::mutex> lock(mu_); | |
183 | low_cpu_priority_ = true; | |
184 | } | |
7c673cae FG |
185 | |
186 | void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { | |
187 | bool low_io_priority = false; | |
11fdf7f2 TL |
188 | bool low_cpu_priority = false; |
189 | ||
7c673cae | 190 | while (true) { |
494da23a | 191 | // Wait until there is an item that is ready to run |
7c673cae FG |
192 | std::unique_lock<std::mutex> lock(mu_); |
193 | // Stop waiting if the thread needs to do work or needs to terminate. | |
194 | while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && | |
195 | (queue_.empty() || IsExcessiveThread(thread_id))) { | |
196 | bgsignal_.wait(lock); | |
197 | } | |
198 | ||
199 | if (exit_all_threads_) { // mechanism to let BG threads exit safely | |
200 | ||
494da23a | 201 | if (!wait_for_jobs_to_complete_ || |
7c673cae FG |
202 | queue_.empty()) { |
203 | break; | |
204 | } | |
205 | } | |
206 | ||
207 | if (IsLastExcessiveThread(thread_id)) { | |
208 | // Current thread is the last generated one and is excessive. | |
209 | // We always terminate excessive thread in the reverse order of | |
210 | // generation time. | |
211 | auto& terminating_thread = bgthreads_.back(); | |
212 | terminating_thread.detach(); | |
213 | bgthreads_.pop_back(); | |
214 | ||
215 | if (HasExcessiveThread()) { | |
216 | // There is still at least more excessive thread to terminate. | |
217 | WakeUpAllThreads(); | |
218 | } | |
219 | break; | |
220 | } | |
221 | ||
222 | auto func = std::move(queue_.front().function); | |
223 | queue_.pop_front(); | |
224 | ||
225 | queue_len_.store(static_cast<unsigned int>(queue_.size()), | |
226 | std::memory_order_relaxed); | |
227 | ||
228 | bool decrease_io_priority = (low_io_priority != low_io_priority_); | |
11fdf7f2 | 229 | bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_); |
7c673cae FG |
230 | lock.unlock(); |
231 | ||
232 | #ifdef OS_LINUX | |
11fdf7f2 TL |
233 | if (decrease_cpu_priority) { |
234 | setpriority( | |
235 | PRIO_PROCESS, | |
236 | // Current thread. | |
237 | 0, | |
238 | // Lowest priority possible. | |
239 | 19); | |
240 | low_cpu_priority = true; | |
241 | } | |
242 | ||
7c673cae FG |
243 | if (decrease_io_priority) { |
244 | #define IOPRIO_CLASS_SHIFT (13) | |
245 | #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) | |
246 | // Put schedule into IOPRIO_CLASS_IDLE class (lowest) | |
247 | // These system calls only have an effect when used in conjunction | |
248 | // with an I/O scheduler that supports I/O priorities. As at | |
249 | // kernel 2.6.17 the only such scheduler is the Completely | |
250 | // Fair Queuing (CFQ) I/O scheduler. | |
251 | // To change scheduler: | |
252 | // echo cfq > /sys/block/<device_name>/queue/schedule | |
253 | // Tunables to consider: | |
254 | // /sys/block/<device_name>/queue/slice_idle | |
255 | // /sys/block/<device_name>/queue/slice_sync | |
256 | syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS | |
257 | 0, // current thread | |
258 | IOPRIO_PRIO_VALUE(3, 0)); | |
259 | low_io_priority = true; | |
260 | } | |
261 | #else | |
262 | (void)decrease_io_priority; // avoid 'unused variable' error | |
11fdf7f2 | 263 | (void)decrease_cpu_priority; |
7c673cae FG |
264 | #endif |
265 | func(); | |
266 | } | |
267 | } | |
268 | ||
269 | // Helper struct for passing arguments when creating threads. | |
270 | struct BGThreadMetadata { | |
271 | ThreadPoolImpl::Impl* thread_pool_; | |
272 | size_t thread_id_; // Thread count in the thread. | |
273 | BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id) | |
274 | : thread_pool_(thread_pool), thread_id_(thread_id) {} | |
275 | }; | |
276 | ||
277 | void* ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) { | |
278 | BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg); | |
279 | size_t thread_id = meta->thread_id_; | |
280 | ThreadPoolImpl::Impl* tp = meta->thread_pool_; | |
281 | #ifdef ROCKSDB_USING_THREAD_STATUS | |
11fdf7f2 TL |
282 | // initialize it because compiler isn't good enough to see we don't use it |
283 | // uninitialized | |
284 | ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES; | |
285 | switch (tp->GetThreadPriority()) { | |
286 | case Env::Priority::HIGH: | |
287 | thread_type = ThreadStatus::HIGH_PRIORITY; | |
288 | break; | |
289 | case Env::Priority::LOW: | |
290 | thread_type = ThreadStatus::LOW_PRIORITY; | |
291 | break; | |
292 | case Env::Priority::BOTTOM: | |
293 | thread_type = ThreadStatus::BOTTOM_PRIORITY; | |
294 | break; | |
494da23a TL |
295 | case Env::Priority::USER: |
296 | thread_type = ThreadStatus::USER; | |
297 | break; | |
11fdf7f2 TL |
298 | case Env::Priority::TOTAL: |
299 | assert(false); | |
300 | return nullptr; | |
301 | } | |
302 | assert(thread_type != ThreadStatus::NUM_THREAD_TYPES); | |
303 | ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type); | |
7c673cae FG |
304 | #endif |
305 | delete meta; | |
306 | tp->BGThread(thread_id); | |
307 | #ifdef ROCKSDB_USING_THREAD_STATUS | |
308 | ThreadStatusUtil::UnregisterThread(); | |
309 | #endif | |
310 | return nullptr; | |
311 | } | |
312 | ||
313 | void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num, | |
314 | bool allow_reduce) { | |
315 | std::unique_lock<std::mutex> lock(mu_); | |
316 | if (exit_all_threads_) { | |
317 | lock.unlock(); | |
318 | return; | |
319 | } | |
320 | if (num > total_threads_limit_ || | |
321 | (num < total_threads_limit_ && allow_reduce)) { | |
11fdf7f2 | 322 | total_threads_limit_ = std::max(0, num); |
7c673cae FG |
323 | WakeUpAllThreads(); |
324 | StartBGThreads(); | |
325 | } | |
326 | } | |
327 | ||
11fdf7f2 TL |
328 | int ThreadPoolImpl::Impl::GetBackgroundThreads() { |
329 | std::unique_lock<std::mutex> lock(mu_); | |
330 | return total_threads_limit_; | |
331 | } | |
332 | ||
7c673cae FG |
333 | void ThreadPoolImpl::Impl::StartBGThreads() { |
334 | // Start background thread if necessary | |
335 | while ((int)bgthreads_.size() < total_threads_limit_) { | |
336 | ||
337 | port::Thread p_t(&BGThreadWrapper, | |
338 | new BGThreadMetadata(this, bgthreads_.size())); | |
339 | ||
340 | // Set the thread name to aid debugging | |
341 | #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) | |
342 | #if __GLIBC_PREREQ(2, 12) | |
343 | auto th_handle = p_t.native_handle(); | |
11fdf7f2 TL |
344 | std::string thread_priority = Env::PriorityToString(GetThreadPriority()); |
345 | std::ostringstream thread_name_stream; | |
346 | thread_name_stream << "rocksdb:"; | |
347 | for (char c : thread_priority) { | |
348 | thread_name_stream << static_cast<char>(tolower(c)); | |
349 | } | |
350 | thread_name_stream << bgthreads_.size(); | |
351 | pthread_setname_np(th_handle, thread_name_stream.str().c_str()); | |
7c673cae FG |
352 | #endif |
353 | #endif | |
354 | bgthreads_.push_back(std::move(p_t)); | |
355 | } | |
356 | } | |
357 | ||
358 | void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule, | |
359 | std::function<void()>&& unschedule, void* tag) { | |
360 | ||
361 | std::lock_guard<std::mutex> lock(mu_); | |
362 | ||
363 | if (exit_all_threads_) { | |
364 | return; | |
365 | } | |
366 | ||
367 | StartBGThreads(); | |
368 | ||
369 | // Add to priority queue | |
370 | queue_.push_back(BGItem()); | |
371 | ||
372 | auto& item = queue_.back(); | |
373 | item.tag = tag; | |
374 | item.function = std::move(schedule); | |
375 | item.unschedFunction = std::move(unschedule); | |
376 | ||
377 | queue_len_.store(static_cast<unsigned int>(queue_.size()), | |
378 | std::memory_order_relaxed); | |
379 | ||
380 | if (!HasExcessiveThread()) { | |
381 | // Wake up at least one waiting thread. | |
382 | bgsignal_.notify_one(); | |
383 | } else { | |
384 | // Need to wake up all threads to make sure the one woken | |
385 | // up is not the one to terminate. | |
386 | WakeUpAllThreads(); | |
387 | } | |
388 | } | |
389 | ||
390 | int ThreadPoolImpl::Impl::UnSchedule(void* arg) { | |
391 | int count = 0; | |
392 | ||
393 | std::vector<std::function<void()>> candidates; | |
394 | { | |
395 | std::lock_guard<std::mutex> lock(mu_); | |
396 | ||
397 | // Remove from priority queue | |
398 | BGQueue::iterator it = queue_.begin(); | |
399 | while (it != queue_.end()) { | |
400 | if (arg == (*it).tag) { | |
401 | if (it->unschedFunction) { | |
402 | candidates.push_back(std::move(it->unschedFunction)); | |
403 | } | |
404 | it = queue_.erase(it); | |
405 | count++; | |
406 | } else { | |
407 | ++it; | |
408 | } | |
409 | } | |
410 | queue_len_.store(static_cast<unsigned int>(queue_.size()), | |
411 | std::memory_order_relaxed); | |
412 | } | |
413 | ||
414 | ||
415 | // Run unschedule functions outside the mutex | |
416 | for (auto& f : candidates) { | |
417 | f(); | |
418 | } | |
419 | ||
420 | return count; | |
421 | } | |
422 | ||
11fdf7f2 | 423 | ThreadPoolImpl::ThreadPoolImpl() : |
7c673cae FG |
424 | impl_(new Impl()) { |
425 | } | |
426 | ||
427 | ||
428 | ThreadPoolImpl::~ThreadPoolImpl() { | |
429 | } | |
430 | ||
431 | void ThreadPoolImpl::JoinAllThreads() { | |
432 | impl_->JoinThreads(false); | |
433 | } | |
434 | ||
435 | void ThreadPoolImpl::SetBackgroundThreads(int num) { | |
436 | impl_->SetBackgroundThreadsInternal(num, true); | |
437 | } | |
438 | ||
11fdf7f2 TL |
439 | int ThreadPoolImpl::GetBackgroundThreads() { |
440 | return impl_->GetBackgroundThreads(); | |
441 | } | |
442 | ||
7c673cae FG |
443 | unsigned int ThreadPoolImpl::GetQueueLen() const { |
444 | return impl_->GetQueueLen(); | |
445 | } | |
446 | ||
447 | void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() { | |
448 | impl_->JoinThreads(true); | |
449 | } | |
450 | ||
451 | void ThreadPoolImpl::LowerIOPriority() { | |
452 | impl_->LowerIOPriority(); | |
453 | } | |
454 | ||
11fdf7f2 TL |
455 | void ThreadPoolImpl::LowerCPUPriority() { |
456 | impl_->LowerCPUPriority(); | |
457 | } | |
458 | ||
7c673cae FG |
459 | void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) { |
460 | impl_->SetBackgroundThreadsInternal(num, false); | |
461 | } | |
462 | ||
463 | void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) { | |
464 | auto copy(job); | |
465 | impl_->Submit(std::move(copy), std::function<void()>(), nullptr); | |
466 | } | |
467 | ||
468 | ||
469 | void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) { | |
470 | impl_->Submit(std::move(job), std::function<void()>(), nullptr); | |
471 | } | |
472 | ||
473 | void ThreadPoolImpl::Schedule(void(*function)(void* arg1), void* arg, | |
474 | void* tag, void(*unschedFunction)(void* arg)) { | |
11fdf7f2 TL |
475 | if (unschedFunction == nullptr) { |
476 | impl_->Submit(std::bind(function, arg), std::function<void()>(), tag); | |
477 | } else { | |
478 | impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg), | |
479 | tag); | |
7c673cae | 480 | } |
7c673cae FG |
481 | } |
482 | ||
483 | int ThreadPoolImpl::UnSchedule(void* arg) { | |
484 | return impl_->UnSchedule(arg); | |
485 | } | |
486 | ||
487 | void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); } | |
488 | ||
489 | Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); } | |
490 | ||
491 | // Return the thread priority. | |
492 | // This would allow its member-thread to know its priority. | |
493 | Env::Priority ThreadPoolImpl::GetThreadPriority() const { | |
494 | return impl_->GetThreadPriority(); | |
495 | } | |
496 | ||
497 | // Set the thread priority. | |
498 | void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) { | |
499 | impl_->SetThreadPriority(priority); | |
500 | } | |
501 | ||
502 | ThreadPool* NewThreadPool(int num_threads) { | |
503 | ThreadPoolImpl* thread_pool = new ThreadPoolImpl(); | |
504 | thread_pool->SetBackgroundThreads(num_threads); | |
505 | return thread_pool; | |
506 | } | |
507 | ||
508 | } // namespace rocksdb |