#undef dout_prefix
#define dout_prefix *_dout << "stack "
-std::function<void ()> NetworkStack::add_thread(unsigned worker_id)
+std::function<void ()> NetworkStack::add_thread(Worker* w)
{
- Worker *w = workers[worker_id];
return [this, w]() {
- char tp_name[16];
- sprintf(tp_name, "msgr-worker-%u", w->id);
- ceph_pthread_setname(pthread_self(), tp_name);
+ rename_thread(w->id);
const unsigned EventMaxWaitUs = 30000000;
w->center.set_owner();
ldout(cct, 10) << __func__ << " starting" << dendl;
return nullptr;
}
+ unsigned num_workers = c->_conf->ms_async_op_threads;
+ ceph_assert(num_workers > 0);
+ if (num_workers >= EventCenter::MAX_EVENTCENTER) {
+ ldout(c, 0) << __func__ << " max thread limit is "
+ << EventCenter::MAX_EVENTCENTER << ", switching to this now. "
+ << "Higher thread values are unnecessary and currently unsupported."
+ << dendl;
+ num_workers = EventCenter::MAX_EVENTCENTER;
+ }
const int InitEventNumber = 5000;
- for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) {
+ for (unsigned worker_id = 0; worker_id < num_workers; ++worker_id) {
Worker *w = stack->create_worker(c, worker_id);
int ret = w->center.init(InitEventNumber, worker_id, t);
if (ret)
NetworkStack::NetworkStack(CephContext *c)
: cct(c)
-{
- ceph_assert(cct->_conf->ms_async_op_threads > 0);
-
- num_workers = cct->_conf->ms_async_op_threads;
- if (num_workers >= EventCenter::MAX_EVENTCENTER) {
- ldout(cct, 0) << __func__ << " max thread limit is "
- << EventCenter::MAX_EVENTCENTER << ", switching to this now. "
- << "Higher thread values are unnecessary and currently unsupported."
- << dendl;
- num_workers = EventCenter::MAX_EVENTCENTER;
- }
-}
+{}
void NetworkStack::start()
{
return ;
}
- for (unsigned i = 0; i < num_workers; ++i) {
- if (workers[i]->is_init())
+ for (Worker* worker : workers) {
+ if (worker->is_init())
continue;
- std::function<void ()> thread = add_thread(i);
- spawn_worker(i, std::move(thread));
+ spawn_worker(add_thread(worker));
}
started = true;
lk.unlock();
- for (unsigned i = 0; i < num_workers; ++i)
- workers[i]->wait_for_init();
+ for (Worker* worker : workers) {
+ worker->wait_for_init();
+ }
}
Worker* NetworkStack::get_worker()
// find worker with least references
// tempting case is returning on references == 0, but in reality
// this will happen so rarely that there's no need for special case.
- for (unsigned i = 0; i < num_workers; ++i) {
- unsigned worker_load = workers[i]->references.load();
+ for (Worker* worker : workers) {
+ unsigned worker_load = worker->references.load();
if (worker_load < min_load) {
- current_best = workers[i];
+ current_best = worker;
min_load = worker_load;
}
}
void NetworkStack::stop()
{
std::lock_guard lk(pool_spin);
- for (unsigned i = 0; i < num_workers; ++i) {
- workers[i]->done = true;
- workers[i]->center.wakeup();
- join_worker(i);
+ unsigned i = 0;
+ for (Worker* worker : workers) {
+ worker->done = true;
+ worker->center.wakeup();
+ join_worker(i++);
}
started = false;
}
ldout(cct, 30) << __func__ << " started." << dendl;
pthread_t cur = pthread_self();
pool_spin.lock();
- C_drain drain(num_workers);
- for (unsigned i = 0; i < num_workers; ++i) {
- ceph_assert(cur != workers[i]->center.get_owner());
- workers[i]->center.dispatch_event_external(EventCallbackRef(&drain));
+ C_drain drain(get_num_worker());
+ for (Worker* worker : workers) {
+ ceph_assert(cur != worker->center.get_owner());
+ worker->center.dispatch_event_external(EventCallbackRef(&drain));
}
pool_spin.unlock();
drain.wait();