#include "util/threadpool_imp.h"
-#include "monitoring/thread_status_util.h"
-#include "port/port.h"
-
#ifndef OS_WIN
# include <unistd.h>
#endif
#include <thread>
#include <vector>
+#include "monitoring/thread_status_util.h"
+#include "port/port.h"
+#include "test_util/sync_point.h"
+
namespace ROCKSDB_NAMESPACE {
void ThreadPoolImpl::PthreadCall(const char* label, int result) {
void LowerIOPriority();
- void LowerCPUPriority();
+ void LowerCPUPriority(CpuPriority pri);
void WakeUpAllThreads() {
bgsignal_.notify_all();
static void BGThreadWrapper(void* arg);
bool low_io_priority_;
- bool low_cpu_priority_;
+ CpuPriority cpu_priority_;
Env::Priority priority_;
Env* env_;
std::vector<port::Thread> bgthreads_;
};
-
-inline
-ThreadPoolImpl::Impl::Impl()
- :
- low_io_priority_(false),
- low_cpu_priority_(false),
+inline ThreadPoolImpl::Impl::Impl()
+ : low_io_priority_(false),
+ cpu_priority_(CpuPriority::kNormal),
priority_(Env::LOW),
env_(nullptr),
total_threads_limit_(0),
queue_(),
mu_(),
bgsignal_(),
- bgthreads_() {
-}
+ bgthreads_() {}
inline
ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); }
low_io_priority_ = true;
}
-inline
-void ThreadPoolImpl::Impl::LowerCPUPriority() {
+inline void ThreadPoolImpl::Impl::LowerCPUPriority(CpuPriority pri) {
std::lock_guard<std::mutex> lock(mu_);
- low_cpu_priority_ = true;
+ cpu_priority_ = pri;
}
void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
bool low_io_priority = false;
- bool low_cpu_priority = false;
+ CpuPriority current_cpu_priority = CpuPriority::kNormal;
while (true) {
// Wait until there is an item that is ready to run
std::memory_order_relaxed);
bool decrease_io_priority = (low_io_priority != low_io_priority_);
- bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);
+ CpuPriority cpu_priority = cpu_priority_;
lock.unlock();
-#ifdef OS_LINUX
- if (decrease_cpu_priority) {
- setpriority(
- PRIO_PROCESS,
- // Current thread.
- 0,
- // Lowest priority possible.
- 19);
- low_cpu_priority = true;
+ if (cpu_priority < current_cpu_priority) {
+ TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::BeforeSetCpuPriority",
+ ¤t_cpu_priority);
+ // 0 means current thread.
+ port::SetCpuPriority(0, cpu_priority);
+ current_cpu_priority = cpu_priority;
+ TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::AfterSetCpuPriority",
+ ¤t_cpu_priority);
}
+#ifdef OS_LINUX
if (decrease_io_priority) {
#define IOPRIO_CLASS_SHIFT (13)
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
}
#else
(void)decrease_io_priority; // avoid 'unused variable' error
- (void)decrease_cpu_priority;
#endif
+
+ TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::Impl::BGThread:BeforeRun",
+ &priority_);
+
func();
}
}
impl_->LowerIOPriority();
}
-void ThreadPoolImpl::LowerCPUPriority() {
- impl_->LowerCPUPriority();
+void ThreadPoolImpl::LowerCPUPriority(CpuPriority pri) {
+ impl_->LowerCPUPriority(pri);
}
void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {