]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/Stack.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / msg / async / Stack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSky <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <mutex>
18
19 #include "include/compat.h"
20 #include "common/Cond.h"
21 #include "common/errno.h"
22 #include "PosixStack.h"
23 #ifdef HAVE_RDMA
24 #include "rdma/RDMAStack.h"
25 #endif
26 #ifdef HAVE_DPDK
27 #include "dpdk/DPDKStack.h"
28 #endif
29
30 #include "common/dout.h"
31 #include "include/ceph_assert.h"
32
33 #define dout_subsys ceph_subsys_ms
34 #undef dout_prefix
35 #define dout_prefix *_dout << "stack "
36
37 std::function<void ()> NetworkStack::add_thread(unsigned worker_id)
38 {
39 Worker *w = workers[worker_id];
40 return [this, w]() {
41 char tp_name[16];
42 sprintf(tp_name, "msgr-worker-%u", w->id);
43 ceph_pthread_setname(pthread_self(), tp_name);
44 const unsigned EventMaxWaitUs = 30000000;
45 w->center.set_owner();
46 ldout(cct, 10) << __func__ << " starting" << dendl;
47 w->initialize();
48 w->init_done();
49 while (!w->done) {
50 ldout(cct, 30) << __func__ << " calling event process" << dendl;
51
52 ceph::timespan dur;
53 int r = w->center.process_events(EventMaxWaitUs, &dur);
54 if (r < 0) {
55 ldout(cct, 20) << __func__ << " process events failed: "
56 << cpp_strerror(errno) << dendl;
57 // TODO do something?
58 }
59 w->perf_logger->tinc(l_msgr_running_total_time, dur);
60 }
61 w->reset();
62 w->destroy();
63 };
64 }
65
66 std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string &t)
67 {
68 if (t == "posix")
69 return std::make_shared<PosixNetworkStack>(c, t);
70 #ifdef HAVE_RDMA
71 else if (t == "rdma")
72 return std::make_shared<RDMAStack>(c, t);
73 #endif
74 #ifdef HAVE_DPDK
75 else if (t == "dpdk")
76 return std::make_shared<DPDKStack>(c, t);
77 #endif
78
79 lderr(c) << __func__ << " ms_async_transport_type " << t <<
80 " is not supported! " << dendl;
81 ceph_abort();
82 return nullptr;
83 }
84
85 Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned worker_id)
86 {
87 if (type == "posix")
88 return new PosixWorker(c, worker_id);
89 #ifdef HAVE_RDMA
90 else if (type == "rdma")
91 return new RDMAWorker(c, worker_id);
92 #endif
93 #ifdef HAVE_DPDK
94 else if (type == "dpdk")
95 return new DPDKWorker(c, worker_id);
96 #endif
97
98 lderr(c) << __func__ << " ms_async_transport_type " << type <<
99 " is not supported! " << dendl;
100 ceph_abort();
101 return nullptr;
102 }
103
104 NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
105 {
106 ceph_assert(cct->_conf->ms_async_op_threads > 0);
107
108 const int InitEventNumber = 5000;
109 num_workers = cct->_conf->ms_async_op_threads;
110 if (num_workers >= EventCenter::MAX_EVENTCENTER) {
111 ldout(cct, 0) << __func__ << " max thread limit is "
112 << EventCenter::MAX_EVENTCENTER << ", switching to this now. "
113 << "Higher thread values are unnecessary and currently unsupported."
114 << dendl;
115 num_workers = EventCenter::MAX_EVENTCENTER;
116 }
117
118 for (unsigned worker_id = 0; worker_id < num_workers; ++worker_id) {
119 Worker *w = create_worker(cct, type, worker_id);
120 w->center.init(InitEventNumber, worker_id, type);
121 workers.push_back(w);
122 }
123 }
124
125 void NetworkStack::start()
126 {
127 std::unique_lock<decltype(pool_spin)> lk(pool_spin);
128
129 if (started) {
130 return ;
131 }
132
133 for (unsigned i = 0; i < num_workers; ++i) {
134 if (workers[i]->is_init())
135 continue;
136 std::function<void ()> thread = add_thread(i);
137 spawn_worker(i, std::move(thread));
138 }
139 started = true;
140 lk.unlock();
141
142 for (unsigned i = 0; i < num_workers; ++i)
143 workers[i]->wait_for_init();
144 }
145
146 Worker* NetworkStack::get_worker()
147 {
148 ldout(cct, 30) << __func__ << dendl;
149
150 // start with some reasonably large number
151 unsigned min_load = std::numeric_limits<int>::max();
152 Worker* current_best = nullptr;
153
154 pool_spin.lock();
155 // find worker with least references
156 // tempting case is returning on references == 0, but in reality
157 // this will happen so rarely that there's no need for special case.
158 for (unsigned i = 0; i < num_workers; ++i) {
159 unsigned worker_load = workers[i]->references.load();
160 if (worker_load < min_load) {
161 current_best = workers[i];
162 min_load = worker_load;
163 }
164 }
165
166 pool_spin.unlock();
167 ceph_assert(current_best);
168 ++current_best->references;
169 return current_best;
170 }
171
172 void NetworkStack::stop()
173 {
174 std::lock_guard lk(pool_spin);
175 for (unsigned i = 0; i < num_workers; ++i) {
176 workers[i]->done = true;
177 workers[i]->center.wakeup();
178 join_worker(i);
179 }
180 started = false;
181 }
182
183 class C_drain : public EventCallback {
184 ceph::mutex drain_lock = ceph::make_mutex("C_drain::drain_lock");
185 ceph::condition_variable drain_cond;
186 unsigned drain_count;
187
188 public:
189 explicit C_drain(size_t c)
190 : drain_count(c) {}
191 void do_request(uint64_t id) override {
192 std::lock_guard l{drain_lock};
193 drain_count--;
194 if (drain_count == 0) drain_cond.notify_all();
195 }
196 void wait() {
197 std::unique_lock l{drain_lock};
198 drain_cond.wait(l, [this] { return drain_count == 0; });
199 }
200 };
201
202 void NetworkStack::drain()
203 {
204 ldout(cct, 30) << __func__ << " started." << dendl;
205 pthread_t cur = pthread_self();
206 pool_spin.lock();
207 C_drain drain(num_workers);
208 for (unsigned i = 0; i < num_workers; ++i) {
209 ceph_assert(cur != workers[i]->center.get_owner());
210 workers[i]->center.dispatch_event_external(EventCallbackRef(&drain));
211 }
212 pool_spin.unlock();
213 drain.wait();
214 ldout(cct, 30) << __func__ << " end." << dendl;
215 }