1 #include "thread_pool.h"
6 #include "include/ceph_assert.h"
7 #include "crimson/common/config_proxy.h"
9 using crimson::common::local_conf
;
11 namespace crimson::os
{
13 ThreadPool::ThreadPool(size_t n_threads
,
16 : queue_size
{round_up_to(queue_sz
, seastar::smp::count
)},
19 auto queue_max_wait
= std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait
);
20 for (size_t i
= 0; i
< n_threads
; i
++) {
21 threads
.emplace_back([this, cpu_id
, queue_max_wait
] {
30 ThreadPool::~ThreadPool()
32 for (auto& thread
: threads
) {
37 void ThreadPool::pin(unsigned cpu_id
)
42 [[maybe_unused
]] auto r
= pthread_setaffinity_np(pthread_self(),
47 void ThreadPool::loop(std::chrono::milliseconds queue_max_wait
)
50 WorkItem
* work_item
= nullptr;
52 std::unique_lock lock
{mutex
};
53 cond
.wait_for(lock
, queue_max_wait
,
55 return pending
.pop(work_item
) || is_stopping();
60 } else if (is_stopping()) {
66 seastar::future
<> ThreadPool::start()
68 auto slots_per_shard
= queue_size
/ seastar::smp::count
;
69 return submit_queue
.start(slots_per_shard
);
72 seastar::future
<> ThreadPool::stop()
74 return submit_queue
.stop().then([this] {
80 } // namespace crimson::os