1 #include "ThreadPool.h"
4 #include "crimson/net/Config.h"
5 #include "include/intarith.h"
7 #include "include/ceph_assert.h"
9 namespace crimson::thread
{
11 ThreadPool::ThreadPool(size_t n_threads
,
14 : queue_size
{round_up_to(queue_sz
, seastar::smp::count
)},
17 for (size_t i
= 0; i
< n_threads
; i
++) {
18 threads
.emplace_back([this, cpu_id
] {
25 ThreadPool::~ThreadPool()
27 for (auto& thread
: threads
) {
32 void ThreadPool::pin(unsigned cpu_id
)
37 [[maybe_unused
]] auto r
= pthread_setaffinity_np(pthread_self(),
42 void ThreadPool::loop()
45 WorkItem
* work_item
= nullptr;
47 std::unique_lock lock
{mutex
};
49 crimson::net::conf
.threadpool_empty_queue_max_wait
,
51 return pending
.pop(work_item
) || is_stopping();
56 } else if (is_stopping()) {
62 seastar::future
<> ThreadPool::start()
64 auto slots_per_shard
= queue_size
/ seastar::smp::count
;
65 return submit_queue
.start(slots_per_shard
);
68 seastar::future
<> ThreadPool::stop()
70 return submit_queue
.stop().then([this] {
76 } // namespace crimson::thread