]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/dpdk/DPDKStack.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / dpdk / DPDKStack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 /*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 /*
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
21 */
22 /*
23 * Ceph - scalable distributed file system
24 *
25 * Copyright (C) 2015 XSky <haomai@xsky.com>
26 *
27 * Author: Haomai Wang <haomaiwang@gmail.com>
28 *
29 * This is free software; you can redistribute it and/or
30 * modify it under the terms of the GNU Lesser General Public
31 * License version 2.1, as published by the Free Software
32 * Foundation. See file COPYING.
33 *
34 */
35
36 #include <memory>
37 #include <sys/types.h>
38 #include <sys/stat.h>
39 #include <unistd.h>
40
41 #include <tuple>
42
43 #include "common/ceph_argparse.h"
44 #include "dpdk_rte.h"
45 #include "DPDKStack.h"
46 #include "DPDK.h"
47 #include "IP.h"
48 #include "TCP-Stack.h"
49
50 #include "common/dout.h"
51 #include "include/ceph_assert.h"
52 #include "common/Cond.h"
53
54 #define dout_subsys ceph_subsys_dpdk
55 #undef dout_prefix
56 #define dout_prefix *_dout << "dpdkstack "
57
58 static int dpdk_thread_adaptor(void* f)
59 {
60 (*static_cast<std::function<void ()>*>(f))();
61 return 0;
62 }
63
64 void DPDKWorker::initialize()
65 {
66 static enum {
67 WAIT_DEVICE_STAGE,
68 WAIT_PORT_FIN_STAGE,
69 DONE
70 } create_stage = WAIT_DEVICE_STAGE;
71 static Mutex lock("DPDKStack::lock");
72 static Cond cond;
73 static unsigned queue_init_done = 0;
74 static unsigned cores = 0;
75 static std::shared_ptr<DPDKDevice> sdev;
76
77 unsigned i = center.get_id();
78 if (i == 0) {
79 // Hardcoded port index 0.
80 // TODO: Inherit it from the opts
81 cores = cct->_conf->ms_async_op_threads;
82 std::unique_ptr<DPDKDevice> dev = create_dpdk_net_device(
83 cct, cores, cct->_conf->ms_dpdk_port_id,
84 cct->_conf->ms_dpdk_lro,
85 cct->_conf->ms_dpdk_hw_flow_control);
86 sdev = std::shared_ptr<DPDKDevice>(dev.release());
87 sdev->workers.resize(cores);
88 ldout(cct, 1) << __func__ << " using " << cores << " cores " << dendl;
89
90 Mutex::Locker l(lock);
91 create_stage = WAIT_PORT_FIN_STAGE;
92 cond.Signal();
93 } else {
94 Mutex::Locker l(lock);
95 while (create_stage <= WAIT_DEVICE_STAGE)
96 cond.Wait(lock);
97 }
98 ceph_assert(sdev);
99 if (i < sdev->hw_queues_count()) {
100 auto qp = sdev->init_local_queue(cct, &center, cct->_conf->ms_dpdk_hugepages, i);
101 std::map<unsigned, float> cpu_weights;
102 for (unsigned j = sdev->hw_queues_count() + i % sdev->hw_queues_count();
103 j < cores; j+= sdev->hw_queues_count())
104 cpu_weights[i] = 1;
105 cpu_weights[i] = cct->_conf->ms_dpdk_hw_queue_weight;
106 qp->configure_proxies(cpu_weights);
107 sdev->set_local_queue(i, std::move(qp));
108 Mutex::Locker l(lock);
109 ++queue_init_done;
110 cond.Signal();
111 } else {
112 // auto master = qid % sdev->hw_queues_count();
113 // sdev->set_local_queue(create_proxy_net_device(master, sdev.get()));
114 ceph_abort();
115 }
116 if (i == 0) {
117 {
118 Mutex::Locker l(lock);
119 while (queue_init_done < cores)
120 cond.Wait(lock);
121 }
122
123 if (sdev->init_port_fini() < 0) {
124 lderr(cct) << __func__ << " init_port_fini failed " << dendl;
125 ceph_abort();
126 }
127 Mutex::Locker l(lock);
128 create_stage = DONE;
129 cond.Signal();
130 } else {
131 Mutex::Locker l(lock);
132 while (create_stage <= WAIT_PORT_FIN_STAGE)
133 cond.Wait(lock);
134 }
135
136 sdev->workers[i] = this;
137 _impl = std::unique_ptr<DPDKWorker::Impl>(
138 new DPDKWorker::Impl(cct, i, &center, sdev));
139 {
140 Mutex::Locker l(lock);
141 if (!--queue_init_done) {
142 create_stage = WAIT_DEVICE_STAGE;
143 sdev.reset();
144 }
145 }
146 }
147
148 using AvailableIPAddress = std::tuple<string, string, string>;
149 static bool parse_available_address(
150 const string &ips, const string &gates, const string &masks, vector<AvailableIPAddress> &res)
151 {
152 vector<string> ip_vec, gate_vec, mask_vec;
153 string_to_vec(ip_vec, ips);
154 string_to_vec(gate_vec, gates);
155 string_to_vec(mask_vec, masks);
156 if (ip_vec.empty() || ip_vec.size() != gate_vec.size() || ip_vec.size() != mask_vec.size())
157 return false;
158
159 for (size_t i = 0; i < ip_vec.size(); ++i) {
160 res.push_back(AvailableIPAddress{ip_vec[i], gate_vec[i], mask_vec[i]});
161 }
162 return true;
163 }
164
165 static bool match_available_address(const vector<AvailableIPAddress> &avails,
166 const entity_addr_t &ip, int &res)
167 {
168 for (size_t i = 0; i < avails.size(); ++i) {
169 entity_addr_t addr;
170 auto a = std::get<0>(avails[i]).c_str();
171 if (!addr.parse(a))
172 continue;
173 if (addr.is_same_host(ip)) {
174 res = i;
175 return true;
176 }
177 }
178 return false;
179 }
180
181 DPDKWorker::Impl::Impl(CephContext *cct, unsigned i, EventCenter *c, std::shared_ptr<DPDKDevice> dev)
182 : id(i), _netif(cct, dev, c), _dev(dev), _inet(cct, c, &_netif)
183 {
184 vector<AvailableIPAddress> tuples;
185 bool parsed = parse_available_address(cct->_conf.get_val<std::string>("ms_dpdk_host_ipv4_addr"),
186 cct->_conf.get_val<std::string>("ms_dpdk_gateway_ipv4_addr"),
187 cct->_conf.get_val<std::string>("ms_dpdk_netmask_ipv4_addr"), tuples);
188 if (!parsed) {
189 lderr(cct) << __func__ << " no available address "
190 << cct->_conf.get_val<std::string>("ms_dpdk_host_ipv4_addr") << ", "
191 << cct->_conf.get_val<std::string>("ms_dpdk_gateway_ipv4_addr") << ", "
192 << cct->_conf.get_val<std::string>("ms_dpdk_netmask_ipv4_addr") << ", "
193 << dendl;
194 ceph_abort();
195 }
196 _inet.set_host_address(ipv4_address(std::get<0>(tuples[0])));
197 _inet.set_gw_address(ipv4_address(std::get<1>(tuples[0])));
198 _inet.set_netmask_address(ipv4_address(std::get<2>(tuples[0])));
199 }
200
201 DPDKWorker::Impl::~Impl()
202 {
203 _dev->unset_local_queue(id);
204 }
205
206 int DPDKWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
207 ServerSocket *sock)
208 {
209 ceph_assert(sa.get_family() == AF_INET);
210 ceph_assert(sock);
211
212 ldout(cct, 10) << __func__ << " addr " << sa << dendl;
213 // vector<AvailableIPAddress> tuples;
214 // bool parsed = parse_available_address(cct->_conf->ms_dpdk_host_ipv4_addr,
215 // cct->_conf->ms_dpdk_gateway_ipv4_addr,
216 // cct->_conf->ms_dpdk_netmask_ipv4_addr, tuples);
217 // if (!parsed) {
218 // lderr(cct) << __func__ << " no available address "
219 // << cct->_conf->ms_dpdk_host_ipv4_addr << ", "
220 // << cct->_conf->ms_dpdk_gateway_ipv4_addr << ", "
221 // << cct->_conf->ms_dpdk_netmask_ipv4_addr << ", "
222 // << dendl;
223 // return -EINVAL;
224 // }
225 // int idx;
226 // parsed = match_available_address(tuples, sa, idx);
227 // if (!parsed) {
228 // lderr(cct) << __func__ << " no matched address for " << sa << dendl;
229 // return -EINVAL;
230 // }
231 // _inet.set_host_address(ipv4_address(std::get<0>(tuples[idx])));
232 // _inet.set_gw_address(ipv4_address(std::get<1>(tuples[idx])));
233 // _inet.set_netmask_address(ipv4_address(std::get<2>(tuples[idx])));
234 return tcpv4_listen(_impl->_inet.get_tcp(), sa.get_port(), opt, sa.get_type(),
235 sock);
236 }
237
238 int DPDKWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
239 {
240 // ceph_assert(addr.get_family() == AF_INET);
241 int r = tcpv4_connect(_impl->_inet.get_tcp(), addr, socket);
242 ldout(cct, 10) << __func__ << " addr " << addr << dendl;
243 return r;
244 }
245
246 void DPDKStack::spawn_worker(unsigned i, std::function<void ()> &&func)
247 {
248 // create a extra master thread
249 //
250 funcs[i] = std::move(func);
251 int r = 0;
252 r = dpdk::eal::init(cct);
253 if (r < 0) {
254 lderr(cct) << __func__ << " init dpdk rte failed, r=" << r << dendl;
255 ceph_abort();
256 }
257 // if dpdk::eal::init already called by NVMEDevice, we will select 1..n
258 // cores
259 ceph_assert(rte_lcore_count() >= i + 1);
260 unsigned core_id;
261 int j = i;
262 RTE_LCORE_FOREACH_SLAVE(core_id) {
263 if (i-- == 0) {
264 break;
265 }
266 }
267 dpdk::eal::execute_on_master([&]() {
268 r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&funcs[j]), core_id);
269 if (r < 0) {
270 lderr(cct) << __func__ << " remote launch failed, r=" << r << dendl;
271 ceph_abort();
272 }
273 });
274 }
275
276 void DPDKStack::join_worker(unsigned i)
277 {
278 dpdk::eal::execute_on_master([&]() {
279 rte_eal_wait_lcore(i+1);
280 });
281 }