]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/os/alienstore/thread_pool.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / os / alienstore / thread_pool.cc
index e127d87d524fc9cca0ee290335372a18de52a780..747d6714e3662e4772701f83a7c304bab2f00467 100644 (file)
@@ -1,3 +1,6 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
 #include "thread_pool.h"
 
 #include <chrono>
@@ -12,17 +15,20 @@ namespace crimson::os {
 
 ThreadPool::ThreadPool(size_t n_threads,
                        size_t queue_sz,
-                       long cpu_id)
-  : queue_size{round_up_to(queue_sz, seastar::smp::count)},
-    pending{queue_size}
+                       std::vector<uint64_t> cpus)
+  : n_threads(n_threads),
+    queue_size{round_up_to(queue_sz, seastar::smp::count)},
+    pending_queues(n_threads)
 {
   auto queue_max_wait = std::chrono::seconds(local_conf()->threadpool_empty_queue_max_wait);
   for (size_t i = 0; i < n_threads; i++) {
-    threads.emplace_back([this, cpu_id, queue_max_wait] {
-      if (cpu_id >= 0) {
-        pin(cpu_id);
+    threads.emplace_back([this, cpus, queue_max_wait, i] {
+      if (!cpus.empty()) {
+        pin(cpus);
       }
-      loop(queue_max_wait);
+      block_sighup();
+      (void) pthread_setname_np(pthread_self(), "alien-store-tp");
+      loop(queue_max_wait, i);
     });
   }
 }
@@ -34,27 +40,37 @@ ThreadPool::~ThreadPool()
   }
 }
 
-void ThreadPool::pin(unsigned cpu_id)
+void ThreadPool::pin(const std::vector<uint64_t>& cpus)
 {
   cpu_set_t cs;
   CPU_ZERO(&cs);
-  CPU_SET(cpu_id, &cs);
+  for (auto cpu : cpus) {
+    CPU_SET(cpu, &cs);
+  }
   [[maybe_unused]] auto r = pthread_setaffinity_np(pthread_self(),
                                                    sizeof(cs), &cs);
   ceph_assert(r == 0);
 }
 
-void ThreadPool::loop(std::chrono::milliseconds queue_max_wait)
+void ThreadPool::block_sighup()
 {
+  sigset_t sigs;
+  sigemptyset(&sigs);
+  // alien threads must ignore the SIGHUP. It's necessary as in
+  // `crimson/osd/main.cc` we set a handler using the Seastar's
+  // signal handling infrastrucute which assumes the `_backend`
+  // of `seastar::engine()` is not null. Grep `reactor.cc` for
+  // `sigaction` or just visit `reactor::signals::handle_signal()`.
+  sigaddset(&sigs, SIGHUP);
+  pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
+}
+
+void ThreadPool::loop(std::chrono::milliseconds queue_max_wait, size_t shard)
+{
+  auto& pending = pending_queues[shard];
   for (;;) {
     WorkItem* work_item = nullptr;
-    {
-      std::unique_lock lock{mutex};
-      cond.wait_for(lock, queue_max_wait,
-                    [this, &work_item] {
-        return pending.pop(work_item) || is_stopping();
-      });
-    }
+    work_item = pending.pop_front(queue_max_wait);
     if (work_item) {
       work_item->process();
     } else if (is_stopping()) {
@@ -73,7 +89,9 @@ seastar::future<> ThreadPool::stop()
 {
   return submit_queue.stop().then([this] {
     stopping = true;
-    cond.notify_all();
+    for (auto& q : pending_queues) {
+      q.stop();
+    }
   });
 }