1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
23 * Ceph - scalable distributed file system
25 * Copyright (C) 2015 XSky <haomai@xsky.com>
27 * Author: Haomai Wang <haomaiwang@gmail.com>
29 * This is free software; you can redistribute it and/or
30 * modify it under the terms of the GNU Lesser General Public
31 * License version 2.1, as published by the Free Software
32 * Foundation. See file COPYING.
38 #include "DPDKStack.h"
40 #include "common/dout.h"
41 #include "include/assert.h"
43 #define dout_subsys ceph_subsys_dpdk
45 #define dout_prefix *_dout << "net "
47 interface::interface(CephContext
*c
, std::shared_ptr
<DPDKDevice
> dev
, EventCenter
*center
)
51 [center
, this] (Packet p
) {
52 return dispatch_packet(center
, std::move(p
));
55 _hw_address(_dev
->hw_address()),
56 _hw_features(_dev
->get_hw_features()) {
58 unsigned qid
= center
->get_id();
59 dev
->queue_for_cpu(center
->get_id()).register_packet_provider([this, idx
, qid
] () mutable {
61 for (size_t i
= 0; i
< _pkt_providers
.size(); i
++) {
62 auto l3p
= _pkt_providers
[idx
++]();
63 if (idx
== _pkt_providers
.size())
66 auto l3pv
= std::move(*l3p
);
67 auto eh
= l3pv
.p
.prepend_header
<eth_hdr
>();
68 eh
->dst_mac
= l3pv
.to
;
69 eh
->src_mac
= _hw_address
;
70 eh
->eth_proto
= uint16_t(l3pv
.proto_num
);
72 ldout(cct
, 10) << "=== tx === proto " << std::hex
<< uint16_t(l3pv
.proto_num
)
73 << " " << _hw_address
<< " -> " << l3pv
.to
74 << " length " << std::dec
<< l3pv
.p
.len() << dendl
;
75 p
= std::move(l3pv
.p
);
83 subscription
<Packet
, ethernet_address
> interface::register_l3(
84 eth_protocol_num proto_num
,
85 std::function
<int (Packet p
, ethernet_address from
)> next
,
86 std::function
<bool (forward_hash
&, Packet
& p
, size_t)> forward
)
88 auto i
= _proto_map
.emplace(std::piecewise_construct
, std::make_tuple(uint16_t(proto_num
)), std::forward_as_tuple(std::move(forward
)));
90 l3_rx_stream
& l3_rx
= i
.first
->second
;
91 return l3_rx
.packet_stream
.listen(std::move(next
));
94 unsigned interface::hash2cpu(uint32_t hash
) {
95 return _dev
->hash2cpu(hash
);
98 const rss_key_type
& interface::rss_key() const {
99 return _dev
->rss_key();
102 uint16_t interface::hw_queues_count() const {
103 return _dev
->hw_queues_count();
106 class C_handle_l2forward
: public EventCallback
{
107 std::shared_ptr
<DPDKDevice
> sdev
;
108 unsigned &queue_depth
;
113 C_handle_l2forward(std::shared_ptr
<DPDKDevice
> &p
, unsigned &qd
, Packet pkt
, unsigned target
)
114 : sdev(p
), queue_depth(qd
), p(std::move(pkt
)), dst(target
) {}
115 void do_request(int fd
) {
116 sdev
->l2receive(dst
, std::move(p
));
122 void interface::forward(EventCenter
*source
, unsigned target
, Packet p
) {
123 static __thread
unsigned queue_depth
;
125 if (queue_depth
< 1000) {
127 // FIXME: need ensure this event not be called after EventCenter destruct
128 _dev
->workers
[target
]->center
.dispatch_event_external(
129 new C_handle_l2forward(_dev
, queue_depth
, std::move(p
.free_on_cpu(source
)), target
));
133 int interface::dispatch_packet(EventCenter
*center
, Packet p
) {
134 auto eh
= p
.get_header
<eth_hdr
>();
136 auto i
= _proto_map
.find(ntoh(eh
->eth_proto
));
137 auto hwrss
= p
.rss_hash();
139 ldout(cct
, 10) << __func__
<< " === rx === proto " << std::hex
<< ::ntoh(eh
->eth_proto
)
140 << " "<< eh
->src_mac
.ntoh() << " -> " << eh
->dst_mac
.ntoh()
141 << " length " << std::dec
<< p
.len() << " rss_hash " << *p
.rss_hash() << dendl
;
143 ldout(cct
, 10) << __func__
<< " === rx === proto " << std::hex
<< ::ntoh(eh
->eth_proto
)
144 << " "<< eh
->src_mac
.ntoh() << " -> " << eh
->dst_mac
.ntoh()
145 << " length " << std::dec
<< p
.len() << dendl
;
147 if (i
!= _proto_map
.end()) {
148 l3_rx_stream
& l3
= i
->second
;
149 auto fw
= _dev
->forward_dst(center
->get_id(), [&p
, &l3
, this] () {
150 auto hwrss
= p
.rss_hash();
155 if (l3
.forward(data
, p
, sizeof(eth_hdr
))) {
156 return toeplitz_hash(rss_key(), data
);
161 if (fw
!= center
->get_id()) {
162 ldout(cct
, 1) << __func__
<< " forward to " << fw
<< dendl
;
163 forward(center
, fw
, std::move(p
));
166 auto from
= h
.src_mac
;
167 p
.trim_front(sizeof(*eh
));
168 // avoid chaining, since queue length is unlimited
171 return l3
.packet_stream
.produce(std::move(p
), from
);
179 class C_arp_learn
: public EventCallback
{
181 ethernet_address l2_addr
;
182 ipv4_address l3_addr
;
185 C_arp_learn(DPDKWorker
*w
, ethernet_address l2
, ipv4_address l3
)
186 : worker(w
), l2_addr(l2
), l3_addr(l3
) {}
187 void do_request(int id
) {
188 worker
->arp_learn(l2_addr
, l3_addr
);
193 void interface::arp_learn(ethernet_address l2
, ipv4_address l3
)
195 for (auto &&w
: _dev
->workers
) {
196 w
->center
.dispatch_event_external(
197 new C_arp_learn(w
, l2
, l3
));
201 l3_protocol::l3_protocol(interface
* netif
, eth_protocol_num proto_num
, packet_provider_type func
)
202 : _netif(netif
), _proto_num(proto_num
) {
203 _netif
->register_packet_provider(std::move(func
));
206 subscription
<Packet
, ethernet_address
> l3_protocol::receive(
207 std::function
<int (Packet
, ethernet_address
)> rx_fn
,
208 std::function
<bool (forward_hash
&h
, Packet
&p
, size_t s
)> forward
) {
209 return _netif
->register_l3(_proto_num
, std::move(rx_fn
), std::move(forward
));