]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "util/threadpool_imp.h" | |
11 | ||
7c673cae | 12 | #ifndef OS_WIN |
1e59de90 | 13 | #include <unistd.h> |
7c673cae FG |
14 | #endif |
15 | ||
16 | #ifdef OS_LINUX | |
1e59de90 TL |
17 | #include <sys/resource.h> |
18 | #include <sys/syscall.h> | |
7c673cae FG |
19 | #endif |
20 | ||
11fdf7f2 | 21 | #include <stdlib.h> |
1e59de90 | 22 | |
7c673cae FG |
23 | #include <algorithm> |
24 | #include <atomic> | |
25 | #include <condition_variable> | |
f67539c2 | 26 | #include <deque> |
7c673cae | 27 | #include <mutex> |
11fdf7f2 | 28 | #include <sstream> |
7c673cae FG |
29 | #include <thread> |
30 | #include <vector> | |
31 | ||
20effc67 TL |
32 | #include "monitoring/thread_status_util.h" |
33 | #include "port/port.h" | |
34 | #include "test_util/sync_point.h" | |
1e59de90 | 35 | #include "util/string_util.h" |
20effc67 | 36 | |
f67539c2 | 37 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
38 | |
39 | void ThreadPoolImpl::PthreadCall(const char* label, int result) { | |
40 | if (result != 0) { | |
1e59de90 | 41 | fprintf(stderr, "pthread %s: %s\n", label, errnoStr(result).c_str()); |
7c673cae FG |
42 | abort(); |
43 | } | |
44 | } | |
45 | ||
46 | struct ThreadPoolImpl::Impl { | |
7c673cae FG |
47 | Impl(); |
48 | ~Impl(); | |
49 | ||
50 | void JoinThreads(bool wait_for_jobs_to_complete); | |
51 | ||
52 | void SetBackgroundThreadsInternal(int num, bool allow_reduce); | |
11fdf7f2 | 53 | int GetBackgroundThreads(); |
7c673cae FG |
54 | |
55 | unsigned int GetQueueLen() const { | |
56 | return queue_len_.load(std::memory_order_relaxed); | |
57 | } | |
58 | ||
59 | void LowerIOPriority(); | |
60 | ||
20effc67 | 61 | void LowerCPUPriority(CpuPriority pri); |
11fdf7f2 | 62 | |
1e59de90 | 63 | void WakeUpAllThreads() { bgsignal_.notify_all(); } |
7c673cae FG |
64 | |
65 | void BGThread(size_t thread_id); | |
66 | ||
67 | void StartBGThreads(); | |
68 | ||
69 | void Submit(std::function<void()>&& schedule, | |
1e59de90 | 70 | std::function<void()>&& unschedule, void* tag); |
7c673cae FG |
71 | |
72 | int UnSchedule(void* arg); | |
73 | ||
74 | void SetHostEnv(Env* env) { env_ = env; } | |
75 | ||
76 | Env* GetHostEnv() const { return env_; } | |
77 | ||
78 | bool HasExcessiveThread() const { | |
79 | return static_cast<int>(bgthreads_.size()) > total_threads_limit_; | |
80 | } | |
81 | ||
82 | // Return true iff the current thread is the excessive thread to terminate. | |
83 | // Always terminate the running thread that is added last, even if there are | |
84 | // more than one thread to terminate. | |
85 | bool IsLastExcessiveThread(size_t thread_id) const { | |
86 | return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; | |
87 | } | |
88 | ||
89 | bool IsExcessiveThread(size_t thread_id) const { | |
90 | return static_cast<int>(thread_id) >= total_threads_limit_; | |
91 | } | |
92 | ||
93 | // Return the thread priority. | |
94 | // This would allow its member-thread to know its priority. | |
95 | Env::Priority GetThreadPriority() const { return priority_; } | |
96 | ||
97 | // Set the thread priority. | |
98 | void SetThreadPriority(Env::Priority priority) { priority_ = priority; } | |
99 | ||
1e59de90 TL |
100 | int ReserveThreads(int threads_to_be_reserved) { |
101 | std::unique_lock<std::mutex> lock(mu_); | |
102 | // We can reserve at most num_waiting_threads_ in total so the number of | |
103 | // threads that can be reserved might be fewer than the desired one. In | |
104 | // rare cases, num_waiting_threads_ could be less than reserved_threads | |
105 | // due to SetBackgroundThreadInternal or last excessive threads. If that | |
106 | // happens, we cannot reserve any other threads. | |
107 | int reserved_threads_in_success = | |
108 | std::min(std::max(num_waiting_threads_ - reserved_threads_, 0), | |
109 | threads_to_be_reserved); | |
110 | reserved_threads_ += reserved_threads_in_success; | |
111 | return reserved_threads_in_success; | |
112 | } | |
f67539c2 | 113 | |
1e59de90 TL |
114 | int ReleaseThreads(int threads_to_be_released) { |
115 | std::unique_lock<std::mutex> lock(mu_); | |
116 | // We cannot release more than reserved_threads_ | |
117 | int released_threads_in_success = | |
118 | std::min(reserved_threads_, threads_to_be_released); | |
119 | reserved_threads_ -= released_threads_in_success; | |
120 | WakeUpAllThreads(); | |
121 | return released_threads_in_success; | |
122 | } | |
f67539c2 | 123 | |
1e59de90 TL |
124 | private: |
125 | static void BGThreadWrapper(void* arg); | |
126 | ||
127 | bool low_io_priority_; | |
128 | CpuPriority cpu_priority_; | |
129 | Env::Priority priority_; | |
130 | Env* env_; | |
131 | ||
132 | int total_threads_limit_; | |
133 | std::atomic_uint queue_len_; // Queue length. Used for stats reporting | |
134 | // Number of reserved threads, managed by ReserveThreads(..) and | |
135 | // ReleaseThreads(..), if num_waiting_threads_ is no larger than | |
136 | // reserved_threads_, its thread will be blocked to ensure the reservation | |
137 | // mechanism | |
138 | int reserved_threads_; | |
139 | // Number of waiting threads (Maximum number of threads that can be | |
140 | // reserved), in rare cases, num_waiting_threads_ could be less than | |
141 | // reserved_threads due to SetBackgroundThreadInternal or last | |
142 | // excessive threads. | |
143 | int num_waiting_threads_; | |
144 | bool exit_all_threads_; | |
145 | bool wait_for_jobs_to_complete_; | |
146 | ||
147 | // Entry per Schedule()/Submit() call | |
148 | struct BGItem { | |
149 | void* tag = nullptr; | |
150 | std::function<void()> function; | |
151 | std::function<void()> unschedFunction; | |
7c673cae FG |
152 | }; |
153 | ||
154 | using BGQueue = std::deque<BGItem>; | |
1e59de90 | 155 | BGQueue queue_; |
7c673cae | 156 | |
1e59de90 TL |
157 | std::mutex mu_; |
158 | std::condition_variable bgsignal_; | |
7c673cae FG |
159 | std::vector<port::Thread> bgthreads_; |
160 | }; | |
161 | ||
20effc67 TL |
162 | inline ThreadPoolImpl::Impl::Impl() |
163 | : low_io_priority_(false), | |
164 | cpu_priority_(CpuPriority::kNormal), | |
7c673cae FG |
165 | priority_(Env::LOW), |
166 | env_(nullptr), | |
11fdf7f2 | 167 | total_threads_limit_(0), |
7c673cae | 168 | queue_len_(), |
1e59de90 TL |
169 | reserved_threads_(0), |
170 | num_waiting_threads_(0), | |
7c673cae FG |
171 | exit_all_threads_(false), |
172 | wait_for_jobs_to_complete_(false), | |
173 | queue_(), | |
174 | mu_(), | |
175 | bgsignal_(), | |
20effc67 | 176 | bgthreads_() {} |
7c673cae | 177 | |
1e59de90 | 178 | inline ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); } |
7c673cae FG |
179 | |
180 | void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) { | |
7c673cae FG |
181 | std::unique_lock<std::mutex> lock(mu_); |
182 | assert(!exit_all_threads_); | |
183 | ||
184 | wait_for_jobs_to_complete_ = wait_for_jobs_to_complete; | |
185 | exit_all_threads_ = true; | |
11fdf7f2 TL |
186 | // prevent threads from being recreated right after they're joined, in case |
187 | // the user is concurrently submitting jobs. | |
188 | total_threads_limit_ = 0; | |
1e59de90 TL |
189 | reserved_threads_ = 0; |
190 | num_waiting_threads_ = 0; | |
7c673cae FG |
191 | |
192 | lock.unlock(); | |
193 | ||
194 | bgsignal_.notify_all(); | |
195 | ||
196 | for (auto& th : bgthreads_) { | |
197 | th.join(); | |
198 | } | |
199 | ||
200 | bgthreads_.clear(); | |
201 | ||
202 | exit_all_threads_ = false; | |
203 | wait_for_jobs_to_complete_ = false; | |
204 | } | |
205 | ||
1e59de90 | 206 | inline void ThreadPoolImpl::Impl::LowerIOPriority() { |
7c673cae FG |
207 | std::lock_guard<std::mutex> lock(mu_); |
208 | low_io_priority_ = true; | |
209 | } | |
210 | ||
20effc67 | 211 | inline void ThreadPoolImpl::Impl::LowerCPUPriority(CpuPriority pri) { |
11fdf7f2 | 212 | std::lock_guard<std::mutex> lock(mu_); |
20effc67 | 213 | cpu_priority_ = pri; |
11fdf7f2 | 214 | } |
7c673cae FG |
215 | |
216 | void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { | |
217 | bool low_io_priority = false; | |
20effc67 | 218 | CpuPriority current_cpu_priority = CpuPriority::kNormal; |
11fdf7f2 | 219 | |
7c673cae | 220 | while (true) { |
494da23a | 221 | // Wait until there is an item that is ready to run |
7c673cae FG |
222 | std::unique_lock<std::mutex> lock(mu_); |
223 | // Stop waiting if the thread needs to do work or needs to terminate. | |
1e59de90 TL |
224 | // Increase num_waiting_threads_ once this task has started waiting |
225 | num_waiting_threads_++; | |
226 | ||
227 | TEST_SYNC_POINT("ThreadPoolImpl::BGThread::WaitingThreadsInc"); | |
228 | TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Start:th", thread_id); | |
229 | // When not exist_all_threads and the current thread id is not the last | |
230 | // excessive thread, it may be blocked due to 3 reasons: 1) queue is empty | |
231 | // 2) it is the excessive thread (not the last one) | |
232 | // 3) the number of waiting threads is not greater than reserved threads | |
233 | // (i.e, no available threads due to full reservation") | |
7c673cae | 234 | while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && |
1e59de90 TL |
235 | (queue_.empty() || IsExcessiveThread(thread_id) || |
236 | num_waiting_threads_ <= reserved_threads_)) { | |
7c673cae FG |
237 | bgsignal_.wait(lock); |
238 | } | |
1e59de90 TL |
239 | // Decrease num_waiting_threads_ once the thread is not waiting |
240 | num_waiting_threads_--; | |
7c673cae FG |
241 | |
242 | if (exit_all_threads_) { // mechanism to let BG threads exit safely | |
243 | ||
1e59de90 | 244 | if (!wait_for_jobs_to_complete_ || queue_.empty()) { |
7c673cae | 245 | break; |
1e59de90 TL |
246 | } |
247 | } else if (IsLastExcessiveThread(thread_id)) { | |
7c673cae FG |
248 | // Current thread is the last generated one and is excessive. |
249 | // We always terminate excessive thread in the reverse order of | |
1e59de90 TL |
250 | // generation time. But not when `exit_all_threads_ == true`, |
251 | // otherwise `JoinThreads()` could try to `join()` a `detach()`ed | |
252 | // thread. | |
7c673cae FG |
253 | auto& terminating_thread = bgthreads_.back(); |
254 | terminating_thread.detach(); | |
255 | bgthreads_.pop_back(); | |
7c673cae FG |
256 | if (HasExcessiveThread()) { |
257 | // There is still at least more excessive thread to terminate. | |
258 | WakeUpAllThreads(); | |
259 | } | |
1e59de90 TL |
260 | TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Termination:th", |
261 | thread_id); | |
262 | TEST_SYNC_POINT("ThreadPoolImpl::BGThread::Termination"); | |
7c673cae FG |
263 | break; |
264 | } | |
265 | ||
266 | auto func = std::move(queue_.front().function); | |
267 | queue_.pop_front(); | |
268 | ||
269 | queue_len_.store(static_cast<unsigned int>(queue_.size()), | |
270 | std::memory_order_relaxed); | |
271 | ||
272 | bool decrease_io_priority = (low_io_priority != low_io_priority_); | |
20effc67 | 273 | CpuPriority cpu_priority = cpu_priority_; |
7c673cae FG |
274 | lock.unlock(); |
275 | ||
20effc67 TL |
276 | if (cpu_priority < current_cpu_priority) { |
277 | TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::BeforeSetCpuPriority", | |
278 | ¤t_cpu_priority); | |
279 | // 0 means current thread. | |
280 | port::SetCpuPriority(0, cpu_priority); | |
281 | current_cpu_priority = cpu_priority; | |
282 | TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::AfterSetCpuPriority", | |
283 | ¤t_cpu_priority); | |
11fdf7f2 TL |
284 | } |
285 | ||
20effc67 | 286 | #ifdef OS_LINUX |
7c673cae FG |
287 | if (decrease_io_priority) { |
288 | #define IOPRIO_CLASS_SHIFT (13) | |
289 | #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) | |
290 | // Put schedule into IOPRIO_CLASS_IDLE class (lowest) | |
291 | // These system calls only have an effect when used in conjunction | |
292 | // with an I/O scheduler that supports I/O priorities. As at | |
293 | // kernel 2.6.17 the only such scheduler is the Completely | |
294 | // Fair Queuing (CFQ) I/O scheduler. | |
295 | // To change scheduler: | |
296 | // echo cfq > /sys/block/<device_name>/queue/schedule | |
297 | // Tunables to consider: | |
298 | // /sys/block/<device_name>/queue/slice_idle | |
299 | // /sys/block/<device_name>/queue/slice_sync | |
300 | syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS | |
301 | 0, // current thread | |
302 | IOPRIO_PRIO_VALUE(3, 0)); | |
303 | low_io_priority = true; | |
304 | } | |
305 | #else | |
306 | (void)decrease_io_priority; // avoid 'unused variable' error | |
307 | #endif | |
20effc67 TL |
308 | |
309 | TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::Impl::BGThread:BeforeRun", | |
310 | &priority_); | |
311 | ||
7c673cae FG |
312 | func(); |
313 | } | |
314 | } | |
315 | ||
316 | // Helper struct for passing arguments when creating threads. | |
317 | struct BGThreadMetadata { | |
318 | ThreadPoolImpl::Impl* thread_pool_; | |
319 | size_t thread_id_; // Thread count in the thread. | |
320 | BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id) | |
321 | : thread_pool_(thread_pool), thread_id_(thread_id) {} | |
322 | }; | |
323 | ||
f67539c2 | 324 | void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) { |
7c673cae FG |
325 | BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg); |
326 | size_t thread_id = meta->thread_id_; | |
327 | ThreadPoolImpl::Impl* tp = meta->thread_pool_; | |
328 | #ifdef ROCKSDB_USING_THREAD_STATUS | |
11fdf7f2 TL |
329 | // initialize it because compiler isn't good enough to see we don't use it |
330 | // uninitialized | |
331 | ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES; | |
332 | switch (tp->GetThreadPriority()) { | |
333 | case Env::Priority::HIGH: | |
334 | thread_type = ThreadStatus::HIGH_PRIORITY; | |
335 | break; | |
336 | case Env::Priority::LOW: | |
337 | thread_type = ThreadStatus::LOW_PRIORITY; | |
338 | break; | |
339 | case Env::Priority::BOTTOM: | |
340 | thread_type = ThreadStatus::BOTTOM_PRIORITY; | |
341 | break; | |
494da23a TL |
342 | case Env::Priority::USER: |
343 | thread_type = ThreadStatus::USER; | |
344 | break; | |
11fdf7f2 TL |
345 | case Env::Priority::TOTAL: |
346 | assert(false); | |
f67539c2 | 347 | return; |
11fdf7f2 TL |
348 | } |
349 | assert(thread_type != ThreadStatus::NUM_THREAD_TYPES); | |
350 | ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type); | |
7c673cae FG |
351 | #endif |
352 | delete meta; | |
353 | tp->BGThread(thread_id); | |
354 | #ifdef ROCKSDB_USING_THREAD_STATUS | |
355 | ThreadStatusUtil::UnregisterThread(); | |
356 | #endif | |
f67539c2 | 357 | return; |
7c673cae FG |
358 | } |
359 | ||
360 | void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num, | |
1e59de90 | 361 | bool allow_reduce) { |
f67539c2 | 362 | std::lock_guard<std::mutex> lock(mu_); |
7c673cae | 363 | if (exit_all_threads_) { |
7c673cae FG |
364 | return; |
365 | } | |
366 | if (num > total_threads_limit_ || | |
367 | (num < total_threads_limit_ && allow_reduce)) { | |
11fdf7f2 | 368 | total_threads_limit_ = std::max(0, num); |
7c673cae FG |
369 | WakeUpAllThreads(); |
370 | StartBGThreads(); | |
371 | } | |
372 | } | |
373 | ||
11fdf7f2 TL |
374 | int ThreadPoolImpl::Impl::GetBackgroundThreads() { |
375 | std::unique_lock<std::mutex> lock(mu_); | |
376 | return total_threads_limit_; | |
377 | } | |
378 | ||
7c673cae FG |
379 | void ThreadPoolImpl::Impl::StartBGThreads() { |
380 | // Start background thread if necessary | |
381 | while ((int)bgthreads_.size() < total_threads_limit_) { | |
7c673cae | 382 | port::Thread p_t(&BGThreadWrapper, |
1e59de90 | 383 | new BGThreadMetadata(this, bgthreads_.size())); |
7c673cae FG |
384 | |
385 | // Set the thread name to aid debugging | |
386 | #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) | |
387 | #if __GLIBC_PREREQ(2, 12) | |
388 | auto th_handle = p_t.native_handle(); | |
11fdf7f2 TL |
389 | std::string thread_priority = Env::PriorityToString(GetThreadPriority()); |
390 | std::ostringstream thread_name_stream; | |
391 | thread_name_stream << "rocksdb:"; | |
392 | for (char c : thread_priority) { | |
393 | thread_name_stream << static_cast<char>(tolower(c)); | |
394 | } | |
11fdf7f2 | 395 | pthread_setname_np(th_handle, thread_name_stream.str().c_str()); |
7c673cae FG |
396 | #endif |
397 | #endif | |
398 | bgthreads_.push_back(std::move(p_t)); | |
399 | } | |
400 | } | |
401 | ||
402 | void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule, | |
1e59de90 TL |
403 | std::function<void()>&& unschedule, |
404 | void* tag) { | |
7c673cae FG |
405 | std::lock_guard<std::mutex> lock(mu_); |
406 | ||
407 | if (exit_all_threads_) { | |
408 | return; | |
409 | } | |
410 | ||
411 | StartBGThreads(); | |
412 | ||
413 | // Add to priority queue | |
414 | queue_.push_back(BGItem()); | |
1e59de90 | 415 | TEST_SYNC_POINT("ThreadPoolImpl::Submit::Enqueue"); |
7c673cae FG |
416 | auto& item = queue_.back(); |
417 | item.tag = tag; | |
418 | item.function = std::move(schedule); | |
419 | item.unschedFunction = std::move(unschedule); | |
420 | ||
421 | queue_len_.store(static_cast<unsigned int>(queue_.size()), | |
1e59de90 | 422 | std::memory_order_relaxed); |
7c673cae FG |
423 | |
424 | if (!HasExcessiveThread()) { | |
425 | // Wake up at least one waiting thread. | |
426 | bgsignal_.notify_one(); | |
427 | } else { | |
428 | // Need to wake up all threads to make sure the one woken | |
429 | // up is not the one to terminate. | |
430 | WakeUpAllThreads(); | |
431 | } | |
432 | } | |
433 | ||
434 | int ThreadPoolImpl::Impl::UnSchedule(void* arg) { | |
435 | int count = 0; | |
436 | ||
437 | std::vector<std::function<void()>> candidates; | |
438 | { | |
439 | std::lock_guard<std::mutex> lock(mu_); | |
440 | ||
441 | // Remove from priority queue | |
442 | BGQueue::iterator it = queue_.begin(); | |
443 | while (it != queue_.end()) { | |
444 | if (arg == (*it).tag) { | |
445 | if (it->unschedFunction) { | |
446 | candidates.push_back(std::move(it->unschedFunction)); | |
447 | } | |
448 | it = queue_.erase(it); | |
449 | count++; | |
450 | } else { | |
451 | ++it; | |
452 | } | |
453 | } | |
454 | queue_len_.store(static_cast<unsigned int>(queue_.size()), | |
1e59de90 | 455 | std::memory_order_relaxed); |
7c673cae FG |
456 | } |
457 | ||
1e59de90 | 458 | // Run unschedule functions outside the mutex |
7c673cae FG |
459 | for (auto& f : candidates) { |
460 | f(); | |
461 | } | |
462 | ||
463 | return count; | |
464 | } | |
465 | ||
1e59de90 | 466 | ThreadPoolImpl::ThreadPoolImpl() : impl_(new Impl()) {} |
7c673cae | 467 | |
1e59de90 | 468 | ThreadPoolImpl::~ThreadPoolImpl() {} |
7c673cae | 469 | |
1e59de90 | 470 | void ThreadPoolImpl::JoinAllThreads() { impl_->JoinThreads(false); } |
7c673cae FG |
471 | |
472 | void ThreadPoolImpl::SetBackgroundThreads(int num) { | |
473 | impl_->SetBackgroundThreadsInternal(num, true); | |
474 | } | |
475 | ||
11fdf7f2 TL |
476 | int ThreadPoolImpl::GetBackgroundThreads() { |
477 | return impl_->GetBackgroundThreads(); | |
478 | } | |
479 | ||
7c673cae FG |
480 | unsigned int ThreadPoolImpl::GetQueueLen() const { |
481 | return impl_->GetQueueLen(); | |
482 | } | |
483 | ||
484 | void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() { | |
485 | impl_->JoinThreads(true); | |
486 | } | |
487 | ||
1e59de90 | 488 | void ThreadPoolImpl::LowerIOPriority() { impl_->LowerIOPriority(); } |
7c673cae | 489 | |
20effc67 TL |
490 | void ThreadPoolImpl::LowerCPUPriority(CpuPriority pri) { |
491 | impl_->LowerCPUPriority(pri); | |
11fdf7f2 TL |
492 | } |
493 | ||
7c673cae FG |
494 | void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) { |
495 | impl_->SetBackgroundThreadsInternal(num, false); | |
496 | } | |
497 | ||
498 | void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) { | |
499 | auto copy(job); | |
500 | impl_->Submit(std::move(copy), std::function<void()>(), nullptr); | |
501 | } | |
502 | ||
7c673cae FG |
503 | void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) { |
504 | impl_->Submit(std::move(job), std::function<void()>(), nullptr); | |
505 | } | |
506 | ||
1e59de90 TL |
507 | void ThreadPoolImpl::Schedule(void (*function)(void* arg1), void* arg, |
508 | void* tag, void (*unschedFunction)(void* arg)) { | |
11fdf7f2 TL |
509 | if (unschedFunction == nullptr) { |
510 | impl_->Submit(std::bind(function, arg), std::function<void()>(), tag); | |
511 | } else { | |
512 | impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg), | |
513 | tag); | |
7c673cae | 514 | } |
7c673cae FG |
515 | } |
516 | ||
1e59de90 | 517 | int ThreadPoolImpl::UnSchedule(void* arg) { return impl_->UnSchedule(arg); } |
7c673cae FG |
518 | |
519 | void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); } | |
520 | ||
521 | Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); } | |
522 | ||
523 | // Return the thread priority. | |
524 | // This would allow its member-thread to know its priority. | |
525 | Env::Priority ThreadPoolImpl::GetThreadPriority() const { | |
526 | return impl_->GetThreadPriority(); | |
527 | } | |
528 | ||
529 | // Set the thread priority. | |
530 | void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) { | |
531 | impl_->SetThreadPriority(priority); | |
532 | } | |
533 | ||
1e59de90 TL |
534 | // Reserve a specific number of threads, prevent them from running other |
535 | // functions The number of reserved threads could be fewer than the desired one | |
536 | int ThreadPoolImpl::ReserveThreads(int threads_to_be_reserved) { | |
537 | return impl_->ReserveThreads(threads_to_be_reserved); | |
538 | } | |
539 | ||
540 | // Release a specific number of threads | |
541 | int ThreadPoolImpl::ReleaseThreads(int threads_to_be_released) { | |
542 | return impl_->ReleaseThreads(threads_to_be_released); | |
543 | } | |
544 | ||
7c673cae FG |
545 | ThreadPool* NewThreadPool(int num_threads) { |
546 | ThreadPoolImpl* thread_pool = new ThreadPoolImpl(); | |
547 | thread_pool->SetBackgroundThreads(num_threads); | |
548 | return thread_pool; | |
549 | } | |
550 | ||
f67539c2 | 551 | } // namespace ROCKSDB_NAMESPACE |