]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/Stack.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / msg / async / Stack.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSky <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
11fdf7f2
TL
17#include <mutex>
18
7c673cae
FG
19#include "include/compat.h"
20#include "common/Cond.h"
21#include "common/errno.h"
22#include "PosixStack.h"
23#ifdef HAVE_RDMA
24#include "rdma/RDMAStack.h"
25#endif
26#ifdef HAVE_DPDK
27#include "dpdk/DPDKStack.h"
28#endif
29
30#include "common/dout.h"
11fdf7f2 31#include "include/ceph_assert.h"
7c673cae
FG
32
33#define dout_subsys ceph_subsys_ms
34#undef dout_prefix
35#define dout_prefix *_dout << "stack "
36
20effc67 37std::function<void ()> NetworkStack::add_thread(Worker* w)
7c673cae 38{
7c673cae 39 return [this, w]() {
20effc67 40 rename_thread(w->id);
11fdf7f2 41 const unsigned EventMaxWaitUs = 30000000;
7c673cae
FG
42 w->center.set_owner();
43 ldout(cct, 10) << __func__ << " starting" << dendl;
44 w->initialize();
45 w->init_done();
46 while (!w->done) {
47 ldout(cct, 30) << __func__ << " calling event process" << dendl;
48
31f18b77
FG
49 ceph::timespan dur;
50 int r = w->center.process_events(EventMaxWaitUs, &dur);
7c673cae
FG
51 if (r < 0) {
52 ldout(cct, 20) << __func__ << " process events failed: "
53 << cpp_strerror(errno) << dendl;
54 // TODO do something?
55 }
31f18b77 56 w->perf_logger->tinc(l_msgr_running_total_time, dur);
7c673cae
FG
57 }
58 w->reset();
59 w->destroy();
60 };
61}
62
f67539c2
TL
63std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c,
64 const std::string &t)
7c673cae 65{
f67539c2
TL
66 std::shared_ptr<NetworkStack> stack = nullptr;
67
7c673cae 68 if (t == "posix")
f67539c2 69 stack.reset(new PosixNetworkStack(c));
7c673cae
FG
70#ifdef HAVE_RDMA
71 else if (t == "rdma")
f67539c2 72 stack.reset(new RDMAStack(c));
7c673cae
FG
73#endif
74#ifdef HAVE_DPDK
75 else if (t == "dpdk")
f67539c2 76 stack.reset(new DPDKStack(c));
7c673cae
FG
77#endif
78
f67539c2
TL
79 if (stack == nullptr) {
80 lderr(c) << __func__ << " ms_async_transport_type " << t <<
7c673cae 81 " is not supported! " << dendl;
f67539c2
TL
82 ceph_abort();
83 return nullptr;
84 }
85
20effc67
TL
86 unsigned num_workers = c->_conf->ms_async_op_threads;
87 ceph_assert(num_workers > 0);
88 if (num_workers >= EventCenter::MAX_EVENTCENTER) {
89 ldout(c, 0) << __func__ << " max thread limit is "
90 << EventCenter::MAX_EVENTCENTER << ", switching to this now. "
91 << "Higher thread values are unnecessary and currently unsupported."
92 << dendl;
93 num_workers = EventCenter::MAX_EVENTCENTER;
94 }
f67539c2 95 const int InitEventNumber = 5000;
20effc67 96 for (unsigned worker_id = 0; worker_id < num_workers; ++worker_id) {
f67539c2
TL
97 Worker *w = stack->create_worker(c, worker_id);
98 int ret = w->center.init(InitEventNumber, worker_id, t);
99 if (ret)
100 throw std::system_error(-ret, std::generic_category());
101 stack->workers.push_back(w);
102 }
7c673cae 103
f67539c2 104 return stack;
7c673cae
FG
105}
106
f67539c2
TL
107NetworkStack::NetworkStack(CephContext *c)
108 : cct(c)
20effc67 109{}
7c673cae
FG
110
111void NetworkStack::start()
112{
11fdf7f2
TL
113 std::unique_lock<decltype(pool_spin)> lk(pool_spin);
114
7c673cae 115 if (started) {
7c673cae
FG
116 return ;
117 }
118
20effc67
TL
119 for (Worker* worker : workers) {
120 if (worker->is_init())
7c673cae 121 continue;
20effc67 122 spawn_worker(add_thread(worker));
7c673cae
FG
123 }
124 started = true;
11fdf7f2 125 lk.unlock();
7c673cae 126
20effc67
TL
127 for (Worker* worker : workers) {
128 worker->wait_for_init();
129 }
7c673cae
FG
130}
131
132Worker* NetworkStack::get_worker()
133{
134 ldout(cct, 30) << __func__ << dendl;
135
136 // start with some reasonably large number
137 unsigned min_load = std::numeric_limits<int>::max();
138 Worker* current_best = nullptr;
139
140 pool_spin.lock();
141 // find worker with least references
142 // tempting case is returning on references == 0, but in reality
143 // this will happen so rarely that there's no need for special case.
20effc67
TL
144 for (Worker* worker : workers) {
145 unsigned worker_load = worker->references.load();
7c673cae 146 if (worker_load < min_load) {
20effc67 147 current_best = worker;
7c673cae
FG
148 min_load = worker_load;
149 }
150 }
151
152 pool_spin.unlock();
11fdf7f2 153 ceph_assert(current_best);
7c673cae
FG
154 ++current_best->references;
155 return current_best;
156}
157
158void NetworkStack::stop()
159{
9f95a23c 160 std::lock_guard lk(pool_spin);
20effc67
TL
161 unsigned i = 0;
162 for (Worker* worker : workers) {
163 worker->done = true;
164 worker->center.wakeup();
165 join_worker(i++);
7c673cae
FG
166 }
167 started = false;
168}
169
170class C_drain : public EventCallback {
9f95a23c
TL
171 ceph::mutex drain_lock = ceph::make_mutex("C_drain::drain_lock");
172 ceph::condition_variable drain_cond;
7c673cae
FG
173 unsigned drain_count;
174
175 public:
176 explicit C_drain(size_t c)
9f95a23c 177 : drain_count(c) {}
11fdf7f2 178 void do_request(uint64_t id) override {
9f95a23c 179 std::lock_guard l{drain_lock};
7c673cae 180 drain_count--;
9f95a23c 181 if (drain_count == 0) drain_cond.notify_all();
7c673cae
FG
182 }
183 void wait() {
9f95a23c
TL
184 std::unique_lock l{drain_lock};
185 drain_cond.wait(l, [this] { return drain_count == 0; });
7c673cae
FG
186 }
187};
188
189void NetworkStack::drain()
190{
191 ldout(cct, 30) << __func__ << " started." << dendl;
192 pthread_t cur = pthread_self();
193 pool_spin.lock();
20effc67
TL
194 C_drain drain(get_num_worker());
195 for (Worker* worker : workers) {
196 ceph_assert(cur != worker->center.get_owner());
197 worker->center.dispatch_event_external(EventCallbackRef(&drain));
7c673cae
FG
198 }
199 pool_spin.unlock();
200 drain.wait();
201 ldout(cct, 30) << __func__ << " end." << dendl;
202}