]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/Stack.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / msg / async / Stack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSky <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include "include/compat.h"
18 #include "common/Cond.h"
19 #include "common/errno.h"
20 #include "PosixStack.h"
21 #ifdef HAVE_RDMA
22 #include "rdma/RDMAStack.h"
23 #endif
24 #ifdef HAVE_DPDK
25 #include "dpdk/DPDKStack.h"
26 #endif
27
28 #include "common/dout.h"
29 #include "include/assert.h"
30
31 #define dout_subsys ceph_subsys_ms
32 #undef dout_prefix
33 #define dout_prefix *_dout << "stack "
34
35 std::function<void ()> NetworkStack::add_thread(unsigned i)
36 {
37 Worker *w = workers[i];
38 return [this, w]() {
39 char tp_name[16];
40 sprintf(tp_name, "msgr-worker-%d", w->id);
41 ceph_pthread_setname(pthread_self(), tp_name);
42 const uint64_t EventMaxWaitUs = 30000000;
43 w->center.set_owner();
44 ldout(cct, 10) << __func__ << " starting" << dendl;
45 w->initialize();
46 w->init_done();
47 while (!w->done) {
48 ldout(cct, 30) << __func__ << " calling event process" << dendl;
49
50 int r = w->center.process_events(EventMaxWaitUs);
51 if (r < 0) {
52 ldout(cct, 20) << __func__ << " process events failed: "
53 << cpp_strerror(errno) << dendl;
54 // TODO do something?
55 }
56 }
57 w->reset();
58 w->destroy();
59 };
60 }
61
62 std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string &t)
63 {
64 if (t == "posix")
65 return std::make_shared<PosixNetworkStack>(c, t);
66 #ifdef HAVE_RDMA
67 else if (t == "rdma")
68 return std::make_shared<RDMAStack>(c, t);
69 #endif
70 #ifdef HAVE_DPDK
71 else if (t == "dpdk")
72 return std::make_shared<DPDKStack>(c, t);
73 #endif
74
75 lderr(c) << __func__ << " ms_async_transport_type " << t <<
76 " is not supported! " << dendl;
77 ceph_abort();
78 return nullptr;
79 }
80
81 Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i)
82 {
83 if (type == "posix")
84 return new PosixWorker(c, i);
85 #ifdef HAVE_RDMA
86 else if (type == "rdma")
87 return new RDMAWorker(c, i);
88 #endif
89 #ifdef HAVE_DPDK
90 else if (type == "dpdk")
91 return new DPDKWorker(c, i);
92 #endif
93
94 lderr(c) << __func__ << " ms_async_transport_type " << type <<
95 " is not supported! " << dendl;
96 ceph_abort();
97 return nullptr;
98 }
99
100 NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
101 {
102 const uint64_t InitEventNumber = 5000;
103 num_workers = cct->_conf->ms_async_op_threads;
104 if (num_workers >= EventCenter::MAX_EVENTCENTER) {
105 ldout(cct, 0) << __func__ << " max thread limit is "
106 << EventCenter::MAX_EVENTCENTER << ", switching to this now. "
107 << "Higher thread values are unnecessary and currently unsupported."
108 << dendl;
109 num_workers = EventCenter::MAX_EVENTCENTER;
110 }
111
112 for (unsigned i = 0; i < num_workers; ++i) {
113 Worker *w = create_worker(cct, type, i);
114 w->center.init(InitEventNumber, i, type);
115 workers.push_back(w);
116 }
117 cct->register_fork_watcher(this);
118 }
119
120 void NetworkStack::start()
121 {
122 pool_spin.lock();
123 if (started) {
124 pool_spin.unlock();
125 return ;
126 }
127
128 for (unsigned i = 0; i < num_workers; ++i) {
129 if (workers[i]->is_init())
130 continue;
131 std::function<void ()> thread = add_thread(i);
132 spawn_worker(i, std::move(thread));
133 }
134 started = true;
135 pool_spin.unlock();
136
137 for (unsigned i = 0; i < num_workers; ++i)
138 workers[i]->wait_for_init();
139 }
140
141 Worker* NetworkStack::get_worker()
142 {
143 ldout(cct, 30) << __func__ << dendl;
144
145 // start with some reasonably large number
146 unsigned min_load = std::numeric_limits<int>::max();
147 Worker* current_best = nullptr;
148
149 pool_spin.lock();
150 // find worker with least references
151 // tempting case is returning on references == 0, but in reality
152 // this will happen so rarely that there's no need for special case.
153 for (unsigned i = 0; i < num_workers; ++i) {
154 unsigned worker_load = workers[i]->references.load();
155 if (worker_load < min_load) {
156 current_best = workers[i];
157 min_load = worker_load;
158 }
159 }
160
161 pool_spin.unlock();
162 assert(current_best);
163 ++current_best->references;
164 return current_best;
165 }
166
167 void NetworkStack::stop()
168 {
169 Spinlock::Locker l(pool_spin);
170 for (unsigned i = 0; i < num_workers; ++i) {
171 workers[i]->done = true;
172 workers[i]->center.wakeup();
173 join_worker(i);
174 }
175 started = false;
176 }
177
178 class C_drain : public EventCallback {
179 Mutex drain_lock;
180 Cond drain_cond;
181 unsigned drain_count;
182
183 public:
184 explicit C_drain(size_t c)
185 : drain_lock("C_drain::drain_lock"),
186 drain_count(c) {}
187 void do_request(int id) override {
188 Mutex::Locker l(drain_lock);
189 drain_count--;
190 if (drain_count == 0) drain_cond.Signal();
191 }
192 void wait() {
193 Mutex::Locker l(drain_lock);
194 while (drain_count)
195 drain_cond.Wait(drain_lock);
196 }
197 };
198
199 void NetworkStack::drain()
200 {
201 ldout(cct, 30) << __func__ << " started." << dendl;
202 pthread_t cur = pthread_self();
203 pool_spin.lock();
204 C_drain drain(num_workers);
205 for (unsigned i = 0; i < num_workers; ++i) {
206 assert(cur != workers[i]->center.get_owner());
207 workers[i]->center.dispatch_event_external(EventCallbackRef(&drain));
208 }
209 pool_spin.unlock();
210 drain.wait();
211 ldout(cct, 30) << __func__ << " end." << dendl;
212 }