#include <algorithm>
#include <atomic>
#include <condition_variable>
+#include <deque>
#include <mutex>
#include <sstream>
#include <thread>
#include <vector>
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
void ThreadPoolImpl::PthreadCall(const char* label, int result) {
if (result != 0) {
void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
private:
-
- static void* BGThreadWrapper(void* arg);
-
- bool low_io_priority_;
- bool low_cpu_priority_;
- Env::Priority priority_;
- Env* env_;
-
- int total_threads_limit_;
- std::atomic_uint queue_len_; // Queue length. Used for stats reporting
- bool exit_all_threads_;
- bool wait_for_jobs_to_complete_;
-
- // Entry per Schedule()/Submit() call
- struct BGItem {
- void* tag = nullptr;
- std::function<void()> function;
- std::function<void()> unschedFunction;
+ static void BGThreadWrapper(void* arg);
+
+ bool low_io_priority_;
+ bool low_cpu_priority_;
+ Env::Priority priority_;
+ Env* env_;
+
+ int total_threads_limit_;
+ std::atomic_uint queue_len_; // Queue length. Used for stats reporting
+ bool exit_all_threads_;
+ bool wait_for_jobs_to_complete_;
+
+ // Entry per Schedule()/Submit() call
+ struct BGItem {
+ void* tag = nullptr;
+ std::function<void()> function;
+ std::function<void()> unschedFunction;
};
using BGQueue = std::deque<BGItem>;
: thread_pool_(thread_pool), thread_id_(thread_id) {}
};
-void* ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
+void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
size_t thread_id = meta->thread_id_;
ThreadPoolImpl::Impl* tp = meta->thread_pool_;
break;
case Env::Priority::TOTAL:
assert(false);
- return nullptr;
+ return;
}
assert(thread_type != ThreadStatus::NUM_THREAD_TYPES);
ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type);
#ifdef ROCKSDB_USING_THREAD_STATUS
ThreadStatusUtil::UnregisterThread();
#endif
- return nullptr;
+ return;
}
void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
bool allow_reduce) {
- std::unique_lock<std::mutex> lock(mu_);
+ std::lock_guard<std::mutex> lock(mu_);
if (exit_all_threads_) {
- lock.unlock();
return;
}
if (num > total_threads_limit_ ||
return thread_pool;
}
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE