]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/dpdk/IP.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / msg / async / dpdk / IP.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2/*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19/*
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
21 *
22 */
23/*
24 * Ceph - scalable distributed file system
25 *
26 * Copyright (C) 2015 XSky <haomai@xsky.com>
27 *
28 * Author: Haomai Wang <haomaiwang@gmail.com>
29 *
30 * This is free software; you can redistribute it and/or
31 * modify it under the terms of the GNU Lesser General Public
32 * License version 2.1, as published by the Free Software
33 * Foundation. See file COPYING.
34 *
35 */
36
37#include "common/perf_counters.h"
38
39#include "capture.h"
40#include "IP.h"
7c673cae
FG
41#include "toeplitz.h"
42
43#include "common/dout.h"
11fdf7f2 44#include "include/ceph_assert.h"
7c673cae
FG
45
46#define dout_subsys ceph_subsys_dpdk
47#undef dout_prefix
48#define dout_prefix *_dout << "dpdk "
49
31f18b77 50std::ostream& operator<<(std::ostream& os, const ipv4_address& a) {
7c673cae
FG
51 auto ip = a.ip;
52 return os << ((ip >> 24) & 0xff) << "." << ((ip >> 16) & 0xff)
53 << "." << ((ip >> 8) & 0xff) << "." << ((ip >> 0) & 0xff);
54}
55
56utime_t ipv4::_frag_timeout = utime_t(30, 0);
57constexpr uint32_t ipv4::_frag_low_thresh;
58constexpr uint32_t ipv4::_frag_high_thresh;
59
60class C_handle_frag_timeout : public EventCallback {
61 ipv4 *_ipv4;
62
63 public:
64 C_handle_frag_timeout(ipv4 *i): _ipv4(i) {}
11fdf7f2 65 void do_request(uint64_t fd_or_id) {
7c673cae
FG
66 _ipv4->frag_timeout();
67 }
68};
69
70enum {
71 l_dpdk_qp_first = 99000,
72 l_dpdk_total_linearize_operations,
73 l_dpdk_qp_last
74};
75
9f95a23c
TL
76struct icmp_hdr {
77 enum class msg_type : uint8_t {
78 echo_reply = 0,
79 echo_request = 8,
80 };
81 msg_type type;
82 uint8_t code;
83 uint16_t csum;
84 uint32_t rest;
85} __attribute__((packed));
86
7c673cae
FG
87ipv4::ipv4(CephContext *c, EventCenter *cen, interface* netif)
88 : cct(c), center(cen), _netif(netif), _global_arp(netif),
89 _arp(c, _global_arp, cen),
90 _host_address(0), _gw_address(0), _netmask(0),
91 _l3(netif, eth_protocol_num::ipv4, [this] { return get_packet(); }),
92 _rx_packets(
93 _l3.receive(
94 [this] (Packet p, ethernet_address ea) {
95 return handle_received_packet(std::move(p), ea);
96 },
97 [this] (forward_hash& out_hash_data, Packet& p, size_t off) {
98 return forward(out_hash_data, p, off);
99 }
100 )
101 ),
102 _tcp(*this, cen), _icmp(c, *this),
103 _l4({{ uint8_t(ip_protocol_num::tcp), &_tcp },
104 { uint8_t(ip_protocol_num::icmp), &_icmp }}),
105 _packet_filter(nullptr)
106{
107 PerfCountersBuilder plb(cct, "ipv4", l_dpdk_qp_first, l_dpdk_qp_last);
108 plb.add_u64_counter(l_dpdk_total_linearize_operations, "dpdk_ip_linearize_operations", "DPDK IP Packet linearization operations");
109 perf_logger = plb.create_perf_counters();
110 cct->get_perfcounters_collection()->add(perf_logger);
111 frag_handler = new C_handle_frag_timeout(this);
112}
113
114bool ipv4::forward(forward_hash& out_hash_data, Packet& p, size_t off)
115{
116 auto iph = p.get_header<ip_hdr>(off);
117
118 out_hash_data.push_back(iph->src_ip.ip);
119 out_hash_data.push_back(iph->dst_ip.ip);
120
121 auto h = iph->ntoh();
122 auto l4 = _l4[h.ip_proto];
123 if (l4) {
124 if (h.mf() == false && h.offset() == 0) {
125 // This IP datagram is atomic, forward according to tcp connection hash
126 l4->forward(out_hash_data, p, off + sizeof(ip_hdr));
127 }
128 // else forward according to ip fields only
129 }
130 return true;
131}
132
133int ipv4::handle_received_packet(Packet p, ethernet_address from)
134{
135 auto iph = p.get_header<ip_hdr>(0);
136 if (!iph) {
137 return 0;
138 }
139
140 // Skip checking csum of reassembled IP datagram
141 if (!get_hw_features().rx_csum_offload && !p.offload_info_ref().reassembled) {
142 checksummer csum;
143 csum.sum(reinterpret_cast<char*>(iph), sizeof(*iph));
144 if (csum.get() != 0) {
145 return 0;
146 }
147 }
148
149 auto h = iph->ntoh();
150 unsigned ip_len = h.len;
151 unsigned ip_hdr_len = h.ihl * 4;
152 unsigned pkt_len = p.len();
153 auto offset = h.offset();
154
155 ldout(cct, 10) << __func__ << " get " << std::hex << int(h.ip_proto)
156 << std::dec << " packet from "
157 << h.src_ip << " -> " << h.dst_ip << " id=" << h.id
158 << " ip_len=" << ip_len << " ip_hdr_len=" << ip_hdr_len
159 << " pkt_len=" << pkt_len << " offset=" << offset << dendl;
160
161 if (pkt_len > ip_len) {
162 // Trim extra data in the packet beyond IP total length
163 p.trim_back(pkt_len - ip_len);
164 } else if (pkt_len < ip_len) {
165 // Drop if it contains less than IP total length
166 return 0;
167 }
168 // Drop if the reassembled datagram will be larger than maximum IP size
169 if (offset + p.len() > ip_packet_len_max) {
170 return 0;
171 }
172
173 // FIXME: process options
174 if (in_my_netmask(h.src_ip) && h.src_ip != _host_address) {
175 ldout(cct, 20) << __func__ << " learn mac " << from << " with " << h.src_ip << dendl;
176 _arp.learn(from, h.src_ip);
177 }
178
179 if (_packet_filter) {
180 bool handled = false;
181 _packet_filter->handle(p, &h, from, handled);
182 if (handled) {
183 return 0;
184 }
185 }
186
187 if (h.dst_ip != _host_address) {
188 // FIXME: forward
189 return 0;
190 }
191
192 // Does this IP datagram need reassembly
193 auto mf = h.mf();
194 if (mf == true || offset != 0) {
195 frag_limit_mem();
196 auto frag_id = ipv4_frag_id{h.src_ip, h.dst_ip, h.id, h.ip_proto};
197 auto& frag = _frags[frag_id];
198 if (mf == false) {
199 frag.last_frag_received = true;
200 }
201 // This is a newly created frag_id
202 if (frag.mem_size == 0) {
203 _frags_age.push_back(frag_id);
204 frag.rx_time = ceph_clock_now();
205 }
206 auto added_size = frag.merge(h, offset, std::move(p));
207 _frag_mem += added_size;
208 if (frag.is_complete()) {
209 // All the fragments are received
210 auto dropped_size = frag.mem_size;
211 auto& ip_data = frag.data.map.begin()->second;
212 // Choose a cpu to forward this packet
213 auto cpu_id = center->get_id();
214 auto l4 = _l4[h.ip_proto];
215 if (l4) {
216 size_t l4_offset = 0;
217 forward_hash hash_data;
218 hash_data.push_back(hton(h.src_ip.ip));
219 hash_data.push_back(hton(h.dst_ip.ip));
220 l4->forward(hash_data, ip_data, l4_offset);
221 cpu_id = _netif->hash2cpu(toeplitz_hash(_netif->rss_key(), hash_data));
222 }
223
224 // No need to forward if the dst cpu is the current cpu
225 if (cpu_id == center->get_id()) {
226 l4->received(std::move(ip_data), h.src_ip, h.dst_ip);
227 } else {
228 auto to = _netif->hw_address();
229 auto pkt = frag.get_assembled_packet(from, to);
230 _netif->forward(center, cpu_id, std::move(pkt));
231 }
232
233 // Delete this frag from _frags and _frags_age
234 frag_drop(frag_id, dropped_size);
235 _frags_age.remove(frag_id);
236 perf_logger->set(l_dpdk_total_linearize_operations,
237 ipv4_packet_merger::linearizations());
238 } else {
239 // Some of the fragments are missing
240 if (frag_timefd) {
241 frag_arm();
242 }
243 }
244 return 0;
245 }
246
247 auto l4 = _l4[h.ip_proto];
248 if (l4) {
249 // Trim IP header and pass to upper layer
250 p.trim_front(ip_hdr_len);
251 l4->received(std::move(p), h.src_ip, h.dst_ip);
252 }
253 return 0;
254}
255
256void ipv4::wait_l2_dst_address(ipv4_address to, Packet p, resolution_cb cb) {
257 // Figure out where to send the packet to. If it is a directly connected
258 // host, send to it directly, otherwise send to the default gateway.
259 ipv4_address dst;
260 if (in_my_netmask(to)) {
261 dst = to;
262 } else {
263 dst = _gw_address;
264 }
265
266 _arp.wait(std::move(dst), std::move(p), std::move(cb));
267}
268
269const hw_features& ipv4::get_hw_features() const
270{
271 return _netif->get_hw_features();
272}
273
274void ipv4::send(ipv4_address to, ip_protocol_num proto_num,
275 Packet p, ethernet_address e_dst) {
276 auto needs_frag = this->needs_frag(p, proto_num, get_hw_features());
277
278 auto send_pkt = [this, to, proto_num, needs_frag, e_dst] (Packet& pkt, uint16_t remaining, uint16_t offset) mutable {
279 static uint16_t id = 0;
280 auto iph = pkt.prepend_header<ip_hdr>();
281 iph->ihl = sizeof(*iph) / 4;
282 iph->ver = 4;
283 iph->dscp = 0;
284 iph->ecn = 0;
285 iph->len = pkt.len();
286 // FIXME: a proper id
287 iph->id = id++;
288 if (needs_frag) {
289 uint16_t mf = remaining > 0;
290 // The fragment offset is measured in units of 8 octets (64 bits)
291 auto off = offset / 8;
292 iph->frag = (mf << uint8_t(ip_hdr::frag_bits::mf)) | off;
293 } else {
294 iph->frag = 0;
295 }
296 iph->ttl = 64;
297 iph->ip_proto = (uint8_t)proto_num;
298 iph->csum = 0;
299 iph->src_ip = _host_address;
300 iph->dst_ip = to;
301 ldout(cct, 20) << " ipv4::send " << " id=" << iph->id << " " << _host_address << " -> " << to
302 << " len " << pkt.len() << dendl;
303 *iph = iph->hton();
304
305 if (get_hw_features().tx_csum_ip_offload) {
306 iph->csum = 0;
307 pkt.offload_info_ref().needs_ip_csum = true;
308 } else {
309 checksummer csum;
310 csum.sum(reinterpret_cast<char*>(iph), sizeof(*iph));
311 iph->csum = csum.get();
312 }
313
314 _packetq.push_back(
315 l3_protocol::l3packet{eth_protocol_num::ipv4, e_dst, std::move(pkt)});
316 };
317
318 if (needs_frag) {
319 uint16_t offset = 0;
320 uint16_t remaining = p.len();
321 auto mtu = get_hw_features().mtu;
322
323 while (remaining) {
324 auto can_send = std::min(uint16_t(mtu - ipv4_hdr_len_min), remaining);
325 remaining -= can_send;
326 auto pkt = p.share(offset, can_send);
327 send_pkt(pkt, remaining, offset);
328 offset += can_send;
329 }
330 } else {
331 // The whole packet can be send in one shot
332 send_pkt(p, 0, 0);
333 }
334}
335
20effc67 336std::optional<l3_protocol::l3packet> ipv4::get_packet() {
7c673cae
FG
337 // _packetq will be mostly empty here unless it hold remnants of previously
338 // fragmented packet
339 if (_packetq.empty()) {
340 for (size_t i = 0; i < _pkt_providers.size(); i++) {
341 auto l4p = _pkt_providers[_pkt_provider_idx++]();
342 if (_pkt_provider_idx == _pkt_providers.size()) {
343 _pkt_provider_idx = 0;
344 }
345 if (l4p) {
346 ldout(cct, 20) << " ipv4::get_packet len " << l4p->p.len() << dendl;
347 send(l4p->to, l4p->proto_num, std::move(l4p->p), l4p->e_dst);
348 break;
349 }
350 }
351 }
352
20effc67 353 std::optional<l3_protocol::l3packet> p;
7c673cae
FG
354 if (!_packetq.empty()) {
355 p = std::move(_packetq.front());
356 _packetq.pop_front();
357 }
358 return p;
359}
360
361void ipv4::frag_limit_mem() {
362 if (_frag_mem <= _frag_high_thresh) {
363 return;
364 }
365 auto drop = _frag_mem - _frag_low_thresh;
366 while (drop) {
367 if (_frags_age.empty()) {
368 return;
369 }
370 // Drop the oldest frag (first element) from _frags_age
371 auto frag_id = _frags_age.front();
372 _frags_age.pop_front();
373
374 // Drop from _frags as well
375 auto& frag = _frags[frag_id];
376 auto dropped_size = frag.mem_size;
377 frag_drop(frag_id, dropped_size);
378
379 drop -= std::min(drop, dropped_size);
380 }
381}
382
383void ipv4::frag_timeout() {
384 if (_frags.empty()) {
385 return;
386 }
387 auto now = ceph_clock_now();
388 for (auto it = _frags_age.begin(); it != _frags_age.end();) {
389 auto frag_id = *it;
390 auto& frag = _frags[frag_id];
391 if (now > frag.rx_time + _frag_timeout) {
392 auto dropped_size = frag.mem_size;
393 // Drop from _frags
394 frag_drop(frag_id, dropped_size);
395 // Drop from _frags_age
396 it = _frags_age.erase(it);
397 } else {
398 // The further items can only be younger
399 break;
400 }
401 }
402 if (_frags.size() != 0) {
403 frag_arm(now);
404 } else {
405 _frag_mem = 0;
406 }
407}
408
409int32_t ipv4::frag::merge(ip_hdr &h, uint16_t offset, Packet p) {
410 uint32_t old = mem_size;
411 unsigned ip_hdr_len = h.ihl * 4;
412 // Store IP header
413 if (offset == 0) {
414 header = p.share(0, ip_hdr_len);
415 }
416 // Sotre IP payload
417 p.trim_front(ip_hdr_len);
418 data.merge(offset, std::move(p));
419 // Update mem size
420 mem_size = header.memory();
421 for (const auto& x : data.map) {
422 mem_size += x.second.memory();
423 }
424 auto added_size = mem_size - old;
425 return added_size;
426}
427
428bool ipv4::frag::is_complete() {
429 // If all the fragments are received, ipv4::frag::merge() should merge all
430 // the fragments into a single packet
431 auto offset = data.map.begin()->first;
432 auto nr_packet = data.map.size();
433 return last_frag_received && nr_packet == 1 && offset == 0;
434}
435
436Packet ipv4::frag::get_assembled_packet(ethernet_address from, ethernet_address to) {
437 auto& ip_header = header;
438 auto& ip_data = data.map.begin()->second;
439 // Append a ethernet header, needed for forwarding
440 auto eh = ip_header.prepend_header<eth_hdr>();
441 eh->src_mac = from;
442 eh->dst_mac = to;
443 eh->eth_proto = uint16_t(eth_protocol_num::ipv4);
444 *eh = eh->hton();
445 // Prepare a packet contains both ethernet header, ip header and ip data
446 ip_header.append(std::move(ip_data));
447 auto pkt = std::move(ip_header);
448 auto iph = pkt.get_header<ip_hdr>(sizeof(eth_hdr));
449 // len is the sum of each fragment
450 iph->len = hton(uint16_t(pkt.len() - sizeof(eth_hdr)));
451 // No fragmentation for the assembled datagram
452 iph->frag = 0;
453 // Since each fragment's csum is checked, no need to csum
454 // again for the assembled datagram
455 offload_info oi;
456 oi.reassembled = true;
457 pkt.set_offload_info(oi);
458 return pkt;
459}
460
461void icmp::received(Packet p, ipaddr from, ipaddr to) {
462 auto hdr = p.get_header<icmp_hdr>(0);
463 if (!hdr || hdr->type != icmp_hdr::msg_type::echo_request) {
464 return;
465 }
466 hdr->type = icmp_hdr::msg_type::echo_reply;
467 hdr->code = 0;
468 hdr->csum = 0;
469 checksummer csum;
470 csum.sum(reinterpret_cast<char*>(hdr), p.len());
471 hdr->csum = csum.get();
472
473 if (_queue_space.get_or_fail(p.len())) { // drop packets that do not fit the queue
474 auto cb = [this, from] (const ethernet_address e_dst, Packet p, int r) mutable {
475 if (r == 0) {
476 _packetq.emplace_back(ipv4_traits::l4packet{from, std::move(p), e_dst, ip_protocol_num::icmp});
477 }
478 };
479 _inet.wait_l2_dst_address(from, std::move(p), cb);
480 }
481}