+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
#include "thread_pool.h"
#include <chrono>
ThreadPool::ThreadPool(size_t n_threads,
size_t queue_sz,
- long cpu_id)
- : queue_size{round_up_to(queue_sz, seastar::smp::count)},
- pending{queue_size}
+ std::vector<uint64_t> cpus)
+ : n_threads(n_threads),
+ queue_size{round_up_to(queue_sz, seastar::smp::count)},
+ pending_queues(n_threads)
{
auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait);
for (size_t i = 0; i < n_threads; i++) {
- threads.emplace_back([this, cpu_id, queue_max_wait] {
- if (cpu_id >= 0) {
- pin(cpu_id);
+ threads.emplace_back([this, cpus, queue_max_wait, i] {
+ if (!cpus.empty()) {
+ pin(cpus);
}
- loop(queue_max_wait);
+ block_sighup();
+ (void) pthread_setname_np(pthread_self(), "alien-store-tp");
+ loop(queue_max_wait, i);
});
}
}
}
}
-void ThreadPool::pin(unsigned cpu_id)
+void ThreadPool::pin(const std::vector<uint64_t>& cpus)
{
cpu_set_t cs;
CPU_ZERO(&cs);
- CPU_SET(cpu_id, &cs);
+ for (auto cpu : cpus) {
+ CPU_SET(cpu, &cs);
+ }
[[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(),
sizeof(cs), &cs);
ceph_assert(r == 0);
}
-void ThreadPool::loop(std::chrono::milliseconds queue_max_wait)
+void ThreadPool::block_sighup()
{
+ sigset_t sigs;
+ sigemptyset(&sigs);
+ // alien threads must ignore the SIGHUP. It's necessary as in
+ // `crimson/osd/main.cc` we set a handler using the Seastar's
+ // signal handling infrastrucute which assumes the `_backend`
+ // of `seastar::engine()` is not null. Grep `reactor.cc` for
+ // `sigaction` or just visit `reactor::signals::handle_signal()`.
+ sigaddset(&sigs, SIGHUP);
+ pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
+}
+
+void ThreadPool::loop(std::chrono::milliseconds queue_max_wait, size_t shard)
+{
+ auto& pending = pending_queues[shard];
for (;;) {
WorkItem* work_item = nullptr;
- {
- std::unique_lock lock{mutex};
- cond.wait_for(lock, queue_max_wait,
- [this, &work_item] {
- return pending.pop(work_item) || is_stopping();
- });
- }
+ work_item = pending.pop_front(queue_max_wait);
if (work_item) {
work_item->process();
} else if (is_stopping()) {
{
return submit_queue.stop().then([this] {
stopping = true;
- cond.notify_all();
+ for (auto& q : pending_queues) {
+ q.stop();
+ }
});
}