]>
git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/dpdk/DPDKStack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
23 * Ceph - scalable distributed file system
25 * Copyright (C) 2015 XSky <haomai@xsky.com>
27 * Author: Haomai Wang <haomaiwang@gmail.com>
29 * This is free software; you can redistribute it and/or
30 * modify it under the terms of the GNU Lesser General Public
31 * License version 2.1, as published by the Free Software
32 * Foundation. See file COPYING.
37 #include <sys/types.h>
43 #include "common/ceph_argparse.h"
45 #include "DPDKStack.h"
48 #include "TCP-Stack.h"
50 #include "common/dout.h"
51 #include "include/ceph_assert.h"
52 #include "common/Cond.h"
54 #define dout_subsys ceph_subsys_dpdk
56 #define dout_prefix *_dout << "dpdkstack "
58 static int dpdk_thread_adaptor(void* f
)
60 (*static_cast<std::function
<void ()>*>(f
))();
64 void DPDKWorker::initialize()
70 } create_stage
= WAIT_DEVICE_STAGE
;
71 static Mutex
lock("DPDKStack::lock");
73 static unsigned queue_init_done
= 0;
74 static unsigned cores
= 0;
75 static std::shared_ptr
<DPDKDevice
> sdev
;
77 unsigned i
= center
.get_id();
79 // Hardcoded port index 0.
80 // TODO: Inherit it from the opts
81 cores
= cct
->_conf
->ms_async_op_threads
;
82 std::unique_ptr
<DPDKDevice
> dev
= create_dpdk_net_device(
83 cct
, cores
, cct
->_conf
->ms_dpdk_port_id
,
84 cct
->_conf
->ms_dpdk_lro
,
85 cct
->_conf
->ms_dpdk_hw_flow_control
);
86 sdev
= std::shared_ptr
<DPDKDevice
>(dev
.release());
87 sdev
->workers
.resize(cores
);
88 ldout(cct
, 1) << __func__
<< " using " << cores
<< " cores " << dendl
;
90 Mutex::Locker
l(lock
);
91 create_stage
= WAIT_PORT_FIN_STAGE
;
94 Mutex::Locker
l(lock
);
95 while (create_stage
<= WAIT_DEVICE_STAGE
)
99 if (i
< sdev
->hw_queues_count()) {
100 auto qp
= sdev
->init_local_queue(cct
, ¢er
, cct
->_conf
->ms_dpdk_hugepages
, i
);
101 std::map
<unsigned, float> cpu_weights
;
102 for (unsigned j
= sdev
->hw_queues_count() + i
% sdev
->hw_queues_count();
103 j
< cores
; j
+= sdev
->hw_queues_count())
105 cpu_weights
[i
] = cct
->_conf
->ms_dpdk_hw_queue_weight
;
106 qp
->configure_proxies(cpu_weights
);
107 sdev
->set_local_queue(i
, std::move(qp
));
108 Mutex::Locker
l(lock
);
112 // auto master = qid % sdev->hw_queues_count();
113 // sdev->set_local_queue(create_proxy_net_device(master, sdev.get()));
118 Mutex::Locker
l(lock
);
119 while (queue_init_done
< cores
)
123 if (sdev
->init_port_fini() < 0) {
124 lderr(cct
) << __func__
<< " init_port_fini failed " << dendl
;
127 Mutex::Locker
l(lock
);
131 Mutex::Locker
l(lock
);
132 while (create_stage
<= WAIT_PORT_FIN_STAGE
)
136 sdev
->workers
[i
] = this;
137 _impl
= std::unique_ptr
<DPDKWorker::Impl
>(
138 new DPDKWorker::Impl(cct
, i
, ¢er
, sdev
));
140 Mutex::Locker
l(lock
);
141 if (!--queue_init_done
) {
142 create_stage
= WAIT_DEVICE_STAGE
;
148 using AvailableIPAddress
= std::tuple
<string
, string
, string
>;
149 static bool parse_available_address(
150 const string
&ips
, const string
&gates
, const string
&masks
, vector
<AvailableIPAddress
> &res
)
152 vector
<string
> ip_vec
, gate_vec
, mask_vec
;
153 string_to_vec(ip_vec
, ips
);
154 string_to_vec(gate_vec
, gates
);
155 string_to_vec(mask_vec
, masks
);
156 if (ip_vec
.empty() || ip_vec
.size() != gate_vec
.size() || ip_vec
.size() != mask_vec
.size())
159 for (size_t i
= 0; i
< ip_vec
.size(); ++i
) {
160 res
.push_back(AvailableIPAddress
{ip_vec
[i
], gate_vec
[i
], mask_vec
[i
]});
165 static bool match_available_address(const vector
<AvailableIPAddress
> &avails
,
166 const entity_addr_t
&ip
, int &res
)
168 for (size_t i
= 0; i
< avails
.size(); ++i
) {
170 auto a
= std::get
<0>(avails
[i
]).c_str();
173 if (addr
.is_same_host(ip
)) {
181 DPDKWorker::Impl::Impl(CephContext
*cct
, unsigned i
, EventCenter
*c
, std::shared_ptr
<DPDKDevice
> dev
)
182 : id(i
), _netif(cct
, dev
, c
), _dev(dev
), _inet(cct
, c
, &_netif
)
184 vector
<AvailableIPAddress
> tuples
;
185 bool parsed
= parse_available_address(cct
->_conf
.get_val
<std::string
>("ms_dpdk_host_ipv4_addr"),
186 cct
->_conf
.get_val
<std::string
>("ms_dpdk_gateway_ipv4_addr"),
187 cct
->_conf
.get_val
<std::string
>("ms_dpdk_netmask_ipv4_addr"), tuples
);
189 lderr(cct
) << __func__
<< " no available address "
190 << cct
->_conf
.get_val
<std::string
>("ms_dpdk_host_ipv4_addr") << ", "
191 << cct
->_conf
.get_val
<std::string
>("ms_dpdk_gateway_ipv4_addr") << ", "
192 << cct
->_conf
.get_val
<std::string
>("ms_dpdk_netmask_ipv4_addr") << ", "
196 _inet
.set_host_address(ipv4_address(std::get
<0>(tuples
[0])));
197 _inet
.set_gw_address(ipv4_address(std::get
<1>(tuples
[0])));
198 _inet
.set_netmask_address(ipv4_address(std::get
<2>(tuples
[0])));
201 DPDKWorker::Impl::~Impl()
203 _dev
->unset_local_queue(id
);
206 int DPDKWorker::listen(entity_addr_t
&sa
, const SocketOptions
&opt
,
209 ceph_assert(sa
.get_family() == AF_INET
);
212 ldout(cct
, 10) << __func__
<< " addr " << sa
<< dendl
;
213 // vector<AvailableIPAddress> tuples;
214 // bool parsed = parse_available_address(cct->_conf->ms_dpdk_host_ipv4_addr,
215 // cct->_conf->ms_dpdk_gateway_ipv4_addr,
216 // cct->_conf->ms_dpdk_netmask_ipv4_addr, tuples);
218 // lderr(cct) << __func__ << " no available address "
219 // << cct->_conf->ms_dpdk_host_ipv4_addr << ", "
220 // << cct->_conf->ms_dpdk_gateway_ipv4_addr << ", "
221 // << cct->_conf->ms_dpdk_netmask_ipv4_addr << ", "
226 // parsed = match_available_address(tuples, sa, idx);
228 // lderr(cct) << __func__ << " no matched address for " << sa << dendl;
231 // _inet.set_host_address(ipv4_address(std::get<0>(tuples[idx])));
232 // _inet.set_gw_address(ipv4_address(std::get<1>(tuples[idx])));
233 // _inet.set_netmask_address(ipv4_address(std::get<2>(tuples[idx])));
234 return tcpv4_listen(_impl
->_inet
.get_tcp(), sa
.get_port(), opt
, sa
.get_type(),
238 int DPDKWorker::connect(const entity_addr_t
&addr
, const SocketOptions
&opts
, ConnectedSocket
*socket
)
240 // ceph_assert(addr.get_family() == AF_INET);
241 int r
= tcpv4_connect(_impl
->_inet
.get_tcp(), addr
, socket
);
242 ldout(cct
, 10) << __func__
<< " addr " << addr
<< dendl
;
246 void DPDKStack::spawn_worker(unsigned i
, std::function
<void ()> &&func
)
248 // create a extra master thread
250 funcs
[i
] = std::move(func
);
252 r
= dpdk::eal::init(cct
);
254 lderr(cct
) << __func__
<< " init dpdk rte failed, r=" << r
<< dendl
;
257 // if dpdk::eal::init already called by NVMEDevice, we will select 1..n
259 ceph_assert(rte_lcore_count() >= i
+ 1);
262 RTE_LCORE_FOREACH_SLAVE(core_id
) {
267 dpdk::eal::execute_on_master([&]() {
268 r
= rte_eal_remote_launch(dpdk_thread_adaptor
, static_cast<void*>(&funcs
[j
]), core_id
);
270 lderr(cct
) << __func__
<< " remote launch failed, r=" << r
<< dendl
;
276 void DPDKStack::join_worker(unsigned i
)
278 dpdk::eal::execute_on_master([&]() {
279 rte_eal_wait_lcore(i
+1);