]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/Stack.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / msg / async / Stack.cc
index 37e15634d79c02f9724dad7dcf22a2914b540af0..94a1bba53637009a284fc508aa9f7d206a54fdf0 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "stack "
 
-std::function<void ()> NetworkStack::add_thread(unsigned worker_id)
+std::function<void ()> NetworkStack::add_thread(Worker* w)
 {
-  Worker *w = workers[worker_id];
   return [this, w]() {
-      char tp_name[16];
-      sprintf(tp_name, "msgr-worker-%u", w->id);
-      ceph_pthread_setname(pthread_self(), tp_name);
+      rename_thread(w->id);
       const unsigned EventMaxWaitUs = 30000000;
       w->center.set_owner();
       ldout(cct, 10) << __func__ << " starting" << dendl;
@@ -86,8 +83,17 @@ std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c,
     return nullptr;
   }
   
+  unsigned num_workers = c->_conf->ms_async_op_threads;
+  ceph_assert(num_workers > 0);
+  if (num_workers >= EventCenter::MAX_EVENTCENTER) {
+    ldout(c, 0) << __func__ << " max thread limit is "
+                  << EventCenter::MAX_EVENTCENTER << ", switching to this now. "
+                  << "Higher thread values are unnecessary and currently unsupported."
+                  << dendl;
+    num_workers = EventCenter::MAX_EVENTCENTER;
+  }
   const int InitEventNumber = 5000;
-  for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) {
+  for (unsigned worker_id = 0; worker_id < num_workers; ++worker_id) {
     Worker *w = stack->create_worker(c, worker_id);
     int ret = w->center.init(InitEventNumber, worker_id, t);
     if (ret)
@@ -100,18 +106,7 @@ std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c,
 
 NetworkStack::NetworkStack(CephContext *c)
   : cct(c)
-{
-  ceph_assert(cct->_conf->ms_async_op_threads > 0);
-
-  num_workers = cct->_conf->ms_async_op_threads;
-  if (num_workers >= EventCenter::MAX_EVENTCENTER) {
-    ldout(cct, 0) << __func__ << " max thread limit is "
-                  << EventCenter::MAX_EVENTCENTER << ", switching to this now. "
-                  << "Higher thread values are unnecessary and currently unsupported."
-                  << dendl;
-    num_workers = EventCenter::MAX_EVENTCENTER;
-  }
-}
+{}
 
 void NetworkStack::start()
 {
@@ -121,17 +116,17 @@ void NetworkStack::start()
     return ;
   }
 
-  for (unsigned i = 0; i < num_workers; ++i) {
-    if (workers[i]->is_init())
+  for (Worker* worker : workers) {
+    if (worker->is_init())
       continue;
-    std::function<void ()> thread = add_thread(i);
-    spawn_worker(i, std::move(thread));
+    spawn_worker(add_thread(worker));
   }
   started = true;
   lk.unlock();
 
-  for (unsigned i = 0; i < num_workers; ++i)
-    workers[i]->wait_for_init();
+  for (Worker* worker : workers) {
+    worker->wait_for_init();
+  }
 }
 
 Worker* NetworkStack::get_worker()
@@ -146,10 +141,10 @@ Worker* NetworkStack::get_worker()
   // find worker with least references
   // tempting case is returning on references == 0, but in reality
   // this will happen so rarely that there's no need for special case.
-  for (unsigned i = 0; i < num_workers; ++i) {
-    unsigned worker_load = workers[i]->references.load();
+  for (Worker* worker : workers) {
+    unsigned worker_load = worker->references.load();
     if (worker_load < min_load) {
-      current_best = workers[i];
+      current_best = worker;
       min_load = worker_load;
     }
   }
@@ -163,10 +158,11 @@ Worker* NetworkStack::get_worker()
 void NetworkStack::stop()
 {
   std::lock_guard lk(pool_spin);
-  for (unsigned i = 0; i < num_workers; ++i) {
-    workers[i]->done = true;
-    workers[i]->center.wakeup();
-    join_worker(i);
+  unsigned i = 0;
+  for (Worker* worker : workers) {
+    worker->done = true;
+    worker->center.wakeup();
+    join_worker(i++);
   }
   started = false;
 }
@@ -195,10 +191,10 @@ void NetworkStack::drain()
   ldout(cct, 30) << __func__ << " started." << dendl;
   pthread_t cur = pthread_self();
   pool_spin.lock();
-  C_drain drain(num_workers);
-  for (unsigned i = 0; i < num_workers; ++i) {
-    ceph_assert(cur != workers[i]->center.get_owner());
-    workers[i]->center.dispatch_event_external(EventCallbackRef(&drain));
+  C_drain drain(get_num_worker());
+  for (Worker* worker : workers) {
+    ceph_assert(cur != worker->center.get_owner());
+    worker->center.dispatch_event_external(EventCallbackRef(&drain));
   }
   pool_spin.unlock();
   drain.wait();