]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/alienstore/thread_pool.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / os / alienstore / thread_pool.cc
1 #include "thread_pool.h"
2
3 #include <chrono>
4 #include <pthread.h>
5
6 #include "include/ceph_assert.h"
7 #include "crimson/common/config_proxy.h"
8
9 using crimson::common::local_conf;
10
11 namespace crimson::os {
12
13 ThreadPool::ThreadPool(size_t n_threads,
14 size_t queue_sz,
15 long cpu_id)
16 : queue_size{round_up_to(queue_sz, seastar::smp::count)},
17 pending{queue_size}
18 {
19 auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait);
20 for (size_t i = 0; i < n_threads; i++) {
21 threads.emplace_back([this, cpu_id, queue_max_wait] {
22 if (cpu_id >= 0) {
23 pin(cpu_id);
24 }
25 loop(queue_max_wait);
26 });
27 }
28 }
29
30 ThreadPool::~ThreadPool()
31 {
32 for (auto& thread : threads) {
33 thread.join();
34 }
35 }
36
37 void ThreadPool::pin(unsigned cpu_id)
38 {
39 cpu_set_t cs;
40 CPU_ZERO(&cs);
41 CPU_SET(cpu_id, &cs);
42 [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(),
43 sizeof(cs), &cs);
44 ceph_assert(r == 0);
45 }
46
47 void ThreadPool::loop(std::chrono::milliseconds queue_max_wait)
48 {
49 for (;;) {
50 WorkItem* work_item = nullptr;
51 {
52 std::unique_lock lock{mutex};
53 cond.wait_for(lock, queue_max_wait,
54 [this, &work_item] {
55 return pending.pop(work_item) || is_stopping();
56 });
57 }
58 if (work_item) {
59 work_item->process();
60 } else if (is_stopping()) {
61 break;
62 }
63 }
64 }
65
66 seastar::future<> ThreadPool::start()
67 {
68 auto slots_per_shard = queue_size / seastar::smp::count;
69 return submit_queue.start(slots_per_shard);
70 }
71
72 seastar::future<> ThreadPool::stop()
73 {
74 return submit_queue.stop().then([this] {
75 stopping = true;
76 cond.notify_all();
77 });
78 }
79
80 } // namespace crimson::os