]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/dpdk/net.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / dpdk / net.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 /*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 /*
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
21 */
22 /*
23 * Ceph - scalable distributed file system
24 *
25 * Copyright (C) 2015 XSky <haomai@xsky.com>
26 *
27 * Author: Haomai Wang <haomaiwang@gmail.com>
28 *
29 */
30
31 #include "net.h"
32 #include "DPDK.h"
33 #include "DPDKStack.h"
34
35 #include "common/dout.h"
36 #include "include/ceph_assert.h"
37
38 #define dout_subsys ceph_subsys_dpdk
39 #undef dout_prefix
40 #define dout_prefix *_dout << "net "
41
42 interface::interface(CephContext *cct, std::shared_ptr<DPDKDevice> dev, EventCenter *center)
43 : cct(cct), _dev(dev),
44 _rx(_dev->receive(
45 center->get_id(),
46 [center, this] (Packet p) {
47 return dispatch_packet(center, std::move(p));
48 }
49 )),
50 _hw_address(_dev->hw_address()),
51 _hw_features(_dev->get_hw_features()) {
52 auto idx = 0u;
53 unsigned qid = center->get_id();
54 dev->queue_for_cpu(center->get_id()).register_packet_provider([this, idx, qid] () mutable {
55 Tub<Packet> p;
56 for (size_t i = 0; i < _pkt_providers.size(); i++) {
57 auto l3p = _pkt_providers[idx++]();
58 if (idx == _pkt_providers.size())
59 idx = 0;
60 if (l3p) {
61 auto l3pv = std::move(*l3p);
62 auto eh = l3pv.p.prepend_header<eth_hdr>();
63 eh->dst_mac = l3pv.to;
64 eh->src_mac = _hw_address;
65 eh->eth_proto = uint16_t(l3pv.proto_num);
66 *eh = eh->hton();
67 ldout(this->cct, 10) << "=== tx === proto " << std::hex << uint16_t(l3pv.proto_num)
68 << " " << _hw_address << " -> " << l3pv.to
69 << " length " << std::dec << l3pv.p.len() << dendl;
70 p = std::move(l3pv.p);
71 return p;
72 }
73 }
74 return p;
75 });
76 }
77
78 subscription<Packet, ethernet_address> interface::register_l3(
79 eth_protocol_num proto_num,
80 std::function<int (Packet p, ethernet_address from)> next,
81 std::function<bool (forward_hash&, Packet& p, size_t)> forward)
82 {
83 auto i = _proto_map.emplace(std::piecewise_construct, std::make_tuple(uint16_t(proto_num)), std::forward_as_tuple(std::move(forward)));
84 ceph_assert(i.second);
85 l3_rx_stream& l3_rx = i.first->second;
86 return l3_rx.packet_stream.listen(std::move(next));
87 }
88
89 unsigned interface::hash2cpu(uint32_t hash) {
90 return _dev->hash2cpu(hash);
91 }
92
93 const rss_key_type& interface::rss_key() const {
94 return _dev->rss_key();
95 }
96
97 uint16_t interface::hw_queues_count() const {
98 return _dev->hw_queues_count();
99 }
100
101 class C_handle_l2forward : public EventCallback {
102 std::shared_ptr<DPDKDevice> sdev;
103 unsigned &queue_depth;
104 Packet p;
105 unsigned dst;
106
107 public:
108 C_handle_l2forward(std::shared_ptr<DPDKDevice> &p, unsigned &qd, Packet pkt, unsigned target)
109 : sdev(p), queue_depth(qd), p(std::move(pkt)), dst(target) {}
110 void do_request(uint64_t fd) {
111 sdev->l2receive(dst, std::move(p));
112 queue_depth--;
113 delete this;
114 }
115 };
116
117 void interface::forward(EventCenter *source, unsigned target, Packet p) {
118 static __thread unsigned queue_depth;
119
120 if (queue_depth < 1000) {
121 queue_depth++;
122 // FIXME: need ensure this event not be called after EventCenter destruct
123 _dev->workers[target]->center.dispatch_event_external(
124 new C_handle_l2forward(_dev, queue_depth, std::move(p.free_on_cpu(source)), target));
125 }
126 }
127
128 int interface::dispatch_packet(EventCenter *center, Packet p) {
129 auto eh = p.get_header<eth_hdr>();
130 if (eh) {
131 auto i = _proto_map.find(ntoh(eh->eth_proto));
132 auto hwrss = p.rss_hash();
133 if (hwrss) {
134 ldout(cct, 10) << __func__ << " === rx === proto " << std::hex << ::ntoh(eh->eth_proto)
135 << " "<< eh->src_mac.ntoh() << " -> " << eh->dst_mac.ntoh()
136 << " length " << std::dec << p.len() << " rss_hash " << *p.rss_hash() << dendl;
137 } else {
138 ldout(cct, 10) << __func__ << " === rx === proto " << std::hex << ::ntoh(eh->eth_proto)
139 << " "<< eh->src_mac.ntoh() << " -> " << eh->dst_mac.ntoh()
140 << " length " << std::dec << p.len() << dendl;
141 }
142 if (i != _proto_map.end()) {
143 l3_rx_stream& l3 = i->second;
144 auto fw = _dev->forward_dst(center->get_id(), [&p, &l3, this] () {
145 auto hwrss = p.rss_hash();
146 if (hwrss) {
147 return *hwrss;
148 } else {
149 forward_hash data;
150 if (l3.forward(data, p, sizeof(eth_hdr))) {
151 return toeplitz_hash(rss_key(), data);
152 }
153 return 0u;
154 }
155 });
156 if (fw != center->get_id()) {
157 ldout(cct, 1) << __func__ << " forward to " << fw << dendl;
158 forward(center, fw, std::move(p));
159 } else {
160 auto h = eh->ntoh();
161 auto from = h.src_mac;
162 p.trim_front(sizeof(*eh));
163 // avoid chaining, since queue length is unlimited
164 // drop instead.
165 if (l3.ready()) {
166 return l3.packet_stream.produce(std::move(p), from);
167 }
168 }
169 }
170 }
171 return 0;
172 }
173
174 class C_arp_learn : public EventCallback {
175 DPDKWorker *worker;
176 ethernet_address l2_addr;
177 ipv4_address l3_addr;
178
179 public:
180 C_arp_learn(DPDKWorker *w, ethernet_address l2, ipv4_address l3)
181 : worker(w), l2_addr(l2), l3_addr(l3) {}
182 void do_request(uint64_t id) {
183 worker->arp_learn(l2_addr, l3_addr);
184 delete this;
185 }
186 };
187
188 void interface::arp_learn(ethernet_address l2, ipv4_address l3)
189 {
190 for (auto &&w : _dev->workers) {
191 w->center.dispatch_event_external(
192 new C_arp_learn(w, l2, l3));
193 }
194 }
195
196 l3_protocol::l3_protocol(interface* netif, eth_protocol_num proto_num, packet_provider_type func)
197 : _netif(netif), _proto_num(proto_num) {
198 _netif->register_packet_provider(std::move(func));
199 }
200
201 subscription<Packet, ethernet_address> l3_protocol::receive(
202 std::function<int (Packet, ethernet_address)> rx_fn,
203 std::function<bool (forward_hash &h, Packet &p, size_t s)> forward) {
204 return _netif->register_l3(_proto_num, std::move(rx_fn), std::move(forward));
205 };